云服务器内容精选

  • 完整示例代码 直接复制如下样例代码到py文件中后,需要注意文件内容中的“\”后面可能会有unexpected character的问题。需要将“\”后面的缩进或是空格全部删除。 通过DataFrame API访问 # _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql.types import StructType, StructField, IntegerType, StringType from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-rds").getOrCreate() # Set cross-source connection parameters. url = "jdbc:mysql://to-rds-1174404952-ZgPo1nNC.datasource.com:3306" dbtable = "test.customer" user = "root" password = "######" driver = "com.mysql.jdbc.Driver" # Create a DataFrame and initialize the DataFrame data. dataList = sparkSession.sparkContext.parallelize([(123, "Katie", 19)]) # Setting schema schema = StructType([StructField("id", IntegerType(), False),\ StructField("name", StringType(), False),\ StructField("age", IntegerType(), False)]) # Create a DataFrame from RDD and schema dataFrame = sparkSession.createDataFrame(dataList, schema) # Write data to the RDS. dataFrame.write \ .format("jdbc") \ .option("url", url) \ .option("dbtable", dbtable) \ .option("user", user) \ .option("password", password) \ .option("driver", driver) \ .mode("Append") \ .save() # Read data jdbcDF = sparkSession.read \ .format("jdbc") \ .option("url", url) \ .option("dbtable", dbtable) \ .option("user", user) \ .option("password", password) \ .option("driver", driver) \ .load() jdbcDF.show() # close session sparkSession.stop() 通过SQL API访问 # _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-rds").getOrCreate() # Createa data table for DLI - associated RDS sparkSession.sql( "CREATE TABLE IF NOT EXISTS dli_to_rds USING JDBC OPTIONS (\ 'url'='jdbc:mysql://to-rds-1174404952-ZgPo1nNC.datasource.com:3306',\ 'dbtable'='test.customer',\ 'user'='root',\ 'password'='######',\ 'driver'='com.mysql.jdbc.Driver')") # Insert data into the DLI data table sparkSession.sql("insert into dli_to_rds values(3,'John',24)") # Read data from DLI data table jdbcDF = sparkSession.sql("select * from dli_to_rds") jdbcDF.show() # close session sparkSession.stop()