数据湖探索 DLI-使用Spark作业访问DLI元数据:Python样例代码

时间:2024-11-16 13:16:39

Python样例代码

#!/usr/bin/python
# -*- coding: UTF-8 -*-

from __future__ import print_function

import sys

from pyspark.sql import SparkSession

if __name__ == "__main__":
    url = sys.argv[1]
    creatTbl = "CREATE TABLE test_sparkapp.dli_rds USING JDBC OPTIONS ('url'='jdbc:mysql://%s'," \
              "'driver'='com.mysql.jdbc.Driver','dbtable'='test.test'," \
              " 'passwdauth' = 'DatasourceRDSTest_pwd','encryption' = 'true')" % url

    spark = SparkSession \
        .builder \
        .enableHiveSupport() \
.config("spark.sql.session.state.builder","org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") \       
.config("spark.sql.catalog.class", "org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") \  
.config("spark.sql.extensions","org.apache.spark.sql.DliSparkExtension") \
        .appName("python Spark test catalog") \
        .getOrCreate()

    spark.sql("CREATE database if not exists test_sparkapp").collect()
    spark.sql("drop table if exists test_sparkapp.dli_rds").collect()
    spark.sql(creatTbl).collect()
    spark.sql("select * from test_sparkapp.dli_rds").show()
    spark.sql("insert into table test_sparkapp.dli_rds select 12,'aaa'").collect()
    spark.sql("select * from test_sparkapp.dli_rds").show()
    spark.sql("insert overwrite table test_sparkapp.dli_rds select 1111,'asasasa'").collect()
    spark.sql("select * from test_sparkapp.dli_rds").show()
    spark.sql("drop table test_sparkapp.dli_rds").collect()
    spark.stop()
support.huaweicloud.com/devg-dli/dli_09_0176.html