数据仓库服务 GaussDB(DWS)-做结果表:使用flink SQL直连DN入库

时间:2025-01-26 10:51:37

使用flink SQL直连DN入库

该能力依赖flink sql DISTRIBUTEBY能力,mrs有提供此能力,具体请参见Flink SQL语法增强

connector提供udf函数可根据分布列值计算出下游并并发结合flink sql DISTRIBUTEBY能力实现将数据按DN分区能力,示例:

  1. 需要在SQL中引入UDF。
    CREATE temporary  FUNCTION dn_hash AS 'com.huaweicloud.dws.connectors.flink.partition.DnHashFunction';
  2. 正常写Source SQL。
    CREATE TABLE users(    id         BIGINT,    name       STRING,    age        INT,    text       STRING,    created_at TIMESTAMP(3),    updated_at TIMESTAMP(3)) WITH (      'connector' = 'datagen',      'fields.id.kind' = 'sequence',      'fields.id.start' = '1',      'fields.id.end' = '1000',      'fields.name.length' = '10',      'fields.age.min' = '18',      'fields.age.max' = '60',      'fields.text.length' = '5'      )
  3. Sink表定义SQL中需要新增一个字段并且要求int类型值用于接收UDF计算的结果,示例中叫dn_hash。
    create table dws_users(    dn_hash int,    id         BIGINT,    name       STRING,    age        INT,    text       STRING,    created_at TIMESTAMP(3),    updated_at TIMESTAMP(3),    PRIMARY KEY (id) NOT ENFORCED) WITH (      'connector' = 'dws',      'url' = '%s',      'tableName' = 'test.users',      'username' = '%s',      'autoFlushBatchSize' = '50000',      'password' = '%s'      )
  4. Insert into sql使用 udf获取数据下游算子信息,同时使用DISTRIBUTEBY对返回结果做数据分区,数据就会按照udf返回信息到下游指定并行度。
    insert into dws_users select /*+ DISTRIBUTEBY('dn_hash') */ dn_hash('test.users',10,1024, id) as dn_hash, * from users
support.huaweicloud.com/tg-dws/dws_07_0184.html