数据湖探索 DLI-Doris源表:示例

时间:2024-11-16 13:21:44

示例

该示例是从Doris源表读取数据,并输入到 print connector。

  1. 参考增强型跨源连接,在 DLI 上根据Doris所在的虚拟私有云和子网分别创建相应的增强型跨源连接,并绑定所要使用的Flink弹性资源池。参考“修改主机信息”章节描述,在增强型跨源中增加 MRS 的主机信息。
  2. 设置Doris的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性分别根据Doris的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。
  3. 创建Doris表,并插入10条数据。创建语句如下:
    CREATE TABLE IF NOT EXISTS dorisdemo
    (
      `user_id` varchar(10) NOT NULL,
      `city` varchar(10),
      `age` int,
      `gender` int
    )
    DISTRIBUTED BY HASH(`user_id`) BUCKETS 10;
    
    INSERT INTO dorisdemo VALUES ('user1', 'city1', 20, 1);
    INSERT INTO dorisdemo VALUES ('user2', 'city2', 21, 0);
    INSERT INTO dorisdemo VALUES ('user3', 'city3', 22, 1);
    INSERT INTO dorisdemo VALUES ('user4', 'city4', 23, 0);
    INSERT INTO dorisdemo VALUES ('user5', 'city5', 24, 1);
    INSERT INTO dorisdemo VALUES ('user6', 'city6', 25, 0);
    INSERT INTO dorisdemo VALUES ('user7', 'city7', 26, 1);
    INSERT INTO dorisdemo VALUES ('user8', 'city8', 27, 0);
    INSERT INTO dorisdemo VALUES ('user9', 'city9', 28, 1);
    INSERT INTO dorisdemo VALUES ('user10', 'city10', 29, 0);
  4. 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业脚本,并提交运行。该作业脚本读取Doris表,并打印。
    CREATE TABLE dorisDemo (
      `user_id` String NOT NULL,
      `city` String,
      `age` int,
      `gender` int
    ) with (
      'connector' = 'doris',
      'fenodes' = 'FE_IP:PORT,FE_IP:PORT,FE_IP:PORT',
      'table.identifier' = 'demo.dorisdemo',
      'username' = 'dorisUser',
      'password' = 'dorisPassword',
      'doris.request.retries'='3',
      'doris.batch.size' = '100'
    );
    
    CREATE TABLE print (
      `user_id` String NOT NULL,
      `city` String,
      `age` int,
      `gender` int
    ) with (
      'connector' = 'print'
    );
    
    insert into print select * from dorisDemo;
  5. 查看print结果表数据。
    +I[user5, city5, 24, 1]
    +I[user4, city4, 23, 0]
    +I[user3, city3, 22, 1]
    +I[user10, city10, 29, 0]
    +I[user6, city6, 25, 0]
    +I[user1, city1, 20, 1]
    +I[user9, city9, 28, 1]
    +I[user7, city7, 26, 1]
    +I[user8, city8, 27, 0]
    +I[user2, city2, 21, 0]
support.huaweicloud.com/sqlref-flink-dli/dli_08_15034.html