应用与数据集成平台 ROMA Connect-Python客户端使用说明:生产消息

时间:2025-02-12 14:55:44

生产消息

  • SASL认证方式

    注意,加粗内容需要替换为实例自有信息。

    from kafka import KafkaProducerimport ssl##连接信息conf = {    'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"],    'topic_name': 'topic_name',    'sasl_plain_username': 'username',    'sasl_plain_password': 'password'}context = ssl.create_default_context()context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)context.verify_mode = ssl.CERT_REQUIRED##证书文件context.load_verify_locations("phy_ca.crt")print('start producer')producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers'],                        sasl_mechanism="PLAIN",                        ssl_context=context,                        security_protocol='SASL_SSL',                        sasl_plain_username=conf['sasl_plain_username'],                        sasl_plain_password=conf['sasl_plain_password'])data = bytes("hello kafka!", encoding="utf-8")producer.send(conf['topic_name'], data)producer.close()print('end producer')

    示例代码中的参数说明,可参考获取MQS连接信息获取参数值。

    • bootstrap_servers:MQS连接地址和端口。
    • topic_name:要生产消息的Topic名称。
    • sasl_plain_username和sasl_plain_password:开启SASL_SSL认证时所使用的用户名和密码。
    • context.load_verify_locations:开启SASL_SSL认证时所使用的客户端证书。
  • 非SASL认证方式

    注意,加粗内容需要替换为实例自有信息。

    from kafka import KafkaProducerconf = {    'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"],    'topic_name': 'topic_name',}print('start producer')producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers'])data = bytes("hello kafka!", encoding="utf-8")producer.send(conf['topic_name'], data)producer.close()print('end producer')

    示例代码中的参数说明,可参考获取MQS连接信息获取参数值。

    • bootstrap_servers:MQS连接地址和端口。
    • topic_name:要生产消息的Topic名称。
support.huaweicloud.com/devg-roma/roma_04_3008.html