云数据库 GAUSSDB-逻辑复制:示例

时间:2025-03-03 19:28:54

示例

逻辑复制类PGReplicationStream为非线程安全类,并发调用可能导致数据异常。

代码运行的前提条件:
  1. 添加JDBC用户机器IP(假设IP为10.11.12.34)到复制数据权限的白名单里,命令如下:
    gs_guc reload -Z datanode -N all -I all -h 'host replication all 10.11.12.34/32 sha256'
  2. 将wal_level参数设置为logical,设置方法请联系管理员处理。
  3. 创建表t1和t2,并且对该表进行DDL或DML操作。
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
// 以下用例以opengaussjdbc.jar为例。
// 认证用的用户名和密码直接写到代码中有很大的安全风险,建议在配置文件或者环境变量中存放(密码应密文存放,使用时解密),确保安全。
// 本示例以用户名和密码保存在环境变量中为例,运行本示例前请先在本地环境中设置环境变量(环境变量名称请根据自身情况进行设置)EXAMPLE_USERNAME_ENV和EXAMPLE_PASSWORD_ENV。
// $ip、$port、database需要用户自行修改。

import com.huawei.opengauss.jdbc.PGProperty;
import com.huawei.opengauss.jdbc.jdbc.PgConnection;
import com.huawei.opengauss.jdbc.replication.LogSequenceNumber;
import com.huawei.opengauss.jdbc.replication.PGReplicationStream;

import java.nio.ByteBuffer;
import java.sql.DriverManager;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

