华为云用户手册

  • 约束限制 如果在字段映射界面, CDM 通过获取样值的方式无法获得所有列(例如从HBase/CloudTable/MongoDB导出数据时,CDM有较大概率无法获得所有列),则可以单击后选择“添加新字段”来手动增加,确保导入到目的端的数据完整。 关系数据库、Hive、 MRS Hudi及 DLI 做源端时,不支持获取样值功能。 SQLServer作为目的端数据源时,不支持timestamp类型字段的写入,需修改为其他时间类型字段写入(如datetime)。 当作业源端为OBS、迁移 CS V文件时,并且配置“解析首行为列名”参数的场景下显示列名。 当使用二进制格式进行文件到文件的迁移时,没有字段映射这一步。 自动创表场景下,需在目的端表中提前手动新增字段,再在字段映射里新增字段。 添加完字段后,新增的字段在界面不显示样值,不会影响字段值的传输,CDM会将字段值直接写入目的端。 如果字段映射关系不正确,您可以通过拖拽字段、单击对字段批量映射两种方式来调整字段映射关系。 如果是导入到 数据仓库 服务(DWS),则还需在目的字段中选择分布列,建议按如下顺序选取分布列: 有主键可以使用主键作为分布列。 多个数据段联合做主键的场景,建议设置所有主键作为分布列。 在没有主键的场景下,如果没有选择分布列,DWS会默认第一列作为分布列,可能会有数据倾斜风险。 如CDM不支持源端迁移字段类型,请参见不支持数据类型转换规避指导将字段类型转换为CDM支持的类型。
  • 创建表/文件迁移作业 在创建表/文件迁移作业时,选择已创建的源端连接器、目的端连接器。 图1 配置作业 单击“下一步”,进入“字段映射”配置页面后,单击源字段图标。 图2 配置字段映射 选择“自定义字段”页签,填写字段名称及字段值后单击“确认”按钮,例如: 名称:InputTime。 值:${timestamp()},更多时间宏变量请参见表1。 图3 添加字段 表1 时间变量宏定义具体展示 宏变量 含义 实际显示效果 ${dateformat(yyyy-MM-dd)} 以yyyy-MM-dd格式返回当前时间。 2017-10-16 ${dateformat(yyyy/MM/dd)} 以yyyy/MM/dd格式返回当前时间。 2017/10/16 ${dateformat(yyyy_MM_dd HH:mm:ss)} 以yyyy_MM_dd HH:mm:ss格式返回当前时间。 2017_10_16 09:00:00 ${dateformat(yyyy-MM-dd HH:mm:ss, -1, DAY)} 以yyyy-MM-dd HH:mm:ss格式返回时间,时间为当前时间的前一天。 2017-10-15 09:00:00 ${dateformat(yyyy-MM-dd, -1, DAY)} 00:00:00 以yyyy-MM-dd HH:mm:ss格式返回时间,时间为当前时间的前一天0点。 2017-10-15 00:00:00 ${dateformat(yyyy-MM-dd, -1, DAY)} 12:00:00 以yyyy-MM-dd HH:mm:ss格式返回时间,时间为当前时间的前一天12点。 2017-10-15 12:00:00 ${dateformat(yyyy-MM-dd, -N, DAY)} 00:00:00 以yyyy-MM-dd HH:mm:ss格式返回时间,时间为当前时间的前N天的0点。 N为3时: 2017-10-13 00:00:00 ${dateformat(yyyy-MM-dd, -N, DAY)} 12:00:00 以yyyy-MM-dd HH:mm:ss格式返回时间,时间为当前时间的前N天的12点。 N为3时: 2017-10-13 12:00:00 ${timestamp()} 返回当前时间的时间戳,即1970年1月1日(00:00:00 GMT)到当前时间的毫秒数。 1508115600000 ${timestamp(-10, MINUTE)} 返回当前时间点10分钟前的时间戳。 1508115000000 ${timestamp(dateformat(yyyyMMdd))} 返回今天0点的时间戳。 1508083200000 ${timestamp(dateformat(yyyyMMdd,-1,DAY))} 返回昨天0点的时间戳。 1507996800000 ${timestamp(dateformat(yyyyMMddHH))} 返回当前整小时的时间戳。 1508115600000 添加完字段后,新增的字段在界面不显示样值,不会影响字段值的传输,CDM会将字段值直接写入目的端。 这里“添加字段”中“自定义字段”的功能,要求源端连接器为JDBC连接器、HBase连接器、MongoDB连接器、ElasticSearch连接器、Kafka连接器,或者目的端为HBase连接器。 添加完字段后,请确保自定义入库时间字段与目的端表字段类型相匹配。 单击“下一步”配置任务参数,一般情况下全部保持默认即可。 单击“保存并运行”,回到作业管理的表/文件迁移界面,在作业管理界面可查看作业执行进度和结果。 作业执行成功后,单击作业操作列的“历史记录”,可查看该作业的历史执行记录、读取和写入的统计数据。 在历史记录界面单击“日志”,可查看作业的日志信息。 前往目的端数据源查看数据迁移的入库时间。
  • 文件格式问题解决方法 数据库的数据导出到CSV文件,由于数据中含有分隔符逗号,造成导出的CSV文件中数据混乱。 CDM提供了以下几种解决方法: 指定字段分隔符 使用数据库中不存在的字符,或者是极少见的不可打印字符来作为字段分隔符。例如可以在目的端指定“字段分隔符”为“%01”,这样导出的字段分隔符就是“\u0001”,详情可见表1。 使用包围符 在目的端作业参数中开启“使用包围符”,这样数据库中如果字段包含了字段分隔符,在导出到CSV文件的时候,CDM会使用包围符将该字段括起来,使之作为一个字段的值写入CSV文件。 数据库的数据包含换行符 场景:使用CDM先将MySQL中的某张表(表的某个字段值中包含了换行符\n)导出到CSV格式的文件中,然后再使用CDM将导出的CSV文件导入到MRS HBase,发现导出的CSV文件中出现了数据被截断的情况。 解决方法:指定换行符。 在使用CDM将MySQL的表数据导出到CSV文件时,指定目的端的换行符为“%01”(确保这个值不会出现在字段值中),这样导出的CSV文件中换行符就是“%01”。然后再使用CDM将CSV文件导入到MRS HBase时,指定源端的换行符为“%01”,这样就避免了数据被截断的问题。
  • 二进制格式 如果想要在文件系统间按原样复制文件,则可以选择二进制格式。二进制格式传输文件到文件的速率高、性能稳定,且不需要在作业第二步进行字段匹配。 文件传输的目录结构 CDM的文件传输,支持单文件,也支持一次传输目录下所有的文件。传输到目的端后,目录结构会保持原样。 增量迁移文件 使用CDM进行二进制传输文件时,目的端有一个参数“重复文件处理方式”,可以用作文件的增量迁移,具体请参见文件增量迁移。 增量迁移文件的时候,选择“重复文件处理方式”为“跳过重复文件”,这样如果源端有新增的文件,或者是迁移过程中出现了失败,只需要再次运行任务,已经迁移过的文件就不会再次迁移。 写入到临时文件 二进制迁移文件时候,可以在目的端指定是否写入到临时文件。如果指定了该参数,在文件复制过程中,会将文件先写入到一个临时文件中,迁移成功后,再进行rename或move操作,在目的端恢复文件。 生成文件MD5值 对每个传输的文件都生成一个MD5值,并将该值记录在一个新文件中,新文件以“.md5”作为后缀,并且可以指定MD5值生成的目录。
  • 文件格式的公共参数 启动作业标识文件 这个主要用于自动化场景中,CDM配置了定时任务,周期去读取源端文件,但此时源端的文件正在生成中,CDM此时读取会造成重复写入或者是读取失败。所以,可以在源端作业参数中指定启动作业标识文件为“ok.txt”,在源端生成文件成功后,再在文件目录下生成“ok.txt”,这样CDM就能读取到完整的文件。 另外,可以设置超时时间,在超时时间内,CDM会周期去查询标识文件是否存在,超时后标识文件还不存在的话,则作业任务失败。 启动作业标识文件本身不会被迁移。 作业成功标识文件 文件系统为目的端的时候,当任务成功时,在目的端的目录下,生成一个空的文件,标识文件名由用户来指定。一般和“启动作业标识文件”搭配使用。 这里需要注意的是,不要和传输的文件混淆,例如传输文件为“finish.txt”,但如果作业成功标识文件也设置为“finish.txt”,这样会造成这两个文件相互覆盖。 过滤器 使用CDM迁移文件的时候,可以使用过滤器来过滤文件。支持通过通配符或时间过滤器来过滤文件。 选择通配符时,CDM只迁移满足过滤条件的目录或文件。 选择时间过滤器时,只有文件的修改时间晚于输入的时间才会被传输。 例如用户的“/table/”目录下存储了很多数据表的目录,并且按天进行了划分DRIVING_BEHAVIOR_20180101~DRIVING_BEHAVIOR_20180630,保存了DRIVING_BEHAVIOR从1月到6月的所有数据。如果只想迁移DRIVING_BEHAVIOR的3月份的表数据,那么需要在作业第一步指定源目录为“/table”,过滤类型选择“通配符”,然后指定“路径过滤器”为“DRIVING_BEHAVIOR_201803*”。
  • JSON格式 这里主要介绍JSON文件格式的以下内容: CDM支持解析的JSON类型 记录节点 从JSON文件复制数据 CDM支持解析的JSON类型:JSON对象、JSON数组。 JSON对象:JSON文件包含单个对象,或者以行分隔/串连的多个对象。 单一对象JSON { "took" : 190, "timed_out" : false, "total" : 1000001, "max_score" : 1.0 } 行分隔的JSON对象 {"took" : 188, "timed_out" : false, "total" : 1000003, "max_score" : 1.0 } {"took" : 189, "timed_out" : false, "total" : 1000004, "max_score" : 1.0 } 串连的JSON对象 { "took": 190, "timed_out": false, "total": 1000001, "max_score": 1.0 } { "took": 191, "timed_out": false, "total": 1000002, "max_score": 1.0 } JSON数组:JSON文件是包含多个JSON对象的数组。 [{ "took" : 190, "timed_out" : false, "total" : 1000001, "max_score" : 1.0 }, { "took" : 191, "timed_out" : false, "total" : 1000001, "max_score" : 1.0 }] 记录节点 记录数据的根节点。该节点对应的数据为JSON数组,CDM会以同一模式从该数组中提取数据。多层嵌套的JSON节点以字符“.”分割。 从JSON文件复制数据 示例一 从行分隔/串连的多个对象中提取数据。JSON文件包含了多个JSON对象,例如: { "took": 190, "timed_out": false, "total": 1000001, "max_score": 1.0 } { "took": 191, "timed_out": false, "total": 1000002, "max_score": 1.0 } { "took": 192, "timed_out": false, "total": 1000003, "max_score": 1.0 } 如果您想要从该JSON对象中提取数据,使用以下格式写入到数据库,只需要在作业第一步指定文件格式为“JSON格式”,指定JSON类型为“JSON对象”,然后在作业第二步进行字段匹配即可。 表2 示例 took timedOut total maxScore 190 false 1000001 1.0 191 false 1000002 1.0 192 false 1000003 1.0 示例二 从记录节点中提取数据。JSON文件包含了单个的JSON对象,但是其中有效的数据在一个数据节点下,例如: { "took": 190, "timed_out": false, "hits": { "total": 1000001, "max_score": 1.0, "hits": [{ "_id": "650612", "_source": { "name": "tom", "books": ["book1","book2","book3"] } }, { "_id": "650616", "_source": { "name": "tom", "books": ["book1","book2","book3"] } }, { "_id": "650618", "_source": { "name": "tom", "books": ["book1","book2","book3"] } }] } } 如果想以如下格式写入到数据库,则需要在作业第一步指定文件格式为“JSON格式”,指定JSON类型为“JSON对象”,并且指定记录节点为“hits.hits”,然后在作业第二步进行字段匹配。 表3 示例 ID SourceName SourceBooks 650612 tom ["book1","book2","book3"] 650616 tom ["book1","book2","book3"] 650618 tom ["book1","book2","book3"] 示例三 从JSON数组中提取数据。JSON文件是包含了多个JSON对象的JSON数组,例如: [{ "took" : 190, "timed_out" : false, "total" : 1000001, "max_score" : 1.0 }, { "took" : 191, "timed_out" : false, "total" : 1000002, "max_score" : 1.0 }] 如果想以如下格式写入到数据库,需要在作业第一步指定文件格式为“JSON格式”,指定JSON类型为“JSON数组”,然后在作业第二步进行字段匹配。 表4 示例 took timedOut total maxScore 190 false 1000001 1.0 191 false 1000002 1.0 示例四 在解析JSON文件的时候搭配转换器。在示例二前提下,想要把hits.max_score字段附加到所有记录中,即以如下格式写入到数据库中: 表5 示例 ID SourceName SourceBooks MaxScore 650612 tom ["book1","book2","book3"] 1.0 650616 tom ["book1","book2","book3"] 1.0 650618 tom ["book1","book2","book3"] 1.0 则需要在作业第一步指定文件格式为“JSON格式”,指定JSON类型为“JSON对象”,并且指定记录节点为“hits.hits”,然后在作业第二步添加转换器,操作步骤如下: 单击添加字段,新增一个字段。 图2 添加字段 在添加的新字段后面,单击添加字段转换器。 图3 添加字段转换器 创建“表达式转换”的转换器,表达式输入“1.0”,然后保存。 图4 配置字段转换器
  • Apache server日志 日志样例: [Mon Jan 08 20:43:51.854334 2018] [mpm_event:notice] [pid 36465:tid 140557517657856] AH00489: Apache/2.4.12 (Unix) OpenSSL/1.0.1t configured -- resuming normal operations 正则表达式为: ^\[(.*)\] \[(.*)\] \[(.*)\] (.*).* 解析结果如下: 表5 Apache server日志解析结果 列号 样值 1 Mon Jan 08 20:43:51.854334 2018 2 mpm_event:notice 3 pid 36465:tid 140557517657856 4 AH00489: Apache/2.4.12 (Unix) OpenSSL/1.0.1t configured -- resuming normal operations
  • Log4J日志 日志样例: 2018-01-11 08:50:59,001 INFO [org.apache.sqoop.core.SqoopConfiguration.configureClassLoader(SqoopConfiguration.java:251)] Adding jars to current classloader from property: org.apache.sqoop.classpath.extra 正则表达式为: ^(\d.*\d) (\w*) \[(.*)\] (\w.*).* 解析出的结果如下: 表1 Log4J日志解析结果 列号 样值 1 2018-01-11 08:50:59,001 2 INFO 3 org.apache.sqoop.core.SqoopConfiguration.configureClassLoader(SqoopConfiguration.java:251) 4 Adding jars to current classloader from property: org.apache.sqoop.classpath.extra
  • Django日志 日志样例: [08/Jan/2018 20:59:07 ] settings INFO Welcome to Hue 3.9.0 正则表达式为: ^\[(.*)\] (\w*) (\w*) (.*).* 解析结果如下: 表4 Django日志解析结果 列号 样值 1 08/Jan/2018 20:59:07 2 settings 3 INFO 4 Welcome to Hue 3.9.0
  • Log4J审计日志 日志样例: 2018-01-11 08:51:06,156 INFO [org.apache.sqoop.audit.FileAuditLogger.logAuditEvent(FileAuditLogger.java:61)] user=sqoop.anonymous.user ip=189.xxx.xxx.75 op=show obj=version objId=x 正则表达式为: ^(\d.*\d) (\w*) \[(.*)\] user=(\w.*) ip=(\w.*) op=(\w.*) obj=(\w.*) objId=(.*).* 解析结果如下: 表2 Log4J审计日志解析结果 列号 样值 1 2018-01-11 08:51:06,156 2 INFO 3 org.apache.sqoop.audit.FileAuditLogger.logAuditEvent(FileAuditLogger.java:61) 4 sqoop.anonymous.user 5 189.xxx.xxx.75 6 show 7 version 8 x
  • Tomcat日志 日志样例: 11-Jan-2018 09:00:06.907 INFO [main] org.apache.catalina.startup.VersionLoggerListener.log OS Name: Linux 正则表达式为: ^(\d.*\d) (\w*) \[(.*)\] ([\w\.]*) (\w.*).* 解析结果如下: 表3 Tomcat日志解析结果 列号 样值 1 11-Jan-2018 09:00:06.907 2 INFO 3 main 4 org.apache.catalina.startup.VersionLoggerListener.log 5 OS Name:Linux
  • 指定文件名迁移 从FTP/SFTP/OBS导出文件时,CDM支持指定文件名迁移,用户可以单次迁移多个指定的文件(最多50个),导出的多个文件只能写到目的端的同一个目录。 在创建表/文件迁移作业时,如果源端数据源为FTP/SFTP/OBS,CDM源端的作业参数“源目录或文件”支持输入多个文件名(最多50个),文件名之间默认使用“|”分隔,您也可以自定义文件分隔符,从而实现文件列表迁移。 迁移文件或对象时支持文件级增量迁移(通过配置跳过重复文件实现),但不支持断点续传。 例如要迁移3个文件,第2个文件迁移到一半时由于网络原因失败,再次启动迁移任务时,会跳过第1个文件,从第2个文件开始重新传,但不能从第2个文件失败的位置重新传。 文件迁移时,单个任务支持千万数量的文件,如果待迁移目录下文件过多,建议拆分到不同目录并创建多个任务。 父主题: 数据迁移进阶实践
  • 表达式转换 使用JSP表达式语言(Expression Language)对当前字段或整行数据进行转换。JSP表达式语言可以用来创建算术和逻辑表达式。在表达式内可以使用整型数,浮点数,字符串,常量true、false和null。 数据进行转换过程中,替换内容包含特殊字符时,需要先使用\将该字符转义成普通字符。 表达式支持以下两个环境变量: value:当前字段值。 row:当前行,数组类型。 表达式支持的工具类用法罗列如下,未列出即表示不支持: 如果当前字段为字符串类型,将字符串全部转换为小写,例如将“aBC”转换为“abc”。 表达式:StringUtils.lowerCase(value) 将当前字段的字符串全部转为大写。 表达式:StringUtils.upperCase(value) 如果想将第1个日期字段格式从“2018-01-05 15:15:05”转换为“20180105”。 表达式:DateUtils.format(DateUtils.parseDate(row[0],"yyyy-MM-dd HH:mm:ss"),"yyyyMMdd") 如果想将时间戳转换成“yyyy-MM-dd hh:mm:ss”格式的日期字符串的类型,例如字段值为“1701312046588”,转换后为“2023-11-30 10:40:46”。 表达式:DateUtils.format(NumberUtils.toLong(value),"yyyy-MM-dd HH:mm:ss") 如果想将“yyyy-MM-dd hh:mm:ss”格式的日期字符串转换成时间戳的类型。 表达式:DateUtils.getTime(DateUtils.parseDate(value,"yyyy-MM-dd hh:mm:ss")) 如果当前字段值为“yyyy-MM-dd”格式的日期字符串,需要截取年,例如字段值为“2017-12-01”,转换后为“2017”。 表达式:StringUtils.substringBefore(value,"-") 如果当前字段值为数值类型,转换后值为当前值的两倍。 表达式:value*2 如果当前字段值为“true”,转换后为“Y”,其它值则转换后为“N”。 表达式:value=="true"?"Y":"N" 如果当前字段值为字符串类型,当为空时,转换为“Default”,否则不转换。 表达式:empty value? "Default":value 如果想将日期字段格式从“2018/01/05 15:15:05”转换为“2018-01-05 15:15:05”。 表达式:DateUtils.format(DateUtils.parseDate(value,"yyyy/MM/dd HH:mm:ss"),"yyyy-MM-dd HH:mm:ss") 获取一个36位的UUID(Universally Unique Identifier,通用唯一识别码)。 表达式:CommonUtils.randomUUID() 如果当前字段值为字符串类型,将首字母转换为大写,例如将“cat”转换为“Cat”。 表达式:StringUtils.capitalize(value) 如果当前字段值为字符串类型,将首字母转换为小写,例如将“Cat”转换为“cat”。 表达式:StringUtils.uncapitalize(value) 如果当前字段值为字符串类型,使用空格填充为指定长度,并且将字符串居中,当字符串长度不小于指定长度时不转换,例如将“ab”转换为长度为4的“ab”。 表达式:StringUtils.center(value,4) 删除字符串末尾的一个换行符(包括“\n”、“\r”或者“\r\n”),例如将“abc\r\n\r\n”转换为“abc\r\n”。 表达式:StringUtils.chomp(value) 如果字符串中包含指定的字符串,则返回布尔值true,否则返回false。例如“abc”中包含“a”,则返回true。 表达式:StringUtils.contains(value,"a") 如果字符串中包含指定字符串的任一字符,则返回布尔值true,否则返回false。例如“zzabyycdxx”中包含“z”或“a”任意一个,则返回true。 表达式:StringUtils.containsAny(value,"za") 如果字符串中不包含指定的所有字符,则返回布尔值true,包含任意一个字符则返回false。例如“abz”中包含“xyz”里的任意一个字符,则返回false。 表达式:StringUtils.containsNone(value,"xyz") 如果当前字符串只包含指定字符串中的字符,则返回布尔值true,包含任意一个其它字符则返回false。例如“abab”只包含“abc”中的字符,则返回true。 表达式:StringUtils.containsOnly(value,"abc") 如果字符串为空或null,则转换为指定的字符串,否则不转换。例如将空字符转换为null。 表达式:StringUtils.defaultIfEmpty(value,null) 如果字符串以指定的后缀结尾(包括大小写),则返回布尔值true,否则返回false。例如“abcdef”后缀不为null,则返回false。 表达式:StringUtils.endsWith(value,null) 如果字符串和指定的字符串完全一样(包括大小写),则返回布尔值true,否则返回false。例如比较字符串“abc”和“ABC”,则返回false。 表达式:StringUtils.equals(value,"ABC") 从字符串中获取指定字符串的第一个索引,没有则返回整数-1。例如从“aabaabaa”中获取“ab”的第一个索引1。 表达式:StringUtils.indexOf(value,"ab") 从字符串中获取指定字符串的最后一个索引,没有则返回整数-1。例如从“aFkyk”中获取“k”的最后一个索引4。 表达式:StringUtils.lastIndexOf(value,"k") 从字符串中指定的位置往后查找,获取指定字符串的第一个索引,没有则转换为“-1”。例如“aabaabaa”中索引3的后面,第一个“b”的索引是5。 表达式:StringUtils.indexOf(value,"b",3) 从字符串获取指定字符串中任一字符的第一个索引,没有则返回整数-1。例如从“zzabyycdxx”中获取“z”或“a”的第一个索引0。 表达式:StringUtils.indexOfAny(value,"za") 如果字符串仅包含Unicode字符,返回布尔值true,否则返回false。例如“ab2c”中包含非Unicode字符,返回false。 表达式:StringUtils.isAlpha(value) 如果字符串仅包含Unicode字符或数字,返回布尔值true,否则返回false。例如“ab2c”中仅包含Unicode字符和数字,返回true。 表达式:StringUtils.isAlphanumeric(value) 如果字符串仅包含Unicode字符、数字或空格,返回布尔值true,否则返回false。例如“ab2c”中仅包含Unicode字符和数字,返回true。 表达式:StringUtils.isAlphanumericSpace(value) 如果字符串仅包含Unicode字符或空格,返回布尔值true,否则返回false。例如“ab2c”中包含Unicode字符和数字,返回false。 表达式:StringUtils.isAlphaSpace(value) 如果字符串仅包含ASCII可打印字符,返回布尔值true,否则返回false。例如“!ab-c~”返回true。 表达式:StringUtils.isAsciiPrintable(value) 如果字符串为空或null,返回布尔值true,否则返回false。 表达式:StringUtils.isEmpty(value) 如果字符串中仅包含Unicode数字,返回布尔值true,否则返回false。 表达式:StringUtils.isNumeric(value) 获取字符串最左端的指定长度的字符,例如获取“abc”最左端的2位字符“ab”。 表达式:StringUtils.left(value,2) 获取字符串最右端的指定长度的字符,例如获取“abc”最右端的2位字符“bc”。 表达式:StringUtils.right(value,2) 将指定字符串拼接至当前字符串的左侧,需同时指定拼接后的字符串长度,如果当前字符串长度不小于指定长度,则不转换。例如将“yz”拼接到“bat”左侧,拼接后长度为8,则转换后为“yzyzybat”。 表达式:StringUtils.leftPad(value,8,"yz") 将指定字符串拼接至当前字符串的右侧,需同时指定拼接后的字符串长度,如果当前字符串长度不小于指定长度,则不转换。例如将“yz”拼接到“bat”右侧,拼接后长度为8,则转换后为“batyzyzy”。 表达式:StringUtils.rightPad(value,8,"yz") 如果当前字段为字符串类型,获取当前字符串的长度,如果该字符串为null,则返回0。 表达式:StringUtils.length(value) 如果当前字段为字符串类型,删除其中所有的指定字符串,例如从“queued”中删除“ue”,转换后为“qd”。 表达式:StringUtils.remove(value,"ue") 如果当前字段为字符串类型,移除当前字段末尾指定的子字符串。指定的子字符串若不在当前字段的末尾,则不转换,例如移除当前字段“www.domain.com”后的“.com”。 表达式:StringUtils.removeEnd(value,".com") 如果当前字段为字符串类型,移除当前字段开头指定的子字符串。指定的子字符串若不在当前字段的开头,则不转换,例如移除当前字段“www.domain.com”前的“www.”。 表达式:StringUtils.removeStart(value,"www.") 如果当前字段为字符串类型,替换当前字段中所有的指定字符串,例如将“aba”中的“a”用“z”替换,转换后为“zbz”。 表达式:StringUtils.replace(value,"a","z") 替换内容包含特殊字符时,需要先把该字符转义成普通字符,例如,客户想通过该表达式把字符串中 \t 去掉时,需要配置为: StringUtils.replace(value,"\\t","")(即把 \ 再次转义)。 如果当前字段为字符串类型,一次替换字符串中的多个字符,例如将字符串“hello”中的“h”用“j”替换,“o”用“y”替换,转换后为“jelly”。 表达式:StringUtils.replaceChars(value,"ho","jy") 如果字符串以指定的前缀开头(区分大小写),则返回布尔值true,否则返回false,例如当前字符串“abcdef”以“abc”开头,则返回true。 表达式:StringUtils.startsWith(value,"abc") 如果当前字段为字符串类型,去除字段中首、尾处所有指定的字符,例如去除“abcyx”中首尾所有的“x”、“y”、“z”和“b”,转换后为“abc”。 表达式:StringUtils.strip(value,"xyzb") 如果当前字段为字符串类型,去除字段末尾所有指定的字符,例如去除当前字段末尾的“abc”字符串。 表达式:StringUtils.stripEnd(value,"abc") 如果当前字段为字符串类型,去除字段开头所有指定的字符,例如去除当前字段开头的所有空格。 表达式:StringUtils.stripStart(value,null) 如果当前字段为字符串类型,获取字符串指定位置后(索引从0开始,包括指定位置的字符)的子字符串,指定位置如果为负数,则从末尾往前计算位置,末尾第一位为-1。例如获取“abcde”索引为2的字符(即c)及之后的字符串,则转换后为“cde”。 表达式:StringUtils.substring(value,2) 如果当前字段为字符串类型,获取字符串指定区间(索引从0开始,区间起点包括指定位置的字符,区间终点不包含指定位置的字符)的子字符串,区间位置如果为负数,则从末尾往前计算位置,末尾第一位为-1。例如获取“abcde”第2个字符(即c)及之后、第4个字符(即e)之前的字符串,则转换后为“cd”。 表达式:StringUtils.substring(value,2,4) 如果当前字段为字符串类型,获取当前字段里第一个指定字符后的子字符串。例如获取“abcba”中第一个“b”之后的子字符串,转换后为“cba”。 表达式:StringUtils.substringAfter(value,"b") 如果当前字段为字符串类型,获取当前字段里最后一个指定字符后的子字符串。例如获取“abcba”中最后一个“b”之后的子字符串,转换后为“a”。 表达式:StringUtils.substringAfterLast(value,"b") 如果当前字段为字符串类型,获取当前字段里第一个指定字符前的子字符串。例如获取“abcba”中第一个“b”之前的子字符串,转换后为“a”。 表达式:StringUtils.substringBefore(value,"b") 如果当前字段为字符串类型,获取当前字段里最后一个指定字符前的子字符串。例如获取“abcba”中最后一个“b”之前的子字符串,转换后为“abc”。 表达式:StringUtils.substringBeforeLast(value,"b") 如果当前字段为字符串类型,获取嵌套在指定字符串之间的子字符串,没有匹配的则返回null。例如获取“tagabctag”中“tag”之间的子字符串,转换后为“abc”。 表达式:StringUtils.substringBetween(value,"tag") 如果当前字段为字符串类型,删除当前字符串两端的控制字符(char≤32),例如删除字符串前后的空格。 表达式:StringUtils.trim(value) 将当前字符串转换为字节,如果转换失败,则返回0。 表达式:NumberUtils.toByte(value) 将当前字符串转换为字节,如果转换失败,则返回指定值,例如指定值配置为1。 表达式:NumberUtils.toByte(value,1) 将当前字符串转换为Double数值,如果转换失败,则返回0.0d。 表达式:NumberUtils.toDouble(value) 将当前字符串转换为Double数值,如果转换失败,则返回指定值,例如指定值配置为1.1d。 表达式:NumberUtils.toDouble(value,1.1d) 将当前字符串转换为Float数值,如果转换失败,则返回0.0f。 表达式:NumberUtils.toFloat(value) 将当前字符串转换为Float数值,如果转换失败,则返回指定值,例如配置指定值为1.1f。 表达式:NumberUtils.toFloat(value,1.1f) 将当前字符串转换为Int数值,如果转换失败,则返回0。 表达式:NumberUtils.toInt(value) 将当前字符串转换为Int数值,如果转换失败,则返回指定值,例如配置指定值为1。 表达式:NumberUtils.toInt(value,1) 将字符串转换为Long数值,如果转换失败,则返回0。 表达式:NumberUtils.toLong(value) 将当前字符串转换为Long数值,如果转换失败,则返回指定值,例如配置指定值为1L。 表达式:NumberUtils.toLong(value,1L) 将字符串转换为Short数值,如果转换失败,则返回0。 表达式:NumberUtils.toShort(value) 将当前字符串转换为Short数值,如果转换失败,则返回指定值,例如配置指定值为1。 表达式:NumberUtils.toShort(value,1) 将当前IP字符串转换为Long数值,例如将“10.78.124.0”转换为Long数值是“172915712”。 表达式:CommonUtils.ipToLong(value) 从网络读取一个IP与物理地址映射文件,并存放到Map集合,这里的URL是IP与地址映射文件存放地址,例如“http://10.114.205.45:21203/sqoop/IpList.csv”。 表达式:HttpsUtils.downloadMap("url") 将IP与地址映射对象缓存起来并指定一个key值用于检索,例如“ipList”。 表达式:CommonUtils.setCache("ipList",HttpsUtils.downloadMap("url")) 取出缓存的IP与地址映射对象。 表达式:CommonUtils.getCache("ipList") 判断是否有IP与地址映射缓存。 表达式:CommonUtils.cacheExists("ipList") 根据IP取出对应的详细地址:国家_省份_城市_运营商,例如“1xx.78.124.0”对应的地址为“中国_广东_深圳_电信”,取不到对应地址则默认“**_**_**_**”。如果需要,可通过StringUtil类表达式对地址进行进一步拆分。 表达式:CommonUtils.getMapValue(CommonUtils.ipToLong(value),CommonUtils.cacheExists("ipLis")?CommonUtils.getCache("ipLis"):CommonUtils.setCache("ipLis",HttpsUtils.downloadMap("url"))) 根据指定的偏移类型(month/day/hour/minute/second)及偏移量(正数表示增加,负数表示减少),将指定格式的时间转换为一个新时间,例如将“2019-05-21 12:00:00”增加8个小时。 表达式:DateUtils.getCurrentTimeByZone("yyyy-MM-dd HH:mm:ss",value, "hour", 8) 如果value值为空或者null时,则返回字符串“aaa”,否则返回value。 表达式:StringUtils.defaultIfEmpty(value,"aaa")
  • 约束限制 作业源端开启“使用SQL语句”参数时不支持配置转换器。 如果在字段映射界面,CDM通过获取样值的方式无法获得所有列(例如从HBase/CloudTable/MongoDB导出数据时,CDM有较大概率无法获得所有列),则可以单击后选择“添加新字段”来手动增加,确保导入到目的端的数据完整。 关系数据库、Hive、MRS Hudi及DLI做源端时,不支持获取样值功能。 SQLServer作为目的端数据源时,不支持timestamp类型字段的写入,需修改为其他时间类型字段写入(如datetime)。 当作业源端为OBS、迁移CSV文件时,并且配置“解析首行为列名”参数的场景下显示列名。 当使用二进制格式进行文件到文件的迁移时,没有配置字段转换器这一步。 自动创表场景下,需在目的端表中提前手动新增字段,再在字段映射里新增字段。 添加完字段后,新增的字段在界面不显示样值,不会影响字段值的传输,CDM会将字段值直接写入目的端。 如果字段映射关系不正确,您可以通过拖拽字段、单击对字段批量映射两种方式来调整字段映射关系。 创建表达式转换器时,表达式的功能是对该字段的数据进行处理,故不建议使用时间宏,如需使用,请根据以下场景处理(源端是文件类的配置时仅支持方式一): 方式一:新建表达式转换器时,表达式需要用''包围。 ${dateformat(yyyy-MM-dd)}不加引号使用时,解析成2017-10-16之后还会进行运算,将'-'识别为减号,导致结果为1991,须使用'${dateformat(yyyy-MM-dd)}',即'2017-10-16'。 图2 使用''包围表达式 方式二:源字段中新增自定义字段,在样值中填写时间宏变量,重新进行字段映射处理。 图3 源字段新增自定义字段 如果是导入到数据仓库服务(DWS),则还需在目的字段中选择分布列,建议按如下顺序选取分布列: 有主键可以使用主键作为分布列。 多个数据段联合做主键的场景,建议设置所有主键作为分布列。 在没有主键的场景下,如果没有选择分布列,DWS会默认第一列作为分布列,可能会有数据倾斜风险。
  • MD5校验文件一致性 CDM数据迁移以抽取-写入模式进行,CDM首先从源端抽取数据,然后将数据写入到目的端。在迁移文件到OBS时,迁移模式如图1所示。 图1 迁移文件到OBS 在这个过程中,CDM支持使用MD5检验文件一致性。 抽取时 该功能支持源端为OBS、HDFS、FTP、SFTP、HTTP。可校验CDM抽取的文件,是否与源文件一致。 该功能由源端作业参数“MD5文件名后缀”控制(“文件格式”为“二进制格式”时生效),配置为源端文件系统中的MD5文件名后缀。 当源端数据文件同一目录下有对应后缀的保存md5值的文件,例如build.sh和build.sh.md5在同一目录下。若配置了“MD5文件名后缀”,则只迁移有MD5值的文件至目的端,没有MD5值或者MD5不匹配的数据文件将迁移失败,MD5文件自身不被迁移。 若未配置“MD5文件名后缀”,则迁移所有文件。 写入时 该功能目前只支持目的端为OBS。可校验写入OBS的文件,是否与CDM抽取的文件一致。 该功能由目的端作业参数“校验MD5值”控制,读取文件后写入OBS时,通过HTTP Header将MD5值提供给OBS做写入校验,并将校验结果写入OBS桶(该桶可以不是存储迁移文件的桶)。如果源端没有MD5文件则不校验。 迁移文件到文件系统时,目前只支持校验CDM抽取的文件是否与源文件一致(即只校验抽取的数据)。 迁移文件到OBS时,支持抽取和写入文件时都校验。 如果选择使用MD5校验,则无法使用KMS加密。 父主题: 数据迁移进阶实践
  • AES-256-GCM加密 目前只支持AES-256-GCM(NoPadding)。该加密算法在目的端为加密,在源端为解密,支持的源端与目的端数据源如下。 源端支持的数据源:HDFS(使用二进制格式传输时支持)。 目的端支持的数据源:HDFS(使用二进制格式传输时支持)。 下面分别以HDFS导出加密文件时解密、导入文件到HDFS时加密为例,介绍AES-256-GCM加解密的使用方法。 源端配置解密 创建从HDFS导出文件的CDM作业时,源端数据源选择HDFS、文件格式选择二进制格式后,在“源端作业配置”的“高级属性”中,配置如下参数。 加密方式:选择“AES-256-GCM”。 数据加密 密钥:这里的密钥必须与加密时配置的密钥一致,否则解密出来的数据会错误,且系统不会提示异常。 初始化向量:这里的初始化向量必须与加密时配置的初始化向量一致,否则解密出来的数据会错误,且系统不会提示异常。 这样CDM从HDFS导出加密过的文件时,写入目的端的文件便是解密后的明文文件。 目的端配置加密 创建CDM导入文件到HDFS的作业时,目的端数据源选择HDFS、文件格式选择二进制格式后,在“目的端作业配置”的“高级属性”中,配置如下参数。 加密方式:选择“AES-256-GCM”。 数据加密密钥:用户自定义密钥,密钥由长度64的十六进制数组成,不区分大小写但必须64位,例如“DD0AE00DFECD78BF051BCFDA25BD4E320DB0A7AC75A1F3FC3D3C56A457DCDC1B”。 初始化向量:用户自定义初始化向量,初始化向量由长度32的十六进制数组成,不区分大小写但必须32位,例如“5C91687BA886EDCD12ACBC3FF19A3C3F”。 这样在CDM导入文件到HDFS时,目的端HDFS上的文件便是经过AES-256-GCM算法加密后的文件。
  • KMS加密 源端解密不支持KMS。 CDM目前只支持导入文件到OBS时,目的端使用KMS加密,表/文件迁移和整库迁移都支持。在“目的端作业配置”的“高级属性”中配置。 KMS密钥需要先在数据加密服务创建,具体操作请参见《数据加密服务 用户指南》。 当启用KMS加密功能后,用户上传对象时,数据会加密成密文存储在OBS。用户从OBS下载加密对象时,存储的密文会先在OBS服务端解密为明文,再提供给用户。 如果选择使用KMS加密,则无法使用MD5校验一致性。 如果这里使用其它项目的KMS ID,则需要修改“项目ID”参数为KMS ID所属的项目ID;如果KMS ID与CDM在同一个项目下,“项目ID”参数保持默认即可。 使用KMS加密后,OBS上对象的加密状态不可以修改。 使用中的KMS密钥不可以删除,如果删除将导致加密对象不能下载。
  • 事务模式迁移 CDM的事务模式迁移,是指当CDM作业执行失败时,将数据回滚到作业开始之前的状态,自动清理目的表中的数据。 参数位置:创建表/文件迁移的作业时,如果目的端为关系型数据库,在目的端作业配置的高级属性中,可以通过“先导入阶段表”参数选择是否启用事务模式。 参数原理:如果启用,在作业执行时CDM会自动创建临时表,先将数据导入到该临时表,导入成功后再通过数据库的事务模式将数据迁移到目标表中;导入失败则将目的表回滚到作业开始之前的状态。 图1 事务模式迁移 如果“导入开始前”选择“清除部分数据”或“清除全部数据”,CDM的事务模式不会回滚已经删除的数据。 父主题: 数据迁移进阶实践
  • HBase/CloudTable增量迁移 使用CDM导出HBase(包括MRS HBase、 FusionInsight HBase、Apache HBase)或者 表格存储服务 (CloudTable)的数据时,支持导出指定时间段内的数据,配合CDM的定时任务,可以实现HBase/CloudTable的增量迁移。 如果配置了时间宏变量,通过 DataArts Studio 数据开发调度CDM迁移作业时,系统会将时间宏变量替换为“数据开发作业计划启动时间-偏移量”,而不是“CDM作业实际启动时间-偏移量”。 在创建CDM表/文件迁移的作业,源连接选择为HBase连接或CloudTable连接时,高级属性的可选参数中可以配置时间区间。 图1 HBase时间区间 起始时间(包含该值),格式为“yyyy-MM-dd HH:mm:ss”,表示只抽取该时间及以后的数据。 终止时间(不包含该值),格式为“yyyy-MM-dd HH:mm:ss”,表示只抽取该时间以前的数据。 这2个参数支持配置为时间宏变量,例如: 起始时间配置为${dateformat(yyyy-MM-dd HH:mm:ss, -1, DAY)}时,表示只导出昨天以后的数据。 终止时间配置为${dateformat(yyyy-MM-dd HH:mm:ss)}时,表示只导出当前时间以前的数据。 这2个参数同时配置后,CDM就只导出前一天内的数据,再将该作业配置为每天0点执行一次,就可以增量同步每天新生成的数据。 父主题: 增量迁移原理介绍
  • 时间变量宏定义具体展示 假设当前时间为“2017-10-16 09:00:00”,时间变量宏定义具体如表1所示。 表中示例实际使用时必须嵌在''中使用,比如需要以yyyy-MM-dd格式返回当前时间时,参数为'${dateformat(yyyy-MM-dd)}'。 表1 时间变量宏定义具体展示 宏变量 含义 实际显示效果 ${dateformat(yyyy-MM-dd)} 以yyyy-MM-dd格式返回当前时间。 2017-10-16 ${dateformat(yyyy/MM/dd)} 以yyyy/MM/dd格式返回当前时间。 2017/10/16 ${dateformat(yyyy_MM_dd HH:mm:ss)} 以yyyy_MM_dd HH:mm:ss格式返回当前时间。 2017_10_16 09:00:00 ${dateformat(yyyy-MM-dd HH:mm:ss, -1, DAY)} 以yyyy-MM-dd HH:mm:ss格式返回时间,时间为当前时间的前一天。 2017-10-15 09:00:00 ${dateformat(yyyy-MM-dd, -1, DAY)} 00:00:00 以yyyy-MM-dd HH:mm:ss格式返回时间,时间为当前时间的前一天0点。 2017-10-15 00:00:00 ${dateformat(yyyy-MM-dd, -1, DAY)} 12:00:00 以yyyy-MM-dd HH:mm:ss格式返回时间,时间为当前时间的前一天12点。 2017-10-15 12:00:00 ${dateformat(yyyy-MM-dd, -N, DAY)} 00:00:00 以yyyy-MM-dd HH:mm:ss格式返回时间,时间为当前时间的前N天的0点。 N为3时: 2017-10-13 00:00:00 ${dateformat(yyyy-MM-dd, -N, DAY)} 12:00:00 以yyyy-MM-dd HH:mm:ss格式返回时间,时间为当前时间的前N天的12点。 N为3时: 2017-10-13 12:00:00 ${timestamp()} 返回当前时间的时间戳,即1970年1月1日(00:00:00 GMT)到当前时间的毫秒数。 1508115600000 ${timestamp(-10, MINUTE)} 返回当前时间点10分钟前的时间戳。 1508115000000 ${timestamp(dateformat(yyyyMMdd))} 返回今天0点的时间戳。 1508083200000 ${timestamp(dateformat(yyyyMMdd,-1,DAY))} 返回昨天0点的时间戳。 1507996800000 ${timestamp(dateformat(yyyyMMddHH))} 返回当前整小时的时间戳。 1508115600000
  • 路径和表名的时间宏变量 如图1所示,如果将: 源端的“表名”配置为“CDM_/${dateformat(yyyy-MM-dd)}”。 目的端的“写入目录”配置为“/opt/ttxx/${timestamp()}”。 经过宏定义转换,这个作业表示:将Oracle数据库的“SQOOP.CDM_20171016”表中数据,迁移到HDFS的“/opt/ttxx/1508115701746”目录中。 图1 源表名和写入目录配置为时间宏变量 目前也支持一个表名或路径名中有多个宏定义变量,例如“/opt/ttxx/${dateformat(yyyy-MM-dd)}/${timestamp()}”,经过转换后为“/opt/ttxx/2017-10-16/1508115701746”。
  • Where子句中的时间宏变量 以SQOOP.CDM_20171016表为例,该表中存在表示时间的列DS,如图2所示。 图2 表数据 假设当前时间为“2017-10-16”,要导出前一天的数据(即DS=‘2017-10-15’),则可以在创建作业时配置“Where子句”为DS='${dateformat(yyyy-MM-dd,-1,DAY)}',即可将符合DS=‘2017-10-15’条件的数据导出。
  • timestamp timestamp支持两种形式的参数: timestamp() 返回当前时间的时间戳,即从1970年到现在的毫秒数,如1508078516286。 timestamp(dateOffset, dateType) 返回经过时间偏移后的时间戳,“dateOffset”和“dateType”表示日期的偏移量以及偏移量的类型。 例如当前日期为“2017-10-16 09:00:00”,则“timestamp(-10, MINUTE)”返回当前时间点10分钟前的时间戳,即“1508115000000”。
  • 时间宏变量和定时任务配合完成增量同步 这里列举两个简单的使用场景: 数据库表中存在表示时间的列DS,类型为“varchar(30)”,插入的时间格式类似于“2017-xx-xx”。 定时任务中,重复周期为1天,每天的凌晨0点执行定时任务。配置“Where子句”为DS='${dateformat(yyyy-MM-dd,-1,DAY)}',这样就可以在每天的凌晨0点导出前一天产生的所有数据。 数据库表中存在表示时间的列time,类型为“Number”,插入的时间格式为时间戳。 定时任务中,重复周期为1天,每天的凌晨0点执行定时任务。配置“Where子句”为time between ${timestamp(-1,DAY)} and ${timestamp()},这样就可以在每天的凌晨0点导出前一天产生的所有数据。 其它的配置方式原理相同。
  • dateformat dateformat支持两种形式的参数: dateformat(format) format表示返回日期的格式,格式定义参考"java.text.SimpleDateFormat.java"中的定义。 例如当前日期为“2017-10-16 09:00:00”,则"yyyy-MM-dd HH:mm:ss"表示“2017-10-16 09:00:00”。 dateformat(format, dateOffset, dateType) format表示返回日期的格式。 dateOffset表示日期的偏移量。 dateType表示日期的偏移量的类型。 目前dateType支持以下几种类型:SECOND(秒),MINUTE(分钟),HOUR(小时),DAY(天),MONTH(月),YEAR(年)。 其中MONTH(月),YEAR(年)的偏移量类型存在特殊场景: 对于年、月来说,若进行偏移后实际没有该日期,则按照日历取该月最大的日期。 不支持在源端和目的端的“时间过滤”参数中的起始时间、终止时间使用年、月的偏移。 例如当前日期为"2023-03-01 09:00:00",则: "dateformat(yyyy-MM-dd HH:mm:ss, -1, YEAR)"表示当前时间的前一年,也就是"2022-03-01 09:00:00"。 "dateformat(yyyy-MM-dd HH:mm:ss, -3, MONTH)"表示当前时间的前三月,也就是"2022-12-01 09:00:00"。 "dateformat(yyyy-MM-dd HH:mm:ss, -1, DAY)"表示当前时间的前一天,也就是"2023-02-28 09:00:00"。 "dateformat(yyyy-MM-dd HH:mm:ss, -1, HOUR)"表示当前时间的前一小时,也就是"2023-03-01 08:00:00"。 "dateformat(yyyy-MM-dd HH:mm:ss, -1, MINUTE)"表示当前时间的前一分钟,也就是"2023-03-01 08:59:00"。 "dateformat(yyyy-MM-dd HH:mm:ss, -1, SECOND)"表示当前时间的前一秒,也就是"2023-03-01 08:59:59"。
  • 文件/路径过滤器 参数位置:在创建表/文件迁移作业时,如果源端数据源为文件类型,那么源端作业参数的高级属性中可以看到“过滤类型”参数,该参数可选择:通配符或正则表达式。 参数原理:“过滤类型”选择“通配符”时,CDM就可以通过用户配置的通配符过滤文件或路径,CDM只迁移满足指定条件的文件或路径。 配置样例: 例如源端文件名带有时间字段“2017-10-15 20:25:26”,这个时刻生成的文件为“/opt/data/file_20171015202526.data”,则在创建作业时,参数配置如下: 过滤类型:选择“通配符”。 文件过滤器:配置为“*${dateformat(yyyyMMdd,-1,DAY)}*”(这是CDM支持的日期宏变量格式,详见时间宏变量使用解析)。 图1 文件过滤 配置作业定时自动执行,“重复周期”为1天。 这样每天就可以把昨天生成的文件都导入到目的端目录,实现增量同步。 文件增量迁移场景下,“路径过滤器”的使用方法同“文件过滤器”一样,需要路径名称里带有时间字段,这样可以定期增量同步指定目录下的所有文件。
  • 时间过滤 参数位置:在创建表/文件迁移作业时,如果源端数据源为文件类型,那么源端作业配置下的高级属性中,“时间过滤”参数选择“是”。 参数原理:“起始时间”和“终止时间”参数中输入时间值后,只有修改时间介于起始时间和终止时间之间(时间区间为左闭右开,即等于起始时间也在区间之内)的文件才会被CDM迁移。 配置样例: 例如需要CDM只同步2021年1月1日~2022年1月1日生成的文件到目的端,则参数配置如下: 时间过滤器:选择为“是”。 起始时间:配置为2021-01-01 00:00:00(格式要求为yyyy-MM-dd HH:mm:ss)。 终止时间:配置为2022-01-01 00:00:00(格式要求为yyyy-MM-dd HH:mm:ss)。 图2 时间过滤 这样CDM作业就只迁移2021年1月1日~2022年1月1日时间段内生成的文件,下次作业再启动时就可以实现增量同步。
  • 注意事项 由于CDM版本不同,某些属性可能不支持,比如fromJobConfig.BatchJob。当创建任务报错时,需要在请求体中删除该属性。如下图所示: 图12 修改属性 CDM节点配置为创建作业时,节点运行会检测是否有同名CDM作业。 如果CDM作业未运行,则按照请求体内容更新同名作业。 如果同名CDM作业正在运行中,则等待作业运行完成。此时该CDM作业可能被其他任务启动,可能会导致数据抽取不符合预期(如作业配置未更新、运行时间宏未替换正确等),因此请注意不要启动或者创建多个同名作业。
  • 创建数据开发作业 单击工作空间的“数据开发”,进入DataArts Studio数据开发模块。 创建子作业“分表作业”,选择CDM节点,节点属性中作业类型配置为“创建新作业”,并将步骤2中复制的作业JSON粘贴到“CDM作业消息体”中。 图6 配置CDM作业消息体 编辑“CDM作业消息体”。 由于源表有三个,分别为mail001、mail002、mail003,因此需要将作业JSON中的“fromJobConfig.tableName”属性值配置为“mail${num}”,即源表名是通过参数配置。如下图所示: 图7 编辑JSON 由于数据迁移作业名不能重复,因此修改JSON中作业名称“name”属性值配置为“mail${num}”,目的是创建多个CDM集成作业,避免作业名称重复。如下图所示: 如果需要创建分库的作业,也可将作业JSON中的源连接修改为变量,方便替换。 图8 编辑JSON 添加作业参数num,用于作业JSON中调用。如下图所示: 图9 添加作业参数num 添加完成后单击“保存并提交版本”,以保存子作业。 创建主作业“集成管理”,选择For Each节点,每次循环调用分表作业,分别将参数001、002、003传递给子作业,生成不同的分表抽取任务。 关键配置如下: 子作业:选择“分表作业” 数据集:[['001'],['002'],['003']] 子作业参数:@@#{Loop.current[0]}@@ 此处子作业参数的EL表达式需要添加@@。如果不加@@包围,数据集001会被识别为1,导致源表名不存在的问题。 如下图所示: 图10 配置关键参数 配置完成后单击“保存并提交版本”,以保存主作业。 创建主作业和子作业完成后,通过测试运行主作业“集成管理”,检查数据集成作业创建情况。运行成功后,创建并运行CDM子作业成功。 图11 查看作业创建情况
  • 适用场景 业务系统中,数据源往往会采用分表的形式,以减少单表大小,支持复杂的业务应用场景。 在这种情况下,通过CDM进行数据集成时,需要针对每张表创建一个数据迁移作业。您可以参考本教程,通过数据开发模块的For Each节点和CDM节点,配合作业参数,实现批量创建分表迁移作业。 本教程中,源端MySQL数据库中存在三张分表,分别是mail01、mail02和mail03,且表结构一致,数据内容不同。目的端为MRS Hive服务。
共100000条