云服务器内容精选

  • 完整示例代码 通过DataFrame API 访问 from __future__ import print_function from pyspark.sql.types import StructType, StructField, IntegerType, StringType from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-mongo").getOrCreate() # Create a DataFrame and initialize the DataFrame data. dataList = sparkSession.sparkContext.parallelize([("1", "Katie", 19),("2","Tom",20)]) # Setting schema schema = StructType([StructField("id", IntegerType(), False),StructField("name", StringType(), False), StructField("age", IntegerType(), False)]) # Create a DataFrame from RDD and schema dataFrame = sparkSession.createDataFrame(dataList, schema) # Setting connection parameters url = "192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin" uri = "mongodb://username:pwd@host:8635/db" user = "rwuser" database = "test" collection = "test" password = "######" # Write data to the mongodb table dataFrame.write.format("mongo") .option("url", url)\ .option("uri", uri)\ .option("user",user)\ .option("password",password)\ .option("database",database)\ .option("collection",collection) .mode("Overwrite").save() # Read data jdbcDF = sparkSession.read.format("mongo") .option("url", url)\ .option("uri", uri)\ .option("user",user)\ .option("password",password)\ .option("database",database)\ .option("collection",collection)\ .load() jdbcDF.show() # close session sparkSession.stop() 通过SQL API 访问 from __future__ import print_function from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-mongo").getOrCreate() # Createa data table for DLI - associated mongo sparkSession.sql( "create table test_dds(id string, name string, age int) using mongo options(\ 'url' = '192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin',\ 'uri' = 'mongodb://username:pwd@host:8635/db',\ 'database' = 'test',\ 'collection' = 'test', \ 'user' = 'rwuser', \ 'password' = '######')") # Insert data into the DLI-table sparkSession.sql("insert into test_dds values('3', 'Ann',23)") # Read data from DLI-table sparkSession.sql("select * from test_dds").show() # close session sparkSession.stop()