public class LogicalReplicationDemo {
    private static PgConnection conn = null;
    public static void main(String[] args) {
        String driver = "com.huawei.opengauss.jdbc.Driver";
        // 此处配置数据库IP以及端口,这里的端口为haPort,通常默认是所连接DN的port+1端口
        String sourceURL = "jdbc:opengauss://$ip:$port/database";    

        // 默认逻辑复制槽的名称是:replication_slot
        // 测试模式:创建逻辑复制槽
        int TEST_MODE_CREATE_SLOT = 1;
        // 测试模式:开启逻辑复制(前提条件是逻辑复制槽已经存在)
        int TEST_MODE_START_REPL = 2;
        // 测试模式:删除逻辑复制槽
        int TEST_MODE_DROP_SLOT = 3;
        // 开启不同的测试模式
        int testMode = TEST_MODE_START_REPL;

        try {
            Class.forName(driver);
        } catch (Exception e) {
            e.printStackTrace();
            return;
        }

        try {
            Properties properties = new Properties();
            PGProperty.USER.set(properties, System.getenv("EXAMPLE_USERNAME_ENV"));
            PGProperty.PASSWORD.set(properties, System.getenv("EXAMPLE_PASSWORD_ENV"));
     // 对于逻辑复制,以下三个属性是必须配置项
            PGProperty.ASSUME_MIN_SERVER_VERSION.set(properties, "9.4");
            PGProperty.REPLICATION.set(properties, "database");
            PGProperty.PREFER_QUERY_MODE.set(properties, "simple");
            conn = (PgConnection) DriverManager.getConnection(sourceURL, properties);
            System.out.println("connection success!");

            if(testMode == TEST_MODE_CREATE_SLOT){
                conn.getReplicationAPI()
                        .createReplicationSlot()
                        .logical()
                        .withSlotName("replication_slot") // 这里字符串如包含大写字母则会自动转化为小写字母
                        .withOutputPlugin("mppdb_decoding")
                        .make();
            }else if(testMode == TEST_MODE_START_REPL) {
                // 开启此模式前需要创建复制槽
                LogSequenceNumber waitLSN = LogSequenceNumber.valueOf("6F/E3C53568"); // LSN需要用户根据实际情况进行修改
                PGReplicationStream stream = conn
                        .getReplicationAPI()
                        .replicationStream()
                        .logical()
                        .withSlotName("replication_slot")
                        .withSlotOption("include-xids", true)
                        .withSlotOption("skip-empty-xacts", true)
                        .withStartPosition(waitLSN)
                        .withSlotOption("parallel-decode-num", 10) // 解码线程并行度
                        .withSlotOption("white-table-list", "public.t1,public.t2") // 白名单列表
                        // .withSlotOption("standby-connection", true) // 强制备机解码
                        .withSlotOption("decode-style", "t") // 解码格式
                        .withSlotOption("sending-batch", 0) // 批量发送解码结果
                        .withSlotOption("max-txn-in-memory", 100) // 单个解码事务落盘内存阈值为100MB
                        .withSlotOption("max-reorderbuffer-in-memory", 2) // 正在处理的解码事务落盘内存阈值为2GB
                        .withSlotOption("exclude-users", "userA") // 不返回用户userA执行事务的逻辑日志
                        .withSlotOption("include-user", false) // 事务BEGIN逻辑日志不携带用户名
                        .withSlotOption("enable-heartbeat", true) // 开启心跳日志
                        .start();
                while (true) {
                    ByteBuffer byteBuffer = stream.readPending();

                    if (byteBuffer == null) {
                        TimeUnit.MILLISECONDS.sleep(10L);
                        continue;
                    }

                    int offset = byteBuffer.arrayOffset();
                    byte[] source = byteBuffer.array();
                    int length = source.length - offset;
                    System.out.println(new String(source, offset, length));

                    // 如果需要flush lsn,根据业务实际情况调用以下接口,该接口会触发数据库复制槽落盘,对服务端解码性能有一定影响,建议调用间隔大于10s。
                    // LogSequenceNumber lastRecv = stream.getLastReceiveLSN();
                    // stream.setFlushedLSN(lastRecv);
                    // stream.forceUpdateStatus();

                }
            }else if(testMode == TEST_MODE_DROP_SLOT){
                conn.getReplicationAPI()
                        .dropReplicationSlot("replication_slot");
            }
        } catch (Exception e) {
            e.printStackTrace();
            return;
        } finally {
            try {
                conn.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
text格式(即't'格式)解码结果示例如下:
BEGIN  CS N: 2014 first_lsn: 0/2816A28
table public t1 INSERT: a[integer]:1 b[integer]:2 c[text]:'hello'
COMMIT XID: 15504
BEGIN CSN: 2015 first_lsn: 0/2816C20
table public t1 UPDATE: old-key: a[integer]:1 b[integer]:2 c[text]:'hello' new-tuple: a[integer]:1 b[integer]:5 c[text]:'hello'
COMMIT XID: 15505
BEGIN CSN: 2016 first_lsn: 0/2816D60
table public t1 DELETE: a[integer]:1 b[integer]:5 c[text]:'hello'
COMMIT XID: 15506
json格式(即'j'格式)解码结果示例如下:
BEGIN CSN: 2014 first_lsn: 0/2816A28
{"table_name":"public.t1","op_type":"INSERT","columns_name":["a","b","c"],"columns_type":["integer","integer","text"],"columns_val":["1","2","'hello'"],"old_keys_name":[],"old_keys_type":[],"old_keys_val":[]}
COMMIT XID: 15504
BEGIN CSN: 2015 first_lsn: 0/2816C20
{"table_name":"public.t1","op_type":"UPDATE","columns_name":["a","b","c"],"columns_type":["integer","integer","text"],"columns_val":["1","5","'hello'"],"old_keys_name":["a","b","c"],"old_keys_type":["integer","integer","text"],"old_keys_val":["1","2","'hello'"]}
COMMIT XID: 15505
BEGIN CSN: 2016 first_lsn: 0/2816D60
{"table_name":"public.t1","op_type":"DELETE","columns_name":[],"columns_type":[],"columns_val":[],"old_keys_name":["a","b","c"],"old_keys_type":["integer","integer","text"],"old_keys_val":["1","5","'hello'"]}
COMMIT XID: 15506
support.huaweicloud.com/centralized-devg-v3-gaussdb/gaussdb-42-0079.html