数据仓库服务 GAUSSDB(DWS)-Flink实时消费Binlog:数据同步示例

时间:2024-12-06 11:14:13

数据同步示例

  • GaussDB (DWS)侧:

    新建binlog表时,enable_hstore_binlog_table参数需要设置为true,可以通过show enable_hstore_binlog_table来查询。

    -- 源表(产生binlog)

    1
    CREATE TABLE test_binlog_source(a int, b int, c int, primary key(a)) with(orientation=column, enable_hstore_opt=on, enable_binlog=true);
    

    -- 目标表

    1
    CREATE TABLE test_binlog_sink(a int, b int, c int, primary key(a)) with(orientation=column, enable_hstore_opt=on);
    
  • Flink侧:

    执行如下命令进行完整数据同步:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    -- 建立源表的映射表
    CREATE TABLE test_binlog_source ( 
       a int,
       b int,
       c int,
       primary key(a) NOT ENFORCED
    ) with (
       'connector' = 'dws',
       'url' = 'jdbc:gaussdb://ip:port/gaussdb',
       'binlog' = 'true',
       'tableName' = 'test_binlog_source',   
       'binlogSlotName' = 'slot',   
       'username'='xxx',   
       'password'='xxx');
       
    -- 建立目标表的映射表
    CREATE TABLE test_binlog_sink (  
       a int,
       b int,
       c int,
       primary key(a) NOT ENFORCED
    ) with (
       'connector' = 'dws',
       'url' = 'jdbc:gaussdb://ip:port/gaussdb',
       'tableName' = 'test_binlog_sink',   
       'ignoreUpdateBefore'='false',   
       'connectionSize' = '1',
       'username'='xxx',
       'password'='xxx');
    
    INSERT INTO test_binlog_sink select * from test_binlog_source;
    
support.huaweicloud.com/HyDevg-910-dws/dws_15_00017.html