云数据库 GAUSSDB-逻辑复制:示例
时间:2025-03-03 19:28:54
示例

逻辑复制类PGReplicationStream为非线程安全类,并发调用可能导致数据异常。
代码运行的前提条件:
- 添加JDBC用户机器IP(假设IP为10.11.12.34)到复制数据权限的白名单里,命令如下:
gs_guc reload -Z datanode -N all -I all -h 'host replication all 10.11.12.34/32 sha256'
- 将wal_level参数设置为logical,设置方法请联系管理员处理。
- 创建表t1和t2,并且对该表进行DDL或DML操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
// 以下用例以opengaussjdbc.jar为例。 // 认证用的用户名和密码直接写到代码中有很大的安全风险,建议在配置文件或者环境变量中存放(密码应密文存放,使用时解密),确保安全。 // 本示例以用户名和密码保存在环境变量中为例,运行本示例前请先在本地环境中设置环境变量(环境变量名称请根据自身情况进行设置)EXAMPLE_USERNAME_ENV和EXAMPLE_PASSWORD_ENV。 // $ip、$port、database需要用户自行修改。 import com.huawei.opengauss.jdbc.PGProperty; import com.huawei.opengauss.jdbc.jdbc.PgConnection; import com.huawei.opengauss.jdbc.replication.LogSequenceNumber; import com.huawei.opengauss.jdbc.replication.PGReplicationStream; import java.nio.ByteBuffer; import java.sql.DriverManager; import java.util.Properties; import java.util.concurrent.TimeUnit; public class LogicalReplicationDemo { private static PgConnection conn = null; public static void main(String[] args) { String driver = "com.huawei.opengauss.jdbc.Driver"; // 此处配置数据库IP以及端口,这里的端口为haPort,通常默认是所连接DN的port+1端口 String sourceURL = "jdbc:opengauss://$ip:$port/database"; // 默认逻辑复制槽的名称是:replication_slot // 测试模式:创建逻辑复制槽 int TEST_MODE_CREATE_SLOT = 1; // 测试模式:开启逻辑复制(前提条件是逻辑复制槽已经存在) int TEST_MODE_START_REPL = 2; // 测试模式:删除逻辑复制槽 int TEST_MODE_DROP_SLOT = 3; // 开启不同的测试模式 int testMode = TEST_MODE_START_REPL; try { Class.forName(driver); } catch (Exception e) { e.printStackTrace(); return; } try { Properties properties = new Properties(); PGProperty.USER.set(properties, System.getenv("EXAMPLE_USERNAME_ENV")); PGProperty.PASSWORD.set(properties, System.getenv("EXAMPLE_PASSWORD_ENV")); // 对于逻辑复制,以下三个属性是必须配置项 PGProperty.ASSUME_MIN_SERVER_VERSION.set(properties, "9.4"); PGProperty.REPLICATION.set(properties, "database"); PGProperty.PREFER_QUERY_MODE.set(properties, "simple"); conn = (PgConnection) DriverManager.getConnection(sourceURL, properties); System.out.println("connection success!"); if(testMode == TEST_MODE_CREATE_SLOT){ conn.getReplicationAPI() .createReplicationSlot() .logical() .withSlotName("replication_slot") // 这里字符串如包含大写字母则会自动转化为小写字母 .withOutputPlugin("mppdb_decoding") .make(); }else if(testMode == TEST_MODE_START_REPL) { // 开启此模式前需要创建复制槽 LogSequenceNumber waitLSN = LogSequenceNumber.valueOf("6F/E3C53568"); // LSN需要用户根据实际情况进行修改 PGReplicationStream stream = conn .getReplicationAPI() .replicationStream() .logical() .withSlotName("replication_slot") .withSlotOption("include-xids", true) .withSlotOption("skip-empty-xacts", true) .withStartPosition(waitLSN) .withSlotOption("parallel-decode-num", 10) // 解码线程并行度 .withSlotOption("white-table-list", "public.t1,public.t2") // 白名单列表 // .withSlotOption("standby-connection", true) // 强制备机解码 .withSlotOption("decode-style", "t") // 解码格式 .withSlotOption("sending-batch", 0) // 批量发送解码结果 .withSlotOption("max-txn-in-memory", 100) // 单个解码事务落盘内存阈值为100MB .withSlotOption("max-reorderbuffer-in-memory", 2) // 正在处理的解码事务落盘内存阈值为2GB .withSlotOption("exclude-users", "userA") // 不返回用户userA执行事务的逻辑日志 .withSlotOption("include-user", false) // 事务BEGIN逻辑日志不携带用户名 .withSlotOption("enable-heartbeat", true) // 开启心跳日志 .start(); while (true) { ByteBuffer byteBuffer = stream.readPending(); if (byteBuffer == null) { TimeUnit.MILLISECONDS.sleep(10L); continue; } int offset = byteBuffer.arrayOffset(); byte[] source = byteBuffer.array(); int length = source.length - offset; System.out.println(new String(source, offset, length)); // 如果需要flush lsn,根据业务实际情况调用以下接口,该接口会触发数据库复制槽落盘,对服务端解码性能有一定影响,建议调用间隔大于10s。 // LogSequenceNumber lastRecv = stream.getLastReceiveLSN(); // stream.setFlushedLSN(lastRecv); // stream.forceUpdateStatus(); } }else if(testMode == TEST_MODE_DROP_SLOT){ conn.getReplicationAPI() .dropReplicationSlot("replication_slot"); } } catch (Exception e) { e.printStackTrace(); return; } finally { try { conn.close(); } catch (Exception e) { e.printStackTrace(); } } } } |
text格式(即't'格式)解码结果示例如下:
BEGIN CS N: 2014 first_lsn: 0/2816A28 table public t1 INSERT: a[integer]:1 b[integer]:2 c[text]:'hello' COMMIT XID: 15504 BEGIN CSN: 2015 first_lsn: 0/2816C20 table public t1 UPDATE: old-key: a[integer]:1 b[integer]:2 c[text]:'hello' new-tuple: a[integer]:1 b[integer]:5 c[text]:'hello' COMMIT XID: 15505 BEGIN CSN: 2016 first_lsn: 0/2816D60 table public t1 DELETE: a[integer]:1 b[integer]:5 c[text]:'hello' COMMIT XID: 15506
json格式(即'j'格式)解码结果示例如下:
BEGIN CSN: 2014 first_lsn: 0/2816A28 {"table_name":"public.t1","op_type":"INSERT","columns_name":["a","b","c"],"columns_type":["integer","integer","text"],"columns_val":["1","2","'hello'"],"old_keys_name":[],"old_keys_type":[],"old_keys_val":[]} COMMIT XID: 15504 BEGIN CSN: 2015 first_lsn: 0/2816C20 {"table_name":"public.t1","op_type":"UPDATE","columns_name":["a","b","c"],"columns_type":["integer","integer","text"],"columns_val":["1","5","'hello'"],"old_keys_name":["a","b","c"],"old_keys_type":["integer","integer","text"],"old_keys_val":["1","2","'hello'"]} COMMIT XID: 15505 BEGIN CSN: 2016 first_lsn: 0/2816D60 {"table_name":"public.t1","op_type":"DELETE","columns_name":[],"columns_type":[],"columns_val":[],"old_keys_name":["a","b","c"],"old_keys_type":["integer","integer","text"],"old_keys_val":["1","5","'hello'"]} COMMIT XID: 15506
support.huaweicloud.com/centralized-devg-v3-gaussdb/gaussdb-42-0079.html
看了此文的人还看了
CDN加速
GaussDB
文字转换成语音
免费的服务器
如何创建网站
域名网站购买
私有云桌面
云主机哪个好
域名怎么备案
手机云电脑
SSL证书申请
云点播服务器
免费OCR是什么
电脑云桌面
域名备案怎么弄
语音转文字
文字图片识别
云桌面是什么
网址安全检测
网站建设搭建
国外CDN加速
SSL免费证书申请
短信批量发送
图片OCR识别
云数据库MySQL
个人域名购买
录音转文字
扫描图片识别文字
OCR图片识别
行驶证识别
虚拟电话号码
电话呼叫中心软件
怎么制作一个网站
Email注册网站
华为VNC
图像文字识别
企业网站制作
个人网站搭建
华为云计算
免费租用云托管
云桌面云服务器
ocr文字识别免费版
HTTPS证书申请
图片文字识别转换
国外域名注册商
使用免费虚拟主机
云电脑主机多少钱
鲲鹏云手机
短信验证码平台
OCR图片文字识别
SSL证书是什么
申请企业邮箱步骤
免费的企业用邮箱
云免流搭建教程
域名价格
推荐文章
- GaussDB工具_gaussdb怎么读_高斯数据库工具_华为云
- GaussDB使用技巧_高斯数据库下载_高斯数据库使用技巧_华为云
- GaussDB视频教程_gaussdb查看表结构语句_高斯数据库视频教程_华为云
- GaussDB数据库考试_GaussDB认证_高斯数据库考试_华为云
- 分布式云原生集合示例_华为云分布式云原生_华为云UCS集合示例
- GaussDB内核_GaussDB数据库内核_高斯数据库内核_华为云
- GaussDB学习_gaussdb教程_高斯数据库学习_华为云
- GaussDB工具_gaussdb数据库_高斯数据库工具_华为云
- GaussDB数据库如何使用_高斯数据库基于什么_高斯数据库如何使用
- 云数据库 RDS for PostgreSQL数据库权限_PG数据库管理_华为云
ServerLess102