云服务器内容精选

  • 通过DataFrame API访问数据源 构造schema 1 2 3 4 5 6 7 8 9 10 val attrId = new StructField("id",StringType) val location = new StructField("location",StringType) val city = new StructField("city",StringType) val booleanf = new StructField("booleanf",BooleanType) val shortf = new StructField("shortf",ShortType) val intf = new StructField("intf",IntegerType) val longf = new StructField("longf",LongType) val floatf = new StructField("floatf",FloatType) val doublef = new StructField("doublef",DoubleType) val attrs = Array(attrId, location,city,booleanf,shortf,intf,longf,floatf,doublef) 根据schema的类型构造数据 1 2 val mutableRow: Seq[Any] = Seq("12345","abc","city1",false,null,3,23,2.3,2.34) val rddData: RDD[Row] = sparkSession.sparkContext.parallelize(Array(Row.fromSeq(mutableRow)), 1) 导入数据到HBase 1 sparkSession.createDataFrame(rddData, new StructType(attrs)).write.insertInto("test_hbase") 读取HBase上的数据 1 2 3 4 5 6 7 8 val map = new mutable.HashMap[String, String]() map("TableName") = "table_DupRowkey1" map("RowKey") = "id:5,location:6,city:7" map("Cols") = "booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef" map("ZKHost")="cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181, cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181, cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181" sparkSession.read.schema(new StructType(attrs)).format("hbase").options(map.toMap).load().show() 返回结果: