数据湖探索 DLI-scala样例代码:通过DataFrame API访问数据源

时间:2024-11-16 13:16:39

通过DataFrame API访问数据源

  1. 连接配置。
    1
    2
    3
    4
    val url = "jdbc:postgresql://to-dws-1174405057-EA1Kgo8H.datasource.com:8000/postgres"
    val username = "dbadmin"
    val password = "######"
    val dbtable = "customer"
    
  2. 创建DataFrame,添加数据,并重命名字段。
    1
    2
    3
    4
    var dataFrame_1 = sparkSession.createDataFrame(List((8, "Jack_1", 18)))
    val df = dataFrame_1.withColumnRenamed("_1", "id")
                        .withColumnRenamed("_2", "name")
                        .withColumnRenamed("_3", "age")
    
  3. 导入数据到DWS。
    1
    2
    3
    4
    5
    6
    7
    df.write.format("jdbc")
      .option("url", url)
      .option("dbtable", dbtable)
      .option("user", username)
      .option("password", password)
      .mode(SaveMode.Append)
      .save()
    

    SaveMode 有四种保存类型:

    • ErrorIfExis:如果已经存在数据,则抛出异常。
    • Overwrite:如果已经存在数据,则覆盖原数据。
    • Append:如果已经存在数据,则追加保存。
    • Ignore:如果已经存在数据,则不做操作。这类似于SQL中的“如果不存在则创建表”。
  4. 读取DWS上的数据。
    • 方式一:read.format()方法
      1
      2
      3
      4
      5
      6
      val jdbcDF = sparkSession.read.format("jdbc")
                       .option("url", url)
                       .option("dbtable", dbtable)
                       .option("user", username)
                       .option("password", password)
                       .load()
      
    • 方式二:read.jdbc()方法
      1
      2
      3
      4
      val properties = new Properties()
       properties.put("user", username)
       properties.put("password", password)
       val jdbcDF2 = sparkSession.read.jdbc(url, dbtable, properties)
      

    插入数据前:

    插入数据后:

    使用上述read.format()或者read.jdbc()方法读取到的dateFrame注册为临时表,就可使用sql语句进行数据查询了。

    1
    2
    jdbcDF.registerTempTable("customer_test")
     sparkSession.sql("select * from customer_test where id = 1").show()
    

    查询结果:

support.huaweicloud.com/devg-dli/dli_09_0069.html