云服务器内容精选

  • 查询binlog 通过DWS提供的系统函数,可以直接查询目标表在指定DN上binlog信息,以及是否被下游消费完毕等信息。 1 2 3 4 5 6 7 8 9 10 -- 模拟Flink调用系统函数获取同步点,参数分别表示 表名、槽位名、是否checkPoint点位,目标DN(为0表示所有DN)。 select * from pg_catalog.pgxc_get_binlog_sync_point('hstore_binlog_source', 'slot1', false, 0); select * from pg_catalog.pgxc_get_binlog_sync_point('hstore_binlog_source', 'slot1', true, 0); -- 进行增删改产生增量binlog。 INSERT INTO hstore_binlog_source VALUES(100, 1, 1); delete hstore_binlog_source where c1 = 100; INSERT INTO hstore_binlog_source VALUES(200, 1, 1); update hstore_binlog_source set c2 =2 where c1 = 200; -- 模拟Flink调用系统函数查询指定 CS N区间的Binlog,参数分别表示表名,目标DN(为0表示所有DN),起始CSN点位, 终止CSN点位。 select * from pgxc_get_binlog_changes('hstore_binlog_source', 0, 0 , 9999999999);
  • 开启Binlog 通过在建HStore表时指定表级参数enable_binlog,开启HStore表的Binlog功能。 1 2 3 4 5 6 7 8 9 10 CREATE TABLE hstore_binlog_source ( c1 INT PRIMARY KEY, c2 INT, c3 INT ) WITH ( ORIENTATION = COLUMN, enable_hstore_opt=true, enable_binlog=on, binlog_ttl = 86400 );
  • Binlog格式与原理 表1 binlog字段格式 字段名称 字段类型 含义 gs_binlog_sync_point BIGINT Binlog系统字段,表示该记录的同步点值,普通GTM模式下,该值唯一且有序。 gs_binlog_event_sequence BIGINT Binlog的系统字段, 用于表示同一事务类操作的先后顺序。 gs_binlog_event_type CHAR Binlog的系统字段, 表示当前记录的操作类型。 type可能有以下几种取值: 'I' 即INSERT, 表示当前Binlog是插入一条新记录。 'd' 即DELETE,表示当前Binlog是删除一条记录。 'B' 即BEFORE_UPDATE,表示当前Binlog是更新前的记录。 'U'即AFTER_UPDATE,表示当前Binlog是更新后的记录。 gs_binlog_timestamp_us BIGINT Binlog的系统字段, 表示当前记录入库时的时间戳。 只有开启binlog时间戳功能时会有,没开启binlog时间戳时为空。仅9.1.0.200及以上版本支持。 user_column_1 用户列 用户的自定义数据列 ... ... ... usert_column_n 用户列 用户的自定义数据列
  • 开启Binlog时间戳功能 如果需要读取指定时间点之后binlog的功能,通过在建HStore表时指定表级参数enable_binlog_timestamp,开启HStore表的Binlog时间戳功能。仅9.1.0.200及以上版本支持。 1 2 3 4 5 6 7 8 9 10 CREATE TABLE hstore_binlog_source( c1 INT PRIMARY KEY, c2 INT, c3 INT ) WITH ( ORIENTATION = COLUMN, enable_hstore_opt=true, enable_binlog_timestamp =on, binlog_ttl = 86400 );
  • 约束与限制 当前仅8.3.0.100及以上版本支持HStore和Hstore-opt记录Binlog功能,且V3表处于试商用阶段,使用前需要联系技术支持进行评估。 使用Binlog的前置条件是必须存在主键约束,并且为HStore表或者Hstore-opt表,分布方式只能是Hash分布。 Binlog表仅记录insert/delete/update(upsert)等DML操作进行记录,不会记录DDL。 当前Binlog表不支持的操作: Insert overwrite、修改分布列、给临时表开启Binlog、exchange/merge/split partition。 当前Binlog表并不限制用户进行以下DDL操作,但进行操作后会导致增量数据与同步点位信息会被清空,需要评估后再执行: ADD COLUMN 增加列、DROP COLUMN 删除列、SET TYPE 修改列、TRUNCATE 清空表数据。 Binlog表在线或者离线扩容期间会等待Binlog记录的消费,只有Binlog记录消费完毕才可以继续进行接下来的扩缩容步骤,默认等待时间为1小时,可通过guc参数binlog_consume_timeout来设置,如果等待超时或者等待出错都会退出扩缩容过程,认为该表扩缩容失败。 VACUUM FULL Binlog表时,会等待Binlog记录的消费,只有Binlog记录消费完毕才可以进行接下来的VACUUM FULL操作,默认等待时间为1小时,可通过guc参数binlog_consume_timeout来设置,如果等待超时或者等待出错都会退出VACUUM FULL过程,认为该表VACUUM FULL失败。且由于需要等待Binlog记录消费完毕,所以即使VACUUM FULL一个分区表,也会对分区的主表上7级锁,阻塞整个表的插入更新或者删除。 Binlog表在备份恢复期间,仅会被当做普通HStore表进行备份,恢复后辅助表的增量数据与同步点信息会清空,需要重新开始同步。 支持Binlog时间戳功能,通过设置enable_binlog_timestamp打开,同样只有HStore和Hstore-opt两种表支持打开。该约束仅9.1.0.200及以上版本支持。
  • 数据同步示例 GaussDB (DWS)侧: 新建binlog表时,enable_hstore_binlog_table参数需要设置为true,可以通过show enable_hstore_binlog_table来查询。 -- 源表(产生binlog) 1 CREATE TABLE test_binlog_source(a int, b int, c int, primary key(a)) with(orientation=column, enable_hstore_opt=on, enable_binlog=true); -- 目标表 1 CREATE TABLE test_binlog_sink(a int, b int, c int, primary key(a)) with(orientation=column, enable_hstore_opt=on); Flink侧: 执行如下命令进行完整数据同步: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 -- 建立源表的映射表 CREATE TABLE test_binlog_source ( a int, b int, c int, primary key(a) NOT ENFORCED ) with ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://ip:port/gaussdb', 'binlog' = 'true', 'tableName' = 'test_binlog_source', 'binlogSlotName' = 'slot', 'username'='xxx', 'password'='xxx'); ​-- 建立目标表的映射表 CREATE TABLE test_binlog_sink ( a int, b int, c int, primary key(a) NOT ENFORCED ) with ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://ip:port/gaussdb', 'tableName' = 'test_binlog_sink', 'ignoreUpdateBefore'='false', 'connectionSize' = '1', 'username'='xxx', 'password'='xxx'); ​ INSERT INTO test_binlog_sink select * from test_binlog_source;
  • Binlog相关参数说明 下表仅涉及消费Binlog时的参数。 表1 消费Binlog时的参数 参数 说明 数据类型 默认值 binlog 是否读取Binlog信息 Boolean false binlogSlotName 槽位信息,可以理解一个标识。由于可能存在多个Flink任务同时消费同一张表的Binlog信息,所以该场景需要保证每个任务的binlogSlotName不同。 String Flink映射表的表名 binlogBatchReadSize 批量读取binlog的数据行数 Integer 5000 fullSyncBinlogBatchReadSize 全量读取binlog的数据行数 Integer 50000 binlogReadTimeout 增量消费Binlog数据时超时时间,单位毫秒 Integer 600000 fullSyncBinlogReadTimeout 全量消费Binlog数据时超时时间,单位毫秒 Long 1800000 binlogSleepTime 实时消费不到Binlog数据时休眠时间,单位毫秒。如果连续读取不到Binlog数据,则休眠时间为:binlogSleepTime * 次数,最大为binlogMaxSleepTime。读取到数据后,则重置。 Long 500 binlogMaxSleepTime 实时消费不到Binlog数据时最大休眠时间,单位毫秒。 Long 10000 binlogMaxRetryTimes 消费Binlog数据出错后的重试次数。 Integer 1 binlogRetryInterval 消费binlog数据出错后的重试时间间隔。重试时sleep时间:binlogRetryInterval * (1~binlogMaxRetryTimes) +Random(100)。单位毫秒。 Long 100 binlogParallelNum 消费Binlog数据时线程数,只有任务并发度小于DWS集群DN数时,该参数才有效,即此时一个并发度会消费多个DN上的数据,所以可以考虑设置该参数。 Integer 3 connectionPoolSize JDBC连接池连接大小。 Integer 5 needRedistribution 表示是否兼容扩充重分布(需要升级到对应内核版本,如果是低版本则设置为false);如果设置成true的话,flink的restart-strategy不能设置为none。 Boolean true newSystemValue 表示读取binlog数据时是否使用新的系统字段(需要升级到对应内核版本,如果是低版本则设置为false)。 Boolean true checkNodeChangeInterval 检测节点变化的间隔,只有needRedistribution=true才生效。 Long 10000 connectionSocketTimeout 连接处理超时时间(可以看成客户端执行SQL超时时间),单位毫秒;默认值为0,即不设置超时时间。 Integer 0 binlogIgnoreUpdateBefore 是否过滤Binlog记录中的before_update记录,以及delete记录是否只返回主键信息。该参数仅9.1.0.200及以上版本支持。 Boolean false binlogStartTime 设置从某个时间点开始消费Binlog(只能增量消费),格式为yyyy-MM-dd hh:mm:ss且表需要开启enable_binlog_timestamp。 该参数仅9.1.0.200及以上版本支持。 String 无 binlogSyncPointSize 增量读取binlog同步点区间的大小(增量读取binlog时,如果数据量过大可能涉及下盘,可通过调整该参数控制)。 该参数仅9.1.0.200及以上版本支持。 Integer 5000
  • 注意事项 当前仅8.3.0.100及以上的版本支持HStore和HStore-opt记录Binlog功能,且V3处于试商用阶段,使用前需要进行评估。 目前GaussDB(DWS)只有Hstore表和HStore-opt支持Binlog功能,表需要包含主键且设置enable_binlog=on或者enable_binlog_timestamp=on。 消费的Binlog表名不要带有特殊字符,如.、""等。 如果多个任务消费同一张表的Binlog数据,需要保证每个任务的binlogSlotName唯一。 为了达到最高的消费速度,建议将任务的并发度和DWS集群DN数设置一致。 使用dws-connector-flink的Sink能力来写入Binlog数据的话,需要注意以下几点: 如果需要保证DN内的数据写入顺序则需要设置connectionSize设置为1。 如果源端有更新主键操作或者需要flink进行聚合计算的话,将ignoreUpdateBefore设置为false,否则不建议将 ignoreUpdateBefore设置为false(默认true)。
  • Binlog格式与原理 表1 binlog字段格式 字段名称 字段类型 含义 sync_point BIGINT Binlog系统字段,表示该记录的同步点值,普通GTM模式下,该值唯一且有序。 event_sequence BIGINT Binlog的系统字段, 用于表示同一事务类操作的先后顺序。 type CHAR Binlog的系统字段, 表示当前记录的操作类型。 type可能有以下几种取值: 'I' 即INSERT, 表示当前Binlog是插入一条新记录。 'd' 即DELETE,表示当前Binlog是删除一条记录。 'B' 即BEFORE_UPDATE,表示当前Binlog是更新前的记录。 'U'即AFTER_UPDATE,表示当前Binlog是更新后的记录。 user_column_1 用户列 用户的自定义数据列 ... ... ... usert_column_n 用户列 用户的自定义数据列
  • 开启Binlog 通过在建HStore表时指定表级参数enable_binlog,开启HStore表的Binlog功能。 1 2 3 4 5 6 7 8 9 10 CREATE TABLE hstore_binlog_source ( c1 INT PRIMARY KEY, c2 INT, c3 INT ) WITH ( ORIENTATION = COLUMN, enable_hstore=true, enable_binlog=on, binlog_ttl = 86400 );
  • 约束与限制 Binlog功能当前为beta版本,受限商用,如需使用请联系技术支持。 当前仅8.3.0.100及以上版本支持HStore和HStore-opt记录Binlog功能。 使用Binlog的前置条件是必须存在主键约束,并且为HStore或Hstore-opt表,分布方式只能是Hash分布。 Binlog表仅记录insert/delete/update(upsert)等DML操作进行记录,不会记录DDL。 当前Binlog表不支持的操作: Insert overwrite、修改分布列、给临时表开启Binlog、exchange/merge/split partition。 当前Binlog表并不限制用户进行以下DDL操作,但进行操作后会导致增量数据与同步点位信息会被清空,需要评估后再执行: ADD COLUMN 增加列、DROP COLUMN 删除列、SET TYPE 修改列、VACUUM FULL 重整数据、TRUNCATE 清空表数据。 Binlog表在线扩容期间会等待Binlog记录的消费,只有Binlog记录消费完毕后才可以继续进行接下来的扩缩容步骤,默认等待时间为1小时,可通过binlog_consume_timeout来设置,如果等待超时或者等待出错都会退出扩缩容过程,认为该表扩缩容失败。 Binlog表在备份恢复期间,仅会被当做普通HStore表进行备份,恢复后辅助表的增量数据与同步点信息会清空,需要重新开始同步。
  • 数据同步示例 GaussDB(DWS)侧: 新建binlog表时,enable_hstore_binlog_table参数需要设置为true,可以通过show enable_hstore_binlog_table来查询。 -- 源表(产生binlog) 1 CREATE TABLE test_binlog_source(a int, b int, c int, primary key(a, b)) with(orientation=column, enable_hstore=on, enable_binlog=true); -- 目标表 1 CREATE TABLE test_binlog_sink(a int, b int, c int, primary key(a, b)) with(orientation=column, enable_hstore=on); Flink侧: 执行如下命令进行完整数据同步: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 -- 建立源表的映射表 CREATE TABLE test_binlog_source ( a int, b int, c int, primary key(a) ) with ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://ip:port/gaussdb', 'binlog' = 'true', 'tableName' = 'test_binlog_source', 'binlogSlotName' = 'slot', 'username'='xxx', 'password'='xxx'); ​-- 建立目标表的映射表 CREATE TABLE test_binlog_sink ( a int, b int, c int, primary key(a) ) with ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://ip:port/gaussdb', 'tableName' = 'test_binlog_sink', 'ignoreUpdateBefore'='false', 'connectionSize' = '1', 'username'='xxx', 'password'='xxx'); ​ INSERT INTO test_binlog_sink select * from test_binlog_source;
  • Binlog相关参数说明 下表仅涉及消费Binlog时的参数。 表1 消费Binlog时的参数 参数 说明 数据类型 默认值 binlog 是否读取Binlog信息 Boolean false binlogSlotName 槽位信息,可以理解一个标识。由于可能存在多个Flink任务同时消费同一张表的Binlog信息,所以该场景需要保证每个任务的binlogSlotName不同。 String Flink映射表的表名 binlogBatchReadSize 批量读取binlog的数据行数 Integer 5000 fullSyncBinlogBatchReadSize 全量读取binlog的数据行数 Integer 50000 binlogReadTimeout 增量消费Binlog数据时超时时间,单位毫秒 Integer 600000 fullSyncBinlogReadTimeout 全量消费Binlog数据时超时时间,单位毫秒 Long 1800000 binlogSleepTime 实时消费不到Binlog数据时休眠时间,单位毫秒。如果连续读取不到Binlog数据,则休眠时间为:binlogSleepTime * 次数,最大为binlogMaxSleepTime。读取到数据后,则重置。 Long 500 binlogMaxSleepTime 实时消费不到Binlog数据时最大休眠时间,单位毫秒。 Long 10000 binlogMaxRetryTimes 消费Binlog数据出错后的重试次数。 Integer 1 binlogRetryInterval 消费binlog数据出错后的重试时间间隔。重试时sleep时间:binlogRetryInterval * (1~binlogMaxRetryTimes) +Random(100)。单位毫秒。 Long 100 binlogParallelNum 消费Binlog数据时线程数,只有任务并发度小于DWS集群DN数时,该参数才有效,即此时一个并发度会消费多个DN上的数据,所以可以考虑设置该参数。 Integer 3 connectionPoolSize JDBC连接池连接大小。 Integer 5 needRedistribution 表示是否兼容扩充重分布(需要升级到对应内核版本,如果是低版本则设置为false);如果设置成true的话,flink的restart-strategy不能设置为none。 Boolean true newSystemValue 表示读取binlog数据时是否使用新的系统字段(需要升级到对应内核版本,如果是低版本则设置为false)。 Boolean true checkNodeChangeInterval 检测节点变化的间隔,只有needRedistribution=true才生效。 Long 10000 connectionSocketTimeout 连接处理超时时间(可以看成客户端执行SQL超时时间),单位毫秒;默认值为0,即不设置超时时间。 Integer 0 binlogIgnoreUpdateBefor 是否过滤Binlog记录中的before_update记录,以及delete记录是否只返回主键信息。该参数仅9.1.0.200及以上版本支持。 Boolean false binlogStartTime 设置从某个时间点开始消费Binlog(只能增量消费),格式为yyyy-MM-dd hh:mm:ss且表需要开启enable_binlog_timestamp。 该参数仅9.1.0.200及以上版本支持。 String 无 binlogSyncPointSize 增量读取binlog同步点区间的大小(增量读取binlog时,如果数据量过大可能涉及下盘,可通过调整该参数控制)。 该参数仅9.1.0.200及以上版本支持。 Integer 5000
  • 注意事项 当前仅8.3.0.100及以上的版本支持HStore和HStore-opt记录Binlog功能,且处于试商用阶段,使用前需要进行评估。 目前GaussDB(DWS)只有Hstore表支持Binlog功能,表需要包含主键且设置enable_binlog=on。 消费的Binlog表名不要带有特殊字符,如.、""等。 如果多个任务消费同一张表的Binlog数据,需要保证每个任务的binlogSlotName唯一。 为了达到最高的消费速度,建议将任务的并发度和DWS集群DN数设置一致。 使用dws-connector-flink的Sink能力来写入Binlog数据的话,需要注意以下几点: 如果需要保证DN内的数据写入顺序则需要设置connectionSize设置为1。 如果源端有更新主键操作或者需要flink进行聚合计算的话,将ignoreUpdateBefore设置为false,否则不建议将 ignoreUpdateBefore设置为false(默认true)。