华为云用户手册

  • 前提条件 MySQL CDC要求MySQL版本为5.7或8.0.x。 该场景作业需要 DLI 与MySQL建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《 数据湖探索 用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。 Flink跨源开发场景中直接配置跨源认证信息存在密码泄露的风险,优先推荐您使用DLI提供的跨源认证。 跨源认证简介及操作方法请参考跨源认证简介。 MySQL已开启了Binlog,并且binlog_row_image设置为FULL。 已创建MySQL用户,并授予了SELECT、 SHOW DATABASES 、REPLICATION SLAVE和REPLICATION CLIENT权限。
  • 语法格式 create table mySqlCdcSource ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector' = 'mysql-cdc', 'hostname' = 'mysqlHostname', 'username' = 'mysqlUsername', 'password' = 'mysqlPassword', 'database-name' = 'mysqlDatabaseName', 'table-name' = 'mysqlTableName' );
  • 注意事项 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 同步数据库数据的客户端,都会有一个唯一ID,即Server ID。同一个数据库下,建议每个MySQL CDC作业配置不同的Server ID。 主要原因如下: MySQL SERVER会根据该ID来维护网络连接以及Binlog位点。因此如果有大量相同的Server ID的客户端一起连接MySQL SERVER,可能导致MySQL SERVER的CPU陡增,影响线上业务稳定性。 此外,多个作业共享相同的Server ID,会导致Binlog位点错乱,多读或少读数据,因此建议每个CDC作业都配置不同的Server ID。 MySQL CDC源表暂不支持定义Watermark。如果您需要进行窗口聚合,请参考常见问题描述。 若连接DWS、MySQL等支持upsert的sink源,需要在sink表的创建语句中定义主键,请参考示例中printSink建表语句。 当使用MySQ CDM 源表时,请不要在源表参数里手动关闭debezium.connect.keep.alive,确保debezium.connect.keep.alive=true(默认值为true)。 如果手动关闭了debezium.connect.keep.alive,一旦发生拉取Binlog线程与MySQL服务器的连接连接异常,拉取Binlog线程不会尝试自动重连,这可能导致无法正常从源端拉取binlog日志。
  • 前提条件 确保已创建Kafka集群。 该场景作业需要运行在DLI的独享队列上,因此要与kafka集群建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《 数据湖 探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。 Flink跨源开发场景中直接配置跨源认证信息存在密码泄露的风险,优先推荐您使用DLI提供的跨源认证。 跨源认证简介及操作方法请参考跨源认证简介。
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 create table kafkaSource( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) (',' WATERMARK FOR rowtime_column_name AS watermark-strategy_expression) ) with ( 'connector' = 'kafka', 'topic' = '', 'properties.bootstrap.servers' = '', 'properties.group.id' = '', 'scan.startup.mode' = '', 'format' = '' );
  • 常见问题 Flink作业运行失败,作业运行日志中如下报错信息,应该怎么解决? org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata 跨源未绑定或未绑定成功,或是Kafka集群安全组未配置放通DLI队列的网段地址。参考增强型跨源连接重新配置跨源,或者Kafka集群安全组放通DLI队列的网段地址。 Flink作业运行失败,作业运行日志中如下报错信息,应该怎么解决? Caused by: java.lang.RuntimeException: RealLine:45;Table 'default_catalog.default_database.printSink' declares persistable metadata columns, but the underlying DynamicTableSink doesn't implement the SupportsWritingMetadata interface. If the column should not be persisted, it can be declared with the VIRTUAL keyword. sink表中定义了metadata类型,但是Print connector并不支持把sink表中的matadata去掉即可。
  • 分区扫描功能介绍 为了加速Source任务实例中的数据读取,Flink为JDBC表提供了分区扫描功能。以下参数定义了从多个任务并行读取时如何对表进行分区。 scan.partition.column:用于对输入进行分区的列名,该列的数据类型必须是数字,日期或时间戳。 scan.partition.num: 分区数。 scan.partition.lower-bound:第一个分区的最小值。 scan.partition.upper-bound:最后一个分区的最大值。 建表时以上扫描分区参数必须同时存在或者同时不存在。 scan.partition.lower-bound和scan.partition.upper-bound参数仅用于决定分区步长,而不是用于过滤表中的行,表中的所有行都会被分区并返回。
  • 数据类型映射 表2 数据类型映射 MySQL类型 PostgreSQL类型 Flink SQL类型 TINYINT - TINYINT SMALLINT TINYINT UNSIGNED SMALLINT INT2 SMALLSERIAL SERIAL2 SMALLINT INT MEDIUMINT SMALLINT UNSIGNED INTEGER SERIAL INT BIGINT INT UNSIGNED BIGINT BIGSERIAL BIGINT BIGINT UNSIGNED - DECIMAL(20, 0) BIGINT BIGINT BIGINT FLOAT REAL FLOAT4 FLOAT DOUBLE DOUBLE PRECISION FLOAT8 DOUBLE PRECISION DOUBLE NUMERIC(p, s) DECIMAL(p, s) NUMERIC(p, s) DECIMAL(p, s) DECIMAL(p, s) BOOLEAN TINYINT(1) BOOLEAN BOOLEAN DATE DATE DATE TIME [(p)] TIME [(p)] [WITHOUT TIMEZONE] TIME [(p)] [WITHOUT TIMEZONE] DATETIME [(p)] TIMESTAMP [(p)] [WITHOUT TIMEZONE] TIMESTAMP [(p)] [WITHOUT TIMEZONE] CHAR(n) VARCHAR(n) TEXT CHAR(n) CHARACTER(n) VARCHAR(n) CHARACTER VARYING(n) TEXT STRING BINARY VARBINARY BLOB BYTEA BYTES - ARRAY ARRAY
  • 参数说明 表1 参数说明 参数 是否必选 默认值 类型 说明 connector 是 无 String 指定要使用的连接器,当前固定为'jdbc'。 url 是 无 String 数据库的URL。 table-name 是 无 String 读取数据库中的数据所在的表名。 driver 否 无 String 连接数据库所需要的驱动。若未配置,则会自动通过URL提取。 username 否 无 String 数据库认证用户名,需要和'password'一起配置。 password 否 无 String 数据库认证密码,需要和'username'一起配置。 scan.partition.column 否 无 String 用于对输入进行分区的列名。分区扫描参数,具体请参考分区扫描功能介绍。 scan.partition.num 否 无 Integer 分区的个数。分区扫描参数,具体请参考分区扫描功能介绍。 scan.partition.lower-bound 否 无 Integer 第一个分区的最小值。分区扫描参数,具体请参考分区扫描功能介绍。 scan.partition.upper-bound 否 无 Integer 最后一个分区的最大值。分区扫描参数,具体请参考分区扫描功能介绍。 scan.fetch-size 否 0 Integer 每次从数据库拉取数据的行数。若指定为0,则会忽略sql hint。 scan.auto-commit 否 true Boolean 是否设置自动提交,以确定事务中的每个statement是否自动提交 pwd_auth_name 否 无 String DLI侧创建的Password类型的跨源认证名称。用户若配置该配置项则不用在SQL中配置账号和密码。
  • 语法格式 create table jbdcSource ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) (',' watermark for rowtime_column_name as watermark-strategy_expression) ) with ( 'connector' = 'jdbc', 'url' = '', 'table-name' = '', 'username' = '', 'password' = '' );
  • 常见问题 Q:Flink作业运行失败,作业运行日志中如下报错信息,应该怎么解决? java.lang.IllegalArgumentException: offset (0) + length (8) exceed the capacity of the array: 6 A:如果HBase表中的数据是以其他方式导入的话,那么其存储是以String格式存储的,所以使用其他的数据格式将会报该错误。需要将Flink创建HBase源表中非string类型的字段的字段类型重新改为String即可。
  • 前提条件 该场景作业需要运行在DLI的独享队列上,因此要与HBase建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。 若使用 MRS HBase,请在增强型跨源的主机信息中添加MRS集群所有节点的主机IP信息。 详细操作请参考《数据湖探索用户指南》中的“修改主机信息”章节描述。 Flink跨源开发场景中直接配置跨源认证信息存在密码泄露的风险,优先推荐您使用DLI提供的跨源认证。 跨源认证简介及操作方法请参考跨源认证简介。
  • 注意事项 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 创建HBase源表的列簇必须定义为ROW类型,字段名对应列簇名(column family),嵌套的字段名对应列限定符名(column qualifier)。 用户只需在表结构中声明查询中使用的的列簇和列限定符。除了ROW类型的列,剩下的原子数据类型字段(比如,STRING, BIGINT)将被识别为HBase的rowkey,一张表中只能声明一个rowkey。rowkey字段的名字可以是任意的,如果是保留关键字,需要用反引号进行转义。
  • 语法格式 create table hbaseSource ( attr_name attr_type (',' attr_name attr_type)* (',' watermark for rowtime_column_name as watermark-strategy_expression) ','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector' = 'hbase-2.2', 'table-name' = '', 'zookeeper.quorum' = '' );
  • 数据类型映射 HBase以字节数组存储所有数据,在读和写过程中要序列化和反序列化数据。 Flink的HBase连接器利用HBase(Hadoop) 的工具类org.apache.hadoop.hbase.util.Bytes进行字节数组和Flink数据类型转换。 Flink的HBase连接器将所有数据类型(除字符串外)null值编码成空字节。对于字符串类型,null值的字面值由null-string-literal选项值决定。 表2 数据类型映射表 Flink数据类型 HBase转换 CHAR/VARCHAR/STRING byte[] toBytes(String s) String toString(byte[] b) BOOLEAN byte[] toBytes(boolean b) boolean toBoolean(byte[] b) BINARY/VARBINARY 返回 byte[]。 DECIMAL byte[] toBytes(BigDecimal v) BigDecimal toBigDecimal(byte[] b) TINYINT new byte[] { val } bytes[0] // returns first and only byte from bytes SMALLINT byte[] toBytes(short val) short toShort(byte[] bytes) INT byte[] toBytes(int val) int toInt(byte[] bytes) BIGINT byte[] toBytes(long val) long toLong(byte[] bytes) FLOAT byte[] toBytes(float val) float toFloat(byte[] bytes) DOUBLE byte[] toBytes(double val) double toDouble(byte[] bytes) DATE 从 1970-01-01 00:00:00 UTC 开始的天数,int 值。 TIME 从 1970-01-01 00:00:00 UTC 开始天的毫秒数,int 值。 TIMESTAMP 从 1970-01-01 00:00:00 UTC 开始的毫秒数,long 值。 ARRAY 不支持 MAP/MULTISET 不支持 ROW 不支持
  • 参数说明 表1 参数说明 参数 是否必选 默认值 数据类型 说明 connector 是 无 String connector类型,需配置为'gaussdb'。 url 是 无 String jdbc连接地址。“url”参数中的ip地址请使用DWS的内网地址。 使用gsjdbc4驱动连接时,格式为:jdbc:postgresql://${ip}:${port}/${dbName} 。 使用gsjdbc200驱动连接时,格式为:jdbc:gaussdb://${ip}:${port}/${dbName}。 table-name 是 无 String 操作的DWS表名。如果该DWS表在某schema下,则具体可以参考如果该DWS表在某schema下的说明。 driver 否 org.postgresql.Driver String jdbc连接驱动,默认为: org.postgresql.Driver。 使用gsjdbc4驱动连接时,加载的数据库驱动类为:org.postgresql.Driver。 使用gsjdbc200驱动连接时,加载的数据库驱动类为:com.huawei.gauss200.jdbc.Driver。 username 否 无 String DWS数据库认证用户名,需要和'password'参数一起配置。 password 否 无 String DWS数据库认证密码,需要和'username'参数一起配置。 scan.partition.column 否 无 String 用于对输入进行分区的列名。 注意:该参数与scan.partition.lower-bound、scan.partition.upper-bound、 scan.partition.num参数必须同时配置或者同时都不配置。 scan.partition.lower-bound 否 无 Integer 第一个分区的最小值。 与scan.partition.column、scan.partition.upper-bound、 scan.partition.num必须同时配置或者同时都不配置。 scan.partition.upper-bound 否 无 Integer 最后一个分区的最大值。 与scan.partition.column、scan.partition.lower-bound、 scan.partition.num必须同时配置或者同时都不配置。 scan.partition.num 否 无 Integer 分区的个数。 与scan.partition.column、scan.partition.upper-bound、 scan.partition.upper-bound必须同时配置或者同时都不配置。 scan.fetch-size 否 0 Integer 每次从数据库拉取数据的行数。默认值为0,表示不限制。 pwd_auth_name 否 无 String DLI侧创建的Password类型的跨源认证名称。 使用跨源认证则无需在作业中配置账号和密码。
  • 常见问题 Q:作业运行失败,运行日志中有如下报错信息,应该怎么解决? java.io.IOException: unable to open JDBC writer ... Caused by: org.postgresql.util.PSQLException: The connection attempt failed. ... Caused by: java.net.SocketTimeoutException: connect timed out A:应考虑是跨源没有绑定,或者跨源没有绑定成功。 参考增强型跨源连接章节,重新配置跨源。参考DLI跨源连接DWS失败进行问题排查。
  • 前提条件 请务必确保您的账户下已在 数据仓库 服务(DWS)里创建了DWS集群。 如何创建DWS集群,请参考《数据仓库服务管理指南》中“创建集群”章节。 请确保已创建DWS数据库表。 该场景作业需要运行在DLI的独享队列上,因此要与DWS集群建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。 Flink跨源开发场景中直接配置跨源认证信息存在密码泄露的风险,优先推荐您使用DLI提供的跨源认证。 跨源认证简介及操作方法请参考跨源认证简介。
  • 功能描述 DLI将Flink作业从数据仓库服务(DWS)中读取数据。DWS数据库内核兼容PostgreSQL,PostgreSQL数据库可存储更加复杂类型的数据,支持空间信息服务、多版本并发控制(MVCC)、高并发,适用场景包括位置应用、金融保险、互联网电商等。 数据仓库服务(Data Warehouse Service,简称DWS)是一种基于基础架构和平台的在线数据处理数据库,为用户提供海量数据挖掘和分析服务。DWS的更多信息,请参见《数据仓库服务管理指南》。
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 12 13 create table dwsSource ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) (',' watermark for rowtime_column_name as watermark-strategy_expression) ) with ( 'connector' = 'gaussdb', 'url' = '', 'table-name' = '', 'username' = '', 'password' = '' );
  • 参数说明 表1 参数说明 参数 是否必选 默认值 数据类型 参数说明 connector 是 无 String 指定要使用的连接器,这里是'datagen'。 rows-per-second 否 10000 Long 每秒生成的行数,用以控制数据发出速率。 fields.#.kind 否 random String 指定 '#' 字段的生成器。 '#' 字段必须是DataGen表中的字段,实际使用时需要将'#'替换为相应字段名。其他各参数的'#'号意义相同,不再重复描述。 参数值可以是 'sequence' 或 'random',具体含义如下: random是默认的生成器,您可以通过“fields.#.max”和“fields.#.min”参数指定随机生成的最大和最小值。 当指定的字段类型为char、varchar、string时,可以同时通过“fields.#.length”字段指定长度。random是无界的生成器。 sequence生成器,您可以通过“fields.#.start”和“fields.#.end”指定序列的起始和结束值。sequence是有界的生成器,当序列数字达到结束值,读取结束。 fields.#.min 否 '#'号指定的字段类型的最小值 '#'号指定的字段类型 当“fields.#.kind”字段为:random时有效。 表示随机生成器的最小值,'#' 指定的字段仅适用于数字类型。 fields.#.max 否 '#'号指定的字段类型的最大值 '#'号指定的字段类型 当“fields.#.kind”字段为:random时有效。 随机生成器的最大值,'#' 指定的字段仅适用于数字类型。 fields.#.length 否 100 Integer 当“fields.#.kind”字段为:random时有效。 随机生成器生成字符的长度,#' 指定的字段仅适用于char、varchar、string。 fields.#.start 否 无 '#'号指定的字段类型 当“fields.#.kind”字段为:sequence时有效。 序列生成器的起始值。 fields.#.end 否 无 '#'号指定的字段类型 当“fields.#.kind”字段为:sequence时有效。 序列生成器的结束值。
  • 创建表相关语法 表1 创建表相关语法 语法分类 功能描述 创建源表 DataGen源表 DWS源表 Hbase源表 JDBC源表 Kafka源表 MySQL CDC源表 Postgres CDC源表 Redis源表 Upsert Kafka源表 创建结果表 BlackHole结果表 ClickHouse结果表 DWS结果表 Elasticsearch结果表 Hbase结果表 JDBC结果表 Kafka结果表 Print结果表 Redis结果表 Upsert Kafka结果表 创建维表 DWS维表 Hbase维表 JDBC维表 Redis维表 Format Avro Canal Confluent Avro CS V Debezium JSON Maxwell Raw
  • 语法说明 COMPUTED COLUMN 计算列是一个使用 “column_name AS computed_column_expression” 语法生成的虚拟列。它由使用同一表中其他列的非查询表达式生成,并且不会在表中进行物理存储。例如,一个计算列可以使用 cost AS price * quantity 进行定义,这个表达式可以包含物理列、常量、函数或变量的任意组合,但这个表达式不能存在任何子查询。 在 Flink 中计算列一般用于为 CREATE TABLE 语句定义 时间属性。 处理时间属性 可以简单地通过使用了系统函数 PROCTIME() 的 proc AS PROCTIME() 语句进行定义。 另一方面,由于事件时间列可能需要从现有的字段中获得,因此计算列可用于获得事件时间列。例如,原始字段的类型不是 TIMESTAMP(3) 或嵌套在 JSON 字符串中。 注意: 定义在一个数据源表( source table )上的计算列会在从数据源读取数据后被计算,它们可以在 SELECT 查询语句中使用。 计算列不可以作为 INSERT 语句的目标,在 INSERT 语句中,SELECT 语句的 schema 需要与目标表不带有计算列的 schema 一致。
  • 语法支持类型 DLI SQL语法支持以下数据类型: STRING,BOOLEAN,BYTES,DECIMAL,TINYINT,SMALLINT,INTEGER,BIGINT,FLOAT,DOUBLE,DATE,TIME,TIMESTAMP,TIMESTAMP WITH LOCAL TIME ZONE,INTERVAL,ARRAY,MULTISET,MAP,ROW 在SQL语法中这些类型用于定义表中列的数据类型。 父主题: SQL语法约束与定义
  • 示例 输入一条记录("student1", "student2, student3"),输出两条记录("student1", "student2") 和 ("student1", "student3") 。 create table s1(attr1 string, attr2 string) with (......); insert into s2 select attr1, b1 from s1 left join lateral table(split_cursor(attr2, ',')) as T(b1) on true;
  • 语法说明 string_split(target, separator) 表1 string_split参数说明 参数 数据类型 说明 target STRING 待处理的目标字符串。 说明: 如果target为NULL,则返回一个空行。 如果target包含两个或多个连续出现的分隔符时,则返回长度为零的空子字符串。 如果target未包含指定分隔符,则返回目标字符串。 separator VARCHAR 指定的分隔符,当前仅支持单字符分隔。
  • 示例 准备测试输入数据 表2 测试源表disSource数据和分隔符 target(STRING) separator (VARCHAR) test-flink - flink - one-two-ww-three - 输入测试SQL语句 create table disSource( target STRING, separator VARCHAR ) with ( "connector.type" = "dis", "connector.region" = "xxx", "connector.channel" = "ygj-dis-in", "format.type" = 'csv' ); create table disSink( target STRING, item STRING ) with ( 'connector.type' = 'dis', 'connector.region' = 'xxx', 'connector.channel' = 'ygj-dis-out', 'format.type' = 'csv' ); insert into disSink select target, item from disSource, lateral table(string_split(target, separator)) as T(item); 查看测试结果 表3 disSink结果表数据 target(STRING) item(STRING) test-flink test test-flink flink flink flink one-two-ww-three one one-two-ww-three two one-two-ww-three ww one-two-ww-three three
  • 注意事项 若使用MRS集群的OpenTSDB,请确保以下几点: OpenTSDB的ip地址和端口请从OpenTSDB服务配置中查看配置项“tsd.network.bind”和“tsd.network.port”分别获取。 若OpenTSDB服务配置项“tsd.https.enabled”的值为true,则sql语句中的“connector.tsdb-link-address”参数值格式为https://ip:port。若“tsd.https.enabled”为false,则“connector.tsdb-link-address”参数值格式可以为http://ip:port或者ip:port。 在建立增强型跨源连接时,需要将MRS集群中的/etc/hosts主机和ip映射信息添加到“主机信息”参数中。 当配置项支持参数化时,表示将记录中的一列或者多列作为该配置项的一部分。例如当配置项设置为car_${car_brand}时,如果一条记录的car_brand列值为BMW,则该配置项在该条记录下为car_BMW。 若支持动态列,则其形式需要为${columnName},其中columnName为相应的字段名。
  • 示例 create table sink1( attr1 bigint, attr2 int, attr3 int ) with ( 'connector.type' = 'opentsdb', 'connector.region' = '', 'connector.tsdb-metrics' = '', 'connector.tsdb-timestamps' = '${attr1}', 'connector.tsdb-values' = '${attr2};10', 'connector.tsdb-tags' = 'key1:value1,key2:value2;key3:value3', 'connector.tsdb-link-address' = '' );
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 connector的类型,只能为opentsdb。 connector.region 是 OpenTSDB服务所在的区域。 connector.tsdb-metrics 是 数据点的metric,支持参数化。 其个数为要为1或者和“connector.tsdb-values”个数相同。 多个metric请使用“;”分隔。 connector.tsdb-timestamps 是 数据点的timestamp,仅支持指定动态列。 数据类型支持int、bigint、string,仅支持数据形式。 其个数需要为1或者和“connector.tsdb-values”的个数相同。 多个timestamp请使用“;”分隔。 connector.tsdb-values 是 数据点的value,支持指定动态列或者常数值。 多个values请使用“;”分隔。 connector.tsdb-tags 是 数据点的tags,每个tags里面至少一个标签值,最多8个标签值,多个标签使用“,”分隔,支持参数化。 其个数需要为1或者和“connector.tsdb-values”的个数相同。 多个tags请使用“;”分隔。 connector.batch-insert-data-num 否 表示一次性批量写入的数据量,即数据条数,值必须为正整数,默认值为8。 connector.tsdb-link-address 是 待插入数据所属集群的OpenTSDB连接地址。
共100000条