分布式消息服务RocketMQ版-消息堆积处理建议:方案概述
方案概述
在RocketMQ的实际业务中,消息堆积是比较常见的问题。在消息处理过程中,如果客户端的消费速度跟不上服务端的发送速度,未处理的消息会越来越多,这部分消息就被称为堆积消息。消息没有被及时消费而产生消息堆积,从而会造成消息消费延迟。对于消息消费实时性要求较高的业务系统,即使是消息堆积造成的短暂消息延迟也无法接受。造成消息堆积的原因有以下两个:
- 消息没有及时被消费,生产者生产消息的速度快于消费者消费消息的速度,从而产生消息积压且无法自行恢复。
- 业务系统本身逻辑耗费时间较长,导致消息消费效率较低。
消息消费过程

一个完整的消息消费过程主要分为2个阶段:
- 消息拉取
客户端通过批量拉取的方式从服务端获取消息,将拉取到的消息缓存到本地缓存队列中。对于拉取式消费,在内网环境下的吞吐量很高,因此消息拉取阶段一般不会引起消息堆积。
- 消息消费
客户端将本地缓存的消息提交到消费线程中,提供给业务消费逻辑进行消息处理,待消息处理完成后获取处理结果。此阶段的消费能力依赖于消息的消费耗时和消费并发度。如果由于业务处理逻辑复杂等原因,导致处理单条消息的耗时较长,就会影响整体的消息吞吐量。而消息吞吐量低会导致客户端本地缓存队列达到上限,从而停止从服务端拉取消息,引起消息堆积。
所以,消息堆积的主要瓶颈在于客户端的消费能力,而消费能力由消费耗时和消费并发度决定。消费耗时的优先级要高于消费并发度,应在保证消费耗时合理性的前提下,再考虑消费并发度问题。
消费耗时
影响消息处理时长的主要因素是业务处理的代码逻辑,而代码逻辑中会影响处理时长的主要有两种代码类型:CPU内部计算型代码和外部I/O操作型代码。如果代码中没有复杂的递归和循环处理,CPU内部计算耗时相对于外部I/O操作耗时来说几乎可以忽略,因此应关注外部I/O操作型代码的消息处理效率。
外部IO操作型代码主要有以下业务操作:
- 读写外部数据库,例如对远程MySQL数据库读写。
- 读写外部缓存系统,例如对远程Redis读写。
- 下游系统调用,例如Dubbo的RPC远程调用,Spring Cloud对下游系统的HTTP接口调用。
提前梳理下游系统的调用逻辑,掌握每个调用操作的预期耗时,有助于判断业务逻辑中I/O操作的耗时是否合理。通常消息堆积都是由于下游系统出现了服务异常或容量限制,从而导致消费耗时增加。而服务异常,并不仅仅是系统出现报错,也可能是更加隐蔽的问题,比如网络带宽问题。
消费并发度
客户端的消费并发度由单客户端线程数和客户端数量决定。单客户端线程数是指单个客户端所包含的线程数量,客户端数量是指消费组所包含的客户端(消费者)数量。对于普通消息、定时/延时消息、事务消息及顺序消息的消费并发度计算方法如下:
消息类型 |
消费并发度 |
---|---|
普通消息 |
单客户端线程数 * 客户端数 |
定时/延时消息 |
|
事务消息 |
|
顺序消息 |
Min(单客户端线程数 * 客户端数,队列数) |
单客户端线程数的调整需谨慎,不能盲目调大线程数,如果设置过大的线程数反而会带来大量的线程切换开销。
理想环境下单客户端的最优线程数计算模型为:C *(T1+T2)/T1。
其中,C为单机vCPU核数,T1为业务逻辑的CPU计算耗时,T2为外部I/O操作耗时,另外线程切换耗时忽略不计,I/O操作不消耗CPU,线程需有足够消息等待处理且内存充足。
此处最大线程数的计算模型是在理想环境下得到的,在实际应用中建议逐步调大线程数,在观察效果后再进行调整。
- RocketMQ是什么_RocketMQ介绍_分布式消息服务RocketMQ版
- 分布式消息服务Kafka版与分布式消息服务RocketMQ版的区别_Kafka与RocketMQ对比_Kafka与RocketMQ差异
- 分布式消息服务有哪些_分布式消息服务哪个好_分布式消息-华为云
- 分布式消息服务优势_分布式消息服务_消息队列-华为云
- 分布式消息中间件的作用_分布式消息中间件_分布式消息-华为云
- 分布式消息中间件实战_分布式消息实战_分布式消息-华为云
- 分布式消息队列是什么_分布式消息队列好处_消息队列-华为云
- RocketMQ的架构_RocketMQ的消息模式_分布式消息RocketMQ-华为云
- kafka是什么_kafka介绍_分布式消息服务Kafka版
- Kafka与其他消息队列的对比_什么是Kafka_分布式消息系统-华为云