数据仓库服务 GAUSSDB(DWS)-使用Python第三方库psycopg2连接集群:psycopg2连接集群不支持CN Retry特性的问题说明

时间:2024-11-06 16:28:36

psycopg2连接集群不支持CN Retry特性的问题说明

GaussDB (DWS)支持在SQL语句执行出错时的自动重试功能(简称CN Retry)。CN Retry对于客户端和驱动发送的SQL语句在执行失败时可以自动识别错误类型,并进行重试,详情请参见SQL语句出错自动重试。但使用psycopg2默认连接方式创建的连接在语句执行失败时没有自动重试,会直接报错退出。如常见的主备切换场景下,未自动重试会报如下错误,但在自动重试期间完成主备切换,则会返回正确结果。

1
psycopg2.errors.ConnectionFailure: pooler: failed to create 1 connections, Error Message: remote node dn_6003_6004, detail: could not connect to server: Operation now in progress

报错原因:

  1. psycopg2在发送SQL语句前先发送了BEGIN语句开启事务。
  2. CN Retry不支持事务块中的语句是特性约束。

解决方案:

  • 在同步方式连接时,可以通过主动结束驱动开启的事务。
    1
    2
    3
    4
    cursor = conn.cursor()
    # 增加end语句主动结束驱动开启的事务
    cursor.execute("end; select * from test order by 1;") 
    rows = cursor.fetchall()
    
  • 使用异步连接方式主动开启事务,异步连接介绍具体请参见pyscopg官网:https://www.psycopg.org/docs/advanced.html?highlight=async
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    #!/usr/bin/env python3
    # _*_ encoding=utf-8 _*_
     
    import psycopg2
    import select
     
    # psycopg2官方提供的异步连接方式时的wait函数
    # 详见https://www.psycopg.org/docs/advanced.html?highlight=async
    def wait(conn):
        while True:
            state = conn.poll()
            if state == psycopg2.extensions.POLL_OK:
                break
            elif state == psycopg2.extensions.POLL_WRITE:
                select.select([], [conn.fileno()], [])
            elif state == psycopg2.extensions.POLL_READ:
                select.select([conn.fileno()], [], [])
            else:
                raise psycopg2.OperationalError("poll() returned %s" % state)
     
    def psycopg2_cnretry_sync():
        # 创建连接
        conn = psycopg2.connect(host='10.154.70.231',
                                    port='8000',
                                    database='gaussdb',  # 需要连接的database
                                    user='dbadmin',
                                    password='password',  # 数据库用户密码
                                    async=1) # 使用异步方式连接
        wait(conn)
     
        # 执行查询
        cursor = conn.cursor()
        cursor.execute("select * from test order by 1;")
        wait(conn)
        rows = cursor.fetchall()
        for row in rows:
            print(row[0], row[1])
     
        # 关闭连接
        conn.close()
     
    if __name__ == '__main__':
        psycopg2_cnretry_async()
    
support.huaweicloud.com/mgtg-dws/dws_01_0120.html