华为云用户手册

  • 逻辑模型转换为物理模型 完成逻辑模型的创建后,您可以将逻辑模型转换为物理模型,支持转换为已有的物理模型。 在 DataArts Studio 控制台首页,选择对应工作空间的“数据架构”模块,进入数据架构页面。 在数据架构控制台,单击左侧导航树中的“逻辑建模”,进入后选择“逻辑模型”进入逻辑模型页面。 在总览图中找到所需要的逻辑模型,将光标移动到该卡片上,单击该模型的转换按钮。逻辑模型只支持转换为关系建模的模型。 图8 逻辑模型转化为物理模型 在“转换为物理模型”对话框中,配置如下参数,然后单击“确定”。 图9 转换为物理模型 逻辑模型转换为物理模型时,系统会先校验是否有前缀。 表6 参数描述 参数名称 说明 *模型名称 逻辑模型所需转换的物理模型的名称。在下拉列表中选择一个已有的模型。 *更新已有表 当选择了模型名称后才显示该参数。 不更新 更新 如果选择更新已有表,则需要选择“物理表更新方式”。 不删除多余字段 删除多余字段 *数据连接类型 在下拉列表中选择数据连接类型。 数据连接 选择所需要的数据连接。同一个关系模型一般建议使用统一的数据连接。 如果您还未创建与数据源之间的数据连接,请前往DataArts Studio管理中心控制台进行创建,详情请参见配置DataArts Studio数据连接参数。 数据库 选择数据库。如果您还未创建数据库,可以前往DataArts Studio数据开发控制台进行创建,详情请参见新建数据库。 选择逻辑实体 全部:将所有的逻辑实体转换为物理表。 部分:将选择的部分逻辑实体转换为物理表。 队列 DLI 队列。该参数仅DLI连接类型有效。 Schema DWS和POSTGRESQL的模式。该参数仅支持DWS和POSTGRESQL连接类型。 描述 描述信息。支持的长度为0~600个字符。
  • 功能 MRS Kafka主要是查询Topic未消费的消息数。 Kafka是一个分布式的、分区的、多副本的消息发布-订阅系统,它具有消息持久化、高吞吐、分布式、多客户端支持、实时等特性,适用于离线和在线的消息消费,如常规的消息收集、网站活性跟踪、聚合统计系统运营数据(监控数据)、日志收集等大量数据的互联网服务的数据收集场景。Kafka作为一个消息发布-订阅系统,为整个大数据平台多个子系统之间数据的传递提供了高速数据流转方式。
  • 功能 DLI Flink作业专为实时数据流处理设计,适用于低时延、需要快速响应的场景。适用于实时监控、在线分析等场景。 DLI Flink Job节点用于创建和执行一个DLI Flink作业,或者查询DLI作业是否正在运行,实现实时流式大数据分析。 DLI Flink流式作业提交到DLI之后,若处于运行中的状态,则认为节点执行成功。若作业配置了周期调度,则会周期检查该Flink作业是否依然处于运行中的状态,如果处于运行状态,则认为节点执行成功。
  • 属性 表1 属性说明 属性 类型 描述 示例 dataArray String Loop.dataArray表示For Each节点“数据集”中定义的二维数组。 一般定义格式为#{Loop.dataArray[0][0]}、#{Loop.dataArray[0][1]}等类似样式。其中[0][0]表示数组中第一行的第一个值,[0][1]表示第一行的第二个值,以此类推。 作为For Each节点的“子作业参数”取值,表示For Each循环中,始终取“数据集”中二维数组的第二行的第一个值。 #{Loop.dataArray[1][0]} current String For Each节点在处理数据集的时候,是一行一行进行处理的。Loop.current表示当前遍历到的For Each节点“数据集”中定义的二维数组的某一行,该数据行为一维数组。 一般定义格式为#{Loop.current[0]}、#{Loop.current[1]}或其他。其中[0]表示遍历到的当前行的第一个值,[1]表示遍历到的当前行的第二个值,以此类推。 作为For Each节点的“子作业参数”取值,表示For Each循环遍历中,取“数据集”中二维数组的当前遍历行的第二个值。 #{Loop.current[1]} offset Int For循环当前的偏移量,从0开始。 Loop.dataArray[Loop.offset] = Loop.current。 获取For Each循环当前的偏移量,即遍历次数,从0开始。 #{Loop.offset}
  • 建立主机数据连接 开发Python脚本前,我们需要建立一个到弹性 云服务器ECS 的连接。 参考访问DataArts Studio实例控制台,登录DataArts Studio管理控制台。 在DataArts Studio控制台首页,选择对应工作空间的“管理中心”模块,进入管理中心页面。 在管理中心页面,单击“数据连接”,进入数据连接页面并单击“创建数据连接”。 参见表1配置相关参数,创建主机连接名称为“ecs”的数据连接,如图1所示。 表1 主机连接 参数 是否必选 说明 数据连接类型 是 主机连接固定选择为主机连接。 数据连接名称 是 数据连接的名称,只能包含字母、数字、下划线和中划线,且长度不超过100个字符。 描述 否 为更好地识别数据连接,此处加以描述信息,长度不能超过100个字符。 标签 否 标识数据连接的属性。设置标签后,便于统一管理。 说明: 标签的名称,只能包含中文、英文字母、数字和下划线,不能以下划线开头,且长度不能超过100个字符。 适用组件 是 选择此连接适用的组件。勾选组件后,才能在相应组件内使用本连接。 基础与网络连通配置 主机地址 是 Linux操作系统主机的IP地址。 请参考查看云服务器详细信息获取。 绑定Agent 是 选择 CDM 集群,CDM集群提供Agent。如果没有可用的CDM集群,请参考创建CDM集群进行创建。 说明: CDM集群作为管理中心数据连接Agent时,单集群的并发活动线程最大为200。即当多个数据连接共用同一Agent时,通过这些数据连接提交SQL脚本、Shell脚本、Python脚本等任务的同时运行上限为200,超出的任务将排队等待。建议您按照业务量情况规划多个Agent分担压力。 在调度Shell、Python脚本时,Agent会访问E CS 主机,如果Shell、Python脚本的调度频率很高,ECS主机会将Agent的内网IP加入黑名单。为了保障作业的正常调度,强烈建议您使用ECS主机的root用户将绑定Agent(即CDM集群)的内网IP加到/etc/hosts.allow文件里面。 CDM集群的内网IP获取方式请参见查看并修改CDM集群配置。 端口 是 主机的SSH端口号。 Linux操作系统主机的默认登录端口为22,如有修改可通过主机路径“/etc/ssh/sshd_config”文件中的port字段确认端口号。 KMS密钥 是 通过KMS加解密数据源认证信息,选择KMS中的任一默认密钥或自定义密钥即可。 说明: 第一次通过DataArts Studio或KPS使用KMS加密时,会自动生成默认密钥dlf/default或kps/default。关于默认密钥的更多信息,请参见什么是默认密钥。 仅支持通过对称密钥加密,暂不支持非对称密钥。 数据源认证及其他功能配置 用户名 是 主机的登录用户名。 登录方式 是 选择主机的登录方式: 密钥对 密码 密钥对 是 “登录方式”为“密钥对”时,显示该配置项。 主机的登录方式为密钥对时,您需要获取并上传其私钥文件至OBS,在此处选择对应的OBS路径(OBS路径中不能存在中文字符)。 说明: 此处上传的私钥文件应和主机上配置的公钥是一个密钥对,详情请参见密钥对使用场景介绍。 密钥对密码 是 如果密钥对未设置密码,则不需要填写该配置项。 密码 是 “登录方式”为“密码”时,显示该配置项。 主机的登录方式为密码时,填写主机的登录密码。 主机连接描述 否 主机连接的描述信息。 图1 新建主机连接 关键参数说明: 主机地址:已开通ECS主机中开通的ECS主机的IP地址。 绑定Agent:已开通批量数据迁移增量包中开通的CDM集群。 单击“测试”,测试数据连接的连通性。如果无法连通,数据连接将无法创建。 测试通过后,单击“确定”,创建数据连接。
  • 在作业中引用Python脚本 创建一个作业。 选择Python节点,并配置节点属性。 选择已创建好的Python脚本,配置相关节点参数。在“参数”里面可以配置脚本参数,例如: 配置的参数是指执行Python语句时,向语句传递的参数,参数之间使用空格分隔,例如:Microsoft Oracle。此处的“参数”需要在Python语句中引用,否则配置无效。 图5 配置Python节点属性 单击“测试运行”,查看该作业的运行结果。 图6 查看作业运行结果 单击“保存”,作业配置信息创建完成。 单击“提交”,提交版本后对该作业进行调度。
  • 环境准备 已开通弹性云服务器,并创建ECS,ECS主机名为“ecs-dgc”。 本示例主机选择“CentOS 8.0 64bit with ARM(40GB)”的公共镜像,并且使用ECS自带的Python环境,您可登录主机后使用python命令确认服务器的Python环境。 已开通数据集成增量包,CDM集群名为“cdm-dlfpyhthon”,提供数据开发模块与ECS主机通信的代理。 请确保ECS主机与CDM集群网络互通,互通需满足如下条件: CDM集群与ECS主机同区域情况下,同虚拟私有云、同子网、同安全组的不同实例默认网络互通;如果同虚拟私有云但是子网或安全组不同,还需配置路由规则及安全组规则,配置路由规则请参见如何配置路由规则章节,配置安全组规则请参见如何配置安全组规则章节。 CDM集群与ECS主机处于不同区域的情况下,需要通过公网或者专线打通网络。通过公网互通时,需确保CDM集群已绑定EIP,数据源所在的主机可以访问公网且防火墙规则已开放连接端口。 此外,您还必须确保该ECS主机与CDM集群所属的企业项目必须相同,如果不同,需要修改工作空间的企业项目。
  • 数据准备 下载Flink作业资源包"wordcount.jar",下载地址:https://github.com/huaweicloudDocs/dgc/blob/master/WordCount.jar 下载的Flink作业资源包需要进行JAR包完整性校验。Windows操作系统下,打开本地命令提示符框,输入如下命令,在本地生成已下载JAR包的SHA256值,其中,“D:\wordcount.jar”为JAR包的本地存放路径和JAR包名,请根据实际情况修改。 certutil -hashfile D:\wordcount.jar SHA256 命令执行结果示例,如下所示: SHA256 的 D:\wordcount.jar 哈希: 0859965cb007c51f0d9ddaf7c964604eb27c39e2f1f56e082acb20c8eb05ccc4 CertUtil: -hashfile 命令成功完成。 对比所下载JAR包的SHA256值和下面JAR包的SHA256值。如果一致,则表示下载过程不存在篡改和丢包。 SHA256值:0859965cb007c51f0d9ddaf7c964604eb27c39e2f1f56e082acb20c8eb05ccc4 准备数据文件“in.txt”,内容为一段英文单词。
  • 操作步骤 将作业资源包和数据文件传入OBS桶中。 本例中,WordCount.jar文件上传路径为:lkj_test/WordCount.jar;word.txt 文件上传路径为:lkj_test/input/word.txt。 创建一个数据开发模块的批处理作业,作业名称为“job_MRS_Flink”。 图1 新建作业 进入到作业开发页面,拖动“MRS Flink”节点到画布中并单击,配置节点的属性。 图2 配置MRS Flink节点属性 参数设置说明: --Flink作业名称 wordcount --MRS集群名称 选择一个MRS集群 --运行程序参数 -c org.apache.flink.streaming.examples.wordcount.WordCount --Flink作业资源包 wordcount --输入数据路径 obs://dlf-test/lkj_test/input/word.txt --输出数据路径 obs://dlf-test/lkj_test/output.txt 其中: obs://dlf-test/lkj_test/input/word.txt为wordcount.jar的传入参数路径,可以把需要统计的单词写到里面; obs://dlf-test/lkj_test/output.txt为输出参数文件的路径(如已存在output.txt文件,会报错)。 单击“测试运行”,执行该MRS Flink作业。 待测试完成,执行“提交”。 在“作业监控”界面,查看作业执行结果。 查看OBS桶中返回的记录(没设置返回可跳过)。
  • 案例二:通过MRS Spark Python作业实现打印输出"hello python" 前提条件: 开发者具有OBS相关路径的访问权限。 数据准备: 准备脚本文件"zt_test_sparkPython1.py",具体内容如下: from pyspark import SparkContext, SparkConf conf = SparkConf().setAppName("master"). setMaster("yarn") sc = SparkContext(conf=conf) print("hello python") sc.stop() 操作步骤: 将脚本文件传入OBS桶中。 创建一个数据开发模块的批处理作业。 进入到作业开发页面,拖动“MRS Spark Python”节点到画布中并单击,配置节点的属性。 参数设置说明: --master yarn --deploy-mode cluster obs://obs-tongji/python/zt_test_sparkPython1.py 其中:zt_test_sparkPython1.py 为脚本所在路径 单击“测试运行”,执行该脚本作业。 待测试完成,执行“提交”。 在“作业监控”界面,查看作业执行结果。 图8 查看作业执行结果 日志验证。 运行成功后,登录MRS manager后在YARN上查看日志,发现有hello python的输出。 图9 查看YARN上日志
  • 开发DWS SQL作业 DWS SQL脚本开发完成后,我们为DWS SQL脚本构建一个周期执行的作业,使得该脚本能定期执行。 创建一个批处理作业,作业名称为“job_dws_sql”。 然后进入到作业开发页面,拖动DWS SQL节点到画布中并单击,配置节点的属性。 图2 配置DWS SQL节点属性 关键属性说明: SQL脚本:关联开发DWS SQL脚本中开发完成的DWS SQL脚本“dws_sql”。 数据连接:默认选择SQL脚本“dws_sql”中设置的数据连接,支持修改。 数据库:默认选择SQL脚本“dws_sql”中设置的数据库,支持修改。 脚本参数:通过EL表达式获取"yesterday"的值,EL表达式如下: #{Job.getYesterday("yyyy-MM-dd")} 节点名称:默认显示为SQL脚本“dws_sql”的名称,支持修改。 作业编排完成后,单击,测试运行作业。 如果运行成功,单击画布空白处,在右侧的“调度配置”页面,配置作业的调度策略。 图3 配置调度方式 说明: 2021/08/06至2021/08/31,每天2点执行一次作业。 单击“提交”,执行调度作业,实现作业每天自动运行。
  • 开发Hive SQL作业 Hive SQL脚本开发完成后,我们为Hive SQL脚本构建一个周期执行的作业,使得该脚本能定期执行。 创建一个数据开发模块的批处理作业,作业名称为“job_hive_sql”。 图2 创建job_hive_sql作业 然后进入到作业开发页面,拖动MRS Hive SQL节点到画布中并单击,配置节点的属性。 图3 配置MRS Hive SQL节点属性 关键属性说明: 节点名称:默认显示为SQL脚本“hive_sql”的名称,支持修改。 SQL脚本:关联开发Hive SQL脚本中开发完成的Hive SQL脚本“hive_sql”。 数据连接:默认选择SQL脚本“hive_sql”中设置的数据连接,支持修改。 数据库:默认选择SQL脚本“hive_sql”中设置的数据库,支持修改。 作业编排完成后,单击,测试运行作业。 如果运行成功,单击画布空白处,在右侧的“调度配置”页面,配置作业的调度策略。 图4 配置调度方式 该作业调度时间在2021/01/01至2021/01/25,每天2点调度一次作业。 最后我们需要提交版本,执行调度作业,实现作业每天自动运行。
  • 提交Spark作业 用户需要在数据开发模块中创建一个作业,通过作业的DLI Spark节点提交Spark作业。 创建一个数据开发批处理作业,右键单击目录,单击“新建作业”,作业名称为“job_DLI_Spark”。 图2 创建作业 然后进入作业开发页面,拖动DLI Spark节点到画布并单击,配置节点的属性。 图3 配置节点属性 关键属性说明: DLI队列:DLI中创建的DLI队列。 作业运行资源:DLI Spark节点运行时,限制最大可以使用的CPU、内存资源。 作业主类:DLI Spark节点的主类,本例的主类是“org.apache.spark.examples.SparkPi”。 Spark程序资源包:3中创建的资源。 作业编排完成后,单击,测试运行作业。 图4 作业日志(仅参考) 如果日志运行正常,保存作业并提交版本。
  • 场景说明 用户在使用DLI服务时,大部分时间会使用SQL对数据进行分析处理,有时候处理的逻辑特别复杂,无法通过SQL处理,那么可以通过Spark作业进行分析处理。本章节通过一个例子演示如何在数据开发模块中提交一个Spark作业。 操作流程如下: 创建DLI集群,通过DLI集群的物理资源来运行Spark作业。 获取Spark作业的演示JAR包,并在数据开发模块中关联到此JAR包。 创建数据开发模块作业,通过DLI Spark节点提交Spark作业。
  • 还原资产 在DataArts Studio控制台首页,选择对应工作空间的“数据开发”模块,进入数据开发页面。 在数据开发模块控制台的左侧导航栏,选择“备份管理”。 选择“还原管理”页签,单击“还原备份”。 在还原备份对话框中,从OBS桶中选择待还原的资产存储路径,设置重名处理策略。 待还原的资产存储路径为备份资产中生成的文件路径。 您可在还原资产前修改备份路径下的backup.json文件,支持修改连接名(connectionName)、数据库名(database)和集群名(clusterName)。 图2 还原资产 单击“确定”。
  • 备份资产 参考访问DataArts Studio实例控制台,登录DataArts Studio管理控制台。 在DataArts Studio控制台首页,选择对应工作空间的“数据开发”模块,进入数据开发页面。 在数据开发主界面的左侧导航栏,选择“备份管理”。 单击“启动每日备份”,打开“OBS文件浏览”页面,选择OBS文件夹,设置备份数据的存储位置。 图1 备份管理 每日备份在每日0点开始备份昨日的所有作业、脚本、资源和环境变量,启动当日不会备份昨日的作业、脚本、资源和环境变量。 选择OBS存储路径时,若仅选择至桶名层级,则备份对象自动存储在以“备份日期”命名的文件夹内。环境变量,资源,脚本和作业分别存储在1_env,2_resources,3_scripts和4_jobs文件夹内。 备份成功后,在以“备份日期”命名的文件夹内,自动生成backup.json文件,该文件按照节点类型存储了作业信息,支持恢复作业前进行修改。 启动每日备份后,若想结束备份任务,您可以单击右边的“停止每日备份”。
  • 作业实例运行状态 表4 作业实例运行状态说明 运行状态 场景描述 等待运行 如果作业实例依赖的前置作业实例未最终完成(未最终完成的状态包括:未生成实例、等待运行、运行失败),该实例处于等待运行。 运行中 作业正常运行中。说明前置的依赖作业都已完成,该作业调度时间已到。 运行成功 作业真正成功执行了业务逻辑,并且最终成功(包含失败重试的成功)。 “运行成功”包括了“成功”、“强制成功”、“忽略失败”三种运行状态。 强制成功 作业实例处于失败或取消状态时,进行手动执行强制成功。 忽略失败成功 如下图所示,节点B设置了失败处理策略,当B执行失败了,会跳过B继续执行C,当存在这种节点运行失败,整个作业执行完成了就是忽略失败成功。 图10 失败处理策略-继续执行下一节点 运行异常 这种运行状态场景较少。如下图所示,节点B设置了失败处理策略,当B执行失败了,作业实例立即挂起,不会继续执行C,作业实例进入异常运行状态。 图11 失败处理策略-挂起当前作业执行计划 已暂停 这种运行状态场景较少。当某个作业的实例正在运行,测试人员在作业监控界面,手工暂停作业调度。此时,该作业正在运行的实例会进入已暂停状态。 已取消 等待运行状态的作业实例,进行手工停止,则实例处于已取消状态。 如果作业实例依赖的直接上游作业被停止调度了,该作业实例会自动进入已取消状态。作业A依赖作业B,作业B被停止调度,作业A实例生成后会自动取消。 冻结 对于未来时间内尚未生成的作业实例,进行冻结后,该作业实例会进入冻结状态。 失败 作业执行失败。执行失败的作业,可以查看失败原因,比如作业的哪个节点执行失败。
  • SQL复杂度 SQL复杂度查看支持实时处理单任务Flink SQL(包括MRS Flink SQL)作业。 SQL复杂度:系统会先自动统计SQL语句中的关键字,再折算为SQL复杂度。 统计SQL关键字。 SQL关键字个数=JOIN个数+GROUP BY个数+ORDER BY个数+DISTINCT个数+窗口函数个数+MAX((INSERT个数|UPDATE个数|DELETE个数), 1)。 如果SQL关键字个数远高于20,会导致解析消耗大量时间,且作业长期处于排队状态,建议您优化SQL,控制SQL关键字个数。 SQL复杂度计算。 SQL关键字个数小于等于3,复杂度为1。 SQL关键字个数小于等于6,且大于等于4,复杂度为1.5。 SQL关键字个数小于等于19,且大于等于7,复杂度为2。 SQL关键字个数大于等于20,复杂度为4。 SQL作业复杂度示例,以下述SQL为例。 SELECT DISTINCT total1 FROM(SELECT id1, COUNT(f1) AS total1 FROM in1 GROUP BY id1 ) tmp1 ORDER BY total1 DESC LIMIT 100; 其中: DISTINCT个数为1 GROUP BY个数为1 ORDER BY个数为1 MAX((INSERT个数|UPDATE个数|DELETE个数), 1) = MAX(0|0|0, 1) = 1 SQL关键字个数 = 1+1+1+1 = 4 由此可知,SQL关键字个数小于等于6,且大于等于4,复杂度为1.5,即SQL复杂度为1.5。
  • SQL复杂度 SQL复杂度查看支持批处理的单任务Spark SQL作业。 SQL复杂度:系统会先自动统计SQL语句中的关键字,再折算为SQL复杂度。 统计SQL关键字。 SQL关键字个数=JOIN个数+GROUP BY个数+ORDER BY个数+DISTINCT个数+窗口函数个数+MAX((INSERT个数|UPDATE个数|DELETE个数), 1)。 如果SQL关键字个数远高于20,会导致解析消耗大量时间,且作业长期处于排队状态,建议您优化SQL,控制SQL关键字个数。 SQL复杂度计算。 SQL关键字个数小于等于3,复杂度为1。 SQL关键字个数小于等于6,且大于等于4,复杂度为1.5。 SQL关键字个数小于等于19,且大于等于7,复杂度为2。 SQL关键字个数大于等于20,复杂度为4。 SQL作业复杂度示例,以下述SQL为例。 SELECT DISTINCT total1 FROM(SELECT id1, COUNT(f1) AS total1 FROM in1 GROUP BY id1 ) tmp1 ORDER BY total1 DESC LIMIT 100; 其中: DISTINCT个数为1 GROUP BY个数为1 ORDER BY个数为1 MAX((INSERT个数|UPDATE个数|DELETE个数), 1) = MAX(0|0|0, 1) = 1 SQL关键字个数 = 1+1+1+1 = 4 由此可知,SQL关键字个数小于等于6,且大于等于4,复杂度为1.5,即SQL复杂度为1.5。
  • 举例 字符串变量str的内容如下: { "cities": [{ "name": "city1", "areaCode": "1000" }, { "name": "city2", "areaCode": "2000" }, { "name": "city3", "areaCode": "3000" }] }
  • 方法 表1 方法说明 方法 描述 示例 Object parse(String jsonStr) 将json字符串转换为对象。 假设变量a为JSON字符串,将json字符串转换为对象,EL表达式如下: #{JSONUtil.parse(a)} String toString(Object jsonObject) 将对象转换为json字符串。 假设变量b为对象,将对象转换为json字符串,EL表达式如下: #{JSONUtil.toString(b)} Object path(String jsonStr,String jsonPath) 返回json字符串指定路径下的字段值。类似于XPath,path方法可以通过路径检索或设置JSON,其路径中可以使用.或[]等访问成员、数值,例如:tables[0].table_name。 字符串变量str的内容如下: { "cities": [{ "name": "city1", "areaCode": "1000" }, { "name": "city2", "areaCode": "2000" }, { "name": "city3", "areaCode": "3000" }] } 获取city1的电话区号,EL表达式如下: #{JSONUtil.path(str,"cities[0].areaCode")}
  • 方法 表1 方法说明 方法 描述 示例 String format(Date date, String pattern) 将Date类型时间按指定pattern格式为字符串。 将作业调度计划的时间,转换为毫秒格式。 #{DateUtil.format(Job.planTime,"yyyy-MM-dd HH:mm:ss:SSS")} 将作业调度计划减一天的时间,转换为周格式。 #{DateUtil.format(DateUtil.addDays(Job.planTime,-1),"yyyyw")} Job.planTime为2024年1月7日时,返回值为20241。 #{DateUtil.format(DateUtil.addDays(Job.planTime,-1),"yyyyww")} Job.planTime为2024年1月7日时,返回值为202401。 Date addMonths(Date date, int amount) 给date添加指定月数后,返回新Date对象,amount可以是负数。 将作业调度计划减一个月的时间,转换为月份格式。 #{DateUtil.format(DateUtil.addMonths(Job.planTime,-1),"yyyy-MM")} Date addDays(Date date, int amount) 给date添加指定天数后,返回新Date对象,amount可以是负数。 将作业调度计划减一天的时间,转换为年月日格式。 #{DateUtil.format(DateUtil.addDays(Job.planTime,-1),"yyyy-MM-dd")} 将作业调度计划减一天的时间,转换为周格式。(非跨年业务场景使用小写y) #{DateUtil.format(DateUtil.addDays(Job.planTime,-1),"yyyyw")} Job.planTime为2024年1月7日时,返回值为20241。 #{DateUtil.format(DateUtil.addDays(Job.planTime,-1),"yyyyww")} Job.planTime为2024年1月7日时,返回值为202401。 将作业调度计划减一天的时间,转换为周格式。(跨年业务场景使用大写Y) #{DateUtil.format(DateUtil.addDays(Job.planTime,-7),"YYYYww")} Job.planTime为2024年1月7日时,返回值为202401。 说明: yyyy和YYYY的使用区别: YYYY表示周年的年份,它可以正确处理跨年的周数。 yyyy表示日历年份,它可能会导致跨年的周数不准确。 因此,在处理跨年业务场景时,年份使用大写的YYYY,除此之外,请使用小写的yyyy。 目前自然周是周天到周六,暂不支持自然周是从星期一到星期天。 Date addHours(Date date, int amount) 给date添加指定小时数后,返回新Date对象,amount可以是负数。 将作业调度计划减一小时的时间,转换为小时格式。 #{DateUtil.format(DateUtil.addHours(Job.planTime,-1),"yyyy-MM-dd HH")} Date addMinutes(Date date, int amount) 给date添加指定分钟数后,返回新Date对象,amount可以是负数。 将作业调度计划减一分钟的时间,转换为分钟格式。 #{DateUtil.format(DateUtil.addMinutes(Job.planTime,-1),"yyyy-MM-dd HH:mm")} int getDay(Date date) 从date获取天,例如:date为2018-09-14,则返回14。 从作业调度计划获取具体的天。 #{DateUtil.getDay(Job.planTime)} int getMonth(Date date) 从date获取月,例如:date为2018-09-14,则返回9。 从日期获取具体的月。 #{DateUtil.getMonth(Job.planTime)} int getQuarter(Date date) 从date获取季度,例如:date为2018-09-14,则返回3。 从日期获取具体的季度。 #{DateUtil.getQuarter(Job.planTime)} int getYear(Date date) 从date获取年,例如:date为2018-09-14,则返回2018。 从日期获取具体的年。 #{DateUtil.getYear(Job.planTime)} Date now() 返回当前时间。 以秒格式返回当前的时间。 #{DateUtil.format(DateUtil.now(),"yyyy-MM-dd HH:mm:ss")} long getTime(Date date) 将Date类型时间转换为long类型时间戳。 将作业调度计划时间转换为时间戳。 #{DateUtil.getTime(Job.planTime)} Date parseDate(String str, String pattern) 字符串按pattern转换为Date类型,pattern为日期、时间模式,请参考日期和时间模式。 将字符串类型的作业启动时间转换为秒格式。 #{DateUtil.parseDate(Job.getPlanTime("yyyy-MM-dd HH:mm:ss:SSS"),"yyyy-MM-dd HH:mm:ss")}
  • 属性和方法 表1 属性说明 属性 类型 描述 name String 作业名称。 planTime java.util.Date 作业调度计划时间,即周期调度配置的时间,例如每天凌晨1:01调度作业。 startTime java.util.Date 作业执行时间,有可能与planTime同一个时间,也有可能晚于planTime(由于作业引擎繁忙等)。 eventData String 当作业使用事件驱动调度时,从通道获取的消息。 projectId String 当前数据开发模块所处项目ID。 表2 方法说明 方法 描述 示例 String getNodeStatus(String nodeName) 获取指定节点运行状态,成功状态返回success,失败状态返回fail。 例如,判断节点是否运行成功,可以使用如下判断条件,其中test为节点名称: #{(Job.getNodeStatus("test")) == "success" } 获取test节点运行状态。 #{Job.getNodeStatus("test")} String getNodeOutput(String nodeName) 获取指定节点的输出。此方法只能获取前面依赖节点的输出。 获取test节点输出。 #{Job.getNodeOutput("test")} 当前一节点执行无结果时,输出结果为“null”。 当前一节点的输出结果是一个字段时,输出结果形如[["000"]]所示。此时可通过EL表达式分割字符串结果,获取前一节点输出的字段值,但注意输出结果类型为String。需要输出原数据类型时,仍需通过For Each节点及其支持的Loop内嵌对象EL表达式获取。 #{StringUtil.split(StringUtil.split(StringUtil.split(Job.getNodeOutput("前一节点名"),"]")[0],"[")[0],"\\"")[0]} 当前一节点的输出结果是多个(两个及以上)字段时,输出结果形如[["000"],["001"]]所示。此时需要结合For Each节点及其支持的Loop内嵌对象EL表达式如#{Loop.current[0]},循环获取输出结果,详见获取SQL节点的输出结果值。 String getParam(String key) 获取作业参数。 注意此方法只能直接获取当前作业里配置的参数值,并不能获取到父作业传递过来的参数值,也不能获取到工作空间里面配置的全局变量,作用域仅为本作业。 这种情况下建议使用表达式${job_param_name},既可以获取到父作业传递过来的参数值,也可以获取到全局配置的变量。 获取参数test的值: #{Job.getParam("test")} String getPlanTime(String pattern) 获取指定pattern的计划时间字符串,pattern为日期、时间模式,请参考日期和时间模式。 获取作业调度计划时间,具体到毫秒: #{Job.getPlanTime("yyyy-MM-dd HH:mm:ss:SSS")} String getYesterday(String pattern) 获取执行pattern的计划时间前一天的时间字符串,pattern为日期、时间模式,请参考日期和时间模式。 获取作业调度计划时间的前一天的时间,具体到日期: #{Job.getYesterday("yyyy-MM-dd HH:mm:ss:SSS")} String getLastHour(String pattern) 获取执行pattern的计划时间前一小时的时间字符串,pattern为日期、时间模式,请参考日期和时间模式。 获取作业调度计划时间前一小时的时间,具体到小时: #{Job.getLastHour("yyyy-MM-dd HH:mm:ss:SSS")} String getRunningData(String nodeName) 获取指定节点运行中记录的数据,当前只支持获取DLI SQL节点SQL语句运行的作业id。此方法只能获取前面依赖节点的输出。 例如,想要获取DLI节点第3条语句的job ID(DLI节点名为DLI_INSERT_DATA),可以这样使用:#{JSONUtil.path(Job.getRunningData("DLI_INSERT_DATA"),"jobIds[2]")}。 获取指定DLI SQL节点test中第三条语句的job ID: #{JSONUtil.path(Job.getRunningData("test"),"jobIds[2]")} String getInsertJobId(String nodeName) 返回指定DLI SQL或Transform Load节点第一个Insert SQL语句的作业ID,不指定参数nodeName时,获取前面一个节点第一个DLI Insert SQL语句的作业ID,如果无法获取到作业ID,返回null值。 获取DLI SQL节点test中第一个Insert SQL语句的job ID: #{Job.getInsertJobId("test")} String getPreviousWorkday(Integer num, String pattern) 按照指定的pattern返回计划时间前第num个工作日的时间字符串,num只可为正整数。若没有获取到符合条件的结果则返回null值。 该EL表达式适用于按照日历选择自定义日期进行周期调度。 获取作业调度前五天的工作日的日期。 #{Job.getPreviousWorkday(5, "yyyyMMdd")} String getPreviousNonWorkday(Integer num, String pattern) 按照指定的pattern返回计划时间前第num个非工作日的时间字符串,num只可为正整数。若没有获取到符合条件的结果则返回null值。 该EL表达式适用于按照日历选择自定义日期进行周期调度。 获取作业调度前一天的非工作日的日期。 #{Job.getPreviousNonWorkday(1, "yyyyMMdd")} String getCalendarPreviousWorkday(String calendarName, Integer num, String pattern) 按照指定的pattern返回指定日历计划时间前第num个工作日的时间字符串,num只可为正整数。若没有获取到符合条件的结果则返回null值。 该EL表达式适用于按照日历选择自定义日期进行周期调度。 获取指定日历作业调度计划时间前五天的工作日的日期。 #{Job.getCalendarPreviousWorkday(5, "yyyyMMdd")} String getCalendarPreviousNonWorkday(String calendarName, Integer num, String pattern) 按照指定的pattern返回指定日历计划时间前第num个非工作日的时间字符串,num只可为正整数。若没有获取到符合条件的结果则返回null值。 该EL表达式适用于按照日历选择自定义日期进行周期调度。 获取指定日历作业调度计划时间前一天的非工作日的日期。 #{Job.getCalendarPreviousNonWorkday(1, "yyyyMMdd")} String getCalendarNextWorkday(String calendarName, Integer num, String pattern) 按照指定的pattern返回指定日历计划时间后第num个工作日的时间字符串,num只可为正整数。若没有获取到符合条件的结果则返回null值。 该EL表达式适用于按照日历选择自定义日期进行周期调度。 获取指定日历作业调度计划时间后五天的工作日的日期。 #{Job.getCalendarNextWorkday(5, "yyyyMMdd")} String getCalendarNextNonWorkday(String calendarName, Integer num, String pattern) 按照指定的pattern返回指定日历计划时间后第num个非工作日的时间字符串,num只可为正整数。若没有获取到符合条件的结果则返回null值。 该EL表达式适用于按照日历选择自定义日期进行周期调度。 获取指定日历作业调度计划时间后一天的非工作日的日期。 #{Job.getPreviousNonWorkday(1, "yyyyMMdd")}
  • 调试方法介绍 下面介绍几种EL表达式的调试方法,能够在调试过程中方便地看到替换结果。 后文以#{DateUtil.now()}表达式为例进行介绍。 使用DIS Client节点。 前提:您需要具备DIS通道。 方法:选择DIS Client节点,将EL表达式直接写在要发送的数据中,单击“测试运行”,然后在节点上右键查看日志,日志中会把EL表达式的值打印出来。 使用Kafka Client节点。 前提:您需要具备MRS集群,且集群有Kafka组件。 方法:选择Kafka Client节点,将EL表达式直接写在要发送的数据中,单击“测试运行”,然后在节点上右键查看日志,日志中会把EL表达式的值打印出来。 使用Shell节点。 前提:您需要具备弹性云服务器ECS。 方法:创建一个主机连接,将EL表达式直接通过echo打印出来,单击“测试运行”之后查看日志,日志中会打印出EL表达式的值。 使用Create OBS节点。 如果上述方法均不可用,则可以通过Create OBS去创建一个OBS目录,目录名称就是EL表达式的值,单击“测试运行”后,再去OBS界面查看创建出来的目录名称。
  • 举例 在Rest Client节点的参数“URL参数”中使用EL表达式“tableName=#{JSONUtil.path(Job.getNodeOutput("get_cluster"),"tables[0].table_name")}”,如图1所示。 表达式说明如下: 获取作业中“get_cluster”节点的执行结果(“Job.getNodeOutput("get_cluster")”),执行结果是一个JSON字符串。 通过JSON路径(“tables[0].table_name”),获取JSON字符串中字段的值。 图1 表达式示例 EL表达式在数据开发过程中被广泛应用,您可以参考最佳实践查看更多应用EL表达式的进阶实践。
  • 配置节点调度任务(实时作业) 配置实时处理作业的节点调度任务,支持单次调度、周期调度、事件驱动调度三种方式。操作方法如下: 单击画布中的节点,在右侧显示“调度配置”页签,单击此页签,展开配置页面,配置如表4所示的参数。 表4 节点调度配置 参数 说明 调度方式 选择作业的调度方式: 单次调度:手动触发作业单次运行。 周期调度:周期性自动运行作业。 事件驱动调度:根据外部条件触发作业运行。 “周期调度”的参数 生效时间 调度任务的生效时间段。 调度周期 选择调度任务的执行周期,并配置相关参数: 分钟 小时 天 周 月 调度周期需要合理设置,如CDM、ETL作业的调度周期至少应在5分钟以上,并根据作业表的数据量、源端表更新频次等调整。 已经在运行中的作业,可以修改其调度周期。 跨周期依赖 选择作业下实例之间的依赖关系。 不依赖上一调度周期 选择“并发数”。多个作业实例并行执行的个数。如果并发数配置为1,前一个批次执行完成后(包括成功、取消、或失败),下一批次才开始执行。 自依赖(上一调度周期的作业实例执行成功下一周期才会执行,否则处于等待运行状态。) “事件驱动调度”的参数 触发事件类型 选择触发作业运行的事件类型。 DIS通道名称 选择DIS通道,当指定的DIS通道有新消息时,数据开发模块将新消息传递给作业,触发该作业运行。 当“触发事件类型”选择“DIS”或“OBS”时才需要配置。 连接名称 选择数据连接,需先在“管理中心”创建kafka数据连接。当“触发事件类型”选择“KAFKA”时才需要配置。 Topic 选择需要发往kafka的消息Topic。当“触发事件类型”选择“KAFKA”时才需要配置。 OBS路径 选择要监听的OBS路径,如果该路径下有新增文件,则触发调度;新增的文件的路径名,可以通过变量Job.trigger.obsNewFiles引用。前提条件:该OBS路径已经配置DIS 消息通知 。 最大处理文件数 节点被驱动运行时,每批次最多可处理的文件数。当“触发事件类型”选择“OBS”时才需要配置。 消费组 消费者组是kafka提供的可扩展且具有容错性的消费者机制。 它是一个组,所以内部可以有多个消费者,这些消费者共用一个ID,一个组内的所有消费者共同协作,完成对订阅的主题的所有分区进行消费。其中一个主题中的一个分区只能由一个消费者消费。 说明: 一个消费者组可以有多个消费者。 Group ID是一个字符串,在一个kafka集群中,它标识唯一的一个消费者组。 每个消费者组订阅的所有主题中,每个主题的每个分区只能由一个消费者消费。消费者组之间不影响。 当触发事件类型选择了DIS或KAFKA时,会自动关联出消费组的ID,用户也可以手动修改。 事件处理并发数 选择作业并行处理的数量,最大并发数为10。 事件检测间隔 配置时间间隔,检测DIS通道下是否有新的消息。时间间隔单位可以配置为秒或分钟。 读取策略 从上次位置读起 从最新位置读起 当“触发事件类型”选择“DIS”或“KAFKA”时才需要配置。 失败策略 选择节点执行失败后的策略: 挂起 忽略失败,继续调度
  • 约束限制 调度周期需要合理设置,单个作业最多允许5个实例并行执行,如果作业实际执行时间大于作业配置的调度周期,会导致后面批次的作业实例堆积,从而出现计划时间和开始时间相差大。例如CDM、ETL作业的调度周期至少应在5分钟以上,并根据作业表的数据量、源端表更新频次等调整。 如果通过DataArts Studio数据开发调度CDM迁移作业,CDM迁移作业处也配置了定时任务,则两种调度均会生效。为了业务运行逻辑统一和避免调度冲突,推荐您启用数据开发调度即可,无需配置CDM定时任务。
  • 功能 通过MRS Hive SQL节点执行数据开发模块中预先定义的Hive SQL脚本。该节点可以传递SQL语句到Hive中执行,支持DML与DDL SQL语句。 Hive是建立在Hadoop上的 数据仓库 基础构架。它可以用来进行数据提取转化加载,这是一种可以存储、查询和分析存储在Hadoop中的大规模数据的机制。Hive定义了简单的类SQL查询语言,称为HQL,它允许熟悉SQL的用户查询数据。 MRS Hive SQL节点的具体使用教程,请参见开发一个Hive SQL作业。 MRS Hive SQL节点不支持Hive的事务表。
  • 使用场景 当某参数被多个作业调用时,可将此参数提取出来作为默认配置项,无需每个作业都配置该参数。 表1 配置项列表 配置项 影响模块 修改默认配置项是否对存量业务有影响 主要用途 配置工作空间模式 作业调度 普通模式 业务日期模式 是 系统支持按照作业调度计划时间去运行,同时支持按照业务日期去运行。 配置周期调度 作业调度 是 当前作业所依赖的作业执行失败后,当前作业的处理策略。 依赖的作业停止时,当前作业实例处理策略。 配置多IF策略 作业调度 是 节点执行依赖多个IF条件的处理策略。 配置软硬锁策略 脚本/作业开发 否 作业或脚本的抢锁操作依赖于软硬锁处理策略。 脚本变量定义 脚本开发 是 脚本变量的格式定义。SQL脚本的变量格式有${}和${dlf.}两种。 配置数据导出策略 脚本/作业开发 否 对SQL执行结果框中的数据配置下载或转储的策略。 所有用户都可以 所有用户都不能 仅工作空间管理员可以 禁用作业节点名称同步变化 作业开发 否 DataArts Studio作业中的节点关联脚本或者其他服务的作业时,节点名称不会同步变化。 是否使用简易变量集 作业开发 是 简易变量集提供了一系列自定义的变量,实现在任务调度时间内参数的动态替换。 忽略失败的通知策略 运维调度 否 对于运行状态为忽略失败的作业,支持发送的通知类型。 节点超时是否重试 作业运行 是 作业节点运行超时导致的失败也会重试。 实例超时是否忽略等待时间 作业运行 是 实例运行时超时计算将忽略等待时间。 MRS jar包参数拆分规则 作业开发 是 MRS MapReduce算子和MRS Spark算子jar包参数中字符串参数(使用""括起来的参数)拆分规则。 等待运行实例同步作业版本策略 运维调度 是 已生成的等待运行的作业实例,此时发布新的作业版本后,实例是否会使用最新的作业版本运行。 Hive SQL及Spark SQL执行方式 脚本/作业开发 否 SQL语句放置在OBS中:将OBS路径返回给MRS。 SQL语句放置在请求的消息体中:将脚本内容返回给MRS。 补数据优先级设置 运维调度-补数据 否 设置补数据作业的优先级。当系统资源不充足时,可以优先满足优先级较高的作业的计算资源,数字越大优先级越高,当前只支持对DLI SQL算子设置优先级。 历史作业实例取消策略 运维调度 是 配置等待运行作业实例的超期天数。当作业实例等待运行的时间,超过了所配置的期限天数时,作业实例将取消执行。超期天数,最小需配置2天,即至少需要等待2天,才可取消未运行的作业实例。超期天数默认为60天,单位:天。 历史作业实例告警策略 运维调度 否 配置“通知管理”中通知告警能监控的天数范围。 通知管理中配置的告警通知能监控的作业实例天数范围,默认配置为7天,即对7天内满足触发条件的作业实例都能正常上报通知告警,但7天之前的作业实例不会再上报告警。 作业告警通知主题 通知配置 否 按责任人发送通知时所使用的主题。 作业算子失败重试默认策略 运维调度 是 设置作业算子失败重试默认策略。 作业每次重试失败即告警 运维调度 否 当作业配置失败告警的时候,该配置项会触发作业每次重试失败即告警,可作用于全部作业、实时作业和批作业。 若选择不支持,则作业达到最大失败重试次数时才触发失败告警。 作业运行自动传递脚本名称 作业开发(作业运行) 否 开关打开后,系统自动传参将生效:将对当前空间内作业运行时,将Hive SQL脚本set mapreduce.job.name=脚本名称,自动传递至MRS。 作业依赖规则 作业调度 否 作业能被其他空间作业依赖,需要该空间作业列表的查询权限。工作空间内的默认角色均有该权限,自定义角色需要在有数据开发下的作业查询权限。 脚本执行历史展示 脚本/作业开发 否 对脚本执行历史结果进行权限管控。 仅自己可见:脚本执行历史只显示本用户的执行历史。 所有用户可见:脚本执行历史显示所有用户的执行历史。 作业测试运行使用的身份 作业开发(作业测试运行) 否 配置作业测试运行使用的身份。 公共委托或 IAM 账号:使用配置的公共委托或公共IAM账号身份执行作业。 个人账号:使用点击测试作业用户的身份执行作业。 Spark SQL作业/脚本默认模板配置 Spark SQL脚本/作业开发 否 Spark SQL作业/脚本配置运行,是否允许用户设置任意参数。 Hive SQL作业/脚本默认模板配置 Hive SQL脚本/作业开发 否 Hive SQL作业/脚本配置运行,是否允许用户设置任意参数。 作业/脚本变更管理 作业/脚本的导入和导出 否 工作空间是否开启作业/脚本变更管理。 是:表示作业/脚本变化时记录变更事件,支持根据时间点增量导出和导入所有变化的作业/脚本。 否:表示作业/脚本变化时不记录变更事件,只支持选定作业/脚本的导出和导入。 Flink调试OBS桶 Flink SQL实时作业开发 否 在进行Flink SQL作业调试时,调试OBS桶必须选择并行桶。 展示层脱敏配置 脚本开发和作业开发 否 进行脚本和作业开发执行结果、表数据预览操作时,支持配置是否开启前端展示层脱敏。 查看日志打开方式 查看日志 否 支持配置查看日志的打开方式,在新标签页或者弹窗打开日志进行查看。 标签不存在处理策略 作业开发 否 通过标签不存在处理策略配置项,可以管控在作业中是否可以直接创建标签。
  • 功能 通过Rest Client节点执行一个华为云内的RESTful请求。该节点可执行各种服务请求,支持GET、POST、PUT、DELETE请求类型。 Rest Client算子的具体使用教程,请参见获取Rest Client算子返回值教程。 当由于网络限制,Rest Client某些API无法调通时,可以尝试使用Shell脚本进行API调用。您需要拥有弹性云服务器ECS,并确保ECS主机和待调用的API之间网络可通,然后在DataArts Studio创建主机连接,通过Shell脚本使用CURL命令进行API调用。 Rest Client算子目前不支持大量的response返回体,目前代码限制30M。
共100000条
提示

您即将访问非华为云网站,请注意账号财产安全