数据湖探索 DLI-JDBC维表:示例

时间:2024-07-01 21:07:20

示例

从Kafka源表中读取数据,将JDBC表作为维表,并将二者生成的表信息写入Kafka结果表中,其具体步骤如下:

  1. 参考增强型跨源连接,在 DLI 上根据MySQL和Kafka所在的虚拟私有云和子网分别创建相应的增强型跨源连接,并绑定所要使用的Flink弹性资源池。
  2. 设置MySQL和Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性分别根据MySQL和Kafka的地址测试队列连通性。若能连通,则表示跨源已经绑定成功,否则表示未成功。
  3. 连接MySQL数据库实例,在flink数据库中创建相应的表,作为维表,表名为area_info,SQL语句如下:
    CREATE TABLE `flink`.`area_info` (
    	`area_id` VARCHAR(32) NOT NULL,
    	`area_province_name` VARCHAR(32) NOT NULL,
    	`area_city_name` VARCHAR(32) NOT NULL,
    	`area_county_name` VARCHAR(32) NOT NULL,
    	`area_street_name` VARCHAR(32) NOT NULL,
    	`region_name` VARCHAR(32) NOT NULL,
    	PRIMARY KEY (`area_id`)
    )	ENGINE = InnoDB
    	DEFAULT CHARACTER SET = utf8mb4
    	COLLATE = utf8mb4_general_ci;
  4. 连接MySQL数据库实例,向JDBC维表area_info中插入测试数据,其语句如下:
    insert into flink.area_info
      (area_id, area_province_name, area_city_name, area_county_name, area_street_name, region_name) 
      values
      ('330102', 'a1', 'b1', 'c1', 'd1', 'e1'),
      ('330106', 'a1', 'b1', 'c2', 'd2', 'e1'),
      ('330108', 'a1', 'b1', 'c3', 'd3', 'e1'),  ('330110', 'a1', 'b1', 'c4', 'd4', 'e1');
  5. 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业运行脚本,提交运行作业。该作业脚本将Kafka为数据源,JDBC作为维表,数据写入到Kafka结果表。
    注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    CREATE TABLE orders (
      order_id string,
      order_channel string,
      order_time string,
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string,
      proctime as Proctime()
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'KafkaSourceTopic',
      'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
      'properties.group.id' = 'jdbc-order',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json'
    );
    
    --创建地址维表
    create table area_info (
        area_id string, 
        area_province_name string,
        area_city_name string,
        area_county_name string, 
        area_street_name string, 
        region_name string 
    ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:mysql://JDBCAddress:JDBCPort/flink',--其中url中的flink表示MySQL中area_info表所在的数据库名
      'table-name' = 'area_info',
      'username' = 'JDBCUserName',
      'password' = 'JDBCPassWord'
    );
    
    --根据地址维表生成详细的包含地址的订单信息宽表
    create table order_detail(
        order_id string,
        order_channel string,
        order_time string,
        pay_amount double,
        real_pay double,
        pay_time string,
        user_id string,
        user_name string,
        area_id string,
        area_province_name string,
        area_city_name string,
        area_county_name string,
        area_street_name string,
        region_name string
    ) with (
      'connector' = 'kafka',
      'topic' = 'KafkaSinkTopic',
      'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
      'format' = 'json'
    );
    
    insert into order_detail
        select orders.order_id, orders.order_channel, orders.order_time, orders.pay_amount, orders.real_pay, orders.pay_time, orders.user_id, orders.user_name,
               area.area_id, area.area_province_name, area.area_city_name, area.area_county_name,
               area.area_street_name, area.region_name  from orders 
               left join area_info for system_time as of orders.proctime as area on orders.area_id = area.area_id;
    
  6. 连接Kafka集群,向Kafka的source topic中插入如下测试数据:
    {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
    
    {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
    
    {"order_id":"202103251505050001", "order_channel":"qqShop", "order_time":"2021-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2021-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"}
  7. 连接Kafka集群,在Kafka的sink topic读取数据,结果参考如下:
    {"order_id":"202103241606060001","order_channel":"appShop","order_time":"2021-03-24 16:06:06","pay_amount":200.0,"real_pay":180.0,"pay_time":"2021-03-24 16:10:06","user_id":"0001","user_name":"Alice","area_id":"330106","area_province_name":"a1","area_city_name":"b1","area_county_name":"c2","area_street_name":"d2","region_name":"e1"}
    
    {"order_id":"202103251202020001","order_channel":"miniAppShop","order_time":"2021-03-25 12:02:02","pay_amount":60.0,"real_pay":60.0,"pay_time":"2021-03-25 12:03:00","user_id":"0002","user_name":"Bob","area_id":"330110","area_province_name":"a1","area_city_name":"b1","area_county_name":"c4","area_street_name":"d4","region_name":"e1"}
    
    {"order_id":"202103251505050001","order_channel":"qqShop","order_time":"2021-03-25 15:05:05","pay_amount":500.0,"real_pay":400.0,"pay_time":"2021-03-25 15:10:00","user_id":"0003","user_name":"Cindy","area_id":"330108","area_province_name":"a1","area_city_name":"b1","area_county_name":"c3","area_street_name":"d3","region_name":"e1"}
support.huaweicloud.com/sqlreference-dli/dli_08_0405.html