云服务器内容精选

  • 网络异常时RabbitMQ客户端重试连接示例代码 客户端和服务端的初始连接失败,不会触发自动恢复,可在客户端编写对应的应用程序代码,通过重试连接来解决初始连接失败的问题。 以下示例演示了使用Java客户端通过重试连接解决初始连接失败的问题。 ConnectionFactory factory = new ConnectionFactory(); // 对于4.0.0版本之前的RabbitMQ Java客户端,开启自动恢复功能 factory.setAutomaticRecoveryEnabled(true); // 配置连接设置 try { Connection conn = factory.newConnection(); } catch (java.net.ConnectException e) { Thread.sleep(5000); // apply retry logic }
  • 方案概述 由于服务端重启、网络抖动等原因造成客户端网络连接断开时,将导致客户端无法正常生产和消费消息。 通过在客户端侧设置重连机制,使客户端在网络连接断开时自动恢复连接,降低网络故障对业务的影响。以下场景会触发网络自动恢复: 在连接的I/O循环中抛出未处理的异常 检测到Socket读取超时 检测到服务端心跳丢失 4.0.0及以上版本的Java客户端默认支持网络自动恢复,无需设置。 如果应用程序使用Connection.Close方法关闭连接,则不会启用或触发网络自动恢复。
  • 与Kafka、RocketMQ的差异 表1 功能差异 功能项 RocketMQ Kafka RabbitMQ 优先级队列 不支持 不支持 3.8.35版本:支持。建议优先级大小设置在0-10之间。 AMQP-0-9-1版本:支持。优先级大小设置在1-9之间。 延迟队列 支持 不支持 不支持 死信队列 支持 不支持 支持 消息重试 支持 不支持 3.8.35版本:不支持。 AMQP-0-9-1版本:支持。 消费模式 支持客户端主动拉取和服务端推送两种方式。 客户端主动拉取。 支持客户端主动拉取和服务端推送两种模式。 广播消费 支持 支持 支持 消息回溯 支持 支持。Kafka支持按照offset和timestamp两种维度进行消息回溯。 3.8.35版本:不支持。RabbitMQ中消息一旦被确认消费就会被标记删除。 AMQP-0-9-1版本:支持。 消息堆积 支持 支持。考虑吞吐因素,Kafka的堆积效率比RabbitMQ总体上要高。 支持 持久化 支持 支持 支持 消息追踪 支持 不支持 3.8.35版本:不支持。 AMQP-0-9-1版本:支持。 消息过滤 支持 支持 3.8.35版本:不支持,但可以自行封装。 AMQP-0-9-1版本:支持。 多租户 支持 支持 支持 多协议支持 兼容RocketMQ协议。 只支持Kafka自定义协议。 RabbitMQ基于AMQP协议实现。 跨语言支持 支持多语言的客户端。 采用Scala和Java编写,支持多种语言的客户端。 支持多种语言的客户端。 流量控制 RocketMQ 5.x支持基于实例规格的流量控制。 支持client、user和Topic级别,通过主动设置可将流控作用于生产者或消费者。 RabbitMQ的流控基于Credit-Based算法,是内部被动触发的保护机制,作用于生产者层面。 消息顺序性 单队列(queue)内有序。 支持单分区(partition)级别的顺序性。 单线程发送、单线程消费并且不采用延迟队列、优先级队列等一些高级功能时,才能实现消息有序。 安全机制 支持SSL认证。 支持SSL、SASL身份认证和读写权限控制。 3.8.35版本:支持SSL认证。 AMQP-0-9-1版本:支持ACL访问控制。 事务性消息 支持 支持 支持
  • 配置RabbitMQ镜像队列 登录RabbitMQ实例的Web UI。 在菜单栏,选择“Admin”。 图1 选择Admin菜单 (可选)如果您需要设置指定Vhost,请执行本步骤;如果不需要,请直接执行4。 选择右侧导航栏“Virtual Hosts”,然后输入“Name”,单击“Add virtual host”,创建Vhost。 图2 创建Vhost 选择右侧导航栏“Policies”,为Vhost设置策略。 图3 设置Vhost策略 表1 策略参数说明 参数 说明 Virtual Host 设置策略所应用的Vhost。如果为指定的Vhost设置,请在“Virtual Host”选择3创建的Vhost;如果没有,则默认为“/”。 Name 策略的名称,用户自定义。 Pattern Queue的匹配模式(正则表达式)。 Apply to 策略所适用的目标。 Priority 策略的优先级,数字越大,优先级越高。 Definition 镜像定义,包括三个部分ha-sync-mode、ha-mode、ha-params。 ha-sync-mode: 表示镜像队列中消息的同步方式,有效取值范围为:automatic和manual。 automatic:表示自动向master同步数据。 manual:表示手动向master同步数据。 ha-mode: 指明镜像队列的模式,有效取值范围为:all、exactly和nodes。 all:表示在集群所有的节点上进行镜像。 exactly:表示在指定个数的节点上进行镜像,节点的个数由ha-params指定。 nodes:表示在指定的节点上进行镜像,节点名称通过ha-params指定。 ha-params: ha-mode模式需要用到的参数。 说明: 如果将队列镜像到集群所有节点上,可能会导致集群承受不必要的网络及磁盘I/O流量。因此,推荐您使用以下参数配置: ha-sync-mode: automatic ha-mode: exactly ha-params: n/2+1,其中n表示集群中总节点数量。 例如集群总节点数为3,ha-params设置为3/2+1=2,此时队列将镜像到一个主节点和一个从节点上。这样配置既保证了数据的高可用性,又避免了不必要的资源开销。 单击“Add policy”。 策略添加成功后如下图所示。 图4 Vhost策略
  • LVS的心跳超时时间 RabbitMQ集群实例使用LVS进行负载均衡,如图1所示,单节点实例不涉及LVS。 图1 集群实例的负载均衡 LVS对客户端连接设置了心跳超时时间,默认为90s。如果客户端在90s内没有向LVS发送心跳(AMQP心跳帧或消息收发),LVS会主动断开与客户端的连接,此时客户端需要重新连接。 如果存在消息收发时间间隔大于90s的场景,请在客户端开启心跳并设置小于90s的心跳超时时间。推荐设置心跳超时时间为10s。
  • 什么是心跳 RabbitMQ实例提供了心跳功能,以确保应用程序层及时发现中断的连接和完全无响应的对端。心跳还可以防止某些网络设备在一段时间内由于没有活动而中断TCP连接。开启心跳的方法为在连接上指定心跳超时时间。 心跳超时时间定义了对等TCP连接在多长时间后被服务端和客户端视为关闭。服务端和客户端会对配置的心跳超时时间进行协商,客户端必须配置该值来发送心跳。RabbitMQ官方团队维护的3个客户端(Java、.NET、Erlang语言)的心跳超时时间协商逻辑如下: 服务端和客户端设置的心跳超时时间都不为0时,两者间较小的值生效。 服务端和客户端任意一端设置的心跳超时时间为0,另一端不为0时,非0的值生效。 服务端和客户端的心跳超时时间都设置为0时,表示禁用心跳。 配置心跳超时时间后,RabbitMQ服务端和客户端都会向对方发送AMQP心跳帧作为心跳,发送的时间间隔为心跳超时时间的一半。客户端在两次错过心跳后,会被认为是不可达的,TCP连接将被关闭。当客户端检测到服务端由于心跳而无法访问时,需要重新连接。更多关于心跳的说明,请参考Detecting Dead TCP Connections with Heartbeats and TCP Keepalives。 一些客户端(如C语言客户端)没有发送心跳的逻辑,即使配置了心跳超时时间,开启了心跳,仍然无法发送心跳。此时需要额外启动一个线程,编写发送心跳的逻辑。
  • Exchange 表3 Exchange约束与限制 限制项 约束和限制 默认Exchange RabbitMQ 3.8.35版本在创建Vhost后会创建7个默认Exchange:(AMQP default)、amq.direct、amq.fanout、amq.headers、amq.match、amq.rabbitmq.trace、amq.topic。 绑定Exchange RabbitMQ 3.8.35版本中,名为“(AMQP default)”的Exchange不能绑定任何Exchange。 “Internal”为“是”的Exchange只能绑定Exchange,不能绑定Queue。 RabbitMQ AMQP-0-9-1版本的Exchange不支持绑定Exchange,只支持绑定Queue。 删除Exchange RabbitMQ 3.8.35版本中,默认Exchange不支持删除。
  • Queue 表4 Queue约束与限制 限制项 约束和限制 绑定Queue RabbitMQ 3.8.35版本中,名为“(AMQP default)”的Exchange不能绑定任何Queue。 “Internal”为“是”的Exchange只能绑定Exchange,不能绑定Queue。 惰性队列 仅RabbitMQ 3.8.35版本支持惰性队列。 仲裁队列 仅RabbitMQ 3.8.35版本支持仲裁队列。 单一活跃消费者 仅RabbitMQ 3.8.35版本支持单一活跃消费者特性。
  • 稳定性 表2 稳定性差异 功能项 AMQP-0-9-1版本 开源RabbitMQ 消息堆积 海量消息堆积能力,高性能不受消息堆积影响。 抗堆积能力差,容易引发内存问题而导致宕机。 弹性能力 集群分布式无主架构,能够横向快速地扩容集群规模。 通过变更机器规格来扩容、缩容。 服务可用性 99.95%集群分布式高可用架构,多可用区高可用。 使用Erlang语言开发,运维靠经验摸索,且无法避免开源架构的稳定性痛点。 数据可靠性 数据三副本,不会影响TPS性能。 配置副本数变多会导致TPS下降。 巡检系统 自动发现并修复死锁、宕机等问题。 无。
  • Queue 表4 Queue差异 功能项 AMQP-0-9-1版本 开源RabbitMQ 队列类型 无需配置,分布式高可用集群。 需要配置,支持以下类型: Classic:经典镜像队列。 Quorum:仲裁队列。 节点 无需配置,服务免运维。 需要配置,可选择节点。 持久化 默认持久化。 支持持久化和非持久化。 Max length 无需配置,支持海量消息堆积。 需要配置,防止消息堆积过多而引起的内存问题导致宕机。 Max length bytes 无需配置,支持海量消息堆积。 需要配置,防止消息堆积过多而引起的内存问题导致宕机。 Max in memory length 无需配置,支持海量消息堆积。 需要配置,防止消息堆积过多而引起的内存问题导致宕机。 Max in memory bytes 无需配置,支持海量消息堆积。 需要配置,防止消息堆积过多而引起的内存问题导致宕机。 Delivery limit 无需配置,固定值,默认16次。 需要配置。 Dead letter exchange 支持。 支持。 Dead letter routing key 支持。 支持。 Single active consumer 不支持。 支持。
  • Exchange 表3 Exchange差异 功能项 AMQP-0-9-1版本 开源RabbitMQ Exchange类型 支持的类型:direct、fanout、headers、topic、x-delayed-message、x-consistent-hash。 支持的类型:direct、fanout、headers、topic、x-delayed-message、x-consistent-hash。 持久化 默认持久化。 支持配置持久化与非持久化。 自动删除 支持。 支持。 Internal 不支持。 支持。 Alternate exchange 不支持。 支持。 Consistent hash exchange 支持。 支持。
  • 功能 表1 功能差异 功能项 AMQP-0-9-1版本 开源RabbitMQ 客户端SDK 支持开源所有语言和所有版本的SDK。 支持开源SDK。 定时消息 支持任意定时时间,秒级精确度,海量堆积。 通过插件或使用消息存活时间过期转移方式实现。 事务消息 不支持。 支持。 顺序消息 不支持。 支持。 消息优先级 支持。 支持。 消息重试机制 支持。消息消费超过一定时间未响应会重新投递。重试间隔时间为1分钟,最多重试16次,超过则会丢弃或发送至死信Exchange。 不支持。 监控指标 指标丰富,维度可精确到Vhost、Exchange和Queue,便于您快速发现和定位问题。 支持以下两种方案: 方案一:通过Management UI能够获取丰富的指标,但需要自建指标存储及展示的系统。 方案二:通过Prometheus+Grafana实现,该方案获取的指标较简单,维度不够精确,不利于快速定位业务问题。 消息轨迹 轨迹数据白屏化展示,消息完整的生命周期清晰可见,一目了然。提供强大的索引能力,可根据Queue、消息ID、消息处理耗时等完成不同维度的查询。 消息轨迹信息以文本格式存储在服务器的log文件中,查询和定位问题效率较低。
  • 命令行模式连接实例 登录客户端所在服务器。 下载RabbitMQ-Tutorial.zip示例工程代码。 wget https://dms-demo.obs.cn-north-1.myhuaweicloud.com/RabbitMQ-Tutorial.zip 解压RabbitMQ-Tutorial.zip压缩包。 unzip RabbitMQ-Tutorial.zip 进入RabbitMQ-Tutorial目录,该目录下包含预编译好的jar文件。 cd RabbitMQ-Tutorial 运行生产消息示例。 java -cp .:rabbitmq-tutorial.jar Send {host} {port} {user} {password} 参数说明如下: {host}:从前提条件中获取的连接地址。 {port}:RabbitMQ实例的连接端口,输入5672。 {user}:从前提条件中获取的用户名。 {password}:从前提条件中获取的密码。 生产消息示例如下: [root@ecs-test RabbitMQ-Tutorial]# java -cp .:rabbitmq-tutorial.jar Send 192.168.xx.40 5672 test Zxxxxxxs [x] Sent 'Hello World!' [root@ecs-test RabbitMQ-Tutorial]# java -cp .:rabbitmq-tutorial.jar Send 192.168.xx.40 5672 test Zxxxxxxs [x] Sent 'Hello World!' 运行消费消息示例。 java -cp .:rabbitmq-tutorial.jar Recv {host} {port} {user} {password} 参数说明如下: {host}:从前提条件中获取的连接地址。 {port}:RabbitMQ实例的连接端口,输入5672。 {user}:从前提条件中获取的用户名。 {password}:从前提条件中获取的密码。 消费消息示例如下: [root@ecs-test RabbitMQ-Tutorial]# java -cp .:rabbitmq-tutorial.jar Recv 192.168.xx.40 5672 test Zxxxxxxs [*] Waiting for messages. To exit press CTRL+C [x] Received 'Hello World!' [x] Received 'Hello World!' 如需停止消费使用Ctrl+C命令退出。
  • 示例代码(Java) 连接实例并生产消息示例代码: VHOST_NAME:消息要发送的Queue所在的Vhost名称。 QUEUE_NAME:消息要发送的Queue名称。 Hello World!:要发送的消息,根据实际需要修改。 ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setPort(port); factory.setVirtualHost("VHOST_NAME"); factory.setUsername(user); factory.setPassword(password); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); 连接实例并消费消息示例代码: VHOST_NAME:要消费消息的Queue所在的Vhost名称。 QUEUE_NAME:要消费消息的Queue名称。 ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setPort(port); factory.setVirtualHost("VHOST_NAME"); factory.setUsername(user); factory.setPassword(password); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(QUEUE_NAME, true, consumer);
  • 前提条件 已购买RabbitMQ实例,并记录创建时输入的用户名和密码,实例未开启SSL。 在实例详情中查看并记录“内网连接地址/公网连接地址”。 客户端所在服务器和RabbitMQ实例之间网络已互通,具体网络要求参见连接RabbitMQ网络要求。 客户端所在服务器已安装Java Development Kit 1.8.111或以上版本,并配置JAVA_HOME与PATH环境变量,环境变量配置方法如下: 使用执行用户在用户家目录下修改“.bash_profile”,添加如下行。其中“/opt/java/jdk1.8.0_151”为JDK的安装路径,请根据实际情况修改。 export JAVA_HOME=/opt/java/jdk1.8.0_151 export PATH=$JAVA_HOME/bin:$PATH 执行source .bash_profile命令使修改生效。 RabbitMQ实例中已创建Vhost、Exchange和Queue,且配置Exchange和Queue的绑定。