MAPREDUCE服务 MRS-Flink SQL逻辑开发建议:多个Flink作业或者insert into语句写同一张Gauss for MySQL时建议过滤回撤数据

时间:2024-09-06 10:03:29

多个Flink作业或者insert into语句写同一张Gauss for MySQL时建议过滤回撤数据

当有多个Flink作业写同一张MySQL表时,其中一个Flink作业发送回撤数据(-D、-U)到目标表删除整行数据,再插入本次更新的数据,导致其他作业写入的字段全部丢失。

  • 优化前SQL:
    create table source-A(
    id,
    user_id
    )with(
    'connector' = 'kafka'
    );
    create table source-B(
    id,
    org_id
    )with(
    'connector' = 'kafka'
    );
    create table sink-A(
    id,
    user_id
    )with(
    'connector' = 'jdbc'
    'url' = 'jdbc:mysql://****',
    'table-name' = 'sink-table'
    );
    create table sink-B(
    id,
    org_id
    )with(
    'connector' = 'jdbc'
    'url' = 'jdbc:mysql://****',
    'table-name' = 'sink-table'
    );
    insert into sink-A select id,user_id from source-A;
    insert into sink-B select id,org_id  from source-B;
  • 优化后SQL:
    create table source-A(
    id,
    user_id
    )with(
    'connector' = 'kafka'
    );
    create table source-B(
    id,
    org_id
    )with(
    'connector' = 'kafka'
    );
    create table sink-A(
    id,
    user_id
    )with(
    'connector' = 'jdbc'
    'url' = 'jdbc:mysql://****',
    'table-name' = 'sink-table',
    'filter.record.enabled' = 'true'
    );
    create table sink-B(
    id,
    org_id
    )with(
    'connector' = 'jdbc'
    'url' = 'jdbc:mysql://****',
    'table-name' = 'sink-table',
    'filter.record.enabled' = 'true'
    );
    insert into sink-A select id,user_id from source-A;
    insert into sink-B select id,org_id  from source-B;
support.huaweicloud.com/devg-rule-mrs/mrs_07_450170.html