数据湖探索 DLI-scala样例代码:通过DataFrame API访问数据源

时间:2024-11-16 13:16:39

通过DataFrame API访问数据源

  1. 构造schema
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    val attrId = new StructField("id",StringType)
    val location = new StructField("location",StringType)
    val city = new StructField("city",StringType)
    val booleanf = new StructField("booleanf",BooleanType)
    val shortf = new StructField("shortf",ShortType)
    val intf = new StructField("intf",IntegerType)
    val longf = new StructField("longf",LongType)
    val floatf = new StructField("floatf",FloatType)
    val doublef = new StructField("doublef",DoubleType)
    val attrs = Array(attrId, location,city,booleanf,shortf,intf,longf,floatf,doublef)
    
  2. 根据schema的类型构造数据
    1
    2
    val mutableRow: Seq[Any] = Seq("12345","abc","city1",false,null,3,23,2.3,2.34)
    val rddData: RDD[Row] = sparkSession.sparkContext.parallelize(Array(Row.fromSeq(mutableRow)), 1)
    
  3. 导入数据到HBase
    1
    sparkSession.createDataFrame(rddData, new StructType(attrs)).write.insertInto("test_hbase")
    
  4. 读取HBase上的数据
    1
    2
    3
    4
    5
    6
    7
    8
    val map = new mutable.HashMap[String, String]()
    map("TableName") = "table_DupRowkey1"
    map("RowKey") = "id:5,location:6,city:7"
    map("Cols") = "booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef"
    map("ZKHost")="cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,
                   cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181,
                   cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181"
    sparkSession.read.schema(new StructType(attrs)).format("hbase").options(map.toMap).load().show()
    

    返回结果:

support.huaweicloud.com/devg-dli/dli_09_0063.html