数据湖探索 DLI-JDBC结果表:示例

时间:2025-02-12 15:00:46

示例

使用Kafka发送数据,通过JDBC结果表将Kafka数据再输出到MySQL数据库中。

  1. 参考增强型跨源连接,在 DLI 上根据MySQL和Kafka所在的虚拟私有云和子网分别创建相应的增强型跨源连接,并绑定所要使用的Flink弹性资源池。
  2. 设置MySQL和Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性分别根据MySQL和Kafka的地址测试队列连通性。若能连通,则表示跨源已经绑定成功,否则表示未成功。
  3. 登录MySQL,并使用下述命令在flink库下创建orders表。
    CREATE TABLE `flink`.`orders` (`order_id` VARCHAR(32) NOT NULL,`order_channel` VARCHAR(32) NULL,`order_time` VARCHAR(32) NULL,`pay_amount` DOUBLE UNSIGNED NOT NULL,`real_pay` DOUBLE UNSIGNED NULL,`pay_time` VARCHAR(32) NULL,`user_id` VARCHAR(32) NULL,`user_name` VARCHAR(32) NULL,`area_id` VARCHAR(32) NULL,PRIMARY KEY (`order_id`))ENGINE = InnoDBDEFAULT CHARACTER SET = utf8mb4COLLATE = utf8mb4_general_ci;
  4. 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业运行脚本,提交运行作业。
    注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改
    CREATE TABLE kafkaSource (  order_id string,  order_channel string,  order_time string,   pay_amount double,  real_pay double,  pay_time string,  user_id string,  user_name string,  area_id string) WITH (  'connector' = 'kafka',  'topic' = 'KafkaTopic',  'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',  'properties.group.id' = 'GroupId',  'scan.startup.mode' = 'latest-offset',  'format' = 'json');CREATE TABLE jdbcSink (  order_id string,  order_channel string,  order_time string,  pay_amount double,  real_pay double,  pay_time string,  user_id string,  user_name string,  area_id string) WITH (  'connector' = 'jdbc',  'url' = 'jdbc:mysql://MySQLAddress:MySQLPort/flink',--其中url中的flink表示MySQL中orders表所在的数据库名  'table-name' = 'orders',  'username' = 'MySQLUsername',  'password' = 'MySQLPassword',  'sink.buffer-flush.max-rows' = '1');insert into jdbcSink select * from kafkaSource;
  5. 连接Kafka集群,向Kafka相应的topic中发送如下测试数据:
    {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
  6. 查看表中数据,在MySQL中执行sql查询语句。
    select * from orders;
    其结果参考如下(注意,以下数据为从MySQL中复制的结果,并不是MySQL中的数据样式)。
    202103241000000001,webShop,2021-03-24 10:00:00,100.0,100.0,2021-03-24 10:02:03,0001,Alice,330106202103241606060001,appShop,2021-03-24 16:06:06,200.0,180.0,2021-03-24 16:10:06,0001,Alice,330106
support.huaweicloud.com/sqlref-flink-dli/dli_08_0397.html