华为云用户手册

  • 语法格式 在Spark Jar作业编辑界面,选择配置优化参数,配置信息如下: 不同的OBS桶,使用不同的AKSK认证信息。 可以使用如下配置方式,根据桶指定不同的AKSK信息,参数说明详见表1。 spark.hadoop.fs.obs.bucket.USER_BUCKET_NAME.dew.access.key= USER_AK_ CS MS_KEY spark.hadoop.fs.obs.bucket.USER_BUCKET_NAME.dew.secret.key= USER_SK_C SMS _KEY spark.hadoop.fs.obs.security.provider = com.dli.provider.UserObsBasicCredentialProvider spark.hadoop.fs.dew.csms.secretName= CredentialName spark.hadoop.fs.dew.endpoint=ENDPOINT spark.hadoop.fs.dew.csms.version=VERSION_ID spark.hadoop.fs.dew.csms.cache.time.second =CACHE_TIME spark.dli.job.agency.name=USER_AGENCY_NAME
  • 修改 DLI Livy工具配置文件 上传指定的DLI Livy工具jar资源包到OBS桶路径下。 登录OBS控制台,在指定的OBS桶下创建一个存放Livy工具jar包的资源目录。例如:“obs://bucket/livy/jars/”。 进入3.a中DLI Livy工具所在ECS服务器的安装目录,获取以下jar包,将获取的jar包上传到1.a创建的OBS桶资源目录下。 例如,当前Livy工具安装路径为“/opt/livy”,则当前需要上传的jar包名称如下: /opt/livy/rsc-jars/livy-api-0.7.2.0107.jar /opt/livy/rsc-jars/livy-rsc-0.7.2.0107.jar /opt/livy/repl_2.11-jars/livy-core_2.11-0.7.2.0107.jar /opt/livy/repl_2.11-jars/livy-repl_2.11-0.7.2.0107.jar 修改DLI Livy工具配置文件。 编辑修改配置文件“ /opt/livy/conf/livy-client.conf”。 vi /opt/livy/conf/livy-client.conf 添加如下内容,并根据注释修改配置项。 #当前ECS的私有IP地址,也可以使用ifconfig命令查询。 livy.rsc.launcher.address = X.X.X.X #当前ECS服务器放通的端口号 livy.rsc.launcher.port.range = 30000~32767 编辑修改配置文件“ /opt/livy/conf/livy.conf”。 vi /opt/livy/conf/livy.conf 添加如下内容。根据注释说明修改具体的配置项。 livy.server.port = 8998 livy.spark.master = yarn livy.server.contextLauncher.custom.class=org.apache.livy.rsc.DliContextLauncher livy.server.batch.custom.class=org.apache.livy.server.batch.DliBatchSession livy.server.interactive.custom.class=org.apache.livy.server.interactive.DliInteractiveSession livy.server.sparkApp.custom.class=org.apache.livy.utils.SparkDliApp livy.server.recovery.mode = recovery livy.server.recovery.state-store = filesystem #以下文件路径请根据情况修改 livy.server.recovery.state-store.url = file:///opt/livy/store/ livy.server.session.timeout-check = true livy.server.session.timeout = 1800s livy.server.session.state-retain.sec = 1800s livy.dli.spark.version = 2.3.2 livy.dli.spark.scala-version = 2.11 # 填入存储livy jar包资源的OBS桶路径。 livy.repl.jars = obs://bucket/livy/jars/livy-core_2.11-0.7.2.0107.jar, obs://bucket/livy/jars/livy-repl_2.11-0.7.2.0107.jar livy.rsc.jars = obs://bucket/livy/jars/livy-api-0.7.2.0107.jar, obs://bucket/livy/jars/livy-rsc-0.7.2.0107.jar 编辑修改配置文件“/opt/livy/conf/spark-defaults.conf”。 vi /opt/livy/conf/spark-defaults.conf 添加如下必选参数内容。配置项参数填写说明,详见表1。 # 以下参数均支持在提交作业时覆盖。 spark.yarn.isPython=true spark.pyspark.python=python3 # 当前参数值为生产环境web地址 spark.dli.user.uiBaseAddress=https://console.huaweicloud.com/dli/web # 队列所在的region。 spark.dli.user.regionName=XXXX # dli endpoint 地址。 spark.dli.user.dliEndPoint=XXXX # 用于指定队列,填写已创建DLI的队列名。 spark.dli.user.queueName=XXXX # 提交作业使用的access key。 spark.dli.user.access.key=XXXX # 提交作业使用的secret key。 spark.dli.user.secret.key=XXXX # 提交作业使用的projectId。 spark.dli.user.projectId=XXXX 表1 spark-defaults.conf必选参数说明 参数名 参数填写说明 spark.dli.user.regionName DLI队列所在的区 域名 。 从地区和终端节点获取,对应“区域”列就是regionName。 spark.dli.user.dliEndPoint DLI队列所在的终端节点。 从地区和终端节点获取,对应的“终端节点(Endpoint)”就是该参数取值。 spark.dli.user.queueName DLI队列名称。 spark.dli.user.access.key 对应用户的访问密钥。该用户需要有Spark作业相关权限,权限说明详见权限管理。 密钥获取方式请参考获取AK/SK。 spark.dli.user.secret.key spark.dli.user.projectId 参考获取项目ID获取项目ID。 以下参数为可选参数,请根据参数说明和实际情况配置。详细参数说明请参考Spark Configuration。 表2 spark-defaults.conf可选参数说明 Spark作业参数 对应Spark批处理参数 备注 spark.dli.user.file file 如果是对接notebook工具场景时不需要设置。 spark.dli.user.className class_name 如果是对接notebook工具场景时不需要设置。 spark.dli.user.scType sc_type 推荐使用livy原生配置。 spark.dli.user.args args 推荐使用livy原生配置。 spark.submit.pyFiles python_files 推荐使用livy原生配置。 spark.files files 推荐使用livy原生配置。 spark.dli.user.modules modules - spark.dli.user.image image 提交作业使用的 自定义镜像 ,仅容器集群支持该参数,默认不设置。 spark.dli.user.autoRecovery auto_recovery - spark.dli.user.maxRetryTimes max_retry_times - spark.dli.user.catalogName catalog_name 访问元数据时,需要将该参数配置为dli。
  • 通过DLI Livy工具提交Spark作业到DLI 本示例演示通过curl命令使用DLI Livy工具将Spark作业提交到DLI。 将开发好的Spark作业程序jar包上传到OBS路径下。 例如,本示例上传“spark-examples_2.11-XXXX.jar”到“obs://bucket/path”路径下。 以root用户登录到安装DLI Livy工具的ECS服务器。 执行curl命令通过DLI Livy工具提交Spark作业请求到DLI。 ECS_IP为当前安装DLI Livy工具所在的弹性云服务器的私有IP地址。 curl --location --request POST 'http://ECS_IP:8998/batches' \ --header 'Content-Type: application/json' \ --data '{ "driverMemory": "3G", "driverCores": 1, "executorMemory": "2G", "executorCores": 1, "numExecutors": 1, "args": [ "1000" ], "file": "obs://bucket/path/spark-examples_2.11-XXXX.jar", "className": "org.apache.spark.examples.SparkPi", "conf": { "spark.dynamicAllocation.minExecutors": 1, "spark.executor.instances": 1, "spark.dynamicAllocation.initialExecutors": 1, "spark.dynamicAllocation.maxExecutors": 2 } }'
  • 准备工作 创建DLI队列。在“队列类型”中选择“通用队列”,即Spark作业的计算资源。具体请参考创建队列。 准备一个linux弹性 云服务器ECS ,用于安装DLI Livy。 ECS需要放通30000至32767端口、8998端口。具体操作请参考添加安全组规则。 ECS需安装Java JDK,JDK版本建议为1.8。配置Java环境变量JAVA_HOME。 查询弹性云服务器ECS详细信息,获取ECS的“私有IP地址”。 使用增强型跨源连接打通DLI队列和Livy实例所在的VPC网络。具体操作可以参考增强型跨源连接。
  • DLI Livy工具下载及安装 本次操作下载的DLI Livy版本为apache-livy-0.7.2.0107-bin.tar.gz,后续版本变化请根据实际情况修改。 单击下载链接,获取DLI Livy工具压缩包。 使用WinSCP工具,将获取的工具压缩包上传到准备好的ECS服务器目录下。 使用root用户登录ECS服务器,执行以下命令安装DLI Livy工具。 执行以下命令创建工具安装路径。 mkdir livy安装路径 例如新建路径/opt/livy:mkdir /opt/livy。后续操作步骤均默认以/opt/livy安装路径演示,请根据实际情况修改。 解压工具压缩包到安装路径。 tar --extract --file apache-livy-0.7.2.0107-bin.tar.gz --directory /opt/livy --strip-components 1 --no-same-owner 执行以下命令修改配置文件名称。 cd /opt/livy/conf mv livy-client.conf.template livy-client.conf mv livy.conf.template livy.conf mv livy-env.sh.template livy-env.sh mv log4j.properties.template log4j.properties mv spark-blacklist.conf.template spark-blacklist.conf touch spark-defaults.conf
  • 常见问题 问题一:查询OBS分区表报错,报错信息如下: DLI.0005: There should be at least one partition pruning predicate on partitioned table `xxxx`.`xxxx`.; 问题根因:查询OBS分区表时没有携带分区字段。 解决方案:查询OBS分区表时,where条件中至少包含一个分区字段。 问题二:使用DataSource语法指定OBS文件路径创建OBS表,insert数据到OBS表,显示作业运行失败,报:“DLI.0007: The output path is a file, don't support INSERT...SELECT” 错误。 问题示例语句参考如下: CREATE TABLE testcsvdatasource (name string, id int) USING csv OPTIONS (path "obs://dli-test-021/data/test.csv"); 问题根因:创建OBS表指定的OBS路径为具体文件,导致不能插入数据。例如上述示例中的OBS路径为:"obs://dli-test-021/data/test.csv"。 解决方案:使用DataSource语法创建OBS表指定的OBS文件路径改为文件目录即可,后续即可通过insert插入数据。上述示例,建表语句可以修改为: CREATE TABLE testcsvdatasource (name string, id int) USING csv OPTIONS (path "obs://dli-test-021/data"); 问题三:使用Hive语法创建OBS分区表时,提示语法格式不对。例如,如下使用Hive语法创建以classNo为分区的OBS表: CREATE TABLE IF NOT EXISTS testtable(name STRING, score DOUBLE, classNo INT) PARTITIONED BY (classNo) STORED AS TEXTFILE LOCATION 'obs://dli-test-021/data7'; 问题根因:使用Hive语法创建OBS分区表时,分区字段不能出现在表名后的字段列表中,只能定义在PARTITIONED BY后。 解决方案:使用Hive语法创建OBS分区表时,分区字段指定在PARTITIONED BY后。例如: CREATE TABLE IF NOT EXISTS testtable(name STRING, score DOUBLE) PARTITIONED BY (classNo INT) STORED AS TEXTFILE LOCATION 'obs://dli-test-021/data7';
  • DataSource和Hive两种语法创建OBS表的区别 DataSource语法和Hive语法主要区别在于支持的表数据存储格式范围、支持的分区数等有差异。两种语法创建OBS表主要差异点参见表1。 表1 DataSource语法和Hive语法创建OBS表的差异点 语法 支持的数据类型范围 创建分区表时分区字段差异 支持的分区数 DataSource语法 支持ORC,PARQUET,JSON,CSV,AVRO类型 创建分区表时,分区字段在表名和PARTITIONED BY后都需要指定。具体可以参考DataSource语法创建单分区OBS表。 单表分区数最多允许7000个。 Hive语法 支持TEXTFILE, AVRO, ORC, SEQUENCEFILE, RCFILE, PARQUET 创建分区表时,指定的分区字段不能出现在表后,只能通过PARTITIONED BY指定分区字段名和类型。具体可以参考Hive语法创建OBS分区表。 单表分区数最多允许100000个。 创建OBS表的DataSource语法可以参考使用DataSource语法创建OBS表。 创建OBS表的Hive语法可以参考使用Hive语法创建OBS表。
  • 使用DataSource语法创建OBS表 以下通过创建CSV格式的OBS表举例,创建其他数据格式的OBS表方法类似,此处不一一列举。 创建OBS非分区表 指定OBS数据文件,创建csv格式的OBS表。 按照以下文件内容创建“test.csv”文件,并将“test.csv”文件上传到OBS桶“dli-test-021”的根目录下。 Jordon,88,23 Kim,87,25 Henry,76,26 登录DLI管理控制台,选择“SQL编辑器”,在SQL编辑器中“执行引擎”选择“spark”,“队列”选择已创建的SQL队列,数据库选择“testdb”,执行以下命令创建OBS表。 CREATE TABLE testcsvdatasource (name STRING, score DOUBLE, classNo INT ) USING csv OPTIONS (path "obs://dli-test-021/test.csv"); 如果是通过指定的数据文件创建的OBS表,后续不支持在DLI通过insert表操作插入数据。OBS文件内容和表数据保持同步。 查询已创建的“testcsvdatasource”表数据。 select * from testcsvdatasource; 图1 查询结果 本地修改原始的OBS表文件“test.csv”,增加一行“Aarn,98,20”数据,重新替换OBS桶目录下的“test.csv”文件。 Jordon,88,23 Kim,87,25 Henry,76,26 Aarn,98,20 在DLI的SQL编辑器中再次查询“testcsvdatasource”表数据,DLI上可以查询到新增的“Aarn,98,20”数据。 select * from testcsvdatasource; 图2 查询结果 指定OBS数据文件目录,创建csv格式的OBS表。 指定的OBS数据目录不包含数据文件。 在OBS桶“dli-test-021”根目录下创建数据文件目录“data”。 登录DLI管理控制台,选择“SQL编辑器”,在SQL编辑器中“执行引擎”选择“spark”,“队列”选择已创建的SQL队列,数据库选择“testdb”。在DLI的“testdb”数据库下创建OBS表“testcsvdata2source”。 CREATE TABLE testcsvdata2source (name STRING, score DOUBLE, classNo INT) USING csv OPTIONS (path "obs://dli-test-021/data"); 通过insert语句插入表数据。 insert into testcsvdata2source VALUES('Aarn','98','20'); insert作业运行成功后,查询OBS表“testcsvdata2source”数据。 select * from testcsvdata2source; 图3 查询结果 在OBS桶的“obs://dli-test-021/data”目录下刷新后查询,生成了csv数据文件,文件内容为insert插入的数据内容。 图4 查询结果 指定的OBS数据目录包含数据文件。 在OBS桶“dli-test-021”根目录下创建数据文件目录“data2”。创建如下内容的测试数据文件“test.csv”,并上传文件到“obs://dli-test-021/data2”目录下。 Jordon,88,23 Kim,87,25 Henry,76,26 登录DLI管理控制台,选择“SQL编辑器”,在SQL编辑器中“执行引擎”选择“spark”,“队列”选择已创建的SQL队列,数据库选择“testdb”。在DLI的“testdb”数据库下创建OBS表“testcsvdata3source”。 CREATE TABLE testcsvdata3source (name STRING, score DOUBLE, classNo INT) USING csv OPTIONS (path "obs://dli-test-021/data2"); 通过insert语句插入表数据。 insert into testcsvdata3source VALUES('Aarn','98','20'); insert作业运行成功后,查询OBS表“testcsvdata3source”数据。 select * from testcsvdata3source; 图5 查询结果 在OBS桶的“obs://dli-test-021/data2”目录下刷新后查询,生成了一个csv数据文件,内容为insert插入的表数据内容。 图6 查询结果 创建OBS分区表 创建单分区OBS表 在OBS桶“dli-test-021”根目录下创建数据文件目录“data3”。 登录DLI管理控制台,选择“SQL编辑器”,在SQL编辑器中“执行引擎”选择“spark”,“队列”选择已创建的SQL队列,数据库选择“testdb”。在DLI的“testdb”数据库下创建以“classNo”列为分区的OBS分区表“testcsvdata4source”,指定OBS目录“obs://dli-test-021/data3”。 CREATE TABLE testcsvdata4source (name STRING, score DOUBLE, classNo INT) USING csv OPTIONS (path "obs://dli-test-021/data3") PARTITIONED BY (classNo); 在OBS桶的“obs://dli-test-021/data3”目录下创建“classNo=25”的分区目录。根据以下文件内容创建数据文件“test.csv”,并上传到OBS的“obs://dli-test-021/data3/classNo=25”目录下。 Jordon,88,25 Kim,87,25 Henry,76,25 在SQL编辑器中执行以下命令,导入分区数据到OBS表“testcsvdata4source ”。 ALTER TABLE testcsvdata4source ADD PARTITION (classNo = 25) LOCATION 'obs://dli-test-021/data3/classNo=25'; 查询OBS表“testcsvdata4source ”classNo分区为“25”的数据: select * from testcsvdata4source where classNo = 25; 图7 查询结果 插入如下数据到OBS表“testcsvdata4source ”: insert into testcsvdata4source VALUES('Aarn','98','25'); insert into testcsvdata4source VALUES('Adam','68','24'); 查询OBS表“testcsvdata4source ”classNo分区为“25”和“24”的数据。 分区表在进行查询时where条件中必须携带分区字段,否则会查询失败,报:DLI.0005: There should be at least one partition pruning predicate on partitioned table。 select * from testcsvdata4source where classNo = 25; 图8 查询结果 select * from testcsvdata4source where classNo = 24; 图9 查询结果 在OBS桶的“obs://dli-test-021/data3”目录下点击刷新,该目录下生成了对应的分区文件,分别存放新插入的表数据。 图10 OBS上classNo分区为“25”文件数据 图11 OBS上classNo分区为“24”文件数据 创建多分区OBS表 在OBS桶“dli-test-021”根目录下创建数据文件目录“data4”。 登录DLI管理控制台,选择“SQL编辑器”,在SQL编辑器中“执行引擎”选择“spark”,“队列”选择已创建的SQL队列,数据库选择“testdb”。在“testdb”数据库下创建以“classNo”和“dt”列为分区的OBS分区表“testcsvdata5source”,指定OBS目录“obs://dli-test-021/data4”。 CREATE TABLE testcsvdata5source (name STRING, score DOUBLE, classNo INT, dt varchar(16)) USING csv OPTIONS (path "obs://dli-test-021/data4") PARTITIONED BY (classNo,dt); 给 testcsvdata5source表插入如下测试数据: insert into testcsvdata5source VALUES('Aarn','98','25','2021-07-27'); insert into testcsvdata5source VALUES('Adam','68','25','2021-07-28'); 根据classNo分区列查询testcsvdata5source数据。 select * from testcsvdata5source where classNo = 25; 图12 查询结果 根据dt分区列查询testcsvdata5source数据。 select * from testcsvdata5source where dt like '2021-07%'; 图13 查询结果 在OBS桶“obs://dli-test-021/data4”目录下刷新后查询,会生成如下数据文件: 文件目录1:obs://dli-test-021/data4/xxxxxx/classNo=25/dt=2021-07-27 图14 查询结果 文件目录2:obs://dli-test-021/data4/xxxxxx/classNo=25/dt=2021-07-28 图15 查询结果 在OBS桶的“obs://dli-test-021/data4”目录下创建“classNo=24”的分区目录,再在“classNo=24”目录下创建子分区目录“dt=2021-07-29”。根据以下文件内容创建数据文件“test.csv”,并上传到OBS的“obs://dli-test-021/data4/classNo=24/dt=2021-07-29”目录下。 Jordon,88,24,2021-07-29 Kim,87,24,2021-07-29 Henry,76,24,2021-07-29 在SQL编辑器中执行以下命令,导入分区数据到OBS表“testcsvdata5source ”。 ALTER TABLE testcsvdata5source ADD PARTITION (classNo = 24,dt='2021-07-29') LOCATION 'obs://dli-test-021/data4/classNo=24/dt=2021-07-29'; 根据classNo分区列查询testcsvdata5source数据。 select * from testcsvdata5source where classNo = 24; 图16 查询结果 根据dt分区列查询所有“2021-07”月的所有数据。 select * from testcsvdata5source where dt like '2021-07%'; 图17 查询结果
  • 约束限制 仅支持Spark3.3.1版本(Spark通用队列场景)使用委托授权访问临时凭证: 在创建作业时,请配置作业使用Spark3.3.1版本 已在作业中配置允许DLI访问DEW的委托信息。spark.dli.job.agency.name=自定义委托名称。 自定义委托请参考自定义DLI委托权限。 请注意配置参数不需要用"" 或 '' 包裹。 Spark3.3.1基础镜像内置了3.1.62版本的huaweicloud-sdk-core。
  • 功能描述 DLI提供了一个通用接口,可用于获取用户在启动Spark作业时设置的委托的临时凭证。该接口将获取到的该作业委托的临时凭证封装到com.huaweicloud.sdk.core.auth.BasicCredentials类中。 获取到的委托的临时认证封装到com.huaweicloud.sdk.core.auth.ICredentialProvider接口的getCredentials()返回值中。 返回类型为com.huaweicloud.sdk.core.auth.BasicCredentials。 仅支持获取AK、SK、SecurityToken。 获取到AK、SK、SecurityToken后,请参考如何使用凭据管理服务替换硬编码的数据库账号密码查询凭据。
  • 整体作业开发流程 整体作业开发流程参考图1。 图1 作业开发流程 步骤1:创建队列:创建DLI作业运行的队列。 步骤2:创建RDS Postgres数据库:创建RDS Postgres的数据库和表。 步骤3:创建DWS数据库和表:创建用于接收数据的DWS数据库和表。 步骤4:创建增强型跨源连接:DLI上创建连接RDS和DWS的跨源连接,打通网络。 步骤5:运行作业:DLI上创建和运行Flink OpenSource作业。 步骤6:发送数据和查询结果:RDS Postgres的表上插入数据,在DWS上查看运行结果。
  • 步骤3:创建DWS数据库和表 连接已创建的DWS集群。 请参考使用gsql命令行客户端连接DWS集群。 执行以下命令连接DWS集群的默认数据库“gaussdb”: gsql -d gaussdb -h DWS集群连接地址 -U dbadmin -p 8000 -W password -r gaussdb:DWS集群默认数据库。 DWS集群连接地址:请参见获取集群连接地址进行获取。如果通过公网地址连接,请指定为集群“公网访问地址”或“公网访问域名”,如果通过内网地址连接,请指定为集群“内网访问地址”或“内网访问域名”。如果通过弹性负载均衡连接,请指定为“弹性负载均衡地址”。 dbadmin:创建集群时设置的默认管理员用户名。 -W:默认管理员用户的密码。 在命令行窗口输入以下命令创建数据库“testdwsdb”。 CREATE DATABASE testdwsdb; 执行以下命令,退出gaussdb数据库,连接新创建的数据库“testdwsdb”。 \q gsql -d testdwsdb -h DWS集群连接地址 -U dbadmin -p 8000 -W password -r 执行以下命令创建表。 create schema test; set current_schema= test; drop table if exists dws_order; CREATE TABLE dws_order ( order_id VARCHAR, order_channel VARCHAR, order_time VARCHAR, pay_amount FLOAT8, real_pay FLOAT8, pay_time VARCHAR, user_id VARCHAR, user_name VARCHAR, area_id VARCHAR );
  • 步骤3:创建DWS数据库和表 参考使用gsql命令行客户端连接DWS集群连接已创建的DWS集群。 执行以下命令连接DWS集群的默认数据库“gaussdb”: gsql -d gaussdb -h DWS集群连接地址 -U dbadmin -p 8000 -W password -r gaussdb:DWS集群默认数据库。 DWS集群连接地址:请参见获取集群连接地址进行获取。如果通过公网地址连接,请指定为集群“公网访问地址”或“公网访问域名”,如果通过内网地址连接,请指定为集群“内网访问地址”或“内网访问域名”。如果通过弹性负载均衡连接,请指定为“弹性负载均衡地址”。 dbadmin:创建集群时设置的默认管理员用户名。 password :默认管理员用户的密码。 在命令行窗口输入以下命令创建数据库“testdwsdb”。 CREATE DATABASE testdwsdb; 执行以下命令,退出gaussdb数据库,连接新创建的数据库“testdwsdb”。 \q gsql -d testdwsdb -h DWS集群连接地址 -U dbadmin -p 8000 -W password -r 执行以下命令创建表。 create schema test; set current_schema= test; drop table if exists qualified_cars; CREATE TABLE qualified_cars ( car_id VARCHAR, car_owner VARCHAR, car_age INTEGER , average_speed FLOAT8, total_miles FLOAT8 );
  • 整体作业开发流程 整体作业开发流程参考图1。 图1 作业开发流程 步骤1:创建队列:创建DLI作业运行的队列。 步骤2:创建Kafka的Topic:创建Kafka生产消费数据的Topic。 步骤3:创建DWS数据库和表:创建DWS数据库和表信息。 步骤4:创建增强型跨源连接:DLI上创建连接Kafka和DWS的跨源连接,打通网络。 步骤5:运行作业:DLI上创建和运行Flink OpenSource作业。 步骤6:发送数据和查询结果:Kafka上发送流数据,在RDS上查看运行结果。
  • 步骤6:发送数据和查询结果 使用Kafka客户端向步骤2:创建Kafka的Topic中的Topic发送数据,模拟实时数据流。 Kafka生产和发送数据的方法请参考DMS - 连接实例生产消费信息。 发送样例数据如下: {"car_id":"3027", "car_owner":"lilei", "car_age":"7", "average_speed":"76", "total_miles":"15000"} {"car_id":"3028", "car_owner":"hanmeimei", "car_age":"6", "average_speed":"92", "total_miles":"17000"} {"car_id":"3029", "car_owner":"Ann", "car_age":"10", "average_speed":"81", "total_miles":"230000"} 连接已创建的DWS集群。 具体操作请参考使用gsql命令行客户端连接DWS集群。 执行以下命令连接DWS集群的默认数据库“testdwsdb”: gsql -d testdwsdb -h DWS集群连接地址 -U dbadmin -p 8000 -W password -r 查询DWS的表数据。 select * from test.qualified_cars; 查询结果参考如下: car_id car_owner car_age average_speed total_miles 3027 lilei 7 76.0 15000.0
  • 步骤5:调试、编译代码并导出Jar包 双击IntelliJ IDEA工具右侧的“Maven”,参考下图分别双击“clean”、“compile”对代码进行编译。 编译成功后,双击“package”对代码进行打包。 图9 编译打包 打包成功后,生成的Jar包会放到target目录下,以备后用。本示例将会生成到:“D:\DLITest\SparkJarObs\target”下名为“SparkJarObs-1.0-SNAPSHOT.jar”。 图10 导出jar包
  • 步骤2:上传数据到OBS桶 根据如下数据,创建people.json文件。 {"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19} 进入OBS管理控制台,在“桶列表”下,单击已创建的OBS桶名称,本示例桶名为“dli-test-obs01”。 单击“上传对象”,将people.json文件上传到OBS桶根目录下。 在OBS桶根目录下,单击“新建文件夹”,创建名为“result”的文件夹。 单击“result”的文件夹,在“result”下单击“新建文件夹”,创建名为“parquet”的文件夹。
  • 步骤8:查看作业运行结果 在Spark作业管理界面显示已提交的作业运行状态。初始状态显示为“启动中”。 如果作业运行成功则作业状态显示为“已成功”,单击“操作”列“更多”下的“Driver日志”,显示当前作业运行的日志。 图14 diver日志 图15 “Driver日志”中的作业执行日志 如果作业运行成功,本示例进入OBS桶下的“result/parquet”目录,查看已生成预期的parquet文件。 图16 obs桶文件 如果作业运行失败,单击“操作”列“更多”下的“Driver日志”,显示具体的报错日志信息,根据报错信息定位问题原因。 例如,如下截图信息因为创建Spark Jar作业时主类名没有包含包路径,报找不到类名“SparkDemoObs”。 图17 报错信息 可以在“操作”列,单击“编辑”,修改“主类”参数为正确的:com.huawei.dli.demo.SparkDemoObs,单击“执行”重新运行该作业即可。
  • 开发流程 DLI进行Spark Jar作业开发流程参考如下: 图1 Spark Jar作业开发流程 表2 开发流程说明 序号 阶段 操作界面 说明 1 创建DLI通用队列 DLI控制台 创建作业运行的DLI队列。 2 上传数据到OBS桶 OBS控制台 将测试数据上传到OBS桶下。 3 新建Maven工程,配置pom文件 IntelliJ IDEA 参考样例代码说明,编写程序代码读取OBS数据。 4 编写程序代码 5 调试,编译代码并导出Jar包 6 上传Jar包到OBS和DLI OBS控制台 DLI控制台 将生成的Spark Jar包文件上传到OBS目录下和DLI程序包中。 7 创建Spark Jar作业 DLI控制台 在DLI控制台创建Spark Jar作业并提交运行作业。 8 查看作业运行结果 DLI控制台 查看作业运行状态和作业运行日志。
  • 环境准备 在进行Spark Jar作业开发前,请准备以下开发环境。 表1 Spark Jar作业开发环境 准备项 说明 操作系统 Windows系统,支持Windows7以上版本。 安装JDK JDK使用1.8版本。 安装和配置IntelliJ IDEA IntelliJ IDEA为进行应用开发的工具,版本要求使用2019.1或其他兼容版本。 安装Maven 开发环境的基本配置。用于项目管理,贯穿软件开发生命周期。
  • 常见问题 Q:作业运行失败,运行日志中有如下报错信息,应该怎么解决? java.lang.NoSuchMethodError: org.apache.flink.api.java.ClosureCleaner.clean(Ljava/lang/Object;Z)V A:该问题是因为所选择的huaweicloud-dis-flink-connector_2.11版本过低导致,请选择2.0.1及以上版本。 Q:运行作业读取DIS数据时,无法读出数据且Taskmanager的运行日志中有如下报错信息,应该怎么解决? ERROR com.huaweicloud.dis.adapter.common.consumer.Coordinator [] - Failed to getCheckpointAsync, error : [400 : {"errorCode":"DIS.4332","message":"app not found. "}], request : [{"stream_name":"xx","partition_id":"shardId-0000000000","checkpoint_type":"LAST_READ","app_name":"xx"}] A: 该问题是因为读取DIS数据所使用的group.id在DIS的Apps中并没有提前创建。
  • 环境准备 已在DLI控制台购买了通用队列。 已购买了DIS通道。开通DIS通道。 用户在使用Flink 1.12版本,则依赖的Dis connector版本需要不低于2.0.1,详细代码参考DISFlinkConnector相关依赖,如何配置connector,详细参考自定义Flink Streaming作业。 若读取DIS,且配置groupId,则需要提前在DIS的“App管理”中创建所需的App名称。 请勿将disToDis.properties放在生成的jar包中,在代码里有关于disToDis.properties的路径,如果放在jar包中,代码会找不到disToDis.properties路径。
  • 场景描述 该场景为根据商品的实时点击量,获取每小时内点击量最高的3个商品及其相关信息。商品的实时点击量数据为输入源发送到Kafka中,再将Kafka数据的分析结果输出到RDS中。 例如,输入如下样例数据: {"user_id":"0001", "user_name":"Alice", "event_time":"2021-03-24 08:01:00", "product_id":"0002", "product_name":"name1"} {"user_id":"0002", "user_name":"Bob", "event_time":"2021-03-24 08:02:00", "product_id":"0002", "product_name":"name1"} {"user_id":"0002", "user_name":"Bob", "event_time":"2021-03-24 08:06:00", "product_id":"0004", "product_name":"name2"} {"user_id":"0001", "user_name":"Alice", "event_time":"2021-03-24 08:10:00", "product_id":"0003", "product_name":"name3"} {"user_id":"0003", "user_name":"Cindy", "event_time":"2021-03-24 08:15:00", "product_id":"0005", "product_name":"name4"} {"user_id":"0003", "user_name":"Cindy", "event_time":"2021-03-24 08:16:00", "product_id":"0005", "product_name":"name4"} {"user_id":"0001", "user_name":"Alice", "event_time":"2021-03-24 08:56:00", "product_id":"0004", "product_name":"name2"} {"user_id":"0001", "user_name":"Alice", "event_time":"2021-03-24 09:05:00", "product_id":"0005", "product_name":"name4"} {"user_id":"0001", "user_name":"Alice", "event_time":"2021-03-24 09:10:00", "product_id":"0006", "product_name":"name5"} {"user_id":"0002", "user_name":"Bob", "event_time":"2021-03-24 09:13:00", "product_id":"0006", "product_name":"name5"} 预期输出: 2021-03-24 08:00:00 - 2021-03-24 08:59:59,0002,name1,2 2021-03-24 08:00:00 - 2021-03-24 08:59:59,0004,name2,2 2021-03-24 08:00:00 - 2021-03-24 08:59:59,0005,name4,2 2021-03-24 09:00:00 - 2021-03-24 09:59:59,0006,name5,2 2021-03-24 09:00:00 - 2021-03-24 09:59:59,0005,name4,1
  • 步骤6:发送数据和查询结果 使用Kafka客户端向步骤2:创建Kafka的Topic中的Topic发送数据,模拟实时数据流。 Kafka生产和发送数据的方法请参考:DMS - 连接实例生产消费信息。 发送样例数据如下: {"user_id":"0001", "user_name":"Alice", "event_time":"2021-03-24 08:01:00", "product_id":"0002", "product_name":"name1"} {"user_id":"0002", "user_name":"Bob", "event_time":"2021-03-24 08:02:00", "product_id":"0002", "product_name":"name1"} {"user_id":"0002", "user_name":"Bob", "event_time":"2021-03-24 08:06:00", "product_id":"0004", "product_name":"name2"} {"user_id":"0001", "user_name":"Alice", "event_time":"2021-03-24 08:10:00", "product_id":"0003", "product_name":"name3"} {"user_id":"0003", "user_name":"Cindy", "event_time":"2021-03-24 08:15:00", "product_id":"0005", "product_name":"name4"} {"user_id":"0003", "user_name":"Cindy", "event_time":"2021-03-24 08:16:00", "product_id":"0005", "product_name":"name4"} {"user_id":"0001", "user_name":"Alice", "event_time":"2021-03-24 08:56:00", "product_id":"0004", "product_name":"name2"} {"user_id":"0001", "user_name":"Alice", "event_time":"2021-03-24 09:05:00", "product_id":"0005", "product_name":"name4"} {"user_id":"0001", "user_name":"Alice", "event_time":"2021-03-24 09:10:00", "product_id":"0006", "product_name":"name5"} {"user_id":"0002", "user_name":"Bob", "event_time":"2021-03-24 09:13:00", "product_id":"0006", "product_name":"name5"} 登录RDS控制台,单击RDS数据库实例,单击创建的数据库名,如“testrdsdb”,在创建的表“clicktop”所在行的“操作”列,单击“SQL查询”,输入以下查询语句。 select * from `clicktop`; 在“SQL查询”界面,单击“执行SQL”,查看RDS表数据已写入成功。 图2 RDS表数据
  • 整体作业开发流程 整体作业开发流程参考图1。 图1 作业开发流程 步骤1:创建队列:创建DLI作业运行的队列。 步骤2:创建Kafka的Topic:创建Kafka生产消费数据的Topic。 步骤3:创建RDS数据库和表:创建RDS MySQL数据库和表信息。 步骤4:创建增强型跨源连接:DLI上创建连接Kafka和RDS的跨源连接,打通网络。 步骤5:运行作业:DLI上创建和运行Flink OpenSource作业。 步骤6:发送数据和查询结果:Kafka上发送流数据,在RDS上查看运行结果。
  • 流生态开发支持的数据格式 DLI Flink作业支持如下数据格式: Avro,Avro_merge,BLOB,CSV,EMAIL,JSON,ORC,Parquet,XML。 表4 数据格式和支持的输入输出流 数据格式 支持的输入流 支持的输出流 Avro - OBS输出流 Avro_merge - OBS输出流 BLOB DIS输入流 MRS Kafka输入流 开源Kafka输入流 - CSV DIS输入流 OBS输入流 开源Kafka输入流 DIS输出流 OBS输出流 DWS输出流(通过OBS方式转储) 开源Kafka输出流 文件系统输出流 EMAIL DIS输入流 - JSON DIS输入流 OBS输入流 MRS Kafka输入流 开源Kafka输入流 DIS输出流 OBS输出流 MRS Kafka输出流 开源Kafka输出流 ORC - OBS输出流 DWS输出流(通过OBS方式转储) Parquet - OBS输出流 文件系统输出流 XML DIS输入流 -
  • 概述 流生态系统基于Flink和Spark双引擎,完全兼容Flink/Storm/Spark开源社区版本接口,并且在此基础上做了特性增强和性能提升,为用户提供易用、低时延、高吞吐的 数据湖探索 数据湖 探索的流生态开发包括云服务生态、开源生态和自拓展生态: 云服务生态 DLI服务在Stream SQL中支持与其他服务的连通。用户可以直接使用SQL从这些服务中读写数据,如DIS、OBS、CloudTable、MRS、RDS、 SMN 、DCS等。 开源生态 通过对等连接建立与其他VPC的网络连接后,用户可以在DLI的租户独享集群中访问所有Flink和Spark支持的数据源与输出源,如Kafka、Hbase、ElasticSearch等。 自拓展生态 用户可通过编写代码实现从想要的云生态或者开源生态获取数据,作为Flink作业的输入数据。
  • 云服务生态开发 表1 云服务生态开发一览表 数据源 SQL 自定义作业 输入流:从其他服务或数据库中获取数据 输出流:将处理后的数据写入到其他服务或数据库中 表格存储服务 CloudTable HBase输入流 HBase输出流 OpenTSDB输出流 - 云搜索服务 CSS - Elasticsearch输出流 - 分布式缓存服务 DCS - DCS输出流 自定义作业交互 文档数据库服务 DDS - DDS输出流 - 数据接入服务 DIS DIS输入流 DIS输出流 - 分布式消息服务 DMS DMS输入流 DMS输出流 - 数据仓库服务 DWS - DWS输出流(通过JDBC方式转储) DWS输出流(通过OBS方式转储) 自定义作业交互 MapReduce服务 MRS MRS Kafka输入流 MRS Kafka输出流 MRS HBase输出流 自定义作业交互 对象存储服务 OBS OBS输入流 OBS输出流 - 关系型数据库 RDS - RDS输出流 - 消息通知 服务 SMN - SMN输出流 -
  • 语法格式 在Flink jar作业编辑界面,选择配置优化参数,配置信息如下: 不同的OBS桶,使用不同的AKSK认证信息。 可以使用如下配置方式,根据桶指定不同的AKSK信息,参数说明详见表1。 flink.hadoop.fs.obs.bucket.USER_BUCKET_NAME.dew.access.key=USER_AK_CSMS_KEY flink.hadoop.fs.obs.bucket.USER_BUCKET_NAME.dew.secret.key=USER_SK_CSMS_KEY flink.hadoop.fs.obs.security.provider=com.dli.provider.UserObsBasicCredentialProvider flink.hadoop.fs.dew.csms.secretName=CredentialName flink.hadoop.fs.dew.endpoint=ENDPOINT flink.hadoop.fs.dew.csms.version=VERSION_ID flink.hadoop.fs.dew.csms.cache.time.second=CACHE_TIME flink.dli.job.agency.name=USER_AGENCY_NAME
  • 前提条件 已在DEW服务创建通用凭证,并存入凭据值。具体操作请参考:创建通用凭据。 已创建DLI访问DEW的委托并完成委托授权。该委托需具备以下权限: DEW中的查询凭据的版本与凭据值ShowSecretVersion接口权限,csms:secretVersion:get。 DEW中的查询凭据的版本列表ListSecretVersions接口权限,csms:secretVersion:list。 DEW解密凭据的权限,kms:dek:decrypt。 委托权限示例请参考自定义DLI委托权限和常见场景的委托权限策略。 仅支持Flink1.15版本使用DEW管理访问凭据,在创建作业时,请配置作业使用Flink1.15版本、且已在作业中配置允许DLI访问DEW的委托信息。自定义委托及配置请参考自定义DLI委托权限。 使用该功能,所有涉及OBS的桶,都需要进行配置AKSK。
共100000条