数据湖探索 DLI-pyspark样例代码:通过DataFrame API访问数据源

时间:2024-11-16 13:16:39

通过DataFrame API访问数据源

  1. 连接参数配置
    1
    2
    3
    4
    5
    url = "jdbc:postgresql://to-dws-1174404951-W8W4cW8I.datasource.com:8000/postgres"
    dbtable = "customer"
    user = "dbadmin"
    password = "######"
    driver = "org.postgresql.Driver"
    
  2. 设置数据
    1
    dataList = sparkSession.sparkContext.parallelize([(1, "Katie", 19)])
    
  3. 设置schema
    1
    2
    3
    schema = StructType([StructField("id", IntegerType(), False),\                
                         StructField("name", StringType(), False),\            
                         StructField("age", IntegerType(), False)])
    
  4. 创建DataFrame
    1
    dataFrame = sparkSession.createDataFrame(dataList, schema)
    
  5. 保存数据到DWS
    1
    2
    3
    4
    5
    6
    7
    8
    9
    dataFrame.write \   
        .format("jdbc") \  
        .option("url", url) \  
        .option("dbtable", dbtable) \  
        .option("user", user) \ 
        .option("password", password) \ 
        .option("driver", driver) \ 
        .mode("Overwrite") \  
        .save()
    

    mode 有四种保存类型:

    • ErrorIfExis:如果已经存在数据,则抛出异常。
    • Overwrite:如果已经存在数据,则覆盖原数据。
    • Append:如果已经存在数据,则追加保存。
    • Ignore:如果已经存在数据,则不做操作。这类似于SQL中的“如果不存在则创建表”。
  6. 读取DWS上的数据
    1
    2
    3
    4
    5
    6
    7
    8
    9
    jdbcDF = sparkSession.read \
        .format("jdbc") \
        .option("url", url) \
        .option("dbtable", dbtable) \
        .option("user", user) \
        .option("password", password) \
        .option("driver", driver) \
        .load()
    jdbcDF.show()
    
  7. 操作结果

support.huaweicloud.com/devg-dli/dli_09_0087.html