华为云用户手册

  • 语法格式 create table hbaseSource ( attr_name attr_type (',' attr_name attr_type)* ) with ( 'connector' = 'doris', 'fenodes' = 'FE_IP:PORT,FE_IP:PORT,FE_IP:PORT', 'table.identifier' = 'database.table', 'username' = 'dorisUsername', 'password' = 'dorisPassword' );
  • 注意事项 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据 集群未启用Kerberos认证(普通模式)。 Doris的表名是区分大小写。 使用cloudTable的doris时,'fenodes'字段值的端口请用8030,如'xx:8030'。同时安全组请放开端口8030, 8040,9030。 开启HTTPS后,需要在创建表的with子句中添加如下配置参数: 'doris.enable.https' = 'true' 'doris.ignore.https.ca' = 'true'
  • 示例 该示例是从Doris源表读取数据,并输入到 print connector。 参考增强型跨源连接,在 DLI 上根据Doris所在的虚拟私有云和子网分别创建相应的增强型跨源连接,并绑定所要使用的Flink弹性资源池。参考“修改主机信息”章节描述,在增强型跨源中增加 MRS 的主机信息。 设置Doris和kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性分别根据Doris和Kafka的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。 参考MRS Doris使用指南,创建doris表,并插入10条数据。创建语句如下: CREATE TABLE IF NOT EXISTS dorisdemo ( `user_id` varchar(10) NOT NULL, `city` varchar(10), `age` int, `gender` int ) DISTRIBUTED BY HASH(`user_id`) BUCKETS 10; INSERT INTO dorisdemo VALUES ('user1', 'city1', 20, 1); INSERT INTO dorisdemo VALUES ('user2', 'city2', 21, 0); INSERT INTO dorisdemo VALUES ('user3', 'city3', 22, 1); INSERT INTO dorisdemo VALUES ('user4', 'city4', 23, 0); INSERT INTO dorisdemo VALUES ('user5', 'city5', 24, 1); INSERT INTO dorisdemo VALUES ('user6', 'city6', 25, 0); INSERT INTO dorisdemo VALUES ('user7', 'city7', 26, 1); INSERT INTO dorisdemo VALUES ('user8', 'city8', 27, 0); INSERT INTO dorisdemo VALUES ('user9', 'city9', 28, 1); INSERT INTO dorisdemo VALUES ('user10', 'city10', 29, 0); 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业脚本,并提交运行。该作业模拟从kafka读取数据,并关联doris维表对数据进行打宽,并输出到print。 CREATE TABLE ordersSource ( user_id string, user_name string, proctime as Proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'kafka-topic', 'properties.bootstrap.servers' = 'kafkaIp:port,kafkaIp:port,kafkaIp:port', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); CREATE TABLE dorisDemo ( `user_id` String NOT NULL, `city` String, `age` int, `gender` int ) with ( 'connector' = 'doris', 'fenodes' = 'FE实例IP地址:端口号', 'table.identifier' = 'demo.dorisdemo', 'username' = 'dorisUsername', 'password' = 'dorisPassword', 'lookup.cache.ttl'='10 m', 'lookup.cache.max-rows' = '100' ); CREATE TABLE print ( user_id string, user_name string, `city` String, `age` int, `gender` int ) WITH ( 'connector' = 'print' ); insert into print select orders.user_id, orders.user_name, dim.city, dim.age, dim.gender from ordersSource orders left join dorisDemo for system_time as of orders.proctime as dim on orders.user_id = dim.user_id; 往kafka数据源写入2条数据。 {"user_id": "user1", "user_name": "name1"} {"user_id": "user2", "user_name": "name2"} 查看print结果表数据。 +I[user1, name1, city1, 20, 1] +I[user2, name2, city2, 21, 0]
  • 数据类型映射 表3 数据类型映射 MySQL类型 PostgreSQL类型 Flink SQL类型 TINYINT - TINYINT SMALLINT TINYINT UNSIGNED SMALLINT INT2 SMALLSERIAL SERIAL2 SMALLINT INT MEDIUMINT SMALLINT UNSIGNED INTEGER SERIAL INT BIGINT INT UNSIGNED BIGINT BIGSERIAL BIGINT BIGINT UNSIGNED - DECIMAL(20, 0) BIGINT BIGINT BIGINT FLOAT REAL FLOAT4 FLOAT DOUBLE DOUBLE PRECISION FLOAT8 DOUBLE PRECISION DOUBLE NUMERIC(p, s) DECIMAL(p, s) NUMERIC(p, s) DECIMAL(p, s) DECIMAL(p, s) BOOLEAN TINYINT(1) BOOLEAN BOOLEAN DATE DATE DATE TIME [(p)] TIME [(p)] [WITHOUT TIMEZONE] TIME [(p)] [WITHOUT TIMEZONE] DATETIME [(p)] TIMESTAMP [(p)] [WITHOUT TIMEZONE] TIMESTAMP [(p)] [WITHOUT TIMEZONE] CHAR(n) VARCHAR(n) TEXT CHAR(n) CHARACTER(n) VARCHAR(n) CHARACTER VARYING(n) TEXT STRING BINARY VARBINARY BLOB BYTEA BYTES - ARRAY ARRAY
  • 示例 示例1:使用JDBC作为数据源,Print作为结果表,从RDS MySQL数据库中读取数据,并写入到Print结果表中。 参考增强型跨源连接,根据RDS MySQL所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。 设置RDS MySQL的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性根RDS的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。 登录RDS MySQL,并使用下述命令在flink库下创建orders表,并插入数据。创建数据库的操作可以参考创建RDS数据库。 在flink数据库下创建orders表: CREATE TABLE `flink`.`orders` ( `order_id` VARCHAR(32) NOT NULL, `order_channel` VARCHAR(32) NULL, PRIMARY KEY (`order_id`) ) ENGINE = InnoDB DEFAULT CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci; 插入表数据: insert into orders( order_id, order_channel ) values ('1', 'webShop'), ('2', 'miniAppShop'); 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业运行脚本,提交运行作业。 注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。 认证用的username和password硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据 CREATE TABLE jdbcSource ( order_id string, order_channel string ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://MySQLAddress:MySQLPort/flink',--flink为RDS MySQL创建的数据库名 'table-name' = 'orders', 'username' = 'MySQLUsername', 'password' = 'MySQLPassword', 'scan.fetch-size' = '10', 'scan.auto-commit' = 'true' ); CREATE TABLE printSink ( order_id string, order_channel string ) WITH ( 'connector' = 'print' ); insert into printSink select * from jdbcSource; 查看taskmanager.out文件中的数据结果,数据结果参考如下: +I(1,webShop) +I(2,miniAppShop) 示例2:使用DataGen源表发送数据,通过JDBC结果表将数据输出到MySQL数据库中。 参考增强型跨源连接,根据RDS MySQL所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。 设置RDS MySQL的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性根RDS的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。 登录RDS MySQL,并使用下述命令在flink库下创建orders表,并插入数据。创建数据库的操作可以参考创建RDS数据库。 在flink数据库下创建orders表: CREATE TABLE `flink`.`orders` ( `order_id` VARCHAR(32) NOT NULL, `order_channel` VARCHAR(32) NULL, PRIMARY KEY (`order_id`) ) ENGINE = InnoDB DEFAULT CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci; 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业运行脚本,提交运行作业。 注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。 CREATE TABLE dataGenSource ( order_id string, order_channel string ) WITH ( 'connector' = 'datagen', 'fields.order_id.kind' = 'sequence', 'fields.order_id.start' = '1', 'fields.order_id.end' = '1000', 'fields.order_channel.kind' = 'random', 'fields.order_channel.length' = '5' ); CREATE TABLE jdbcSink ( order_id string, order_channel string, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://MySQLAddress:MySQLPort/flink',--其中url中的flink表示MySQL中orders表所在的数据库名 'table-name' = 'orders', 'username' = 'MySQLUsername', 'password' = 'MySQLPassword', 'sink.buffer-flush.max-rows' = '1' ); insert into jdbcSink select * from dataGenSource; 查看表中数据,在MySQL中执行sql查询语句 select * from orders; 示例3:从DataGen源表中读取数据,将JDBC表作为维表,并将二者生成的表信息写入Print结果表中。 参考增强型跨源连接,根据RDS MySQL所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。 设置RDS MySQL的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性根RDS的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。 登录RDS MySQL,并使用下述命令在flink库下创建orders表,并插入数据。创建数据库的操作可以参考创建RDS数据库。 在flink数据库下创建orders表: CREATE TABLE `flink`.`orders` ( `order_id` VARCHAR(32) NOT NULL, `order_channel` VARCHAR(32) NULL, PRIMARY KEY (`order_id`) ) ENGINE = InnoDB DEFAULT CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci; 插入表数据: insert into orders( order_id, order_channel ) values ('1', 'webShop'), ('2', 'miniAppShop'); 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业运行脚本,提交运行作业。该作业脚本将DataGen为数据源,JDBC作为维表,数据写入到Print结果表。 注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。 CREATE TABLE dataGenSource ( order_id string, order_time timestamp, proctime as Proctime() ) WITH ( 'connector' = 'datagen', 'fields.order_id.kind' = 'sequence', 'fields.order_id.start' = '1', 'fields.order_id.end' = '2' ); --创建维表 CREATE TABLE jdbcTable ( order_id string, order_channel string ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://JDBCAddress:JDBCPort/flink',--flink为RDS MySQL中orders表所在的数据库名 'table-name' = 'orders', 'username' = 'JDBCUserName', 'password' = 'JDBCPassWord', 'lookup.cache.max-rows' = '100', 'lookup.cache.ttl' = '1000', 'lookup.cache.caching-missing-key' = 'false', 'lookup.max-retries' = '5' ); CREATE TABLE printSink ( order_id string, order_time timestamp, order_channel string ) WITH ( 'connector' = 'print' ); insert into printSink SELECT dataGenSource.order_id, dataGenSource.order_time, jdbcTable.order_channel from dataGenSource left join jdbcTable for system_time as of dataGenSource.proctime on dataGenSource.order_id = jdbcTable.order_id; 查看taskmanager.out文件中的数据结果,数据结果参考如下: +I(1, xxx, webShop) +I(2, xxx, miniAppShop)
  • 分区扫描功能介绍 为了加速Source任务实例中的数据读取,Flink为JDBC表提供了分区扫描功能。以下参数定义了从多个任务并行读取时如何对表进行分区。 scan.partition.column:用于对输入进行分区的列名,该列的数据类型必须是数字,日期或时间戳。 scan.partition.num: 分区数。 scan.partition.lower-bound:第一个分区的最小值。 scan.partition.upper-bound:最后一个分区的最大值。 建表时以上扫描分区参数必须同时存在或者同时不存在。 scan.partition.lower-bound和scan.partition.upper-bound参数仅用于决定分区步长,而不是用于过滤表中的行,表中的所有行都会被分区并返回。
  • Lookup Cache功能介绍 JDBC连接器可以用在时态表关联中作为一个可lookup的维表,当前只支持同步的查找模式。 默认情况下,Lookup cache是未启用的,所有请求都会发送到外部数据库。您可以设置Lookup.cache.max-rows和Lookup.cache.ttl参数来启用。Lookup cache的主要目的是用于提高时态表关联JDBC连接器的性能。 当Lookup cache被启用时,每个进程(即TaskManager)将维护一个缓存。Flink将优先查找缓存,只有当缓存未查找到时才向外部数据库发送请求,并使用返回的数据更新缓存。当缓存命中最大缓存行Lookup.cache.max-rows或当行超过最大存活时间Lookup.cache.ttl时,缓存中最先添加的条目将被标记为过期。缓存中的记录可能不是最新的,用户可以将Lookup.cache.ttl设置为一个更小的值以获得更好的刷新数据,但这可能会增加发送到数据库的请求数。所以要做好吞吐量和正确性之间的平衡。 默认情况下,Flink会缓存主键的空查询结果,您可以通过将Lookup.cache.caching-missing-key设置为false来切换行为。
  • 参数说明 表2 参数说明 参数 是否必选 默认值 类型 说明 connector 是 无 String 指定要使用的连接器,当前固定为'jdbc'。 url 是 无 String 数据库的URL。 连接MySQL数据库时,格式为:jdbc:mysql://MySQLAddress:MySQLPort/dbName 。 连接PostgreSQL数据库时,格式为:jdbc:postgresql://PostgreSQLAddress:PostgreSQLPort/dbName。 table-name 是 无 String 读取数据库中的数据所在的表名。 driver 否 无 String 连接数据库所需要的驱动。如果未配置,则会自动通过URL提取。 MySQL数据库默认驱动为com.mysql.jdbc.Driver。 PostgreSQL数据库默认驱动为org.postgresql.Driver。 username 否 无 String 数据库认证用户名,需要和'password'一起配置。 password 否 无 String 数据库认证密码,需要和'username'一起配置。 connection.max-retry-timeout 否 60s Duration 尝试连接数据库服务器最大重试超时时间,不应小于1s。 scan.partition.column 否 无 String 用于对输入进行分区的列名。分区扫描参数,具体请参考分区扫描功能介绍。 scan.partition.num 否 无 Integer 分区的个数。分区扫描参数,具体请参考分区扫描功能介绍。 scan.partition.lower-bound 否 无 Integer 第一个分区的最小值。分区扫描参数,具体请参考分区扫描功能介绍。 scan.partition.upper-bound 否 无 Integer 最后一个分区的最大值。分区扫描参数,具体请参考分区扫描功能介绍。 scan.fetch-size 否 0 Integer 每次从数据库拉取数据的行数。如果指定为0,则会忽略sql hint。 scan.auto-commit 否 true Boolean 是否设置自动提交,以确定事务中的每个statement是否自动提交 lookup.cache.max-rows 否 无 Integer lookup cache的最大行数,如果超过该值,缓存中最先添加的条目将被标记为过期。 默认情况下,lookup cache是未开启的。具体请参考Lookup Cache功能介绍。 lookup.cache.ttl 否 无 Duration lookup cache中每一行记录的最大存活时间,如果超过该时间,缓存中最先添加的条目将被标记为过期。 默认情况下,lookup cache是未开启的。具体请参考Lookup Cache功能介绍。 lookup.cache.caching-missing-key 否 true Boolean 是否缓存空查询结果,默认为true。具体请参考Lookup Cache功能介绍。 lookup.max-retries 否 3 Integer 查询数据库失败的最大重试次数。 sink.buffer-flush.max-rows 否 100 Integer flush前缓存记录的最大值,可以设置为 '0' 来禁用它。 sink.buffer-flush.interval 否 1s Duration flush间隔时间,超过该时间后异步线程将flush数据。可以设置为 '0' 来禁用它。如果想完全异步地处理缓存的flush事件,可以将 'sink.buffer-flush.max-rows' 设置为 '0' ,并配置适当的flush时间间隔。 sink.max-retries 否 3 Integer 写入到数据库失败后的最大重试次数。 sink.parallelism 否 无 Integer 用于定义JDBC sink算子的并行度。默认情况下,并行度是由框架决定,即与上游并行度一致。
  • 注意事项 JDBC结果表如果定义了主键,将以upsert模式与外部系统交换UPDATE/DELETE消息;否则,它将以append模式与外部系统交换消息,不支持消费UPDATE/DELETE消息。 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据
  • 语法格式 create table jbdcTable ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) (',' watermark for rowtime_column_name as watermark-strategy_expression) ) with ( 'connector' = 'jdbc', 'url' = '', 'table-name' = '', 'username' = '', 'password' = '' );
  • 示例 示例1:从Kafka中读取数据,并将数据插入ClickHouse中(ClickHouse版本为MRS的21.3.4.25,且MRS集群未开启Kerberos认证): 参考增强型跨源连接,在DLI上根据ClickHouse和Kafka集群所在的虚拟私有云和子网分别创建跨源连接,并绑定所要使用的Flink弹性资源池。 设置ClickHouse和Kafka集群安全组的入向规则,使其对当前将要使用的Flink作业队列网段放通。参考测试地址连通性根据ClickHouse和Kafka的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。 使用ClickHouse客户端连接到ClickHouse服务端,并使用以下命令查询集群标识符cluster等其他环境参数信息。 详细操作请参考从零开始使用ClickHouse。 select cluster,shard_num,replica_num,host_name from system.clusters; 其返回信息如下图: ┌─cluster────┬────┬─shard_num─┐ │ default_cluster │ 1 │ 1 │ │ default_cluster │ 1 │ 2 │ └──────── ┴────┴────── ┘ 根据获取到的集群标识符cluster,例如当前为default_cluster ,使用以下命令在ClickHouse的default_cluster集群节点上创建数据库flink。 CREATE DATABASE flink ON CLUSTER default_cluster; 使用以下命令在default_cluster集群节点上和flink数据库下创建表名为order的ReplicatedMergeTree表。 CREATE TABLE flink.order ON CLUSTER default_cluster(order_id String,order_channel String,order_time String,pay_amount Float64,real_pay Float64,pay_time String,user_id String,user_name String,area_id String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/flink/order', '{replica}')ORDER BY order_id; 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业脚本,并提交运行。该作业脚本将DMS Kafka作为数据源,ClickHouse作业结果表。 如下脚本中的加粗参数请根据实际环境修改。 create table orders ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); create table clickhouseSink( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) with ( 'connector' = 'clickhouse', 'url' = 'jdbc:clickhouse://ClickhouseAddress1:ClickhousePort,ClickhouseAddress2:ClickhousePort/flink', 'username' = 'username', 'password' = 'password', 'table-name' = 'order', 'sink.buffer-flush.max-rows' = '10', 'sink.buffer-flush.interval' = '3s' ); insert into clickhouseSink select * from orders; 连接Kafka集群,向DMS Kafka中插入以下测试数据: {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} 使用ClickHouse客户端连接到ClickHouse,执行以下查询命令,查询写入flink数据库下order表中的数据。 select * from flink.order; 查询结果参考如下: 202103241000000001 webShop 2021-03-24 10:00:00 100 100 2021-03-24 10:02:03 0001 Alice 330106 202103241606060001 appShop 2021-03-24 16:06:06 200 180 2021-03-24 16:10:06 0001 Alice 330106 202103251202020001 miniAppShop 2021-03-25 12:02:02 60 60 2021-03-25 12:03:00 0002 Bob 330110 示例2:从Kafka中读取数据,并将数据插入ClickHouse中(ClickHouse版本为MRS的21.3.4.25,且MRS集群开启Kerberos认证) 参考增强型跨源连接,在DLI上根据ClickHouse和Kafka集群所在的虚拟私有云和子网分别创建跨源连接,并绑定所要使用的Flink弹性资源池。 设置ClickHouse和Kafka集群安全组的入向规则,使其对当前将要使用的Flink作业队列网段放通。参考测试地址连通性根据ClickHouse和Kafka的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。 使用ClickHouse客户端连接到ClickHouse服务端,并使用以下命令查询集群标识符cluster等其他环境参数信息。 参考从零开始使用ClickHouse。 select cluster,shard_num,replica_num,host_name from system.clusters; 其返回信息如下图: ┌─cluster────┬────┬─shard_num─┐ │ default_cluster │ 1 │ 1 │ │ default_cluster │ 1 │ 2 │ └──────── ┴────┴────── ┘ 根据获取到的集群标识符cluster,例如当前为default_cluster ,使用以下命令在ClickHouse的default_cluster集群节点上创建数据库flink。 CREATE DATABASE flink ON CLUSTER default_cluster; 使用以下命令在default_cluster集群节点上和flink数据库下创建表名为order的ReplicatedMergeTree表。 CREATE TABLE flink.order ON CLUSTER default_cluster(order_id String,order_channel String,order_time String,pay_amount Float64,real_pay Float64,pay_time String,user_id String,user_name String,area_id String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/flink/order', '{replica}')ORDER BY order_id; 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业脚本,并提交运行。该作业脚本将Kafka作为数据源,ClickHouse作业结果表。 如下脚本中的加粗参数请根据实际环境修改。 create table orders ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); create table clickhouseSink( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) with ( 'connector' = 'clickhouse', 'url' = 'jdbc:clickhouse://ClickhouseAddress1:ClickhousePort,ClickhouseAddress2:ClickhousePort/flink?ssl=true&sslmode=none', 'table-name' = 'order', 'username' = 'username', 'password' = 'password', --DEW凭据中的key 'sink.buffer-flush.max-rows' = '10', 'sink.buffer-flush.interval' = '3s', 'dew.endpoint'='kms.xx.xx.com', --使用的DEW服务所在的endpoint信息 'dew.csms.secretName'='xx', --DEW服务通用凭据的凭据名称 'dew.csms.decrypt.fields'='password', --password字段值需要利用DEW凭证管理,进行解密替换 'dew.csms.version'='v1' ); insert into clickhouseSink select * from orders; 连接Kafka集群,向Kafka中插入以下测试数据: {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} 使用ClickHouse客户端连接到ClickHouse,执行以下查询命令,查询写入flink数据库下order表中的数据。 select * from flink.order; 查询结果参考如下: 202103241000000001 webShop 2021-03-24 10:00:00 100 100 2021-03-24 10:02:03 0001 Alice 330106 202103241606060001 appShop 2021-03-24 16:06:06 200 180 2021-03-24 16:10:06 0001 Alice 330106 202103251202020001 miniAppShop 2021-03-25 12:02:02 60 60 2021-03-25 12:03:00 0002 Bob 330110
  • 注意事项 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据 创建MRS的ClickHouse集群,集群版本选择MRS 3.1.0及以上版本。 ClickHouse结果表不支持删除表数据操作。 Flink中支持字段类型范围为:string、tinyint、smallint、int、bigint、float、double、date、timestamp、decimal以及Array。 其中Array中的数据类型仅支持int、bigint、string、float、double。
  • 参数说明 当下游消费Hudi过慢,上游写入端会把Hudi文件归档,导致File Not Found问题。设置合理的消费参数避免File Not Found问题。 优化建议: 调大read.tasks。 如果有限流,调大限流参数。 调大上游compaction、archive、clean参数。 表2 参数名称 参数 是否必选 默认值 数据类型 参数说明 connector 是 无 String 读取表类型。需要填写'hudi' path 是 无 String 表存储的路径。如obs://xx/xx table.type 是 COPY_ON_WRITE String Hudi表类型。 MERGE_ON_READ COPY_ON_WRITE hoodie.datasource.write.recordkey.field 是 无 String 表的主键。 write.precombine.field 是 无 String 数据合并字段。 read.tasks 否 4 Integer 读hudi表task并行度。 read.streaming.enabled 是 false Boolean 设置 true 开启流式增量模式,false批量读。建议值为true read.streaming.start-commit 否 默认从最新 commit String Stream和Batch增量消费,指定“yyyyMMddHHmmss”格式时间的开始消费位置(闭区间) hoodie.datasource.write.keygenerator.type 否 COMPLEX Enum 上游表主键生成类型: SIMPLE(默认值) COMPLEX TIMESTAMP CUSTOM NON_PARTITION GLOBAL_DELETE read.streaming.check-interval 否 60 Integer 流读监测上游新提交的周期(秒),流量大时建议使用默认值,默认值:60。 read.end-commit 否 默认到最新 commit String Batch增量消费,通过参数“read.streaming.start-commit”指定起始消费位置,通过参数“read.end-commit”指定结束消费位置,为闭区间,即包含起始、结束的Commit,默认到最新Commit。 read.rate.limit 否 0 Integer 流读Hudi的限流速率,单位为每秒的条数。默认值:0,表示不限速。该值为总限速大小,每个算子的限速大小需除以读算子个数(read.tasks)。 changelog.enabled 否 false Boolean 是否消费所有变更(包含中间变更)。 参数取值如下: true:支持消费所有变更。 false:不消费所有变更,即UPSERT语义,所有的消息仅保证最后一条合并消息,中间的变更可能会被merge掉。 说明 只有MOR表支持,在该模式下Hudi会保留消息的所有变更(I/-U/U/D) 非changelog模式,流读单次的batch数据集会merge中间变更;批读(快照读)会合并所有的中间结果,不管中间状态是否已被写入,都将被忽略。 开启changelog.enabled参数后,异步的压缩任务仍然会将中间变更合并成1条数据,所以如果流读消费不够及时,被压缩后只能读到最后一条记录。但是,可以通过调整压缩的频率,预留一定的时间buffer给 reader,比如调整compaction.delta_commits:5和compaction.delta_seconds: 3600压缩参数。
  • 注意事项 建议Hudi作为Source表时设置限流 Hudi表作为Source表时,为防止数据上限超过流量峰值导致作业出现异常,建议设置限流(read.rate.limit),限流上限应该为业务上线压测的峰值。 及时对Hudi表进行Compaction,防止Hudi source算子checkpoint完成时间过长 当Hudi Source算子checkpoint完成时间长时,检查该Hudi表Compaction是否正常。因为当长时间不做Compaction时list性能会变差。 流读Hudi MOR表时,建议开启log index特性提升Flink流读性能 Hudi的Mor表可以通过log index提升读写性能, Sink和Source表添加属性 'hoodie.log.index.enabled'='true' 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
  • 语法格式 create table hudiSource ( attr_name attr_type (',' attr_name attr_type)* ) with ( 'connector' = 'hudi', 'path' = 'obs://xx', 'table.type' = 'xx', 'hoodie.datasource.write.recordkey.field' = 'xx', 'write.precombine.field' = 'xx', 'read.streaming.enabled' = 'xx' ... );
  • 功能描述 Flink SQL读取Hudi表数据。 Hudi是一种 数据湖 的存储格式,在Hadoop文件系统之上提供了更新数据和删除数据的能力以及消费变化数据的能力。支持多种计算引擎,提供IUD接口,在HDFS的数据集上提供了插入更新和增量拉取的功能。 表1 支持类别 类别 详情 支持Flink表类型 源表、结果表。 支持Hudi表类型 MOR表,COW表。 支持读写类型 批量读,批量写,流式读,流式写。 更多具体使用可参考开源社区文档:Hudi。
  • 常见问题 Flink作业运行失败,作业运行日志中如下报错信息,应该怎么解决? org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata 跨源未绑定或未绑定成功,或是Kafka集群安全组未配置放通DLI队列的网段地址。重新配置跨源,或者Kafka集群安全组放通DLI队列的网段地址。 具体操作请参考增强型跨源连接。 Flink作业运行失败,作业运行日志中如下报错信息,应该怎么解决? Caused by: java.lang.RuntimeException: RealLine:45;Table 'default_catalog.default_database.printSink' declares persistable metadata columns, but the underlying DynamicTableSink doesn't implement the SupportsWritingMetadata interface. If the column should not be persisted, it can be declared with the VIRTUAL keyword. sink表中定义了metadata类型,但是Print connector并不支持把sink表中的matadata去掉即可。
  • 功能描述 Kafka 连接器提供从 Kafka topic 中消费和写入数据的能力。 Apache Kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统,具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。 表1 支持类别 类别 详情 支持表类型 源表、结果表 支持数据格式 CS V JSON Apache Avro Confluent Avro Debezium CDC Canal CDC Maxwell CDC OGG CDC Raw
  • 示例3:将DMS Kafka作为源表,Print作为结果表(适用于Kafka集群已开启SASL_SSL场景) 创建DMS的kafka集群,开启SASL_SSL,并下载SSL证书,将下载的证书client.jks上传到OBS桶中。 其中,properties.sasl.jaas.config字段包含账号密码,使用DEW进行加密。 CREATE TABLE ordersSource ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:9093,KafkaAddress2:9093', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'properties.connector.auth.open' = 'true', 'properties.ssl.truststore.location' = 'obs://xx/client.jks', -- 用户上传证书的位置 'properties.sasl.mechanism' = 'PLAIN', 'properties.security.protocol' = 'SASL_SSL', 'properties.sasl.jaas.config' = 'xx', -- dew凭据管理中的key,其值如:org.apache.kafka.common.security.plain.PlainLoginModule required username=xx password=xx; 'format' = 'json', 'dew.endpoint' = 'kms.xx.com', --使用的DEW服务所在的endpoint信息 'dew.csms.secretName' = 'xx', --DEW服务通用凭据的凭据名称 'dew.csms.decrypt.fields' = 'properties.sasl.jaas.config', --其中properties.sasl.jaas.config字段值,需要利用DEW凭证管理,进行解密替换 'dew.csms.version' = 'v1' ); CREATE TABLE ordersSink ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'print' ); insert into ordersSink select * from ordersSource;
  • 注意事项 更多具体使用可参考开源社区文档:Apache Kafka SQL 连接器。 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据 with参数中字段只能使用单引号,不能使用双引号。 建表时数据类型的使用请参考Format章节。 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 create table kafkaSource( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) (',' WATERMARK FOR rowtime_column_name AS watermark-strategy_expression) ) with ( 'connector' = 'kafka', 'topic' = '', 'properties.bootstrap.servers' = '', 'properties.group.id' = '', 'scan.startup.mode' = '', 'format' = '' );
  • Topic和Partition的探测 topic 和 topic-pattern 配置项决定了 source 消费的 topic 或 topic 的匹配规则。topic 配置项可接受使用分号间隔的 topic 列表,例如 topic-1;topic-2。 topic-pattern 配置项使用正则表达式来探测匹配的 topic。例如 topic-pattern 设置为 test-topic-[0-9],则在作业启动时,所有匹配该正则表达式的 topic(以 test-topic- 开头,以一位数字结尾)都将被 consumer 订阅。 为允许 consumer 在作业启动之后探测到动态创建的 topic,请将 scan.topic-partition-discovery.interval 配置为一个非负值。这将使 consumer 能够探测匹配名称规则的 topic 中新的 partition。 topic列表和topic匹配规则只适用于 source。对于sink端,Flink目前只支持单一topic。
  • 示例2:将json格式DMS Kafka作为源表,输出到Kafka sink中(适用于Kafka集群未开启SASL_SSL场景) 将Kafka作为源表,Kafka作为结果表,从Kafka中读取编码格式为json数据类型的数据,输出到日志文件中。 参考增强型跨源连接,根据Kafka所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。 设置Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性根据Kafka的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业运行脚本,并提交运行。 注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。 CREATE TABLE kafkaSource( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaSourceTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); CREATE TABLE kafkaSink ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaSinkTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'format' = 'json' ); insert into kafkaSink select * from kafkaSource; 向Kafka的源表的topic中发送如下数据: {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} 读取Kafka的结果表的topic,其数据结果参考如下: {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
  • Flink Opensource SQL语法参考简介 表1 Flink Opensource SQL语法参考简介 Flink版本 说明 语法参考 Flink 1.15 Flink 1.15版本在语法设计上实现了更高的兼容性,与主流开源技术标准保持一致。 Flink 1.15版本新增读写Hive、Hudi等Connector。 Flink 1.15版本数据同步迁移场景,优先推荐使用DataArts的数据集成。 Flink 1.15版本支持集成DEW-C SMS 凭证管理,提供隐私保护方案。 Flink 1.15版本支持Flink Jar作业最小化提交。 Flink OpenSource SQL1.15语法概览 Flink 1.12 Flink 1.12新增支持DataGen源表、DWS源表、JDBC源表、MySQL CDC源表、Postgres CDC源表、Redis源表、Upsert Kafka源表、Hbase源表。 Flink 1.12新增支持小文件合并功能。 Flink 1.12新增支持Redis维表、RDS维表。 Flink OpenSource SQL1.12语法概览 Flink 1.10 支持DIS源表结果表,支持DWS源表、结果表和维表,支持REDIS,支持自定义connector,支持TUMBLE窗口周期触发和控制延迟。 Flink OpenSource SQL1.10语法概览
  • 步骤二:添加队列到弹性资源池 在已创建的弹性资源池的“操作”列,单击“添加队列”进入弹性资源池添加的队列的操作界面。 首先配置弹性资源池队列的基本信息,具体参数参考如下。 名称:添加的队列的名称。 类型:根据作业需要选择队列类型。本示例选择为:通用队列。 SQL队列类型:用于运行Spark SQL和HetuEngine作业。 通用队列类型:用于运行Flink和Spark Jar作业。 其他参数请根据需要配置。 图3 添加队列 配置完基本参数后,单击“下一步”,在队列的扩缩容策略配置界面,修改扩缩容策略配置:最小CU:64、最大CU:64。 图4 队列扩缩容策略配置 单击“确定”完成添加队列操作。
  • 背景信息 DLI与内网数据源的网络联通通常指的是华为云内部服务的场景,例如DLI连接MRS、RDS、 CSS 、Kafka、DWS时,需要打通DLI和外部数据源之间的网络。使用DLI提供的增强型跨源连接,采用对等连接的方式打通DLI与目的数据源的VPC网络,实现数据互通。 本节操作介绍适用增强型跨源连接配置DLI与内网数据源的网络联通的操作指导。 创建增强型跨源连接网络不通的问题,可以根据本指导的整体流程和步骤进行排查验证。
  • 前提条件 已创建DLI队列。创建队列详见创建DLI队列操作指导。 队列的计费类型必须为:“包年/包月”,“按需计费”(按需计费需勾选“专属资源模式”。) 仅“包年/包月”资源、“专属资源模式”的“按需计费”资源才能创建增强型跨源链接。 已创建对应的外部数据源集群。具体对接的外部数据源根据业务自行选择。 表1 创建各外部数据源参考 服务名 参考文档链接 RDS 购买RDS for MySQ L实例 DWS 创建DWS集群 DMS Kafka 创建Kafka实例 CSS 创建CSS集群 MRS 创建MRS集群 绑定跨源的DLI队列网段和其他数据源子网网段不能重合。 系统default队列不支持创建跨源连接。
  • 前提条件 已创建DLI的SQL队列。创建DLI队列的操作可以参考创建DLI队列。 创建DLI队列时队列类型需要选择为“SQL队列”。 已创建云数据库RDS的MySQL的数据库实例。具体创建RDS集群的操作可以参考创建RDS MySQL数据库实例。 本示例RDS数据库引擎:MySQL 本示例RDS MySQL数据库版本:5.7。 已创建 CDM 迁移集群。创建CDM集群的操作可以参考创建CDM集群。 如果目标数据源为云下的数据库,则需要通过公网或者专线打通网络。通过公网互通时,需确保CDM集群已绑定EIP、CDM云上安全组出方向放通云下数据源所在的主机、数据源所在的主机可以访问公网且防火墙规则已开放连接端口。 数据源为云上服务RDS、MRS时,网络互通需满足如下条件: i. CDM集群与云上服务处于不同区域的情况下,需要通过公网或者专线打通网络。通过公网互通时,需确保CDM集群已绑定EIP,数据源所在的主机可以访问公网且防火墙规则已开放连接端口。 ii. CDM集群与云上服务同区域情况下,同虚拟私有云、同子网、同安全组的不同实例默认网络互通;如果同虚拟私有云但是子网或安全组不同,还需配置路由规则及安全组规则。 配置路由规则请参见如何配置路由规则章节,配置安全组规则请参见如何配置安全组规则章节。 iii. 此外,您还必须确保该云服务的实例与CDM集群所属的企业项目必须相同,如果不同,需要修改工作空间的企业项目。 本示例CDM集群的虚拟私有云、子网以及安全组和RDS MySQL实例保持一致。
  • 操作流程 使用DLI提交SQL作业查询OBS数据的基本流程如表1所示。 开始进行如下操作前,请务必参考准备工作完成必要操作。 表1 使用DLI提交SQL作业查询OBS数据的操作步骤 操作步骤 说明 步骤1:上传数据至OBS 使用DLI查询数据前,需要将数据文件上传至OBS中。 步骤2:创建弹性资源池并添加队列 创建提交作业所需的计算资源。 步骤3:创建数据库 DLI元数据是SQL作业开发的基础。在执行作业前您需要根据业务场景定义数据库和表。 步骤4:创建表 数据库创建完成后,需要在数据库db1中基于OBS上的样本数据创建表。 步骤5:查询数据 使用标准SQL语句进行数据的查询和分析。
  • 步骤3:创建数据库 在进行数据查询之前还需要创建一个数据库,例如db1。 “default”为内置数据库,不能创建名为“default”的数据库。 在DLI管理控制台,单击左侧导航栏中的“SQL编辑器”,可进入SQL作业“数据库”页面。 在“SQL编辑器”页面右侧的编辑窗口中,输入如下SQL语句,单击“执行”。阅读并同意隐私协议,单击“确定”。 create database db1; 数据库创建成功后,左侧单击“ ”刷新数据库页面,新建的数据库db1会在“数据库”列表中出现。 在DLI管理控制台第一次单击“执行”操作时,需要阅读隐私协议,确认同意后才能执行作业,且后续“执行”操作将不会再提示阅读隐私协议。
共100000条
提示

您即将访问非华为云网站,请注意账号财产安全