数据湖探索 DLI-ClickHouse:示例

时间:2024-11-16 13:21:44

示例

  • 示例1:从Kafka中读取数据,并将数据插入ClickHouse中(ClickHouse版本为 MRS 的21.3.4.25,且MRS集群未开启Kerberos认证):
    1. 参考增强型跨源连接,在 DLI 上根据ClickHouse和Kafka集群所在的虚拟私有云和子网分别创建跨源连接,并绑定所要使用的Flink弹性资源池。
    2. 设置ClickHouse和Kafka集群安全组的入向规则,使其对当前将要使用的Flink作业队列网段放通。参考测试地址连通性根据ClickHouse和Kafka的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。
    3. 使用ClickHouse客户端连接到ClickHouse服务端,并使用以下命令查询集群标识符cluster等其他环境参数信息。

      详细操作请参考从零开始使用ClickHouse

      select cluster,shard_num,replica_num,host_name from system.clusters;
      其返回信息如下图:
      ┌─cluster────┬────┬─shard_num─┐
      │ default_cluster │    1   │           1 │
      │ default_cluster │    1   │           2 │
      └──────── ┴────┴────── ┘

      根据获取到的集群标识符cluster,例如当前为default_cluster ,使用以下命令在ClickHouse的default_cluster集群节点上创建数据库flink。

      CREATE DATABASE flink ON CLUSTER default_cluster;
    4. 使用以下命令在default_cluster集群节点上和flink数据库下创建表名为order的ReplicatedMergeTree表。
      CREATE TABLE flink.order ON CLUSTER default_cluster(order_id String,order_channel String,order_time String,pay_amount Float64,real_pay Float64,pay_time String,user_id String,user_name String,area_id String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/flink/order', '{replica}')ORDER BY order_id;
    5. 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业脚本,并提交运行。该作业脚本将DMS Kafka作为数据源,ClickHouse作业结果表。

      如下脚本中的加粗参数请根据实际环境修改。

      create table orders (
        order_id string,
        order_channel string,
        order_time string,
        pay_amount double,
        real_pay double,
        pay_time string,
        user_id string,
        user_name string,
        area_id string
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'KafkaTopic',
        'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
        'properties.group.id' = 'GroupId',
        'scan.startup.mode' = 'latest-offset',
        'format' = 'json'
      );
      
      create table clickhouseSink(
        order_id string,
        order_channel string,
        order_time string,
        pay_amount double,
        real_pay double,
        pay_time string,
        user_id string,
        user_name string,
        area_id string
      ) with (
        'connector' = 'clickhouse',
        'url' = 'jdbc:clickhouse://ClickhouseAddress1:ClickhousePort,ClickhouseAddress2:ClickhousePort/flink',
        'username' = 'username',
        'password' = 'password',
        'table-name' = 'order',
        'sink.buffer-flush.max-rows' = '10',
        'sink.buffer-flush.interval' = '3s'
      );
      
      insert into clickhouseSink select * from orders;
    6. 连接Kafka集群,向DMS Kafka中插入以下测试数据:
      {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
      
      {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
      
      {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
    7. 使用ClickHouse客户端连接到ClickHouse,执行以下查询命令,查询写入flink数据库下order表中的数据。
      select * from flink.order;

      查询结果参考如下:

      202103241000000001 webShop 2021-03-24 10:00:00 100 100 2021-03-24 10:02:03 0001 Alice 330106
      
      202103241606060001 appShop 2021-03-24 16:06:06 200 180 2021-03-24 16:10:06 0001 Alice 330106 
      
      202103251202020001 miniAppShop 2021-03-25 12:02:02 60 60 2021-03-25 12:03:00 0002 Bob 330110 
  • 示例2:从Kafka中读取数据,并将数据插入ClickHouse中(ClickHouse版本为MRS的21.3.4.25,且MRS集群开启Kerberos认证)
    1. 参考增强型跨源连接,在DLI上根据ClickHouse和Kafka集群所在的虚拟私有云和子网分别创建跨源连接,并绑定所要使用的Flink弹性资源池。
    2. 设置ClickHouse和Kafka集群安全组的入向规则,使其对当前将要使用的Flink作业队列网段放通。参考测试地址连通性根据ClickHouse和Kafka的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。
    3. 使用ClickHouse客户端连接到ClickHouse服务端,并使用以下命令查询集群标识符cluster等其他环境参数信息。

      参考从零开始使用ClickHouse

      select cluster,shard_num,replica_num,host_name from system.clusters;
      其返回信息如下图:
      ┌─cluster────┬────┬─shard_num─┐
      │ default_cluster │    1   │           1 │
      │ default_cluster │    1   │           2 │
      └──────── ┴────┴────── ┘

      根据获取到的集群标识符cluster,例如当前为default_cluster ,使用以下命令在ClickHouse的default_cluster集群节点上创建数据库flink。

      CREATE DATABASE flink ON CLUSTER default_cluster;
    4. 使用以下命令在default_cluster集群节点上和flink数据库下创建表名为order的ReplicatedMergeTree表。
      CREATE TABLE flink.order ON CLUSTER default_cluster(order_id String,order_channel String,order_time String,pay_amount Float64,real_pay Float64,pay_time String,user_id String,user_name String,area_id String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/flink/order', '{replica}')ORDER BY order_id;
    5. 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业脚本,并提交运行。该作业脚本将Kafka作为数据源,ClickHouse作业结果表。

      如下脚本中的加粗参数请根据实际环境修改。

      create table orders (
        order_id string,
        order_channel string,
        order_time string,
        pay_amount double,
        real_pay double,
        pay_time string,
        user_id string,
        user_name string,
        area_id string
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'KafkaTopic',
        'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
        'properties.group.id' = 'GroupId',
        'scan.startup.mode' = 'latest-offset',
        'format' = 'json'
      );
      
      create table clickhouseSink(
        order_id string,
        order_channel string,
        order_time string,
        pay_amount double,
        real_pay double,
        pay_time string,
        user_id string,
        user_name string,
        area_id string
      ) with (
        'connector' = 'clickhouse',
        'url' = 'jdbc:clickhouse://ClickhouseAddress1:ClickhousePort,ClickhouseAddress2:ClickhousePort/flink?ssl=true&sslmode=none',
        'table-name' = 'order',
        'username' = 'username',
        'password' = 'password', --DEW凭据中的key
        'sink.buffer-flush.max-rows' = '10',
        'sink.buffer-flush.interval' = '3s',
        'dew.endpoint'='kms.xx.myhuaweicloud.com', --使用的DEW服务所在的endpoint信息
        'dew.csms.secretName'='xx', --DEW服务通用凭据的凭据名称
        'dew.csms.decrypt.fields'='password', --password字段值需要利用DEW凭证管理,进行解密替换
        'dew.csms.version'='v1'
      );
      
      insert into clickhouseSink select * from orders;
    6. 连接Kafka集群,向Kafka中插入以下测试数据:
      {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
      
      {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
      
      {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
    7. 使用ClickHouse客户端连接到ClickHouse,执行以下查询命令,查询写入flink数据库下order表中的数据。
      select * from flink.order;

      查询结果参考如下:

      202103241000000001 webShop 2021-03-24 10:00:00 100 100 2021-03-24 10:02:03 0001 Alice 330106
      
      202103241606060001 appShop 2021-03-24 16:06:06 200 180 2021-03-24 16:10:06 0001 Alice 330106 
      
      202103251202020001 miniAppShop 2021-03-25 12:02:02 60 60 2021-03-25 12:03:00 0002 Bob 330110 
support.huaweicloud.com/sqlref-flink-dli/dli_08_15030.html