数据仓库服务 GAUSSDB(DWS)-做结果表:示例

时间:2024-10-26 11:41:17

示例

该示例是从kafka数据源中读取数据,写入DWS结果表中,并指定攒批时间不超过10秒,每批数据最大30000条,其具体步骤如下:

  1. GaussDB (DWS)数据库中创建表public.dws_order
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    create table public.dws_order(
      order_id VARCHAR,
      order_channel VARCHAR,
      order_time VARCHAR,
      pay_amount FLOAT8,
      real_pay FLOAT8,
      pay_time VARCHAR,
      user_id VARCHAR,
      user_name VARCHAR,
      area_id VARCHAR
      );
    
  2. 消费Kafka中order_test topic中的数据作为数据源,public.dws_order作为结果表,Kafka数据为JSON格式,并且字段名称和数据库字段名称一一对应:
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    CREATE TABLE kafkaSource (
      order_id string,
      order_channel string,
      order_time string,
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'order_test',
      'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
      'properties.group.id' = 'GroupId',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json'
    );
    
    CREATE TABLE dwsSink (
      order_id string,
      order_channel string,
      order_time string,
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) WITH (
      'connector' = 'dws',
      'url' = 'jdbc:gaussdb://DWSAddress:DWSPort/DWSdbName',
      'tableName' = 'dws_order',
      'username' = 'DWSUserName',
      'password' = 'DWSPassword',
      'autoFlushMaxInterval' = '10s',
      'autoFlushBatchSize' = '30000'
    );
    
    insert into dwsSink select * from kafkaSource;
    
  3. 给Kafka写入测试数据:
    1
    {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
    
  4. 等10秒后在GaussDB(DWS)表中查询结果:
    1
     select * from dws_order
    

    结果如下:

support.huaweicloud.com/tg-dws/dws_07_0184.html