云服务器内容精选

  • 配置方法(Kafka Client) 登录 DataArts Studio 控制台,找到所需要的DataArts Studio实例,单击实例卡片上的“进入控制台”,进入概览页面。 单击第一个工作空间A的“数据开发”,系统跳转至数据开发页面,新建数据开发作业job1。分别选择Dummy节点和Kafka Client节点,选中连线图标并拖动,编排如图7所示的作业。 Dummy节点不执行任何操作,本例选择Dummy节点仅为演示操作,实际使用中您可以用其他作业节点替代。 Kafka Client节点用于发送消息。您需要选择Kafka连接和Topic名称,并将发送数据配置为EL表达式job1,#{DateUtil.getDay(Job.startTime)}。则当本作业执行完成后,将使用Kafka Client发送一条字符串消息:job1,作业执行日期。例如2月15日作业job1执行,实际的消息则为:job1,15。 作业调度等其他作业参数无需配置,保持默认即可。 图7 job1作业Kafka Client节点配置 在另一个工作空间B,新建数据开发作业job_agent。分别选择Dummy节点和Subjob节点,选中连线图标并拖动,编排图8所示的作业。 图8 job_agent作业调度配置 Dummy节点不执行任何操作,本例选择Dummy节点用于设置Dummy节点到Subjob节点之间连线的IF条件。 Subjob节点用于将需要后续执行的作业job2作为子作业引用执行。实际使用中您可以引用已有作业,也可以使用其他作业节点替代Subjob节点。 作业的调度方式设置为“事件驱动调度”,连接名称和Topic选择为工作空间B中的Kafka连接和Topic,需要与工作空间A中job1作业中Kafka Client节点所选择的Kafka连接和Topic相对应,用于通过Kafka消息触发作业运行。 IF判断条件设置,用于校验Kafka Client节点发送的消息是否符合预期,符合才会继续执行Subjob节点,否则跳过。 右键单击连线,选择“设置条件”,在弹出的“编辑参数表达式”文本框中输入IF判断条件,失败策略保持默认即可。IF判断条件为通过EL表达式语法填写三元表达式,当三元表达式结果为true的时候,才会执行连线后面的节点,否则后续节点将被跳过。 #{StringUtil.equals(StringUtil.split(Job.eventData,',')[1],'21')} 该IF判断条件表示,仅当从Kafka通道获取的消息逗号后的部分为“21”时,即每月21日时,才执行后续的作业节点。 如果您需要匹配多条消息记录,可以添加多个Dummy节点并分别添加到Subjob节点的IF条件,然后将数据开发组件配置项中的“多IF策略”设置为“逻辑或”即可。 图9 编辑参数表达式 测试运行作业job_agent,在工作空间A的作业job1未运行的情况下,前往实例监控中查看执行结果是否符合预期。 由于作业job1未运行即未发送消息,则job_agent作业中的Subjob节点被跳过,证明IF条件判断生效。 图10 Subjob节点被跳过 启动调度job_agent。然后测试运行工作空间A作业job1,待job1实例运行成功后,前往工作空间B实例监控中查看作业运行结果是否符合预期。 job_agent被触发运行。 如果当天日期和IF条件中的日期匹配,则job_agent作业中的Subjob节点成功运行、子作业job2也执行完成。否则Subjob节点被跳过。 图11 Subjob节点成功运行
  • 方案说明 DataArts Studio数据开发模块支持以事件触发的方式运行作业,因此通过DIS或者 MRS Kafka作为作业依赖纽带,可以跨空间实现作业调度。 如下图,工作空间A中的job1运行完成后,可以使用DIS Client或Kafka Client发送消息触发中继作业job_agent;job_agent配置事件触发调度,根据DIS Client或Kafka Client发送的消息触发运行后,判断消息是否符合预期,符合则触发job2作业运行,否则不再触发job2运行。 图1 调度方案
  • 配置方法(DIS Client) 登录DataArts Studio控制台,找到所需要的DataArts Studio实例,单击实例卡片上的“进入控制台”,进入概览页面。 单击第一个工作空间A的“数据开发”,系统跳转至数据开发页面,新建数据开发作业job1。分别选择Dummy节点和DIS Client节点,选中连线图标并拖动,编排如图2所示的作业。 Dummy节点不执行任何操作,本例选择Dummy节点仅为演示操作,实际使用中您可以用其他作业节点替代。 DIS Client节点用于发送消息。您需要选择DIS所属Region和通道,并将发送数据配置为EL表达式job1,#{DateUtil.getDay(Job.startTime)}。则当本作业执行完成后,将使用DIS Client发送一条字符串消息:job1,作业执行日期。例如2月15日作业job1执行,实际的消息则为:job1,15。 作业调度等其他作业参数无需配置,保持默认即可。 图2 job1作业DIS Client节点配置 在另一个工作空间B,新建数据开发作业job_agent。分别选择Dummy节点和Subjob节点,选中连线图标并拖动,编排图3所示的作业。 图3 job_agent作业调度配置 Dummy节点不执行任何操作,本例选择Dummy节点用于设置Dummy节点到Subjob节点之间连线的IF条件。 Subjob节点用于将需要后续执行的作业job2作为子作业引用执行。实际使用中您可以引用已有作业,也可以使用其他作业节点替代Subjob节点。 作业的调度方式设置为“事件驱动调度”,DIS通道名称选择为工作空间A中job1作业中DIS Client节点所选择的通道,用于通过DIS消息触发作业运行。 IF判断条件设置,用于校验DIS Client节点发送的消息是否符合预期,符合才会继续执行Subjob节点,否则跳过。 右键单击连线,选择“设置条件”,在弹出的“编辑参数表达式”文本框中输入IF判断条件,失败策略保持默认即可。IF判断条件为通过EL表达式语法填写三元表达式,当三元表达式结果为true的时候,才会执行连线后面的节点,否则后续节点将被跳过。 #{StringUtil.equals(StringUtil.split(Job.eventData,',')[1],'21')} 该IF判断条件表示,仅当从DIS通道获取的消息逗号后的部分为“21”时,即每月21日时,才执行后续的作业节点。 如果您需要匹配多条消息记录,可以添加多个Dummy节点并分别添加到Subjob节点的IF条件,然后将数据开发组件配置项中的“多IF策略”设置为“逻辑或”即可。 图4 编辑参数表达式 测试运行作业job_agent,在工作空间A的作业job1未运行的情况下,前往实例监控中查看执行结果是否符合预期。 由于作业job1未运行即未发送消息,则job_agent作业中的Subjob节点被跳过,证明IF条件判断生效。 图5 Subjob节点被跳过 启动调度job_agent。然后测试运行工作空间A作业job1,待job1实例运行成功后,前往工作空间B实例监控中查看作业运行结果是否符合预期。 job_agent被触发运行。 如果当天日期和IF条件中的日期匹配,则job_agent作业中的Subjob节点成功运行、子作业job2也执行完成。否则Subjob节点被跳过。 图6 Subjob节点成功运行
  • Where子句中的时间宏变量 以SQOOP. CDM _20171016表为例,该表中存在表示时间的列DS,如图2所示。 图2 表数据 假设当前时间为“2017-10-16”,要导出前一天的数据(即DS=‘2017-10-15’),则可以在创建作业时配置“Where子句”为DS='${dateformat(yyyy-MM-dd,-1,DAY)}',即可将符合DS=‘2017-10-15’条件的数据导出。
  • 时间宏变量和定时任务配合完成增量同步 这里列举两个简单的使用场景: 数据库表中存在表示时间的列DS,类型为“varchar(30)”,插入的时间格式类似于“2017-xx-xx”。 定时任务中,重复周期为1天,每天的凌晨0点执行定时任务。配置“Where子句”为DS='${dateformat(yyyy-MM-dd,-1,DAY)}',这样就可以在每天的凌晨0点导出前一天产生的所有数据。 数据库表中存在表示时间的列time,类型为“Number”,插入的时间格式为时间戳。 定时任务中,重复周期为1天,每天的凌晨0点执行定时任务。配置“Where子句”为time between ${timestamp(-1,DAY)} and ${timestamp()},这样就可以在每天的凌晨0点导出前一天产生的所有数据。 其它的配置方式原理相同。
  • 时间变量宏定义具体展示 假设当前时间为“2017-10-16 09:00:00”,时间变量宏定义具体如表1所示。 表1 时间变量宏定义具体展示 宏变量 含义 实际显示效果 ${dateformat(yyyy-MM-dd)} 以yyyy-MM-dd格式返回当前时间。 2017-10-16 ${dateformat(yyyy/MM/dd)} 以yyyy/MM/dd格式返回当前时间。 2017/10/16 ${dateformat(yyyy_MM_dd HH:mm:ss)} 以yyyy_MM_dd HH:mm:ss格式返回当前时间。 2017_10_16 09:00:00 ${dateformat(yyyy-MM-dd HH:mm:ss, -1, DAY)} 以yyyy-MM-dd HH:mm:ss格式返回时间,时间为当前时间的前一天。 2017-10-15 09:00:00 ${timestamp()} 返回当前时间的时间戳,即1970年1月1日(00:00:00 GMT)到当前时间的毫秒数。 1508115600000 ${timestamp(-10, MINUTE)} 返回当前时间点10分钟前的时间戳。 1508115000000 ${timestamp(dateformat(yyyyMMdd))} 返回今天0点的时间戳。 1508083200000 ${timestamp(dateformat(yyyyMMdd,-1,DAY))} 返回昨天0点的时间戳。 1507996800000 ${timestamp(dateformat(yyyyMMddHH))} 返回当前整小时的时间戳。 1508115600000
  • 路径和表名的时间宏变量 如图1所示,如果将: 源端的“表名”配置为“CDM_/${dateformat(yyyy-MM-dd)}”。 目的端的“写入目录”配置为“/opt/ttxx/${timestamp()}”。 经过宏定义转换,这个作业表示:将Oracle数据库的“SQOOP.CDM_20171016”表中数据,迁移到HDFS的“/opt/ttxx/1508115701746”目录中。 图1 源表名和写入目录配置为时间宏变量 目前也支持一个表名或路径名中有多个宏定义变量,例如“/opt/ttxx/${dateformat(yyyy-MM-dd)}/${timestamp()}”,经过转换后为“/opt/ttxx/2017-10-16/1508115701746”。
  • timestamp timestamp支持两种形式的参数: timestamp() 返回当前时间的时间戳,即从1970年到现在的毫秒数,如1508078516286。 timestamp(dateOffset, dateType) 返回经过时间偏移后的时间戳,“dateOffset”和“dateType”表示日期的偏移量以及偏移量的类型。 例如当前日期为“2017-10-16 09:00:00”,则“timestamp(-10, MINUTE)”返回当前时间点10分钟前的时间戳,即“1508115000000”。
  • dateformat dateformat支持两种形式的参数: dateformat(format) format表示返回日期的格式,格式定义参考"java.text.SimpleDateFormat.java"中的定义。 例如当前日期为“2017-10-16 09:00:00”,则"yyyy-MM-dd HH:mm:ss"表示“2017-10-16 09:00:00”。 dateformat(format, dateOffset, dateType) format表示返回日期的格式。 dateOffset表示日期的偏移量。 dateType表示日期的偏移量的类型。 目前dateType支持以下几种类型:SECOND(秒),MINUTE(分钟),HOUR(小时),DAY(天),MONTH(月),YEAR(年)。 其中MONTH(月),YEAR(年)的偏移量类型存在特殊场景: 对于年、月来说,若进行偏移后实际没有该日期,则按照日历取该月最大的日期。 不支持在源端和目的端的“时间过滤”参数中的起始时间、终止时间使用年、月的偏移。 例如当前日期为"2023-03-01 09:00:00",则: "dateformat(yyyy-MM-dd HH:mm:ss, -1, YEAR)"表示当前时间的前一年,也就是"2022-03-01 09:00:00"。 "dateformat(yyyy-MM-dd HH:mm:ss, -3, MONTH)"表示当前时间的前三月,也就是"2022-12-01 09:00:00"。 "dateformat(yyyy-MM-dd HH:mm:ss, -1, DAY)"表示当前时间的前一天,也就是"2023-02-28 09:00:00"。 "dateformat(yyyy-MM-dd HH:mm:ss, -1, HOUR)"表示当前时间的前一小时,也就是"2023-03-01 08:00:00"。 "dateformat(yyyy-MM-dd HH:mm:ss, -1, MINUTE)"表示当前时间的前一分钟,也就是"2023-03-01 08:59:00"。 "dateformat(yyyy-MM-dd HH:mm:ss, -1, SECOND)"表示当前时间的前一秒,也就是"2023-03-01 08:59:59"。
  • 文件格式的公共参数 启动作业标识文件 这个主要用于自动化场景中,CDM配置了定时任务,周期去读取源端文件,但此时源端的文件正在生成中,CDM此时读取会造成重复写入或者是读取失败。所以,可以在源端作业参数中指定启动作业标识文件为“ok.txt”,在源端生成文件成功后,再在文件目录下生成“ok.txt”,这样CDM就能读取到完整的文件。 另外,可以设置超时时间,在超时时间内,CDM会周期去查询标识文件是否存在,超时后标识文件还不存在的话,则作业任务失败。 启动作业标识文件本身不会被迁移。 作业成功标识文件 文件系统为目的端的时候,当任务成功时,在目的端的目录下,生成一个空的文件,标识文件名由用户来指定。一般和“启动作业标识文件”搭配使用。 这里需要注意的是,不要和传输的文件混淆,例如传输文件为“finish.txt”,但如果作业成功标识文件也设置为“finish.txt”,这样会造成这两个文件相互覆盖。 过滤器 使用CDM迁移文件的时候,可以使用过滤器来过滤文件。支持通过通配符或时间过滤器来过滤文件。 选择通配符时,CDM只迁移满足过滤条件的目录或文件。 选择时间过滤器时,只有文件的修改时间晚于输入的时间才会被传输。 例如用户的“/table/”目录下存储了很多数据表的目录,并且按天进行了划分DRIVING_BEHAVIOR_20180101~DRIVING_BEHAVIOR_20180630,保存了DRIVING_BEHAVIOR从1月到6月的所有数据。如果只想迁移DRIVING_BEHAVIOR的3月份的表数据,那么需要在作业第一步指定源目录为“/table”,过滤类型选择“通配符”,然后指定“路径过滤器”为“DRIVING_BEHAVIOR_201803*”。
  • 文件格式问题解决方法 数据库的数据导出到 CS V文件,由于数据中含有分隔符逗号,造成导出的CSV文件中数据混乱。 CDM提供了以下几种解决方法: 指定字段分隔符 使用数据库中不存在的字符,或者是极少见的不可打印字符来作为字段分隔符。例如可以在目的端指定“字段分隔符”为“%01”,这样导出的字段分隔符就是“\u0001”,详情可见表1。 使用包围符 在目的端作业参数中开启“使用包围符”,这样数据库中如果字段包含了字段分隔符,在导出到CSV文件的时候,CDM会使用包围符将该字段括起来,使之作为一个字段的值写入CSV文件。 数据库的数据包含换行符 场景:使用CDM先将MySQL中的某张表(表的某个字段值中包含了换行符\n)导出到CSV格式的文件中,然后再使用CDM将导出的CSV文件导入到MRS HBase,发现导出的CSV文件中出现了数据被截断的情况。 解决方法:指定换行符。 在使用CDM将MySQL的表数据导出到CSV文件时,指定目的端的换行符为“%01”(确保这个值不会出现在字段值中),这样导出的CSV文件中换行符就是“%01”。然后再使用CDM将CSV文件导入到MRS HBase时,指定源端的换行符为“%01”,这样就避免了数据被截断的问题。
  • 二进制格式 如果想要在文件系统间按原样复制文件,则可以选择二进制格式。二进制格式传输文件到文件的速率高、性能稳定,且不需要在作业第二步进行字段匹配。 文件传输的目录结构 CDM的文件传输,支持单文件,也支持一次传输目录下所有的文件。传输到目的端后,目录结构会保持原样。 增量迁移文件 使用CDM进行二进制传输文件时,目的端有一个参数“重复文件处理方式”,可以用作文件的增量迁移,具体请参见文件增量迁移。 增量迁移文件的时候,选择“重复文件处理方式”为“跳过重复文件”,这样如果源端有新增的文件,或者是迁移过程中出现了失败,只需要再次运行任务,已经迁移过的文件就不会再次迁移。 写入到临时文件 二进制迁移文件时候,可以在目的端指定是否写入到临时文件。如果指定了该参数,在文件复制过程中,会将文件先写入到一个临时文件中,迁移成功后,再进行rename或move操作,在目的端恢复文件。 生成文件MD5值 对每个传输的文件都生成一个MD5值,并将该值记录在一个新文件中,新文件以“.md5”作为后缀,并且可以指定MD5值生成的目录。
  • JSON格式 这里主要介绍JSON文件格式的以下内容: CDM支持解析的JSON类型 记录节点 从JSON文件复制数据 CDM支持解析的JSON类型:JSON对象、JSON数组。 JSON对象:JSON文件包含单个对象,或者以行分隔/串连的多个对象。 单一对象JSON { "took" : 190, "timed_out" : false, "total" : 1000001, "max_score" : 1.0 } 行分隔的JSON对象 {"took" : 188, "timed_out" : false, "total" : 1000003, "max_score" : 1.0 } {"took" : 189, "timed_out" : false, "total" : 1000004, "max_score" : 1.0 } 串连的JSON对象 { "took": 190, "timed_out": false, "total": 1000001, "max_score": 1.0 } { "took": 191, "timed_out": false, "total": 1000002, "max_score": 1.0 } JSON数组:JSON文件是包含多个JSON对象的数组。 [{ "took" : 190, "timed_out" : false, "total" : 1000001, "max_score" : 1.0 }, { "took" : 191, "timed_out" : false, "total" : 1000001, "max_score" : 1.0 }] 记录节点 记录数据的根节点。该节点对应的数据为JSON数组,CDM会以同一模式从该数组中提取数据。多层嵌套的JSON节点以字符“.”分割。 从JSON文件复制数据 示例一 从行分隔/串连的多个对象中提取数据。JSON文件包含了多个JSON对象,例如: { "took": 190, "timed_out": false, "total": 1000001, "max_score": 1.0 } { "took": 191, "timed_out": false, "total": 1000002, "max_score": 1.0 } { "took": 192, "timed_out": false, "total": 1000003, "max_score": 1.0 } 如果您想要从该JSON对象中提取数据,使用以下格式写入到数据库,只需要在作业第一步指定文件格式为“JSON格式”,指定JSON类型为“JSON对象”,然后在作业第二步进行字段匹配即可。 表2 示例 took timedOut total maxScore 190 false 1000001 1.0 191 false 1000002 1.0 192 false 1000003 1.0 示例二 从记录节点中提取数据。JSON文件包含了单个的JSON对象,但是其中有效的数据在一个数据节点下,例如: { "took": 190, "timed_out": false, "hits": { "total": 1000001, "max_score": 1.0, "hits": [{ "_id": "650612", "_source": { "name": "tom", "books": ["book1","book2","book3"] } }, { "_id": "650616", "_source": { "name": "tom", "books": ["book1","book2","book3"] } }, { "_id": "650618", "_source": { "name": "tom", "books": ["book1","book2","book3"] } }] } } 如果想以如下格式写入到数据库,则需要在作业第一步指定文件格式为“JSON格式”,指定JSON类型为“JSON对象”,并且指定记录节点为“hits.hits”,然后在作业第二步进行字段匹配。 表3 示例 ID SourceName SourceBooks 650612 tom ["book1","book2","book3"] 650616 tom ["book1","book2","book3"] 650618 tom ["book1","book2","book3"] 示例三 从JSON数组中提取数据。JSON文件是包含了多个JSON对象的JSON数组,例如: [{ "took" : 190, "timed_out" : false, "total" : 1000001, "max_score" : 1.0 }, { "took" : 191, "timed_out" : false, "total" : 1000002, "max_score" : 1.0 }] 如果想以如下格式写入到数据库,需要在作业第一步指定文件格式为“JSON格式”,指定JSON类型为“JSON数组”,然后在作业第二步进行字段匹配。 表4 示例 took timedOut total maxScore 190 false 1000001 1.0 191 false 1000002 1.0 示例四 在解析JSON文件的时候搭配转换器。在示例二前提下,想要把hits.max_score字段附加到所有记录中,即以如下格式写入到数据库中: 表5 示例 ID SourceName SourceBooks MaxScore 650612 tom ["book1","book2","book3"] 1.0 650616 tom ["book1","book2","book3"] 1.0 650618 tom ["book1","book2","book3"] 1.0 则需要在作业第一步指定文件格式为“JSON格式”,指定JSON类型为“JSON对象”,并且指定记录节点为“hits.hits”,然后在作业第二步添加转换器,操作步骤如下: 单击添加字段,新增一个字段。 图2 添加字段 在添加的新字段后面,单击添加字段转换器。 图3 添加字段转换器 创建“表达式转换”的转换器,表达式输入“1.0”,然后保存。 图4 配置字段转换器
  • 事务模式迁移 CDM的事务模式迁移,是指当CDM作业执行失败时,将数据回滚到作业开始之前的状态,自动清理目的表中的数据。 参数位置:创建表/文件迁移的作业时,如果目的端为关系型数据库,在目的端作业配置的高级属性中,可以通过“先导入阶段表”参数选择是否启用事务模式。 参数原理:如果启用,在作业执行时CDM会自动创建临时表,先将数据导入到该临时表,导入成功后再通过数据库的事务模式将数据迁移到目标表中;导入失败则将目的表回滚到作业开始之前的状态。 图1 事务模式迁移 如果“导入开始前”选择“清除部分数据”或“清除全部数据”,CDM的事务模式不会回滚已经删除的数据。 父主题: 进阶实践