华为云用户手册

  • 注意事项 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据 集群未启用Kerberos认证(普通模式)。 Doris的表名是区分大小写。 使用cloudTable的doris时,'fenodes'字段值的端口请用8030,如'xx:8030'。同时安全组请放开端口8030, 8040,9030。 开启HTTPS后,需要在创建表的with子句中添加如下配置参数: 'doris.enable.https' = 'true' 'doris.ignore.https.ca' = 'true'
  • 示例 该示例是从Doris源表读取数据,并输入到 print connector。 参考增强型跨源连接,在 DLI 上根据Doris所在的虚拟私有云和子网分别创建相应的增强型跨源连接,并绑定所要使用的Flink弹性资源池。参考“修改主机信息”章节描述,在增强型跨源中增加 MRS 的主机信息。 设置Doris和kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性分别根据Doris和Kafka的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。 参考MRS Doris使用指南,创建doris表,并插入10条数据。创建语句如下: CREATE TABLE IF NOT EXISTS dorisdemo ( `user_id` varchar(10) NOT NULL, `city` varchar(10), `age` int, `gender` int ) DISTRIBUTED BY HASH(`user_id`) BUCKETS 10; INSERT INTO dorisdemo VALUES ('user1', 'city1', 20, 1); INSERT INTO dorisdemo VALUES ('user2', 'city2', 21, 0); INSERT INTO dorisdemo VALUES ('user3', 'city3', 22, 1); INSERT INTO dorisdemo VALUES ('user4', 'city4', 23, 0); INSERT INTO dorisdemo VALUES ('user5', 'city5', 24, 1); INSERT INTO dorisdemo VALUES ('user6', 'city6', 25, 0); INSERT INTO dorisdemo VALUES ('user7', 'city7', 26, 1); INSERT INTO dorisdemo VALUES ('user8', 'city8', 27, 0); INSERT INTO dorisdemo VALUES ('user9', 'city9', 28, 1); INSERT INTO dorisdemo VALUES ('user10', 'city10', 29, 0); 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业脚本,并提交运行。该作业模拟从kafka读取数据,并关联doris维表对数据进行打宽,并输出到print。 CREATE TABLE ordersSource ( user_id string, user_name string, proctime as Proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'kafka-topic', 'properties.bootstrap.servers' = 'kafkaIp:port,kafkaIp:port,kafkaIp:port', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); CREATE TABLE dorisDemo ( `user_id` String NOT NULL, `city` String, `age` int, `gender` int ) with ( 'connector' = 'doris', 'fenodes' = 'FE实例IP地址:端口号', 'table.identifier' = 'demo.dorisdemo', 'username' = 'dorisUsername', 'password' = 'dorisPassword', 'lookup.cache.ttl'='10 m', 'lookup.cache.max-rows' = '100' ); CREATE TABLE print ( user_id string, user_name string, `city` String, `age` int, `gender` int ) WITH ( 'connector' = 'print' ); insert into print select orders.user_id, orders.user_name, dim.city, dim.age, dim.sex from ordersSource orders left join dorisDemo for system_time as of orders.proctime as dim on orders.user_id = dim.user_id; 往kafka数据源写入2条数据。 {"user_id": "user1", "user_name": "name1"} {"user_id": "user2", "user_name": "name2"} 查看print结果表数据。 +I[user1, name1, city1, 20, 1] +I[user2, name2, city2, 21, 0]
  • 语法格式 create table hbaseSource ( attr_name attr_type (',' attr_name attr_type)* ) with ( 'connector' = 'doris', 'fenodes' = 'FE_IP:PORT,FE_IP:PORT,FE_IP:PORT', 'table.identifier' = 'database.table', 'username' = 'dorisUsername', 'password' = 'dorisPassword' );
  • 前提条件 该场景作业需要运行在DLI的独享队列上,因此要与Doris建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《 数据湖探索 用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。 如果使用MRS Doris,请在增强型跨源的主机信息中添加MRS集群所有节点的主机ip信息。 详细操作请参考《 数据湖 探索用户指南》中的“修改主机信息”章节描述。 集群未启用Kerberos认证(普通模式) 使用admin用户连接Doris后,创建具有管理员权限的角色并绑定给用户。
  • 注意事项 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据 集群未启用Kerberos认证(普通模式) Doris的表名是区分大小写。 使用cloudTable的doris时,'fenodes'字段值的端口请用8030,如'xx:8030'。同时安全组请放开端口8030,8040,9030。 开启HTTPS后,需要在创建表的with子句中添加如下配置参数: 'doris.enable.https' = 'true' 'doris.ignore.https.ca' = 'true' 请在Flink“作业编辑”页面选择“运行参数配置”,选择“开启Checkpoint”,否则会导致Doris结果表无法写入数据,且写入Doris的延时取决于设置的Checkpoint的间隔时间。
  • 示例 该示例是从Datagen数据源中生成数据,并将结果写入到Doris结果表中。 参考增强型跨源连接,在DLI上根据Doris所在的虚拟私有云和子网分别创建相应的增强型跨源连接,并绑定所要使用的Flink弹性资源池。参考“修改主机信息”章节描述,在增强型跨源中增加MRS的主机信息。 设置Doris的安全组,添加入向规则使其对Flink的队列网段放通。分别根据Doris的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。 参考测试地址连通性。 参考MRS Doris使用指南,创建doris表,创建语句如下: CREATE TABLE IF NOT EXISTS dorisdemo ( `user_id` varchar(10) NOT NULL, `city` varchar(10), `age` int, `gender` int ) DISTRIBUTED BY HASH(`user_id`) BUCKETS 10 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业脚本,并提交运行。该作业脚本将Datagen作为数据源,将数据写入到Doris作为结果表中。 create table student_datagen_source( `user_id` String NOT NULL, `city` String, `age` int, `gender` int ) with ( 'connector' = 'datagen', 'rows-per-second' = '1', 'fields.user_id.kind' = 'random', 'fields.user_id.length' = '7', 'fields.city.kind' = 'random', 'fields.city.length' = '7' ); CREATE TABLE dorisDemo ( `user_id` String NOT NULL, `city` String, `age` int, `gender` int ) with ( 'connector' = 'doris', 'fenodes' = 'FE_IP:PORT', 'table.identifier' = 'demo.dorisdemo', 'username' = 'dorisUser', 'password' = 'dorisPassword', 'sink.label-prefix' = 'demo', 'sink.enable-2pc' = 'true', 'sink.buffer-count' = '10' ); insert into dorisDemo select * from student_datagen_source 查看doris结果表是否已成功写入数据。 user_id city age gender 50aff04 93406c5 12 1 681a230 1f27d06 16 1 006eff4 3521ded 18 0
  • 语法格式 create table dorisSource ( attr_name attr_type (',' attr_name attr_type)* ) with ( 'connector' = 'doris', 'fenodes' = 'FE_IP:PORT,FE_IP:PORT,FE_IP:PORT', 'table.identifier' = 'database.table', 'username' = 'dorisUsername', 'password' = 'dorisPassword' );
  • 注意事项 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据 集群未启用Kerberos认证(普通模式) Doris的表名是区分大小写。 使用cloudTable的doris时,'fenodes'字段值的端口请用8030,如'xx:8030'。同时安全组请放开端口8030, 8040,9030。 开启HTTPS后,需要在创建表的with子句中添加如下配置参数: 'doris.enable.https' = 'true' 'doris.ignore.https.ca' = 'true'
  • 示例 该示例是从Doris源表读取数据,并输入到 print connector。 参考增强型跨源连接,在DLI上根据Doris所在的虚拟私有云和子网分别创建相应的增强型跨源连接,并绑定所要使用的Flink弹性资源池。参考“修改主机信息”章节描述,在增强型跨源中增加MRS的主机信息。 设置Doris的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性分别根据Doris的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。 创建Doris表,并插入10条数据。创建语句如下: CREATE TABLE IF NOT EXISTS dorisdemo ( `user_id` varchar(10) NOT NULL, `city` varchar(10), `age` int, `gender` int ) DISTRIBUTED BY HASH(`user_id`) BUCKETS 10; INSERT INTO dorisdemo VALUES ('user1', 'city1', 20, 1); INSERT INTO dorisdemo VALUES ('user2', 'city2', 21, 0); INSERT INTO dorisdemo VALUES ('user3', 'city3', 22, 1); INSERT INTO dorisdemo VALUES ('user4', 'city4', 23, 0); INSERT INTO dorisdemo VALUES ('user5', 'city5', 24, 1); INSERT INTO dorisdemo VALUES ('user6', 'city6', 25, 0); INSERT INTO dorisdemo VALUES ('user7', 'city7', 26, 1); INSERT INTO dorisdemo VALUES ('user8', 'city8', 27, 0); INSERT INTO dorisdemo VALUES ('user9', 'city9', 28, 1); INSERT INTO dorisdemo VALUES ('user10', 'city10', 29, 0); 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业脚本,并提交运行。该作业脚本读取Doris表,并打印。 CREATE TABLE dorisDemo ( `user_id` String NOT NULL, `city` String, `age` int, `gender` int ) with ( 'connector' = 'doris', 'fenodes' = 'FE_IP:PORT,FE_IP:PORT,FE_IP:PORT', 'table.identifier' = 'demo.dorisdemo', 'username' = 'dorisUser', 'password' = 'dorisPassword', 'doris.request.retries'='3', 'doris.batch.size' = '100' ); CREATE TABLE print ( `user_id` String NOT NULL, `city` String, `age` int, `gender` int ) with ( 'connector' = 'print' ); insert into print select * from dorisDemo; 查看print结果表数据。 +I[user5, city5, 24, 1] +I[user4, city4, 23, 0] +I[user3, city3, 22, 1] +I[user10, city10, 29, 0] +I[user6, city6, 25, 0] +I[user1, city1, 20, 1] +I[user9, city9, 28, 1] +I[user7, city7, 26, 1] +I[user8, city8, 27, 0] +I[user2, city2, 21, 0]
  • 前提条件 该场景作业需要运行在DLI的独享队列上,因此要与Doris建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。 如果使用MRS Doris,请在增强型跨源的主机信息中添加MRS集群所有节点的主机ip信息。 详细操作请参考《数据湖探索用户指南》中的“修改主机信息”章节描述。 集群未启用Kerberos认证(普通模式) 使用admin用户连接Doris后,创建具有管理员权限的角色并绑定给用户。
  • 语法格式 create table dorisSource ( attr_name attr_type (',' attr_name attr_type)* ) with ( 'connector' = 'doris', 'fenodes' = 'FE_IP:PORT,FE_IP:PORT,FE_IP:PORT', 'table.identifier' = 'database.table', 'username' = 'dorisUsername', 'password' = 'dorisPassword' );
  • 注意事项 创建DataGen表时,表字段类型不支持Array,Map和Row复杂类型,可以通过CREATE TABLE语句中的“COMPUTED COLUMN”来进行类似功能构造。 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据
  • 参数说明 表2 参数说明 参数 是否必选 默认值 数据类型 参数说明 connector 是 无 String 指定要使用的连接器,这里是'datagen'。 rows-per-second 否 10000 Long 每秒生成的行数,用以控制数据发出速率。 number-of-rows 否 无 Long 生成数据的总行数。默认条件下,不限制生成数据的总行数。如果有字段生成器类型为序列生成器,则当生成数据的行数达到上限或者序列数字达到结束值时,都不会再生成数据。 fields.#.kind 否 random String 指定 '#' 字段的生成器。 '#' 字段必须是DataGen表中的字段,实际使用时需要将'#'替换为相应字段名。其他各参数的'#'号意义相同,不再重复描述。 参数值可以是 'sequence' 或 'random',具体含义如下: random是默认值,表示无界的随机生成器。您可以通过“fields.#.max”和“fields.#.min”参数指定随机生成数的最大和最小值。当指定的字段类型为char、varchar、string时,可以通过“fields.#.length”参数指定长度。当指定的字段类型为时间戳类型时,可以通过“fields.#.max-past”参数指定相对当前时间向过去偏移的最大值。 sequence表示有界的序列生成器。您可以通过“fields.#.start”和“fields.#.end”指定序列的起始和结束值,当序列数字达到结束值时,就不会再生成数据。 fields.#.min 否 '#'号指定的字段类型的最小值 '#'号指定的字段类型 当“fields.#.kind”字段为:random时有效。 表示随机生成器的最小值,'#' 指定的字段仅适用于数字类型。 fields.#.max 否 '#'号指定的字段类型的最大值 '#'号指定的字段类型 当“fields.#.kind”字段为:random时有效。 随机生成数的最大值,'#' 指定的字段仅适用于数字类型。 fields.#.max-past 否 0 Duration 当“fields.#.kind”字段为:random时有效。 随机生成器生成相对当前时间向过去偏移的最大值,'#' 指定的字段仅适用于时间戳类型。 fields.#.length 否 100 Integer 当“fields.#.kind”字段为:random时有效。 随机生成器生成字符的长度,'#' 指定的字段仅适用于char、varchar、string。 fields.#.start 否 无 '#'号指定的字段类型 当“fields.#.kind”字段为:sequence时有效。 序列生成器的起始值。 fields.#.end 否 无 '#'号指定的字段类型 当“fields.#.kind”字段为:sequence时有效。 序列生成器的结束值。
  • 示例 示例1:从Kafka中读取数据,并将数据插入ClickHouse中(ClickHouse版本为MRS的21.3.4.25,且MRS集群未开启Kerberos认证): 参考增强型跨源连接,在DLI上根据ClickHouse和Kafka集群所在的虚拟私有云和子网分别创建跨源连接,并绑定所要使用的Flink弹性资源池。 设置ClickHouse和Kafka集群安全组的入向规则,使其对当前将要使用的Flink作业队列网段放通。参考测试地址连通性根据ClickHouse和Kafka的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。 使用ClickHouse客户端连接到ClickHouse服务端,并使用以下命令查询集群标识符cluster等其他环境参数信息。 详细操作请参考从零开始使用ClickHouse。 select cluster,shard_num,replica_num,host_name from system.clusters; 其返回信息如下图: ┌─cluster────┬────┬─shard_num─┐ │ default_cluster │ 1 │ 1 │ │ default_cluster │ 1 │ 2 │ └──────── ┴────┴────── ┘ 根据获取到的集群标识符cluster,例如当前为default_cluster ,使用以下命令在ClickHouse的default_cluster集群节点上创建数据库flink。 CREATE DATABASE flink ON CLUSTER default_cluster; 使用以下命令在default_cluster集群节点上和flink数据库下创建表名为order的ReplicatedMergeTree表。 CREATE TABLE flink.order ON CLUSTER default_cluster(order_id String,order_channel String,order_time String,pay_amount Float64,real_pay Float64,pay_time String,user_id String,user_name String,area_id String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/flink/order', '{replica}')ORDER BY order_id; 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业脚本,并提交运行。该作业脚本将DMS Kafka作为数据源,ClickHouse作业结果表。 如下脚本中的加粗参数请根据实际环境修改。 create table orders ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); create table clickhouseSink( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) with ( 'connector' = 'clickhouse', 'url' = 'jdbc:clickhouse://ClickhouseAddress1:ClickhousePort,ClickhouseAddress2:ClickhousePort/flink', 'username' = 'username', 'password' = 'password', 'table-name' = 'order', 'sink.buffer-flush.max-rows' = '10', 'sink.buffer-flush.interval' = '3s' ); insert into clickhouseSink select * from orders; 连接Kafka集群,向DMS Kafka中插入以下测试数据: {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} 使用ClickHouse客户端连接到ClickHouse,执行以下查询命令,查询写入flink数据库下order表中的数据。 select * from flink.order; 查询结果参考如下: 202103241000000001 webShop 2021-03-24 10:00:00 100 100 2021-03-24 10:02:03 0001 Alice 330106 202103241606060001 appShop 2021-03-24 16:06:06 200 180 2021-03-24 16:10:06 0001 Alice 330106 202103251202020001 miniAppShop 2021-03-25 12:02:02 60 60 2021-03-25 12:03:00 0002 Bob 330110 示例2:从Kafka中读取数据,并将数据插入ClickHouse中(ClickHouse版本为MRS的21.3.4.25,且MRS集群开启Kerberos认证) 参考增强型跨源连接,在DLI上根据ClickHouse和Kafka集群所在的虚拟私有云和子网分别创建跨源连接,并绑定所要使用的Flink弹性资源池。 设置ClickHouse和Kafka集群安全组的入向规则,使其对当前将要使用的Flink作业队列网段放通。参考测试地址连通性根据ClickHouse和Kafka的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。 使用ClickHouse客户端连接到ClickHouse服务端,并使用以下命令查询集群标识符cluster等其他环境参数信息。 参考从零开始使用ClickHouse。 select cluster,shard_num,replica_num,host_name from system.clusters; 其返回信息如下图: ┌─cluster────┬────┬─shard_num─┐ │ default_cluster │ 1 │ 1 │ │ default_cluster │ 1 │ 2 │ └──────── ┴────┴────── ┘ 根据获取到的集群标识符cluster,例如当前为default_cluster ,使用以下命令在ClickHouse的default_cluster集群节点上创建数据库flink。 CREATE DATABASE flink ON CLUSTER default_cluster; 使用以下命令在default_cluster集群节点上和flink数据库下创建表名为order的ReplicatedMergeTree表。 CREATE TABLE flink.order ON CLUSTER default_cluster(order_id String,order_channel String,order_time String,pay_amount Float64,real_pay Float64,pay_time String,user_id String,user_name String,area_id String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/flink/order', '{replica}')ORDER BY order_id; 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业脚本,并提交运行。该作业脚本将Kafka作为数据源,ClickHouse作业结果表。 如下脚本中的加粗参数请根据实际环境修改。 create table orders ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); create table clickhouseSink( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) with ( 'connector' = 'clickhouse', 'url' = 'jdbc:clickhouse://ClickhouseAddress1:ClickhousePort,ClickhouseAddress2:ClickhousePort/flink?ssl=true&sslmode=none', 'table-name' = 'order', 'username' = 'username', 'password' = 'password', --DEW凭据中的key 'sink.buffer-flush.max-rows' = '10', 'sink.buffer-flush.interval' = '3s', 'dew.endpoint'='kms.xx.myhuaweicloud.com', --使用的DEW服务所在的endpoint信息 'dew.csms.secretName'='xx', --DEW服务通用凭据的凭据名称 'dew.csms.decrypt.fields'='password', --password字段值需要利用DEW凭证管理,进行解密替换 'dew.csms.version'='v1' ); insert into clickhouseSink select * from orders; 连接Kafka集群,向Kafka中插入以下测试数据: {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} 使用ClickHouse客户端连接到ClickHouse,执行以下查询命令,查询写入flink数据库下order表中的数据。 select * from flink.order; 查询结果参考如下: 202103241000000001 webShop 2021-03-24 10:00:00 100 100 2021-03-24 10:02:03 0001 Alice 330106 202103241606060001 appShop 2021-03-24 16:06:06 200 180 2021-03-24 16:10:06 0001 Alice 330106 202103251202020001 miniAppShop 2021-03-25 12:02:02 60 60 2021-03-25 12:03:00 0002 Bob 330110
  • 注意事项 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据 创建MRS的ClickHouse集群,集群版本选择MRS 3.1.0及以上版本。 ClickHouse结果表不支持删除表数据操作。 Flink中支持字段类型范围为:string、tinyint、smallint、int、bigint、float、double、date、timestamp、decimal以及Array。 其中Array中的数据类型仅支持int、bigint、string、float、double。
  • 示例 通过DataGen源表产生数据,BlackHole结果表接收传来的数据。 create table datagenSource ( user_id string, user_name string, user_age int ) with ( 'connector' = 'datagen', 'rows-per-second'='1' ); create table blackholeSink ( user_id string, user_name string, user_age int ) with ( 'connector' = 'blackhole' ); insert into blackholeSink select * from datagenSource;
  • 功能描述 BlackHole Connector允许接收所有输入记录,常用于高性能测试和UDF输出,其不是实质性Sink。Blackhole结果表是系统内置的Connector。 例如,如果您在注册其他类型的Connector结果表时报错,但您不确定是系统问题还是结果表WITH参数错误,您可以将WITH参数修改为'connector' = 'blackhole'后,单击运行。如果不再报错,则证明系统没有问题,您需要排查确认修改WITH参数是否正确。 表1 支持类别 类别 详情 支持表类型 结果表
  • 注意事项 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据
  • Connector支持列表 表1 Connector支持列表 Connector 源表 维表 结果表 BlackHole 不支持 不支持 支持 ClickHouse 不支持 不支持 支持 DataGen 支持 不支持 不支持 Doris 支持 支持 支持 DWS 支持 支持 支持 Elasticsearch 不支持 不支持 支持 FileSystem 支持 不支持 支持 Hbase 支持 支持 支持 Hive 支持 支持 支持 JDBC 支持 支持 支持 Kafka 支持 不支持 支持 Print 不支持 不支持 支持 Redis 支持 支持 支持 Upsert Kafka 支持 不支持 支持
  • 表类型 源表:源表是Flink作业的数据输入表,例如Kafka等实时流数据输入。 维表:数据源表的辅助表,用于丰富和扩展源表的数据。在Flink作业中,因为数据采集端采集到的数据往往比较有限,在做数据分析之前,就要先将所需的维度信息补全,而维表就是代表存储数据维度信息的数据源。常见的用户维表有 MySQL,Redis等。 结果表:Flink作业输出的结果数据表,将每条实时处理完的数据写入的目标存储,如 MySQL,HBase 等数据库。
  • 数据类型映射 下表详细说明了这种格式支持的 SQL 类型,包括用于编码和解码的序列化类和反序列化类的详细信息。 表2 数据类型映射 Flink SQL 类型 值 CHAR / VARCHAR / STRING UTF-8(默认)编码的文本字符串。编码字符集可以通过 'raw.charset' 进行配置。 BINARY / VARBINARY / BYTES 字节序列本身。 BOOLEAN 表示布尔值的单个字节,0表示 false, 1 表示 true。 TINYINT 有符号数字值的单个字节。 SMALLINT 采用big-endian(默认)编码的两个字节。字节序可以通过 'raw.endianness' 配置。 INT 采用 big-endian (默认)编码的四个字节。字节序可以通过 'raw.endianness' 配置。 BIGINT 采用 big-endian (默认)编码的八个字节。字节序可以通过 'raw.endianness' 配置。 FLOAT 采用 IEEE 754 格式和 big-endian (默认)编码的四个字节。字节序可以通过 'raw.endianness' 配置。 DOUBLE 采用 IEEE 754 格式和 big-endian (默认)编码的八个字节。字节序可以通过 'raw.endianness' 配置。 RAW 通过 RAW 类型的底层 TypeSerializer 序列化的字节序列。
  • 参数说明 表1 参数 是否必选 默认值 类型 描述 format 是 (none) String 指定要使用的格式, 这里应该是 'raw'。 raw.charset 否 UTF-8 String 指定字符集来编码文本字符串。 raw.endianness 否 big-endian String 指定字节序来编码数字值的字节。有效值为'big-endian'和'little-endian'。 更多细节可查阅 字节序。
  • 功能描述 Raw format 允许读写原始(基于字节)值作为单个列。 Raw Format将 null 值编码成 byte[] 类型的 null。这样在 upsert-kafka 中使用时可能会有限制,因为 upsert-kafka 将 null 值视为 墓碑消息(在键上删除)。因此,如果该字段可能具有 null 值,我们建议避免使用 upsert-kafka 连接器和 raw format 作为 value.format。 Raw format 连接器是内置的。更多具体使用可参考开源社区文档:Raw Format。
  • 数据类型映射 目前,Parquet 格式类型映射与 Apache Hive 兼容,但与 Apache Spark 有所不同: Timestamp:不论精度,映射 timestamp 类型至 int96。 Decimal:根据精度,映射 decimal 类型至固定长度字节的数组。 下表列举了 Flink 中的数据类型与 JSON 中的数据类型的映射关系。 注意:复合数据类型暂只支持写不支持读(Array、Map 与 Row)。 表2 数据类型映射 Flink数据类型 Parquet类型 Parquet逻辑类型 CHAR / VARCHAR / STRING BINARY UTF8 BOOLEAN BOOLEAN - BINARY / VARBINARY BINARY - DECIMAL FIXED_LEN_BYTE_ARRAY DECIMAL TINYINT INT32 INT_8 SMALLINT INT32 INT_16 INT INT32 - BIGINT INT64 - FLOAT FLOAT - DOUBLE DOUBLE - DATE INT32 DATE TIME INT32 TIME_MILLIS TIMESTAMP INT96 - ARRAY - LIST MAP - MAP ROW - STRUCT
  • 数据类型映射 Orc 格式类型的映射和 Apache Hive 是兼容的。下面的表格列出了 Flink 类型的数据和 Orc 类型的数据的映射关系。 表2 数据类型映射 Flink数据类型 Orc物理类型 Orc逻辑类型 CHAR bytes CHAR VARCHAR bytes VARCHAR STRING bytes STRING BOOLEAN long BOOLEAN BYTES bytes BINARY DECIMAL decimal DECIMAL TINYINT long BYTE SMALLINT long SHORT INT long INT BIGINT long LONG FLOAT double FLOAT DOUBLE double DOUBLE DATE long DATE TIMESTAMP timestamp TIMESTAMP ARRAY - LIST MAP - MAP ROW - STRUCT
  • 功能描述 Oracle GoldenGate (a.k.a ogg) 是一个实现异构 IT 环境间数据实时数据集成和复制的综合软件包。 该产品集支持高可用性解决方案、实时数据集成、事务更改数据捕获、运营和分析企业系统之间的数据复制、转换和验证。Ogg 为变更日志提供了统一的格式结构,并支持使用 JSON 序列化消息。 Flink 支持将 Ogg JSON 消息解析为 INSERT/UPDATE/DELETE 消息到 Flink SQL 系统中。在很多情况下,利用这个特性非常有用,例如 将增量数据从数据库同步到其他系统 日志审计 数据库的实时物化视图 关联维度数据库的变更历史,等等 Flink 还支持将 Flink SQL 中的 INSERT/UPDATE/DELETE 消息编码为 Ogg JSON 格式的消息, 输出到 Kafka 等存储中。 但需要注意, 目前 Flink 还不支持将 UPDATE_BEFORE 和 UPDATE_AFTER 合并为一条 UPDATE 消息. 因此, Flink 将 UPDATE_BEFORE 和 UPDATE_AFTER 分别编码为 DELETE 和 INSERT 类型的 Ogg 消息。
  • 参数说明 表1 参数说明 参数 是否必须 默认值 类型 描述 format 是 (none) String 指定要使用的格式,此处应为 'ogg-json'。 ogg-json.ignore-parse-errors 否 false Boolean 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。 debezium-json.timestamp-format.standard 否 'SQL' String 声明输入和输出的时间戳格式。当前支持的格式为'SQL' 以及 'ISO-8601': 可选参数 'SQL' 将会以 "yyyy-MM-dd HH:mm:ss.s{precision}" 的格式解析时间戳, 例如 '2020-12-30 12:13:14.123',且会以相同的格式输出。 可选参数 'ISO-8601' 将会以 "yyyy-MM-ddTHH:mm:ss.s{precision}" 的格式解析输入时间戳, 例如 '2020-12-30T12:13:14.123' ,且会以相同的格式输出。 ogg-json.map-null-key.mode 否 'FAIL' String 指定处理 Map 中 key 值为空的方法. 当前支持的值有 'FAIL', 'DROP' 和 'LITERAL': Option 'FAIL' 将抛出异常。 Option 'DROP' 将丢弃 Map 中 key 值为空的数据项。 Option 'LITERAL' 将使用字符串常量来替换 Map 中的空 key 值。字符串常量的值由 ogg-json.map-null-key.literal 定义。 ogg-json.map-null-key.literal 否 'null' String 当 'ogg-json.map-null-key.mode' 是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。
  • 参数说明 表1 参数说明 参数 是否必选 默认值 类型 说明 format 是 (none) String 指定使用格式,此处使用'maxwell-json'。 maxwell-json.ignore-parse-errors 否 false Boolean 跳过解析错误而不是失败的字段和行。出现错误时,字段设置为空。 maxwell-json.timestamp-format.standard 否 'SQL' String 指定输入和输出时间戳格式。当前支持的值为“SQL”和“ISO-8601”: 'SQL'将以“yyyy-MM-dd HH:mm:ss.s{precision}”格式解析输入时间戳,例如'2020-12-30 12:13:14.123' 并以相同格式输出时间戳。 'ISO-8601'将以“yyyy-MM-ddTHH:mm:ss.s{precision}”格式解析输入时间戳,例如'2020-12-30T12:13:14.123' 并以相同格式输出时间戳。 maxwell-json.map-null-key.mode 否 'FAIL' String 指定序列化map数据的null键时的处理模式。当前支持的值为“FAIL”、“DROP”和“LITERAL”: 'FAIL'将在遇到带有null键的map时抛出异常。 'DROP'将删除map数据的null键条目。 'LITERAL'将使用字符串代替null键。字符串由 maxwell-json.map-null-key.literal 选项定义。 maxwell-json.map-null-key.literal 否 'null' String 当 'maxwell-json.map-null-key.mode' 为 LITERAL 时,指定字符串以替换null键。 maxwell-json.encode.decimal-as-plain-number 否 false Boolean 将所有小数编码为普通数字,而不是可能的科学计数法。默认情况下,小数可以使用科学计数法书写。例如,0.000000027在默认情况下被编码为2.7E-8,如果将此选项设置为true,则将被写入为0.000000027。
  • 功能描述 Maxwell是一个CDC(Changelog Data Capture)工具,可以将MySql中的更改实时流式写入到Kafka等流式connector。Maxwell为changelog提供了统一的格式,而且支持使用JSON对消息进行序列化。 Flink 支持将 Maxwell JSON 消息解释为 INSERT/UPDATE/DELETE 消息到 Flink SQL 系统中。在许多情况下,这对于利用此功能很有用。
  • 注意事项 Maxwell应用允许将每个变动消息精确地传递一次。在这种情况下,Flink在消费Maxwell生成的消息时处理得很好。如果Maxwell应用程序在at-least-once模式处理,它可能向Kafka写入重复的改动消息,Flink将获得重复的消息。这可能会导致Flink查询得到错误的结果或意外的异常。因此,在这种情况下,建议将作业配置table.exec.source.cdc-events-duplicate设置为true,并在源表上定义PRIMARY KEY。Framework将生成一个额外的有状态操作符,并使用主键对变更事件进行去重,并生成一个规范化的changelog流。
共100000条