数据仓库服务 GaussDB(DWS)-Flink实时消费Binlog:数据同步示例

时间:2025-02-12 15:01:49

数据同步示例

  • GaussDB (DWS)侧:

    新建binlog表时,enable_hstore_binlog_table参数需要设置为true,可以通过show enable_hstore_binlog_table来查询。

    -- 源表(产生binlog)

    1
    CREATE TABLE test_binlog_source(a int, b int, c int, primary key(a)) with(orientation=column, enable_hstore_opt=on, enable_binlog=true);

    -- 目标表

    1
    CREATE TABLE test_binlog_sink(a int, b int, c int, primary key(a)) with(orientation=column, enable_hstore_opt=on);
  • Flink侧:

    执行如下命令进行完整数据同步:

     1 2 3 4 5 6 7 8 910111213141516171819202122232425262728293031
    -- 建立源表的映射表CREATE TABLE test_binlog_source (    a int,   b int,   c int,   primary key(a) NOT ENFORCED) with (   'connector' = 'dws',   'url' = 'jdbc:gaussdb://ip:port/gaussdb',   'binlog' = 'true',   'tableName' = 'test_binlog_source',      'binlogSlotName' = 'slot',      'username'='xxx',      'password'='xxx');   -- 建立目标表的映射表CREATE TABLE test_binlog_sink (     a int,   b int,   c int,   primary key(a) NOT ENFORCED) with (   'connector' = 'dws',   'url' = 'jdbc:gaussdb://ip:port/gaussdb',   'tableName' = 'test_binlog_sink',      'ignoreUpdateBefore'='false',      'connectionSize' = '1',   'username'='xxx',   'password'='xxx');INSERT INTO test_binlog_sink select * from test_binlog_source;
support.huaweicloud.com/HyDevg-910-dws/dws_15_00017.html