数据仓库服务 GaussDB(DWS)-做结果表:示例

时间:2025-01-26 10:51:37

示例

该示例是从kafka数据源中读取数据,写入DWS结果表中,并指定攒批时间不超过10秒,每批数据最大30000条,其具体步骤如下:

  1. GaussDB (DWS)数据库中创建表public.dws_order
     1 2 3 4 5 6 7 8 91011
    create table public.dws_order(  order_id VARCHAR,  order_channel VARCHAR,  order_time VARCHAR,  pay_amount FLOAT8,  real_pay FLOAT8,  pay_time VARCHAR,  user_id VARCHAR,  user_name VARCHAR,  area_id VARCHAR  );
  2. 消费Kafka中order_test topic中的数据作为数据源,public.dws_order作为结果表,Kafka数据为JSON格式,并且字段名称和数据库字段名称一一对应:
     1 2 3 4 5 6 7 8 910111213141516171819202122232425262728293031323334353637383940
    CREATE TABLE kafkaSource (  order_id string,  order_channel string,  order_time string,  pay_amount double,  real_pay double,  pay_time string,  user_id string,  user_name string,  area_id string) WITH (  'connector' = 'kafka',  'topic' = 'order_test',  'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',  'properties.group.id' = 'GroupId',  'scan.startup.mode' = 'latest-offset',  'format' = 'json');CREATE TABLE dwsSink (  order_id string,  order_channel string,  order_time string,  pay_amount double,  real_pay double,  pay_time string,  user_id string,  user_name string,  area_id string) WITH (  'connector' = 'dws',  'url' = 'jdbc:gaussdb://DWSAddress:DWSPort/DWSdbName',  'tableName' = 'dws_order',  'username' = 'DWSUserName',  'password' = 'DWSPassword',  'autoFlushMaxInterval' = '10s',  'autoFlushBatchSize' = '30000');insert into dwsSink select * from kafkaSource;
  3. 给Kafka写入测试数据:
    1
    {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
  4. 等10秒后在GaussDB(DWS)表中查询结果:
    1
     select * from dws_order

    结果如下:

support.huaweicloud.com/tg-dws/dws_07_0184.html