云服务器内容精选

  • 调测并保存作业 作业编排和配置完成后,请执行以下操作: 批处理作业 单击画布上方的测试运行按钮,测试作业。如果测试未通过,请您查看作业节点的运行日志,进行定位处理。 用户可以查看该作业的测试运行日志,单击“查看日志”可以进入查看日志界面查看日志的详细信息记录。 作业未提交版本之前,进行手动测试运行,作业监控里面的作业运行实例版本显示是0。 测试通过后,单击画布上方的保存按钮,保存作业的配置信息。 保存后,在右侧的版本里面,会自动生成一个保存版本,支持版本回滚。保存版本时,一分钟内多次保存只记录一次版本。对于中间数据比较重要时,可以通过“新增版本”按钮手动增加保存版本。 实时处理作业 单击画布上方的保存按钮,保存作业的配置信息。 保存后,在右侧的版本里面,会自动生成一个保存版本,支持版本回滚。保存版本时,一分钟内多次保存只记录一次版本。对于中间数据比较重要时,可以通过“新增版本”按钮手动增加保存版本。
  • 配置作业基本信息 为作业配置责任人、优先级信息后,用户可根据责任人、优先级来检索相应的作业。操作方法如下: 单击画布右侧“作业基本信息”页签,展开配置页面,配置如表2所示的参数。 表2 作业基本信息 参数 说明 责任人 自动匹配创建作业时配置的作业责任人,此处支持修改。 执行用户 当“作业调度身份是否可配置”设置为“是”,该参数可见。 执行作业的用户。如果输入了执行用户,则作业以执行用户身份执行;如果没有输入执行用户,则以提交作业启动的用户身份执行。 作业委托 当“作业调度身份是否可配置”设置为“是”,该参数可见。 配置委托后,作业执行过程中,以委托的身份与其他服务交互。 作业优先级 自动匹配创建作业时配置的作业优先级,此处支持修改。 实例超时时间 配置作业实例的超时时间,设置为0或不配置时,该配置项不生效。如果您为作业设置了异常通知,当作业实例执行时间超过超时时间,将触发异常通知,发送消息给用户,作业不会中断,继续运行。 实例超时是否忽略等待时间 配置实例超时是否忽略等待时间。 如果勾选上,表示实例运行时等待时间不会被计入超时时间,可前往默认项设置修改此策略。 如果未选上,表示实例运行时等待时间会被计入超时时间。 自定义字段 配置自定义字段的参数名称和参数值。 作业标签 配置作业的标签,用以分类管理作业。 单击“新增”,可给作业重新添加一个标签。也可选择管理作业标签中已配置的标签。
  • 配置作业参数 作业参数为全局参数,可用于作业中的任意节点。操作方法如下: Pipeline模式的批处理作业和实时处理作业,单击画布的空白处,在右侧显示“作业参数配置”页签,单击此页签,展开配置页面,配置如表3所示的参数。 表3 作业参数配置 功能 说明 变量 新增 单击“新增”,在文本框中填写作业参数的名称和参数值。 参数名称 名称只能包含字符:英文字母、数字、中划线和下划线。 参数值 字符串类的参数直接填写字符串,例如:str1 数值类的参数直接填写数值或运算表达式。 参数配置完成后,在作业中的引用格式为:${参数名称} 说明: 如果作业中有两个节点,比如第一个Rest Client节点返回了body,第二个节点使用返回的data。如果这个data的长度超过1000000个字符,内容就会被截断。在配置作业参数时,作业的参数值的结果最大不超过1000000个字符。 编辑参数表达式 在参数值文本框后方,单击,编辑参数表达式,更多表达式请参见表达式概述。 修改 在参数名和参数值的文本框中直接修改。 掩码显示 在参数值为密钥等情况下,从安全角度,请单击将参数值掩码显示。 删除 在参数值文本框后方,单击,删除作业参数。 常量 新增 单击“新增”,在文本框中填写作业常量的名称和参数值。 参数名称 名称只能包含字符:英文字母、数字、中划线和下划线。 参数值 字符串类的参数直接填写字符串,例如:str1 数值类的参数直接填写数值或运算表达式。 参数配置完成后,在作业中的引用格式为:${参数名称} 编辑参数表达式 在参数值文本框后方,单击,编辑参数表达式,更多表达式请参见表达式概述。 修改 在参数名和参数值的文本框中直接修改,修改完成后,请保存。 删除 在参数值文本框后方,单击,删除作业常量。 工作空间环境变量 查看工作空间已配置的变量和常量。 单击“作业参数预览”页签,展开预览页面,配置如表4所示的参数。 MRS Flink Job、 DLI Flink Job、DLI SQL、DWS SQL、MRS HetuEngine、MRS ClickHouse SQL、MRS Hive SQL、MRS Impala SQL、MRS Presto SQL、MRS Spark SQL、RDS SQL的算子脚本参数支持参数预览。 表4 作业参数预览 功能 说明 当前时间 仅单次调度才显示。系统默认为当前时间。 事件触发时间 仅事件驱动调度才显示。系统默认为事件触发时间。 周期调度 仅周期调度才显示。系统默认为调度周期。 具体时间 仅周期调度才显示。周期调度配置的具体运行时间。 起始日期 仅周期调度才显示。周期调度的生效时间。 后N个实例 作业运行调度的实例个数。 单次调度场景默认为1。 事件驱动调度场景默认为1。 周期调度场景 当实例数大于10时,系统最多展示10个日期实例,系统会自动提示“当前参数预览最多支持查看10个实例”。 在作业参数预览中,如果作业参数配置存在语法异常情况系统会给出提示信息。 如果参数配置了依赖作业实际运行时产生的数据,参数预览功能中无法模拟此类数据,则该数据不展示。
  • 常见问题 Q:作业运行失败,运行日志中有如下报错信息,应该怎么解决? java.lang.NoSuchMethodError: org.apache.flink.api.java.ClosureCleaner.clean(Ljava/lang/Object;Z)V A:该问题是因为所选择的huaweicloud-dis-flink-connector_2.11版本过低导致,请选择2.0.1及以上版本。 Q:运行作业读取DIS数据时,无法读出数据且Taskmanager的运行日志中有如下报错信息,应该怎么解决? ERROR com.huaweicloud.dis.adapter.common.consumer.Coordinator [] - Failed to getCheckpointAsync, error : [400 : {"errorCode":"DIS.4332","message":"app not found. "}], request : [{"stream_name":"xx","partition_id":"shardId-0000000000","checkpoint_type":"LAST_READ","app_name":"xx"}] A: 该问题是因为读取DIS数据所使用的group.id在DIS的Apps中并没有提前创建。
  • 环境准备 已在DLI控制台购买了通用队列。 已购买了DIS通道。开通DIS通道。 用户在使用Flink 1.12版本,则依赖的Dis connector版本需要不低于2.0.1,详细代码参考DISFlinkConnector相关依赖,如何配置connector,详细参考自定义Flink Streaming作业。 若读取DIS,且配置groupId,则需要提前在DIS的“App管理”中创建所需的App名称。 请勿将disToDis.properties放在生成的jar包中,在代码里有关于disToDis.properties的路径,如果放在jar包中,代码会找不到disToDis.properties路径。
  • 环境准备 登录MRS管理控制台,创建MRS集群,选择“开启kerberos”,勾选“Kafka”, “HBase”, “HDFS”等。请参见《 MapReduce服务 用户指南》的“购买自定义集群”的章节创建MRS。 “安全组规则”开通对应UDP/TCP端口。详细内容请参考《私有云用户指南》中的“添加安全组规则”章节。 进入MRS manager管理界面: 创建机机账号,需确保该用户含有“hdfs_admin”, “hbase_admin”权限,下载该用户认证凭据,其中包含“user.keytab” 和 “krb5.conf” 文件。 由于人机账号的keytab会随用户密码过期而失效,故建议使用机机账号进行配置。 单击“服务管理”,下载客户端,单击“确定”。 在MRS节点上下载配置文件,所需集群配置文件包含“hbase-site.xml”和“hiveclient.properties”。 创建弹性资源池和队列。 弹性资源池与队列为DLI作业提供计算资源,创建弹性资源池,弹性资源池添加队列。 使用该DLI独享队列与MRS集群建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《 数据湖探索 用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。 获取MRS集群全部节点的ip和 域名 映射,在DLI跨源连接修改主机信息中配置host映射。 如何添加IP域名映射,请参见《 数据湖 探索用户指南》中“修改主机信息”章节。 Kafka服务端的端口如果监听在hostname上,则需要将Kafka Broker节点的hostname和IP的对应关系添加到DLI队列中。Kafka Broker节点的hostname和IP请联系Kafka服务的部署人员。
  • 相关操作 怎样设置作业的参数? 在FLink作业列表中选择待编辑的作业。 单击操作列“编辑”。 在参数区域输入参数信息。 指定类的参数列表,参数之间使用空格分隔。 参数输入格式:--key1 value1 --key2 value2 例如:控制台入输入的参数 --bootstrap.server 192.168.168.xxx:9092 通过ParameterTool解析后的参数如下所示: 图5 解析后的参数 怎样查看作业日志? 在FLink作业列表中点击作业名称,进入作业详情页面。 单击“运行日志”,即可在控制台查看作业日志。 此处只展示最新的运行日志,更多信息请查看保存日志的OBS桶。
  • Java样例代码 本示例操作步骤采用Java进行编码,具体完整的样例代码参考如下: package com.huawei.dli.demo; import org.apache.spark.sql.SparkSession; public class DliCatalogTest { public static void main(String[] args) { SparkSession spark = SparkSession .builder() .config("spark.sql.session.state.builder", "org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") .config("spark.sql.catalog.class", "org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") .config("spark.sql.extensions","org.apache.spark.sql.DliSparkExtension") .appName("java_spark_demo") .getOrCreate(); spark.sql("create database if not exists test_sparkapp").collect(); spark.sql("drop table if exists test_sparkapp.dli_testtable").collect(); spark.sql("create table test_sparkapp.dli_testtable(id INT, name STRING)").collect(); spark.sql("insert into test_sparkapp.dli_testtable VALUES (123,'jason')").collect(); spark.sql("insert into test_sparkapp.dli_testtable VALUES (456,'merry')").collect(); spark.sql("drop table if exists test_sparkapp.dli_testobstable").collect(); spark.sql("create table test_sparkapp.dli_testobstable(age INT, name STRING) using csv options (path 'obs://dli-test-obs01/testdata.csv')").collect(); spark.stop(); } }
  • scala样例代码 object DliCatalogTest { def main(args:Array[String]): Unit = { val sql = args(0) val runDdl = Try(args(1).toBoolean).getOrElse(true) System.out.println(s"sql is $sql runDdl is $runDdl") val sparkConf = new SparkConf(true) sparkConf .set("spark.sql.session.state.builder","org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") .set("spark.sql.catalog.class","org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") sparkConf.setAppName("dlicatalogtester") val spark = SparkSession.builder .config(sparkConf) .enableHiveSupport() .config("spark.sql.extensions","org.apache.spark.sql.DliSparkExtension") .appName("SparkTest") .getOrCreate() System.out.println("catalog is " + spark.sessionState.catalog.toString) if (runDdl) { val df = spark.sql(sql).collect() } else { spark.sql(sql).show() } spark.close() } }
  • Python样例代码 #!/usr/bin/python # -*- coding: UTF-8 -*- from __future__ import print_function import sys from pyspark.sql import SparkSession if __name__ == "__main__": url = sys.argv[1] creatTbl = "CREATE TABLE test_sparkapp.dli_rds USING JDBC OPTIONS ('url'='jdbc:mysql://%s'," \ "'driver'='com.mysql.jdbc.Driver','dbtable'='test.test'," \ " 'passwdauth' = 'DatasourceRDSTest_pwd','encryption' = 'true')" % url spark = SparkSession \ .builder \ .enableHiveSupport() \ .config("spark.sql.session.state.builder","org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") \ .config("spark.sql.catalog.class", "org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") \ .config("spark.sql.extensions","org.apache.spark.sql.DliSparkExtension") \ .appName("python Spark test catalog") \ .getOrCreate() spark.sql("CREATE database if not exists test_sparkapp").collect() spark.sql("drop table if exists test_sparkapp.dli_rds").collect() spark.sql(creatTbl).collect() spark.sql("select * from test_sparkapp.dli_rds").show() spark.sql("insert into table test_sparkapp.dli_rds select 12,'aaa'").collect() spark.sql("select * from test_sparkapp.dli_rds").show() spark.sql("insert overwrite table test_sparkapp.dli_rds select 1111,'asasasa'").collect() spark.sql("select * from test_sparkapp.dli_rds").show() spark.sql("drop table test_sparkapp.dli_rds").collect() spark.stop()
  • 步骤8:查看作业运行结果 在Spark作业管理界面显示已提交的作业运行状态。初始状态显示为“启动中”。 如果作业运行成功则作业状态显示为“已成功”,通过以下操作查看创建的数据库和表。 可以在DLI控制台,左侧导航栏,单击“SQL编辑器”。在“数据库”中已显示创建的数据库“test_sparkapp”。 图14 查看创建的数据库 双击数据库名,可以在数据库下查看已创建成功的DLI和OBS表。 图15 查看表 双击DLI表名dli_testtable,单击“执行”查询DLI表数据。 图16 查询DLI表数据 注释掉DLI表查询语句,双击OBS表名dli_testobstable,单击“执行”查询OBS表数据。 图17 查询OBS表数据 如果作业运行失败则作业状态显示为“已失败”,单击“操作”列“更多”下的“Driver日志”,显示当前作业运行的日志,分析报错原因。 图18 查看Driver日志 原因定位解决后,可以在作业“操作”列,单击“编辑”,修改作业相关参数后,单击“执行”重新运行该作业即可。
  • 开发流程 DLI进行Spark作业访问DLI元数据开发流程参考如下: 图1 Spark作业访问DLI元数据开发流程 表2 开发流程说明 序号 阶段 操作界面 说明 1 创建DLI通用队列 DLI控制台 创建作业运行的DLI队列。 2 OBS桶文件配置 OBS控制台 如果是创建OBS表,则需要上传文件数据到OBS桶下。 配置Spark创建表的元数据信息的存储路径。该文件夹路径用来存储Spark创建表的元数据信息“spark.sql.warehouse.dir”。 3 新建Maven工程,配置pom文件 IntelliJ IDEA 参考样例代码说明,编写程序代码创建DLI表或OBS表。 4 编写程序代码 5 调试,编译代码并导出Jar包 6 上传Jar包到OBS和DLI OBS控制台 将生成的Spark Jar包文件上传到OBS目录下和DLI程序包中。 7 创建Spark Jar作业 DLI控制台 在DLI控制台创建Spark Jar作业并提交运行作业。 8 查看作业运行结果 DLI控制台 查看作业运行状态和作业运行日志。
  • 步骤2:OBS桶文件配置 如果需要创建OBS表,则需要先上传数据到OBS桶目录下。 本次演示的样例代码创建了OBS表,测试数据内容参考如下示例,创建名为的testdata.csv文件。 12,Michael 27,Andy 30,Justin 进入OBS管理控制台,在“桶列表”下,单击已创建的OBS桶名称,本示例桶名为“dli-test-obs01”,进入“概览”页面。 单击左侧列表中的“对象”,选择“上传对象”,将testdata.csv文件上传到OBS桶根目录下。 在OBS桶根目录下,单击“新建文件夹”,创建名为“warehousepath”的文件夹。该文件夹路径用来存储Spark创建表的元数据信息“spark.sql.warehouse.dir”。
  • 步骤1:创建DLI通用队列 第一次提交Spark作业,需要先创建队列,例如创建名为“sparktest”的队列,队列类型选择为“通用队列”。 在DLI管理控制台的左侧导航栏中,选择“队列管理”。 单击“队列管理”页面右上角“购买队列”进行创建队列。 创建名为“sparktest”的队列,队列类型选择为“通用队列”。创建队列详细介绍请参考创建队列。 图2 创建队列 单击“立即购买”,确认配置。 配置确认无误,单击“提交”完成队列创建。
  • 环境准备 在进行Spark 作业访问DLI元数据开发前,请准备以下开发环境。 表1 Spark Jar作业开发环境 准备项 说明 操作系统 Windows系统,支持Windows7以上版本。 安装JDK JDK使用1.8版本。 安装和配置IntelliJ IDEA IntelliJ IDEA为进行应用开发的工具,版本要求使用2019.1或其他兼容版本。 安装Maven 开发环境的基本配置。用于项目管理,贯穿软件开发生命周期。