数据湖探索 DLI-pyspark样例代码:完整示例代码

时间:2024-06-20 11:23:10

完整示例代码

  • 通过SQL API访问 MRS HBase
    • 未开启kerberos认证样例代码
      # _*_ coding: utf-8 _*_
      from __future__ import print_function
      from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, ShortType, LongType, FloatType, DoubleType
      from pyspark.sql import SparkSession
      
      if __name__ == "__main__":
        # Create a SparkSession session.    
        sparkSession = SparkSession.builder.appName("datasource-hbase").getOrCreate()
      
        sparkSession.sql(
          "CREATE TABLE testhbase(id STRING, location STRING, city STRING) using hbase OPTIONS (\
          'ZKHost' = '192.168.0.189:2181',\
          'TableName' = 'hbtest',\
          'RowKey' = 'id:5',\
          'Cols' = 'location:info.location,city:detail.city')")
      
      
        sparkSession.sql("insert into testhbase values('95274','abc','Jinan')")
      
        sparkSession.sql("select * from testhbase").show()
        # close session    
        sparkSession.stop()
    • 开启kerberos认证样例代码
      # _*_ coding: utf-8 _*_
      from __future__ import print_function
      from pyspark import SparkFiles
      from pyspark.sql import SparkSession
      import shutil
      import time
      import os
      
      if __name__ == "__main__":
          # Create a SparkSession session.
          sparkSession = SparkSession.builder.appName("Test_HBase_SparkSql_Kerberos").getOrCreate()
          sc = sparkSession.sparkContext
          time.sleep(10)
      
          krb5_startfile = SparkFiles.get("krb5.conf")
          keytab_startfile = SparkFiles.get("user.keytab")
          path_user = os.getcwd()
          krb5_endfile = path_user + "/" + "krb5.conf"
          keytab_endfile = path_user + "/" + "user.keytab"
          shutil.copy(krb5_startfile, krb5_endfile)
          shutil.copy(keytab_startfile, keytab_endfile)
          time.sleep(20)
      
          sparkSession.sql(
            "CREATE TABLE testhbase(id string,booleanf boolean,shortf short,intf int,longf long,floatf float,doublef double) " +
            "using hbase OPTIONS(" +
            "'ZKHost'='10.0.0.146:2181'," +
            "'TableName'='hbtest'," +
            "'RowKey'='id:100'," +
            "'Cols'='booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF2.longf,floatf:CF1.floatf,doublef:CF2.doublef'," +
            "'krb5conf'='" + path_user + "/krb5.conf'," +
            "'keytab'='" + path_user+ "/user.keytab'," +
            "'principal'='krbtest') ")
      
            sparkSession.sql("insert into testhbase values('95274','abc','Jinan')")
      
          sparkSession.sql("select * from testhbase").show()
          # close session
          sparkSession.stop()
  • 通过DataFrame API访问HBase
    # _*_ coding: utf-8 _*_
    from __future__ import print_function
    from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, ShortType, LongType, FloatType, DoubleType
    from pyspark.sql import SparkSession
    
    if __name__ == "__main__":
      # Create a SparkSession session.    
      sparkSession = SparkSession.builder.appName("datasource-hbase").getOrCreate()
    
      # Createa data table for  DLI -associated ct    
      sparkSession.sql(\
       "CREATE TABLE test_hbase(id STRING, location STRING, city STRING, booleanf BOOLEAN, shortf SHORT, intf INT, longf LONG,floatf FLOAT,doublef DOUBLE) using hbase OPTIONS ( \
        'ZKHost' = 'cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,\
                    cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181,\
                    cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181',\
        'TableName' = 'table_DupRowkey1',\
        'RowKey' = 'id:5,location:6,city:7',\
        'Cols' = 'booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef')")
    
      # Create a DataFrame and initialize the DataFrame data.    
      dataList = sparkSession.sparkContext.parallelize([("11111", "aaa", "aaa", False, 4, 3, 23, 2.3, 2.34)])
    
      # Setting schema    
      schema = StructType([StructField("id", StringType()), 
                           StructField("location", StringType()), 
                           StructField("city", StringType()),                         
                           StructField("booleanf", BooleanType()),                        
                           StructField("shortf", ShortType()),                     
                           StructField("intf", IntegerType()),                 
                           StructField("longf", LongType()),                   
                           StructField("floatf", FloatType()),              
                           StructField("doublef", DoubleType())])
    
      # Create a DataFrame from RDD and schema    
      dataFrame = sparkSession.createDataFrame(dataList, schema)
    
      # Write data to the cloudtable-hbase    
      dataFrame.write.insertInto("test_hbase")
    
      # Set cross-source connection parameters    
      TableName = "table_DupRowkey1"
      RowKey = "id:5,location:6,city:7"
      Cols = "booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef"
      ZKHost = "cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181,
                cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181"
      # Read data on CloudTable-HBase    
      jdbcDF = sparkSession.read.schema(schema)\
                           .format("hbase")\
                           .option("ZKHost", ZKHost)\
                           .option("TableName",TableName)\
                           .option("RowKey", RowKey)\
                           .option("Cols", Cols)\
                           .load()    
      jdbcDF.filter("id = '12333' or id='11111'").show()
    
      # close session    
      sparkSession.stop()
support.huaweicloud.com/devg-dli/dli_09_0078.html