云服务器内容精选

  • 数据同步示例 GaussDB (DWS)侧: 新建binlog表时,enable_hstore_binlog_table参数需要设置为true,可以通过show enable_hstore_binlog_table来查询。 -- 源表(产生binlog) 1 CREATE TABLE test_binlog_source(a int, b int, c int, primary key(a)) with(orientation=column, enable_hstore_opt=on, enable_binlog=true); -- 目标表 1 CREATE TABLE test_binlog_sink(a int, b int, c int, primary key(a)) with(orientation=column, enable_hstore_opt=on); Flink侧: 执行如下命令进行完整数据同步: 1 2 3 4 5 6 7 8 910111213141516171819202122232425262728293031 -- 建立源表的映射表CREATE TABLE test_binlog_source ( a int, b int, c int, primary key(a) NOT ENFORCED) with ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://ip:port/gaussdb', 'binlog' = 'true', 'tableName' = 'test_binlog_source', 'binlogSlotName' = 'slot', 'username'='xxx', 'password'='xxx'); ​-- 建立目标表的映射表CREATE TABLE test_binlog_sink ( a int, b int, c int, primary key(a) NOT ENFORCED) with ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://ip:port/gaussdb', 'tableName' = 'test_binlog_sink', 'ignoreUpdateBefore'='false', 'connectionSize' = '1', 'username'='xxx', 'password'='xxx');​INSERT INTO test_binlog_sink select * from test_binlog_source;
  • 注意事项 当前仅8.3.0.100及以上的版本支持HStore和HStore-opt记录Binlog功能。 V3 HStore表不支持Binlog,仅V3 HStore_opt表支持Binlog,且V3处于试商用阶段,使用前需要进行评估。 目前GaussDB(DWS)只有Hstore表和HStore-opt支持Binlog功能,表需要包含主键且设置enable_binlog=on或者enable_binlog_timestamp=on。 消费的Binlog表名不要带有特殊字符,如.、""等。 如果多个任务消费同一张表的Binlog数据,需要保证每个任务的binlogSlotName唯一。 为了达到最高的消费速度,建议将任务的并发度和DWS集群DN数设置一致。 使用dws-connector-flink的Sink能力来写入Binlog数据的话,需要注意以下几点: 如果需要保证DN内的数据写入顺序则需要设置connectionSize设置为1。 如果源端有更新主键操作或者需要flink进行聚合计算的话,将ignoreUpdateBefore设置为false,否则不建议将 ignoreUpdateBefore设置为false(默认true)。
  • Binlog相关参数说明 下表仅涉及消费Binlog时的参数。 表1 消费Binlog时的参数 参数 说明 数据类型 默认值 binlog 是否读取Binlog信息 Boolean false binlogSlotName 槽位信息,可以理解一个标识。由于可能存在多个Flink任务同时消费同一张表的Binlog信息,所以该场景需要保证每个任务的binlogSlotName不同。 String Flink映射表的表名 binlogBatchReadSize 批量读取binlog的数据行数 Integer 5000 fullSyncBinlogBatchReadSize 全量读取binlog的数据行数 Integer 50000 binlogReadTimeout 增量消费Binlog数据时超时时间,单位毫秒 Integer 600000 fullSyncBinlogReadTimeout 全量消费Binlog数据时超时时间,单位毫秒 Long 1800000 binlogSleepTime 实时消费不到Binlog数据时休眠时间,单位毫秒。如果连续读取不到Binlog数据,则休眠时间为:binlogSleepTime * 次数,最大为binlogMaxSleepTime。读取到数据后,则重置。 Long 500 binlogMaxSleepTime 实时消费不到Binlog数据时最大休眠时间,单位毫秒。 Long 10000 binlogMaxRetryTimes 消费Binlog数据出错后的重试次数。 Integer 1 binlogRetryInterval 消费binlog数据出错后的重试时间间隔。重试时sleep时间:binlogRetryInterval * (1~binlogMaxRetryTimes) +Random(100)。单位毫秒。 Long 100 binlogParallelNum 消费Binlog数据时线程数,只有任务并发度小于DWS集群DN数时,该参数才有效,即此时一个并发度会消费多个DN上的数据,所以可以考虑设置该参数。 Integer 3 connectionPoolSize JDBC连接池连接大小。 Integer 5 needRedistribution 表示是否兼容扩充重分布(需要升级到对应内核版本,如果是低版本则设置为false);如果设置成true的话,flink的restart-strategy不能设置为none。 Boolean true newSystemValue 表示读取binlog数据时是否使用新的系统字段(需要升级到对应内核版本,如果是低版本则设置为false)。 Boolean true checkNodeChangeInterval 检测节点变化的间隔,只有needRedistribution=true才生效。 Long 10000 connectionSocketTimeout 连接处理超时时间(可以看成客户端执行SQL超时时间),单位毫秒;默认值为0,即不设置超时时间。 Integer 0 binlogIgnoreUpdateBefore 是否过滤Binlog记录中的before_update记录,以及delete记录是否只返回主键信息。该参数仅9.1.0.200及以上版本支持。 Boolean false binlogStartTime 设置从某个时间点开始消费Binlog(只能增量消费),格式为yyyy-MM-dd hh:mm:ss且表需要开启enable_binlog_timestamp。 该参数仅9.1.0.200及以上版本支持。 String 无 binlogSyncPointSize 增量读取binlog同步点区间的大小(增量读取binlog时,如果数据量过大可能涉及下盘,可通过调整该参数控制)。 该参数仅9.1.0.200及以上版本支持。 Integer 5000