华为云用户手册

  • 数据同步示例 GaussDB (DWS)侧: 新建binlog表时,enable_hstore_binlog_table参数需要设置为true,可以通过show enable_hstore_binlog_table来查询。 -- 源表(产生binlog) 1 CREATE TABLE test_binlog_source(a int, b int, c int, primary key(a, b)) with(orientation=column, enable_hstore=on, enable_binlog=true); -- 目标表 1 CREATE TABLE test_binlog_sink(a int, b int, c int, primary key(a, b)) with(orientation=column, enable_hstore=on); Flink侧: 执行如下命令进行完整数据同步: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 -- 建立源表的映射表 CREATE TABLE test_binlog_source ( a int, b int, c int, primary key(a) ) with ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://ip:port/gaussdb', 'binlog' = 'true', 'tableName' = 'test_binlog_source', 'binlogSlotName' = 'slot', 'username'='xxx', 'password'='xxx'); ​-- 建立目标表的映射表 CREATE TABLE test_binlog_sink ( a int, b int, c int, primary key(a) ) with ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://ip:port/gaussdb', 'tableName' = 'test_binlog_sink', 'ignoreUpdateBefore'='false', 'connectionSize' = '1', 'username'='xxx', 'password'='xxx'); ​ INSERT INTO test_binlog_sink select * from test_binlog_source;
  • Binlog相关参数说明 下表仅涉及消费Binlog时的参数。 表1 消费Binlog时的参数 参数 说明 数据类型 默认值 binlog 是否读取Binlog信息 Boolean false binlogSlotName 槽位信息,可以理解一个标识。由于可能存在多个Flink任务同时消费同一张表的Binlog信息,所以该场景需要保证每个任务的binlogSlotName不同。 String Flink映射表的表名 binlogBatchReadSize 批量读取binlog的数据行数 Integer 5000 fullSyncBinlogBatchReadSize 全量读取binlog的数据行数 Integer 50000 binlogReadTimeout 增量消费Binlog数据时超时时间,单位毫秒 Integer 600000 fullSyncBinlogReadTimeout 全量消费Binlog数据时超时时间,单位毫秒 Long 1800000 binlogSleepTime 实时消费不到Binlog数据时休眠时间,单位毫秒。如果连续读取不到Binlog数据,则休眠时间为:binlogSleepTime * 次数,最大为binlogMaxSleepTime。读取到数据后,则重置。 Long 500 binlogMaxSleepTime 实时消费不到Binlog数据时最大休眠时间,单位毫秒。 Long 10000 binlogMaxRetryTimes 消费Binlog数据出错后的重试次数。 Integer 1 binlogRetryInterval 消费binlog数据出错后的重试时间间隔。重试时sleep时间:binlogRetryInterval * (1~binlogMaxRetryTimes) +Random(100)。单位毫秒。 Long 100 binlogParallelNum 消费Binlog数据时线程数,只有任务并发度小于DWS集群DN数时,该参数才有效,即此时一个并发度会消费多个DN上的数据,所以可以考虑设置该参数。 Integer 3 connectionPoolSize JDBC连接池连接大小。 Integer 5 needRedistribution 表示是否兼容扩充重分布(需要升级到对应内核版本,如果是低版本则设置为false);如果设置成true的话,flink的restart-strategy不能设置为none。 Boolean true newSystemValue 表示读取binlog数据时是否使用新的系统字段(需要升级到对应内核版本,如果是低版本则设置为false)。 Boolean true checkNodeChangeInterval 检测节点变化的间隔,只有needRedistribution=true才生效。 Long 10000 connectionSocketTimeout 连接处理超时时间(可以看成客户端执行SQL超时时间),单位毫秒;默认值为0,即不设置超时时间。 Integer 0 binlogIgnoreUpdateBefor 是否过滤Binlog记录中的before_update记录,以及delete记录是否只返回主键信息。该参数仅9.1.0.200及以上版本支持。 Boolean false binlogStartTime 设置从某个时间点开始消费Binlog(只能增量消费),格式为yyyy-MM-dd hh:mm:ss且表需要开启enable_binlog_timestamp。 该参数仅9.1.0.200及以上版本支持。 String 无 binlogSyncPointSize 增量读取binlog同步点区间的大小(增量读取binlog时,如果数据量过大可能涉及下盘,可通过调整该参数控制)。 该参数仅9.1.0.200及以上版本支持。 Integer 5000
  • 注意事项 当前仅8.3.0.100及以上的版本支持HStore和HStore-opt记录Binlog功能,且处于试商用阶段,使用前需要进行评估。 目前GaussDB(DWS)只有Hstore表支持Binlog功能,表需要包含主键且设置enable_binlog=on。 消费的Binlog表名不要带有特殊字符,如.、""等。 如果多个任务消费同一张表的Binlog数据,需要保证每个任务的binlogSlotName唯一。 为了达到最高的消费速度,建议将任务的并发度和DWS集群DN数设置一致。 使用dws-connector-flink的Sink能力来写入Binlog数据的话,需要注意以下几点: 如果需要保证DN内的数据写入顺序则需要设置connectionSize设置为1。 如果源端有更新主键操作或者需要flink进行聚合计算的话,将ignoreUpdateBefore设置为false,否则不建议将 ignoreUpdateBefore设置为false(默认true)。
  • 完整示例代码 通过DataFrame API访问 认证用的password硬编码到代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 # _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql.types import StructType, StructField, IntegerType, StringType from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-dws").getOrCreate() # Set cross-source connection parameters url = "jdbc:postgresql://to-dws-1174404951-W8W4cW8I.datasource.com:8000/postgres" dbtable = "customer" user = "dbadmin" password = "######" driver = "org.postgresql.Driver" # Create a DataFrame and initialize the DataFrame data. dataList = sparkSession.sparkContext.parallelize([(1, "Katie", 19)]) # Setting schema schema = StructType([StructField("id", IntegerType(), False),\ StructField("name", StringType(), False),\ StructField("age", IntegerType(), False)]) # Create a DataFrame from RDD and schema dataFrame = sparkSession.createDataFrame(dataList, schema) # Write data to the DWS table dataFrame.write \ .format("jdbc") \ .option("url", url) \ .option("dbtable", dbtable) \ .option("user", user) \ .option("password", password) \ .option("driver", driver) \ .mode("Overwrite") \ .save() # Read data jdbcDF = sparkSession.read \ .format("jdbc") \ .option("url", url) \ .option("dbtable", dbtable) \ .option("user", user) \ .option("password", password) \ .option("driver", driver) \ .load() jdbcDF.show() # close session sparkSession.stop() 通过SQL API访问 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 # _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-dws").getOrCreate() # Createa data table for DLI - associated DWS sparkSession.sql( "CREATE TABLE IF NOT EXISTS dli_to_dws USING JDBC OPTIONS (\ 'url'='jdbc:postgresql://to-dws-1174404951-W8W4cW8I.datasource.com:8000/postgres',\ 'dbtable'='customer',\ 'user'='dbadmin',\ 'password'='######',\ 'driver'='org.postgresql.Driver')") # Insert data into the DLI data table sparkSession.sql("insert into dli_to_dws values(2,'John',24)") # Read data from DLI data table jdbcDF = sparkSession.sql("select * from dli_to_dws").show() # close session sparkSession.stop()
  • 通过SQL API 访问数据源 创建DLI跨源访问 dws 的关联表。 1 2 3 4 5 6 7 sparkSession.sql( "CREATE TABLE IF NOT EXISTS dli_to_dws USING JDBC OPTIONS ( 'url'='jdbc:postgresql://to-dws-1174404951-W8W4cW8I.datasource.com:8000/postgres',\ 'dbtable'='customer',\ 'user'='dbadmin',\ 'password'='######',\ 'driver'='org.postgresql.Driver')") 建表参数详情可参考表1。 插入数据 1 sparkSession.sql("insert into dli_to_dws values(2,'John',24)") 查询数据 1 jdbcDF = sparkSession.sql("select * from dli_to_dws").show() 操作结果
  • 操作前准备 import相关依赖包 1 2 3 from __future__ import print_function from pyspark.sql.types import StructType, StructField, IntegerType, StringType from pyspark.sql import SparkSession 创建会话 1 sparkSession = SparkSession.builder.appName("datasource-dws").getOrCreate()
  • 通过DataFrame API访问数据源 连接参数配置 1 2 3 4 5 url = "jdbc:postgresql://to-dws-1174404951-W8W4cW8I.datasource.com:8000/postgres" dbtable = "customer" user = "dbadmin" password = "######" driver = "org.postgresql.Driver" 设置数据 1 dataList = sparkSession.sparkContext.parallelize([(1, "Katie", 19)]) 设置schema 1 2 3 schema = StructType([StructField("id", IntegerType(), False),\ StructField("name", StringType(), False),\ StructField("age", IntegerType(), False)]) 创建DataFrame 1 dataFrame = sparkSession.createDataFrame(dataList, schema) 保存数据到DWS 1 2 3 4 5 6 7 8 9 dataFrame.write \ .format("jdbc") \ .option("url", url) \ .option("dbtable", dbtable) \ .option("user", user) \ .option("password", password) \ .option("driver", driver) \ .mode("Overwrite") \ .save() mode 有四种保存类型: ErrorIfExis:如果已经存在数据,则抛出异常。 Overwrite:如果已经存在数据,则覆盖原数据。 Append:如果已经存在数据,则追加保存。 Ignore:如果已经存在数据,则不做操作。这类似于SQL中的“如果不存在则创建表”。 读取DWS上的数据 1 2 3 4 5 6 7 8 9 jdbcDF = sparkSession.read \ .format("jdbc") \ .option("url", url) \ .option("dbtable", dbtable) \ .option("user", user) \ .option("password", password) \ .option("driver", driver) \ .load() jdbcDF.show() 操作结果
  • 完整示例代码 通过SQL API访问 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import org.apache.spark.sql.SparkSession; public class java_rds { public static void main(String[] args) { SparkSession sparkSession = SparkSession.builder().appName("datasource-rds").getOrCreate(); // Create a data table for DLI-associated RDS sparkSession.sql("CREATE TABLE IF NOT EXISTS dli_to_rds USING JDBC OPTIONS ('url'='jdbc:mysql://192.168.6.150:3306','dbtable'='test.customer','user'='root','password'='**','driver'='com.mysql.jdbc.Driver')"); //*****************************SQL model*********************************** //Insert data into the DLI data table sparkSession.sql("insert into dli_to_rds values(3,'Liu',21),(4,'Joey',34)"); //Read data from DLI data table sparkSession.sql("select * from dli_to_rds"); //drop table sparkSession.sql("drop table dli_to_rds"); sparkSession.close(); } }
  • 完整示例代码 直接复制如下样例代码到py文件中后,需要注意文件内容中的“\”后面可能会有unexpected character的问题。需要将“\”后面的缩进或是空格全部删除。 通过DataFrame API访问 # _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql.types import StructType, StructField, IntegerType, StringType from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-rds").getOrCreate() # Set cross-source connection parameters. url = "jdbc:mysql://to-rds-1174404952-ZgPo1nNC.datasource.com:3306" dbtable = "test.customer" user = "root" password = "######" driver = "com.mysql.jdbc.Driver" # Create a DataFrame and initialize the DataFrame data. dataList = sparkSession.sparkContext.parallelize([(123, "Katie", 19)]) # Setting schema schema = StructType([StructField("id", IntegerType(), False),\ StructField("name", StringType(), False),\ StructField("age", IntegerType(), False)]) # Create a DataFrame from RDD and schema dataFrame = sparkSession.createDataFrame(dataList, schema) # Write data to the RDS. dataFrame.write \ .format("jdbc") \ .option("url", url) \ .option("dbtable", dbtable) \ .option("user", user) \ .option("password", password) \ .option("driver", driver) \ .mode("Append") \ .save() # Read data jdbcDF = sparkSession.read \ .format("jdbc") \ .option("url", url) \ .option("dbtable", dbtable) \ .option("user", user) \ .option("password", password) \ .option("driver", driver) \ .load() jdbcDF.show() # close session sparkSession.stop() 通过SQL API访问 # _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-rds").getOrCreate() # Createa data table for DLI - associated RDS sparkSession.sql( "CREATE TABLE IF NOT EXISTS dli_to_rds USING JDBC OPTIONS (\ 'url'='jdbc:mysql://to-rds-1174404952-ZgPo1nNC.datasource.com:3306',\ 'dbtable'='test.customer',\ 'user'='root',\ 'password'='######',\ 'driver'='com.mysql.jdbc.Driver')") # Insert data into the DLI data table sparkSession.sql("insert into dli_to_rds values(3,'John',24)") # Read data from DLI data table jdbcDF = sparkSession.sql("select * from dli_to_rds") jdbcDF.show() # close session sparkSession.stop()
  • 完整示例代码 通过SQL API访问 MRS 的OpenTSDB # _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-opentsdb").getOrCreate() # Create a DLI cross-source association opentsdb data table sparkSession.sql(\ "create table opentsdb_test using opentsdb options(\ 'Host'='10.0.0.171:4242',\ 'metric'='cts_opentsdb',\ 'tags'='city,location')") sparkSession.sql("insert into opentsdb_test values('aaa', 'abc', '2021-06-30 18:00:00', 30.0)") result = sparkSession.sql("SELECT * FROM opentsdb_test") result.show() # close session sparkSession.stop() 通过DataFrame API访问OpenTSDB # _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-opentsdb").getOrCreate() # Create a DLI cross-source association opentsdb data table sparkSession.sql( "create table opentsdb_test using opentsdb options(\ 'Host'='opentsdb-3xcl8dir15m58z3.cloudtable.com:4242',\ 'metric'='ct_opentsdb',\ 'tags'='city,location')") # Create a DataFrame and initialize the DataFrame data. dataList = sparkSession.sparkContext.parallelize([("aaa", "abc", 123456L, 30.0)]) # Setting schema schema = StructType([StructField("location", StringType()),\ StructField("name", StringType()),\ StructField("timestamp", LongType()),\ StructField("value", DoubleType())]) # Create a DataFrame from RDD and schema dataFrame = sparkSession.createDataFrame(dataList, schema) # Set cross-source connection parameters metric = "ctopentsdb" tags = "city,location" Host = "opentsdb-3xcl8dir15m58z3.cloudtable.com:4242" # Write data to the cloudtable-opentsdb dataFrame.write.insertInto("opentsdb_test") # ******* Opentsdb does not currently implement the ctas method to save data, so the save() method cannot be used.******* # dataFrame.write.format("opentsdb").option("Host", Host).option("metric", metric).option("tags", tags).mode("Overwrite").save() # Read data on CloudTable-OpenTSDB jdbdDF = sparkSession.read\ .format("opentsdb")\ .option("Host",Host)\ .option("metric",metric)\ .option("tags",tags)\ .load() jdbdDF.show() # close session sparkSession.stop()
  • 功能描述 DLI提供了一个通用接口,可用于获取用户在启动Flink作业时设置的委托的临时凭证。该接口将获取到的该作业委托的临时凭证封装到com.huaweicloud.sdk.core.auth.BasicCredentials类中。 获取到的委托的临时认证封装到com.huaweicloud.sdk.core.auth.ICredentialProvider接口的getCredentials()返回值中。 返回类型为com.huaweicloud.sdk.core.auth.BasicCredentials。 仅支持获取AK、SK、SecurityToken。 获取到AK、SK、SecurityToken后,请参考如何使用凭据管理服务替换硬编码的数据库账号密码查询凭据。
  • 约束限制 仅支持Flink1.15版本使用委托授权访问临时凭证: 在创建作业时,请配置作业使用Flink1.15版本 已在作业中配置允许DLI访问DEW的委托信息。flink.dli.job.agency.name=自定义委托名称。 自定义委托请参考自定义DLI委托权限。 请注意配置参数不需要用"" 或 '' 包裹。 Flink1.15基础镜像内置了3.1.62版本的huaweicloud-sdk-core。
  • 完整示例代码 通过SQL API访问MRS HBase 未开启kerberos认证样例代码 # _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, ShortType, LongType, FloatType, DoubleType from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-hbase").getOrCreate() sparkSession.sql( "CREATE TABLE testhbase(id STRING, location STRING, city STRING) using hbase OPTIONS (\ 'ZKHost' = '192.168.0.189:2181',\ 'TableName' = 'hbtest',\ 'RowKey' = 'id:5',\ 'Cols' = 'location:info.location,city:detail.city')") sparkSession.sql("insert into testhbase values('95274','abc','Jinan')") sparkSession.sql("select * from testhbase").show() # close session sparkSession.stop() 开启kerberos认证样例代码 # _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark import SparkFiles from pyspark.sql import SparkSession import shutil import time import os if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("Test_HBase_SparkSql_Kerberos").getOrCreate() sc = sparkSession.sparkContext time.sleep(10) krb5_startfile = SparkFiles.get("krb5.conf") keytab_startfile = SparkFiles.get("user.keytab") path_user = os.getcwd() krb5_endfile = path_user + "/" + "krb5.conf" keytab_endfile = path_user + "/" + "user.keytab" shutil.copy(krb5_startfile, krb5_endfile) shutil.copy(keytab_startfile, keytab_endfile) time.sleep(20) sparkSession.sql( "CREATE TABLE testhbase(id string,booleanf boolean,shortf short,intf int,longf long,floatf float,doublef double) " + "using hbase OPTIONS(" + "'ZKHost'='10.0.0.146:2181'," + "'TableName'='hbtest'," + "'RowKey'='id:100'," + "'Cols'='booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF2.longf,floatf:CF1.floatf,doublef:CF2.doublef'," + "'krb5conf'='" + path_user + "/krb5.conf'," + "'keytab'='" + path_user+ "/user.keytab'," + "'principal'='krbtest') ") sparkSession.sql("insert into testhbase values('95274','abc','Jinan')") sparkSession.sql("select * from testhbase").show() # close session sparkSession.stop() 通过DataFrame API访问HBase # _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, ShortType, LongType, FloatType, DoubleType from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-hbase").getOrCreate() # Createa data table for DLI-associated ct sparkSession.sql(\ "CREATE TABLE test_hbase(id STRING, location STRING, city STRING, booleanf BOOLEAN, shortf SHORT, intf INT, longf LONG,floatf FLOAT,doublef DOUBLE) using hbase OPTIONS ( \ 'ZKHost' = 'cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,\ cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181,\ cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181',\ 'TableName' = 'table_DupRowkey1',\ 'RowKey' = 'id:5,location:6,city:7',\ 'Cols' = 'booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef')") # Create a DataFrame and initialize the DataFrame data. dataList = sparkSession.sparkContext.parallelize([("11111", "aaa", "aaa", False, 4, 3, 23, 2.3, 2.34)]) # Setting schema schema = StructType([StructField("id", StringType()), StructField("location", StringType()), StructField("city", StringType()), StructField("booleanf", BooleanType()), StructField("shortf", ShortType()), StructField("intf", IntegerType()), StructField("longf", LongType()), StructField("floatf", FloatType()), StructField("doublef", DoubleType())]) # Create a DataFrame from RDD and schema dataFrame = sparkSession.createDataFrame(dataList, schema) # Write data to the cloudtable-hbase dataFrame.write.insertInto("test_hbase") # Set cross-source connection parameters TableName = "table_DupRowkey1" RowKey = "id:5,location:6,city:7" Cols = "booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef" ZKHost = "cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181, cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181" # Read data on CloudTable-HBase jdbcDF = sparkSession.read.schema(schema)\ .format("hbase")\ .option("ZKHost", ZKHost)\ .option("TableName",TableName)\ .option("RowKey", RowKey)\ .option("Cols", Cols)\ .load() jdbcDF.filter("id = '12333' or id='11111'").show() # close session sparkSession.stop()
  • 通过DataFrame API访问数据源 连接配置。 1 2 3 4 val url = "jdbc:postgresql://to-dws-1174405057-EA1Kgo8H.datasource.com:8000/postgres" val username = "dbadmin" val password = "######" val dbtable = "customer" 创建DataFrame,添加数据,并重命名字段。 1 2 3 4 var dataFrame_1 = sparkSession.createDataFrame(List((8, "Jack_1", 18))) val df = dataFrame_1.withColumnRenamed("_1", "id") .withColumnRenamed("_2", "name") .withColumnRenamed("_3", "age") 导入数据到DWS。 1 2 3 4 5 6 7 df.write.format("jdbc") .option("url", url) .option("dbtable", dbtable) .option("user", username) .option("password", password) .mode(SaveMode.Append) .save() SaveMode 有四种保存类型: ErrorIfExis:如果已经存在数据,则抛出异常。 Overwrite:如果已经存在数据,则覆盖原数据。 Append:如果已经存在数据,则追加保存。 Ignore:如果已经存在数据,则不做操作。这类似于SQL中的“如果不存在则创建表”。 读取DWS上的数据。 方式一:read.format()方法 1 2 3 4 5 6 val jdbcDF = sparkSession.read.format("jdbc") .option("url", url) .option("dbtable", dbtable) .option("user", username) .option("password", password) .load() 方式二:read.jdbc()方法 1 2 3 4 val properties = new Properties() properties.put("user", username) properties.put("password", password) val jdbcDF2 = sparkSession.read.jdbc(url, dbtable, properties) 插入数据前: 插入数据后: 使用上述read.format()或者read.jdbc()方法读取到的dateFrame注册为临时表,就可使用sql语句进行数据查询了。 1 2 jdbcDF.registerTempTable("customer_test") sparkSession.sql("select * from customer_test where id = 1").show() 查询结果:
  • 完整示例代码 通过DataFrame API 访问 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 # _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql.types import StructType, StructField, IntegerType, StringType from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-redis").getOrCreate() # Set cross-source connection parameters. host = "192.168.4.199" port = "6379" table = "person" auth = "######" # Create a DataFrame and initialize the DataFrame data. # ******* method noe ********* dataList = sparkSession.sparkContext.parallelize([(1, "Katie", 19),(2,"Tom",20)]) schema = StructType([StructField("id", IntegerType(), False),StructField("name", StringType(), False),StructField("age", IntegerType(), False)]) dataFrame_one = sparkSession.createDataFrame(dataList, schema) # ****** method two ****** # jdbcDF = sparkSession.createDataFrame([(3,"Jack", 23)]) # dataFrame = jdbcDF.withColumnRenamed("_1", "id").withColumnRenamed("_2", "name").withColumnRenamed("_3", "age") # Write data to the redis table dataFrame.write.format("redis").option("host", host).option("port", port).option("table", table).option("password", auth).mode("Overwrite").save() # Read data sparkSession.read.format("redis").option("host", host).option("port", port).option("table", table).option("password", auth).load().show() # close session sparkSession.stop() 通过SQL API 访问 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 # _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession sparkSession = SparkSession.builder.appName("datasource_redis").getOrCreate() sparkSession.sql( "CREATE TEMPORARY VIEW person (name STRING, age INT) USING org.apache.spark.sql.redis OPTIONS (\ 'host' = '192.168.4.199', \ 'port' = '6379',\ 'password' = '######',\ 'table'= 'person')".stripMargin); sparkSession.sql("INSERT INTO TABLE person VALUES ('John', 30),('Peter', 45)".stripMargin) sparkSession.sql("SELECT * FROM person".stripMargin).collect().foreach(println) # close session sparkSession.stop()
  • 通过DataFrame API访问数据源 构造schema 1 2 3 4 5 6 7 8 9 10 val attrId = new StructField("id",StringType) val location = new StructField("location",StringType) val city = new StructField("city",StringType) val booleanf = new StructField("booleanf",BooleanType) val shortf = new StructField("shortf",ShortType) val intf = new StructField("intf",IntegerType) val longf = new StructField("longf",LongType) val floatf = new StructField("floatf",FloatType) val doublef = new StructField("doublef",DoubleType) val attrs = Array(attrId, location,city,booleanf,shortf,intf,longf,floatf,doublef) 根据schema的类型构造数据 1 2 val mutableRow: Seq[Any] = Seq("12345","abc","city1",false,null,3,23,2.3,2.34) val rddData: RDD[Row] = sparkSession.sparkContext.parallelize(Array(Row.fromSeq(mutableRow)), 1) 导入数据到HBase 1 sparkSession.createDataFrame(rddData, new StructType(attrs)).write.insertInto("test_hbase") 读取HBase上的数据 1 2 3 4 5 6 7 8 val map = new mutable.HashMap[String, String]() map("TableName") = "table_DupRowkey1" map("RowKey") = "id:5,location:6,city:7" map("Cols") = "booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef" map("ZKHost")="cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181, cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181, cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181" sparkSession.read.schema(new StructType(attrs)).format("hbase").options(map.toMap).load().show() 返回结果:
  • 通过SQL API 访问数据源 创建DLI跨源访问DWS的关联表,填写连接参数。 1 sparkSession.sql("CREATE TABLE IF NOT EXISTS dli_to_dws USING JDBC OPTIONS ('url'='jdbc:postgresql://10.0.0.233:8000/postgres','dbtable'='test','user'='dbadmin','password'='**')"); 插入数据 1 sparkSession.sql("insert into dli_to_dws values(3,'Liu'),(4,'Xie')"); 查询数据 1 sparkSession.sql("select * from dli_to_dws").show(); 插入数据后:
  • 完整示例代码 通过SQL API 访问DWS表 import org.apache.spark.sql.SparkSession; public class java_dws { public static void main(String[] args) { SparkSession sparkSession = SparkSession.builder().appName("datasource-dws").getOrCreate(); sparkSession.sql("CREATE TABLE IF NOT EXISTS dli_to_dws USING JDBC OPTIONS ('url'='jdbc:postgresql://10.0.0.233:8000/postgres','dbtable'='test','user'='dbadmin','password'='**')"); //*****************************SQL model*********************************** //Insert data into the DLI data table sparkSession.sql("insert into dli_to_dws values(3,'Liu'),(4,'Xie')"); //Read data from DLI data table sparkSession.sql("select * from dli_to_dws").show(); //drop table sparkSession.sql("drop table dli_to_dws"); sparkSession.close(); } }
  • 完整示例代码 通过DataFrame API 访问 from __future__ import print_function from pyspark.sql.types import StructType, StructField, IntegerType, StringType from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-mongo").getOrCreate() # Create a DataFrame and initialize the DataFrame data. dataList = sparkSession.sparkContext.parallelize([("1", "Katie", 19),("2","Tom",20)]) # Setting schema schema = StructType([StructField("id", IntegerType(), False),StructField("name", StringType(), False), StructField("age", IntegerType(), False)]) # Create a DataFrame from RDD and schema dataFrame = sparkSession.createDataFrame(dataList, schema) # Setting connection parameters url = "192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin" uri = "mongodb://username:pwd@host:8635/db" user = "rwuser" database = "test" collection = "test" password = "######" # Write data to the mongodb table dataFrame.write.format("mongo") .option("url", url)\ .option("uri", uri)\ .option("user",user)\ .option("password",password)\ .option("database",database)\ .option("collection",collection) .mode("Overwrite").save() # Read data jdbcDF = sparkSession.read.format("mongo") .option("url", url)\ .option("uri", uri)\ .option("user",user)\ .option("password",password)\ .option("database",database)\ .option("collection",collection)\ .load() jdbcDF.show() # close session sparkSession.stop() 通过SQL API 访问 from __future__ import print_function from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-mongo").getOrCreate() # Createa data table for DLI - associated mongo sparkSession.sql( "create table test_dds(id string, name string, age int) using mongo options(\ 'url' = '192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin',\ 'uri' = 'mongodb://username:pwd@host:8635/db',\ 'database' = 'test',\ 'collection' = 'test', \ 'user' = 'rwuser', \ 'password' = '######')") # Insert data into the DLI-table sparkSession.sql("insert into test_dds values('3', 'Ann',23)") # Read data from DLI-table sparkSession.sql("select * from test_dds").show() # close session sparkSession.stop()
  • 降级方式 包含下述2种降级方式: 如果指定降级和自动降级都设置了,仅指定降级会生效。 示例代码如下所示: const client = HWLLSPlayer.createClient() client.startPlay(url, { ... hlsUrl: // 填写hls的播放地址 flvUrl: // 填写flv的播放地址 autoDowngrade: true, // true表示启用自动降级播放,false表示不启用自动降级播放 ... }) 指定降级 设置HWLLSClient中的startPlay接口,指定options里面参数downgradeUrl的flv或hls地址,即可实现异常时的降级播放。 如果hlsUrl和flvUrl播放地址只选填一个,会降级至指定地址;如果两个播放地址都填,则降级后的播放顺序为先HLS后FLV。 const client = HWLLSPlayer.createClient() client.startPlay(url, { ... downgradeUrl: { hlsUrl: // 填写hls的播放地址 flvUrl: // 填写flv的播放地址 } ... }) 自动降级 设置HWLLSClient中的startPlay接口,指定options里面参数autoDowngrade的值为true,即可实现接口异常时的自动降级播放。 const client = HWLLSPlayer.createClient() client.startPlay(url, { ... autoDowngrade: true, // true表示启用自动降级播放,false表示不启用自动降级播放 ... })
  • 常见问题 如果业务上App只能使用http协议,是否能够集成使用华为低时延直播Web SDK ? 部分浏览器(chrome)可以集成使用,但不推荐。由于浏览器兼容性识别是根据浏览器暴露的WebRTC对象判断的,在非https协议下,对象可能不存在。 Firefox浏览器中无法使用华为低时延直播Web SDK? Firefox浏览器使用之前需要安装H264的编解码插件。浏览器中输入about:addons,跳转到插件安装页面,查看H264插件是否安装完成,如未安装请在该页面更新安装。 集成华为低时延直播Web SDK后,无法正常使用,可能原因? 需要检查用户自定义的 域名 配置是否完成,如:推、拉流域名,权威机构签发的https证书等。 推流端设置及推流是否正常。 播放地址是否填写正确,如:appName、streamName等。 网络连接是否正常、网络防火墙配置是否有限制,如:UDP端口(8000-8063)是否放通。 华为低时延直播Web SDK,支持哪些类型浏览器? 浏览器支持详情请参见浏览器适配。 推流端推流成功后,华为低时延直播Web SDK拉流播放失败? 需要确认推流端的推流编码参数,是否为H264+无B帧。目前华为低时延直播Web SDK仅支持H264+无B帧的流,所以如果原始流为H265或者带B帧,则需要提前在租户Console上配置对应转码模板,开启转码服务,但这样会引入额外的转码延迟,并且会产生转码费用。建议推流端尽量推H264+不包含B帧的流,可以通过调整推流端软件(如OBS)的视频编码参数去除B 帧。如果使用OBS推流,可以通过设置,关闭B帧。如下图所示: 华为低时延直播Web SDK,播放报错:NotAllowedError:xxx? 由于浏览器自动播放安全策略的限制,浏览器直接拉起App并启动播放会返回该错误,在应用层需要根据该错误码,引导用户通过手动触发页面UI控件,并调用replay接口恢复播放。 开启认证策略,该如何获取token信息? 认证策略不开启不影响功能正常使用,也不影响打点和日志上传的能力。 认证策略开启可以保证打点数据和日志上传数据的安全性。 如果当前需要认证策略能力,请提交工单,联系技术支持获取appid和token。 父主题: Web SDK
  • on on(event: string, handler: function, withTimeout?: boolean): void 【功能说明】 注册客户端对象事件回调接口。 【请求参数】 event:必选,string类型,事件名称, 注册Error事件,监听打点或者日志上传的错误信息。 handler:必选,function类型,事件处理方法。 withTimeout:选填,boolean类型,是否超时报错 【返回参数】 无
  • createClient createClient(type: string): HWLLSClient | HWFlvClient | HWHlsClient 【功能说明】 创建一个直播拉流客户端对象,如果需要拉取多个直播流则需要创建多个客户端对象。 【请求参数】 type:string类型,可选。创建的拉流客户端类型。 低时延直播拉流客户端类型:webrtc。 flv直播拉流客户端类型:flv。 HLS直播拉流客户端类型:hls(预留,暂未开放)。 缺省值:webrtc。 【返回参数】 client:拉流客户端对象。
  • setLogLevel setLogLevel(level: string): boolean 【功能说明】 设置Console上打印的日志级别,如不设置日志级别,则console日志打印级别默认为error。 【请求参数】 level:string类型,必选,日志级别标识。 none:关闭全部级别的日志打印。 error:打印error级别日志。 warn:打印warn级别及更高级别日志。 info:打印info级别及更高级别日志。 debug:打印debug级别及更高级别日志。 【返回参数】 boolean:设置日志级别结果。true表示日志级别设置成功,false表示日志级别设置失败。
  • setReportConfig setReportConfig(reportConfig:ReportConfig):boolear 【功能说明】 设置打点能力以及打点和日志上传的认证策略。 【请求参数】 reportConfig:ReportConfig类型,必选。ReportConfig定义如下所示: enable:必选,布尔类型,true表示开启打点,false表示关闭打点。默认true。 tokenConfig:可选,对象定义如下所示: enable:布尔类型,true表示开启认证,false表示关闭认证。默认false。 tokenInfo:数组类型,数组内部ReportTokenInfo类。ReportTokenInfo定义如下所示: appid:string类型。传入appid。 expTimestamp:string类型。过期时间戳,系统当前UNIX时间戳加上鉴权过期时间(推荐7200秒,最长需要小于43200秒,即12个小时)。 例如:当前UNIX时间戳为:1708531200,鉴权过期时间自定义为7200秒,那么过期的时间戳为:1708538400,即表示该校验字符串在2024-02-22 02:00:00过期。 token:string类型。hmac_sha256生成的字符串。hmac_sha256(共享密钥, 过期的时间戳 + appID)。共享密钥由用户控制获取。 【返回参数】 返回值布尔值, true表示设置成功,false表示设置失败。 开启了认证策略,实际请求状态会通过on函数注册Error回调获得。
  • setParameter setParameter(parameterKey: string, parameterValue: any): boolean 【功能说明】 设置全局配置参数。 【请求参数】 参数名称 参数值 LOADING_CONFIG LoadingConfig类型,定义如下: { netQualityLoading:可选,boolean类型。true表示开启根据网络质量进行loading效果展示,默认值为false,关闭。 netQualityLoadingThreshold:可选,number类型。展示loading效果的网络质量(network-quality)的阈值,默认网络质量等级为5。 frameStuckLoading:可选,boolean类型。true表示开启根据帧卡顿时长进行loading效果展示,默认值为false frameStuckThreshold:可选,number类型。展示loading效果帧卡顿时长的阈值,单位为100ms。默认值为10,表示帧卡顿时长为1000ms。 } 注意: 需要在起播之前进行设置。 DNS_QUERY_ENABLE boolean类型,可选,默认为false,true表示开启DNS结果解析,false表示关闭DNS结果解析。 AC CES S_DOMAIN string类型,可选,默认为空,主要用于拉流环境配置,沟通华为工程师填入。 GLSB_DOMAIN string类型,可选,默认为空,主要用于GSLB环境配置,沟通华为工程师填入。 BACKGROUND_PLAY boolean类型,可选,默认为false,true表示开启后台播放,false表示关闭后台播放 【返回参数】 boolean:配置参数设置结果。true表示参数设置成功,false表示参数设置失败。
  • 浏览器适配 本章节介绍低时延直播Web SDK支持的浏览器类型、版本以及使用限制。 表1 浏览器适配详情 操作系统类型 浏览器类型 浏览器版本 Windows Chrome浏览器 67+ QQ浏览器(极速内核) 10.4+ 360安全浏览器(极速模式) 12 微信内嵌浏览器 - Firefox浏览器 90+ Edge浏览器 80+ Opera浏览器 54+ macOS Chrome浏览器 67+ 微信内嵌浏览器 - Safari浏览器 13+ Firefox浏览器 90+ Opera浏览器 56+ Android 微信内嵌浏览器( TBS内核) - 微信内嵌浏览器( XWEB内核) - 移动版Chrome浏览器 83+ 移动版QQ浏览器 12+ 华为系统浏览器 11.0.8+ iOS 微信内嵌浏览器 iOS 14.3+ 微信6.5+版本 移动版Chrome浏览器 iOS 14.3+ 移动版Safari浏览器 13+ 表2 浏览器使用限制 浏览器类型 使用限制 Chrome浏览器 1、在华为移动端设备上,Chrome浏览器(包括华为浏览器)支持WebRTC的版本为91+。 2、Android移动端WebView对WebRTC能力的支持参差不齐,受影响的因素很多,如设备厂家、浏览器内核、版本等,使用的兼容性较差,因此可用性不能保证,不建议使用这类浏览器。 Safari浏览器 Safari 13的用户可能听不到远端用户的声音。 iOS Safari 14.2和macOS Safari 14.0.1上音频可能断断续续。 Firefox浏览器 Apple M1芯片的Mac设备上Firefox不支持H.264编解码。 Opera浏览器 在华为移动端设备上,Opera浏览器支持SDK的版本为64+。 其他浏览器 由于Android设备各厂家的浏览器内核、webview、版本等因素,移动端浏览器对WebRTC的支持度不一,除可以使用表1 浏览器适配详情中列举的明确支持的浏览器外,还可以集成使用Native SDK(Andriod / iOS)。 父主题: Web SDK
  • 公网地址 表1 公网地址列表 公网地址 信息 log-collection-new.hwcloudlive.com 国内日志和打点环境地址。 log-collection-ap-southeast-3.rocket-cdn.com 海外日志和打点环境地址。 global-lll.huaweicloud.com 默认拉流主环境地址。 global-lll.huaweicloud.cn 默认拉流备用环境地址。 hcdnl-pull302-global-gslb.livehwc3.cn 默认GSLB环境地址。 父主题: 接口参考
  • 不同规格主机Agent资源占用一览 Agent运行时,不同规格的云服务器CPU、内存占用情况如表1所示。 表1 Agent资源占用一览 vCPUs规格 占用CPU资源比例(峰值) 执行病毒查杀时,占用CPU资源比例(峰值) 内存占用(峰值) 执行病毒查杀时,内存占用(均值) 1vCPUs 20% 50% 500MB 800MB 2vCPUs 10% 40% 500MB 800MB 4vCPUs 5% 35% 500MB 800MB 8vCPUs 2.5% 32.5% 500MB 800MB 12vCPUs 约1.67% 约31.67% 500MB 800MB 16vCPUs 约1.25% 约31.25% 500MB 800MB 24vCPUs 约0.84% 约30.84% 500MB 800MB 32vCPUs 约0.63% 约30.63% 500MB 800MB 48vCPUs 约0.42% 约30.42% 500MB 800MB 60vCPUs 约0.34% 约30.34% 500MB 800MB 64vCPUs 约0.32% 约30.32% 500MB 800MB
  • 请求示例 授权给用户"13gg44z4g2sglzk0egw0u726zoyzvrs8"ID为 "0d0466b0-e727-4d9c-b35d-f84bb474a37f"的密钥操作权限,授权操作为查询密钥、创建数据密钥、加密数据密钥。 { "key_id" : "0d0466b0-e727-4d9c-b35d-f84bb474a37f", "operations" : [ "describe-key", "create-datakey", "encrypt-datakey" ], "grantee_principal" : "13gg44z4g2sglzk0egw0u726zoyzvrs8", "grantee_principal_type" : "user", "retiring_principal" : "13gg44z4g2sglzk0egw0u726zoyzvrs8" }
共100000条