云服务器内容精选

  • 消费消息 import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;import java.util.Date;@Componentpublic class ReceiveMsgService { Logger LOG = LoggerFactory.getLogger(ReceiveMsgService.class); @RabbitListener(queues = "test") public void receive(String message) { SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); LOG.info("receive message: {}", message + " 接收时间:" + simpleDateFormat.format(new Date())); }}
  • (可选)在application.properties文件中填写配置 如果RabbitMQ实例已开启SSL,在“application.properties”文件中填写如下配置。 #开启SSL认证spring.rabbitmq.ssl.enabled=true#SSL使用的算法spring.rabbitmq.ssl.algorithm=TLSv1.2#是否启用主机验证spring.rabbitmq.ssl.verify-hostname=false#是否启用服务端证书验证spring.rabbitmq.ssl.validate-server-certificate=false
  • 客户端网络环境说明 客户端可以通过以下方式访问RabbitMQ实例: VPC内子网地址访问 如果客户端是云上E CS ,与RabbitMQ实例处于同region同VPC,则可以直接访问RabbitMQ实例提供的VPC内子网地址。 VPC对等连接方式访问 如果客户端是云上ECS,与RabbitMQ实例处于相同region但不同VPC,则可以通过建立VPC对等连接后,访问RabbitMQ实例提供的VPC内子网地址。 关于创建和使用VPC对等连接,可参考VPC对等连接说明。 公网访问 客户端在其他网络环境,或者与RabbitMQ实例处于不同region,则访问实例的公网地址。 不同网络环境,对于客户端配置来说,只是连接地址的差异,其他都一样。因此,本手册以同一VPC内子网地址的方式,介绍客户端开发环境搭建。 遇到连接超时或失败时,请注意确认网络是否连通。可使用telnet方式,检测实例连接地址与端口。
  • 开源SDK列表 分布式消息服务RabbitMQ版支持所有开源版本的SDK,常见的开源SDK如表1所示。 表1 开源SDK列表 编程语言 SDK Java rabbitmq-java-client Spring Framework SpringAMQP .Net rabbitmq-dotnet-client Python pika PHP php-amqplib C rabbitmq-c Go amqp091-go 推荐使用最新Release版本的SDK。
  • 生产消息 以下加粗内容需要替换为实例自有信息,请根据实际情况替换。 SSL认证方式 import pikaimport ssl# 连接信息conf = { 'host': 'ip', 'port': 5671, 'queue_name': 'queue-test', 'username': 'root', 'password': 'password'}context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)credentials = pika.PlainCredentials(conf['username'], conf['password'])parameters = pika.ConnectionParameters(conf['host'], conf['port'], '/', credentials, ssl_options=pika.SSLOptions(context))connection = pika.BlockingConnection(parameters)channel = connection.channel()channel.queue_declare(conf['queue_name'])data = bytes('Hello World!', encoding='utf-8')channel.basic_publish(exchange='', routing_key=conf['queue_name'], body=data)print(" [x] Sent 'Hello World!'")connection.close() 非SSL认证方式 import pika# 连接信息conf = { 'host': 'ip', 'port': 5672, 'queue_name': 'queue-test', 'username': 'root', 'password': 'password'}credentials = pika.PlainCredentials(conf['username'], conf['password'])parameters = pika.ConnectionParameters(conf['host'], conf['port'], '/', credentials)connection = pika.BlockingConnection(parameters)channel = connection.channel()channel.queue_declare(conf['queue_name'])data = bytes("Hello World!", encoding="utf-8")channel.basic_publish(exchange='', routing_key=conf['queue_name'], body=data)print(" [x] Sent 'Hello World!'")connection.close()
  • 消费消息 以下加粗内容需要替换为实例自有信息,请根据实际情况替换。 SSL认证方式 import pikaimport ssl# 连接信息conf = { 'host': 'ip', 'port': 5671, 'queue_name': 'queue-test', 'username': 'root', 'password': 'password'}context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)credentials = pika.PlainCredentials(conf['username'], conf['password'])parameters = pika.ConnectionParameters(conf['host'], conf['port'], '/', credentials, ssl_options=pika.SSLOptions(context))connection = pika.BlockingConnection(parameters)channel = connection.channel()channel.queue_declare(conf['queue_name'])def callback(ch, method, properties, body): print(" [x] Received %r" % body.decode('utf-8'))channel.basic_consume(queue=conf['queue_name'], on_message_callback=callback, auto_ack=True)print(' [*] Waiting for messages. To exit press CTRL+C')channel.start_consuming() 非SSL认证方式 import pika# 连接信息conf = { 'host': 'ip', 'port': 5672, 'queue_name': 'queue-test', 'username': 'root', 'password': 'password'}credentials = pika.PlainCredentials(conf['username'], conf['password'])parameters = pika.ConnectionParameters(conf['host'], conf['port'], '/', credentials)connection = pika.BlockingConnection(parameters)channel = connection.channel()channel.queue_declare(conf['queue_name'])def callback(ch, method, properties, body): print(" [x] Received %r" % body.decode('utf-8'))channel.basic_consume(queue=conf['queue_name'], on_message_callback=callback, auto_ack=True)print(' [*] Waiting for messages. To exit press CTRL+C')channel.start_consuming()
  • SDK列表 在开始使用之前,请确保您安装的是最新版本的SDK。使用过时的版本可能会导致兼容性问题或无法使用最新功能。您可以在 SDK中心 查询版本信息。 表1提供了分布式消息服务RabbitMQ版支持的SDK列表,您可以在GitHub仓库查看SDK更新历史、获取安装包以及查看指导文档。 表1 SDK列表 编程语言 Github地址 参考文档 Java huaweicloud-sdk-java-v3 Java SDK使用指导 Python huaweicloud-sdk-python-v3 Python SDK使用指导 Go huaweicloud-sdk-go-v3 Go SDK使用指导 NodeJs huaweicloud-sdk-nodejs-v3 NodeJs SDK使用指导 .NET huaweicloud-sdk-net-v3 .Net SDK使用指导 PHP huaweicloud-sdk-php-v3 PHP SDK使用指导 表1提供的SDK为管理面API的SDK,并非生产消费消息的SDK。如果需要生产消费消息的SDK,请参考开发指南。
  • 计费说明 分布式消息服务RabbitMQ版的计费项由实例费用和存储空间费用组成。具体内容如表1所示。 如您需要快速了解分布式消息服务RabbitMQ版的具体价格,请参见分布式消息服务RabbitMQ版价格详情。 表1 分布式消息服务RabbitMQ版计费项 计费项 计费项说明 适用的计费模式 计费公式 实例费用 计费因子:代理规格和代理数量 说明: 实例费用在账单中显示的产品名称为“分布式消息服务专享版”。 包年/包月、按需计费 实例规格单价 * 购买时长 实例规格单价请参见分布式消息服务RabbitMQ版价格详情中的“实例价格”。 存储空间费用 计费因子:云硬盘类型、容量 说明: 存储空间费用在账单中显示的产品名称为“分布式消息服务物理多租存储”。 包年/包月、按需计费 云硬盘单价 * 购买时长 云硬盘单价请参见分布式消息服务RabbitMQ版价格详情中的“存储价格”。
  • 计费示例 以包年/包月计费模式为例,假设您于2023/03/08 15:50:04在“华北-北京四”区域购买了一个包年/包月RabbitMQ实例(规格:rabbitmq.2u4g.cluster*3,总存储空间:超高I/O 300GB),计费资源包括实例费用(代理规格和代理数量),以及存储空间费用(超高I/O 300GB)。购买时长为一个月,并在到期前手动续费1个月,则: 第一个计费周期为:2023/03/08 15:50:04 ~ 2023/04/08 23:59:59 第二个计费周期为:2023/04/08 23:59:59 ~ 2023/05/08 23:59:59 图1给出了上述示例配置的费用计算过程。图中价格仅供参考,实际计算请以分布式消息服务RabbitMQ版价格详情中的价格为准。 图1 包年/包月RabbitMQ实例费用计算示例(华北-北京四区域) 按需计费模式下,各计费项的计费示例请参见计费示例。
  • 维度 Key Value rabbitmq_instance_id RabbitMQ实例 rabbitmq_node RabbitMQ实例节点 rabbitmq_queue RabbitMQ实例队列 rabbitmq_vhost RabbitMQ实例Vhost rabbitmq_vhost_exchange RabbitMQ实例Exchange rabbitmq_vhost_queue RabbitMQ实例Queue
  • 变更实例规格的影响 表3 变更实例规格的影响 变更配置类型 影响 代理数量 变更规格过程中会有秒级业务中断,客户端需要支持自动重连,建议在业务低峰时进行变更。 预计总变更时长为10分钟左右。 存储空间 扩容存储空间不会影响业务。 可用存储空间=实际存储空间-用于存储日志的存储空间-格式化磁盘的损耗。例如,实际扩容存储空间到700GB,用于存储日志的数据的存储空间为100GB,格式化磁盘损耗7GB,那么扩容后的可用存储空间为593GB。 预计总变更时长为5分钟内。 代理规格/实例规格 RabbitMQ 3.x.x版本单机实例和没有配置镜像/仲裁队列的集群实例在变更规格过程中会有分钟级业务中断,客户端需要支持自动重连,建议在业务低峰时进行变更。 RabbitMQ 3.x.x版本配置了镜像/仲裁队列的集群实例在变更规格过程中会有秒级业务中断,客户端需要支持自动重连,建议在业务低峰时进行变更。 RabbitMQ AMQP-0-9-1版本单机实例和集群实例在变更规格过程中会有秒级连接闪断,客户端需要支持自动重连,建议在业务低峰时进行变更。 代理采用滚动重启的方式进行实例变更,变更时长和代理数量有关,单个代理的变更时长一般在5~10分钟,总变更时长为(5~10分钟)*代理数量。
  • 测试步骤 登录客户端服务器,进入“rabbitmq-perf-test-2.18.0/bin”目录下。 运行以下脚本,测试并记录不同实例规格下的生产速率和消费速率。 ./runjava com.rabbitmq.perf.PerfTest -h amqp://${实例用户名}:${实例密码}@${内网连接地址} -e ${Exchange名称} -s 1024 -u ${队列名称} -x 3 -y 3 -z 300 示例如下: ./runjava com.rabbitmq.perf.PerfTest -h amqp://test:******@192.168.0.150:5672 -e exchange-direct -s 1024 -u queue-1 -x 3 -y 3 -z 300 运行以下脚本,测试并记录rabbitmq-ssl实例在不同生产者数量、消费者数量、队列数量的生产速率和消费速率。 单队列测试脚本: ./runjava com.rabbitmq.perf.PerfTest -h amqps://${实例用户名}:${实例密码}@${内网连接地址} -e ${Exchange名称} -s 1024 -u ${队列名称} -x ${生产者个数} -y ${消费者个数} -z 300 示例如下: ./runjava com.rabbitmq.perf.PerfTest -h amqps://test:******@192.168.0.150:5671 -e exchange-direct -s 1024 -u queue-1 -x 1 -y 1 -z 300 多队列测试脚本: ./runjava com.rabbitmq.perf.PerfTest -h amqps://${实例用户名}:${实例密码}@${内网连接地址} -e ${Exchange名称} -s 1024 -x 3 -y 3 -z 300 --queue-pattern 'queue-%d' --queue-pattern-from 1 --queue-pattern-to x 示例如下: ./runjava com.rabbitmq.perf.PerfTest -h amqps://test:******@192.168.0.150:5671 -e exchange-direct -s 1024 -x 3 -y 3 -z 300 --queue-pattern 'queue-%d' --queue-pattern-from 1 --queue-pattern-to 3 运行以下脚本,测试并记录rabbitmq-2u4g实例在不同生产者数量、消费者数量、队列数量的生产速率和消费速率。 单队列测试脚本: ./runjava com.rabbitmq.perf.PerfTest -h amqp://${实例用户名}:${实例密码}@${内网连接地址} -e ${Exchange名称} -s 1024 -u ${队列名称} -x ${生产者个数} -y ${消费者个数} -z 300 示例如下: ./runjava com.rabbitmq.perf.PerfTest -h amqp://test:******@192.168.0.150:5672 -e exchange-direct -s 1024 -u queue-1 -x 1 -y 1 -z 300 多队列测试脚本: ./runjava com.rabbitmq.perf.PerfTest -h amqp://${实例用户名}:${实例密码}@${内网连接地址} -e ${Exchange名称} -s 1024 -x ${生产者个数} -y ${消费者个数} -z 300 --queue-pattern 'queue-%d' --queue-pattern-from 1 --queue-pattern-to x 示例如下: ./runjava com.rabbitmq.perf.PerfTest -h amqp://test:******@192.168.0.150:5672 -e exchange-direct -s 1024 -x 3 -y 3 -z 300 --queue-pattern 'queue-%d' --queue-pattern-from 1 --queue-pattern-to 3 运行以下脚本,测试并记录rabbitmq-2u4g实例在不同生产者数量、消费者数量、队列数量、队列类型的生产速率和消费速率,其中队列类型分别为“惰性”、“镜像”和“仲裁”。 单队列测试脚本(不包括仲裁队列): ./runjava com.rabbitmq.perf.PerfTest -h amqp://${实例用户名}:${实例密码}@${内网连接地址} -e ${Exchange名称} -s 1024 -u ${队列名称} -x ${生产者个数} -y ${消费者个数} -z 300 示例如下: ./runjava com.rabbitmq.perf.PerfTest -h amqp://test:******@192.168.0.150:5672 -e exchange-direct -s 1024 -u queue-1 -x 1 -y 1 -z 300 多队列测试脚本(不包括仲裁队列): ./runjava com.rabbitmq.perf.PerfTest -h amqp://${实例用户名}:${实例密码}@${内网连接地址} -e ${Exchange名称} -s 1024 -x ${生产者个数} -y ${消费者个数} -z 300 --queue-pattern 'queue-%d' --queue-pattern-from 1 --queue-pattern-to x 示例如下: ./runjava com.rabbitmq.perf.PerfTest -h amqp://test:******@192.168.0.150:5672 -e exchange-direct -s 1024 -x 3 -y 3 -z 300 --queue-pattern 'queue-%d' --queue-pattern-from 1 --queue-pattern-to 3 单队列测试脚本(仲裁队列): ./runjava com.rabbitmq.perf.PerfTest -h amqp://${实例用户名}:${实例密码}@${内网连接地址} -e ${Exchange名称} -s 1024 -u ${队列名称} -x ${生产者个数} -y ${消费者个数} -z 300 --predeclared 示例如下: ./runjava com.rabbitmq.perf.PerfTest -h amqp://test:******@192.168.0.150:5672 -e exchange-direct -s 1024 -u queue-1 -x 1 -y 1 -z 300 --predeclared 多队列测试脚本(仲裁队列): ./runjava com.rabbitmq.perf.PerfTest -h amqp://${实例用户名}:${实例密码}@${内网连接地址} -e ${Exchange名称} -s 1024 -x ${生产者个数} -y ${消费者个数} -z 300 --queue-pattern 'queue-%d' --queue-pattern-from 1 --queue-pattern-to x --predeclared 示例如下: ./runjava com.rabbitmq.perf.PerfTest -h amqp://test:******@192.168.0.150:5672 -e exchange-direct -s 1024 -x 3 -y 3 -z 300 --queue-pattern 'queue-%d' --queue-pattern-from 1 --queue-pattern-to 3 --predeclared 运行以下脚本,测试并记录rabbitmq-2u4g实例的Fanout Exchange在不同消费者数量、队列数量的生产速率和消费速率。 单队列测试脚本: ./runjava com.rabbitmq.perf.PerfTest -h amqp://${实例用户名}:${实例密码}@${内网连接地址} -e ${Exchange名称} -s 1024 -u ${队列名称} -x ${生产者个数} -y ${消费者个数} -z 300 --predeclared 示例如下: ./runjava com.rabbitmq.perf.PerfTest -h amqp://test:******@192.168.0.150:5672 -e exchange-fanout -s 1024 -u queue-1 -x 1 -y 3 -z 300 --predeclared
  • 测试结果 测试场景一(实例规格):相同Exchange、队列、生产者数量、消费者数量、不同的实例规格 测试参数如下: Exchange:类型为“direct”,非持久化,不会自动删除。 队列:类型“经典队列”,数量为“3”,非持久化,会自动删除。 生产者:数量为“3”。 消费者:数量为“3”。 表2 测试结果 实例规格 磁盘类型 代理数量 生产速率 消费速率 rabbitmq.2u4g.cluster 超高I/O 3 32052 25219 rabbitmq.4u8g.cluster 超高I/O 3 53774 47370 rabbitmq.8u16g.cluster 超高I/O 3 54727 45730 rabbitmq.16u32g.cluster 超高I/O 3 66896 51061 通过上表的测试结果,得出以下结论,仅供参考:实例规格越大,实例性能越高。
  • 测试脚本 测试脚本自动创建的Exchange类型为“direct”,队列特性为非持久化、自动删除,在测试fanout类型Exchange和仲裁队列时,需要在脚本后增加“--predeclared”,表示使用自己定义的Exchange和队列参数。 测试开启SSL的实例时,需要将“amqp://”修改为“amqps://”,表示以加密形式传输数据。 单队列测试脚本: ./runjava com.rabbitmq.perf.PerfTest -h amqp://${实例用户名}:${实例密码}@${内网连接地址} -e ${Exchange名称} -s 1024 -u ${队列名称} -x ${生产者个数} -y ${消费者个数} -z ${运行时间} 参数说明如下: 实例用户名:购买实例时设置的用户名。 实例密码:购买实例时设置的密码。 内网连接地址:购买实例后获取的内网连接地址。 队列名称:队列的名称。 生产者个数:生产者的数量。 消费者个数:消费者的数量。 运行时间:脚本的运行时间,单位为秒。 多队列测试脚本: ./runjava com.rabbitmq.perf.PerfTest -h amqp://${实例用户名}:${实例密码}@${内网连接地址} -e ${Exchange名称} -s 1024 -x ${生产者个数} -y ${消费者个数} -z ${运行时间} --queue-pattern 'queue-%d' --queue-pattern-from 1 --queue-pattern-to x 参数说明如下: 实例用户名:购买实例时设置的用户名。 实例密码:购买实例时设置的密码。 内网连接地址:购买实例后获取的内网连接地址。 生产者个数:生产者的数量。 消费者个数:消费者的数量。 运行时间:脚本的运行时间,单位为秒。 queue-%d:表示多个队列,队列名称前缀为queue-,%d表示变量,取值为从--queue-pattern-from数值到--queue-pattern-to数值的连续整数。例如 --queue-pattern 'queue-%d' --queue-pattern-from 1 --queue-pattern-to 3,表示3个队列,队列名称为queue-1、queue-2、queue-3。
  • 测试环境 进行性能测试前,您需要先构建如下的测试环境: 购买如表1所示RabbitMQ 3.8.35版本的集群实例,购买方法请参考购买RabbitMQ实例。 设置连接RabbitMQ密码时,建议不要使用特殊字符。如果使用特殊字符,在使用测试脚本时,需要对特殊字符进行转义处理,否则会报错。 购买“rabbitmq-2u4g”实例时,请开启公网访问,并在安全组中入方向规则中放通15672端口,以便在浏览器中访问WebUI界面。 表1 实例参数 名称 代理数量 规格 是否开启SSL 磁盘类型 rabbitmq-ssl 3 rabbitmq.2u4g.cluster 是 超高I/O rabbitmq-2u4g 3 rabbitmq.2u4g.cluster 否 超高I/O rabbitmq-4u8g 3 rabbitmq.4u8g.cluster 否 超高I/O rabbitmq-8u16g 3 rabbitmq.8u16g.cluster 否 超高I/O rabbitmq-16u32g 3 rabbitmq.16u32g.cluster 否 超高I/O 购买完成后,在实例详情页获取RabbitMQ实例的内网连接地址,并记录购买实例时设置的用户名和密码。如果是“rabbitmq-2u4g”实例,除了内网连接地址、用户名和密码外,还需要记录Web界面UI地址,此地址在后续登录WebUI界面设置镜像队列和惰性队列时需要使用。 在“rabbitmq-2u4g”实例中,登录WebUI,并设置镜像队列、惰性队列和仲裁队列。 在“rabbitmq-2u4g”实例的“/”Vhost下,创建“fanout”类型的Exchange,具体步骤请参考创建RabbitMQ Exchange。 获取测试工具rabbitmq-perf-test-2.18.0-bin.tar.gz。 购买客户端服务器。 购买区域、可用区、虚拟私有云、子网、安全组与RabbitMQ实例保持一致,规格为16U32G,Linux系统的ECS服务器,具体步骤请参考购买弹性云服务器。 购买完成ECS后,需要在ECS中完成以下配置: 安装Java JDK,并配置JAVA_HOME与PATH环境变量。 export JAVA_HOME=/root/jdk1.8.0_231 export PATH=$JAVA_HOME/bin:$PATH 下载rabbitmq-perf-test-2.18.0-bin.tar.gz,并解压。 tar -zxvf rabbitmq-perf-test-2.18.0-bin.tar.gz