云服务器内容精选

  • 前提条件 删除消息前,请先在消费代码中设置“auto.offset.reset”参数。“auto.offset.reset”用来指定当Kafka中没有初始偏移量或者当前偏移量不存在(例如当前偏移量已被删除)时,消费者的消费策略。取值如下: latest:偏移量自动被重置到最晚偏移量。 earliest:偏移量自动被重置到最早偏移量。 none:向消费者抛出异常。 如果将此配置设置为latest,新增分区时,生产者可能会在消费者重置初始偏移量之前开始向新增加的分区发送消息,从而导致部分消息丢失。
  • 预取值设置建议 如果您只有一个或很少几个消费者在处理消息,建议一次预取多条消息,尽量让客户端保持忙碌。如果您的处理时间和网络状态稳定,则只需将总往返时间除以每条消息在客户端的处理时间即可获得估计的预取值。 在消费者多且处理时间短的情况下,建议使用较低的预取值。过低的预取值会使消费者闲置,因为消费者在处理完消息后需要等待下一批的消息到达。过高的值可能会使单个消费者忙碌,其他消费者处于空闲状态。 在消费者多且处理时间很长的情况下,建议您将预取值设置为1,以便消息在所有消费者间均匀分布。
  • 设置预取值 以下示例演示在Java客户端为单个消费者设置预取值为10。 ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //设置预取值为10。 channel.basicQos(10, false); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume("my_queue", false, consumer); 在Java客户端中,global的默认值为false,因此以上示例可以简单地写为channel.basicQos(10)。 global取值的含义如下: false:分别作用于通道上的每个新消费者。 true:在通道上的所有消费者之间所共享。
  • 前提条件 已创建RocketMQ实例和Topic。 如果通过按Message ID查询,需要提前获取消息所在的Topic名称和消息的Message ID。 Message ID为生产消息后返回的MsgId,如6中返回的内容,也可先通过Topic查询消息,记录Message ID。 如果通过按Message Key查询,需要提前获取消息所在的Topic名称和消息的Message Key。 Message Key为7中配置的消息Key,也可先通过Topic查询消息,记录Message Key。
  • 消息清理机制 无论消息是否被消费,RocketMQ的消息默认保留时间为48小时,最长保留时间为720小时,修改保留时间的操作请参见修改RocketMQ消息保留时间。RocketMQ消息存储在CommitLog文件中,CommitLog文件大小为1GB,当一个CommitLog文件写满后,会生成一个新的CommitLog文件。RocketMQ删除消息是删除CommitLog文件,而不是删除一条消息。CommitLog文件为顺序写入,当最后写入的一条消息过期时,表示CommitLog文件过期。满足如下任意一个条件,CommitLog文件将会被清理: 每天凌晨4点会清理过期的文件,部分老实例由于未设置时区,清理时间为每天中午12点。 当磁盘使用率达到70%(针对4.8.0版本)或75%(针对5.x版本)时,会立刻清理过期的文件。 当磁盘使用率达到85%时,会从最早创建的文件开始清理,不管文件是否已过期,直到磁盘空间充足。
  • 开启RocketMQ消息轨迹(Go) 在客户端开启消息轨迹的方法如下: 执行以下命令,检查是否已安装Go。 go version 返回如下回显时,说明Go已经安装。 [root@ecs-test sarama]# go version go version go1.16.5 linux/amd64 如果未安装Go,请下载并安装。 进入Go脚本所在的bin目录下。 执行“touch go.mod”命令新建一个“go.mod”,并增加以下代码,添加依赖。 module rocketmq-example-go go 1.13 require ( github.com/apache/rocketmq-client-go/v2 v2.1.0 ) 生产者开启消息轨迹(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。 package main import ( "context" "fmt" "os" "time" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/producer" ) func main() { namesrvs := []string{"192.168.0.1:8100"} traceCfg := &primitive.TraceConfig{ Access: primitive.Local, Resolver: primitive.NewPassthroughResolver(namesrvs), } p, _ := rocketmq.NewProducer( producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})), producer.WithRetry(2), producer.WithTrace(traceCfg)) // 增加此行代码表示开启了消息轨迹。 err := p.Start() if err != nil { fmt.Printf("start producer error: %s", err.Error()) os.Exit(1) } res, err := p.SendSync(context.Background(), primitive.NewMessage("topic1", []byte("Hello RocketMQ Go Client!"))) if err != nil { fmt.Printf("send message error: %s\n", err) } else { fmt.Printf("send message success: result=%s\n", res.String()) } time.Sleep(10 * time.Second) err = p.Shutdown() if err != nil { fmt.Printf("shutdown producer error: %s", err.Error()) } } 消费者开启消息轨迹(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。 package main import ( "context" "fmt" "os" "time" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/consumer" "github.com/apache/rocketmq-client-go/v2/primitive" ) func main() { namesrvs := []string{"192.168.0.1:8100"} traceCfg := &primitive.TraceConfig{ Access: primitive.Local, Resolver: primitive.NewPassthroughResolver(namesrvs), } c, _ := rocketmq.NewPushConsumer( consumer.WithGroupName("testGroup"), consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})), consumer.WithTrace(traceCfg), // 增加此行代码表示开启了消息轨迹。 ) err := c.Subscribe("TopicTest", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { fmt.Printf("subscribe callback: %v \n", msgs) return consumer.ConsumeSuccess, nil }) if err != nil { fmt.Println(err.Error()) } // Note: start after subscribe err = c.Start() if err != nil { fmt.Println(err.Error()) os.Exit(-1) } time.Sleep(time.Hour) err = c.Shutdown() if err != nil { fmt.Printf("shutdown Consumer error: %s", err.Error()) } }
  • 开启RocketMQ消息轨迹(Java) 在客户端开启消息轨迹的方法如下: 生产者开启消息轨迹(除事务消息以外的消息类型) 构造函数的“enableMsgTrace”参数传入“true”,例如: DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", true); 生产者开启消息轨迹(事务消息) 构造函数的“enableMsgTrace”参数传入“true”,例如: TransactionMQProducer producer = new TransactionMQProducer(null, "ProducerGroupName", null, true, null); 消费者开启消息轨迹 构造函数的“enableMsgTrace”参数传入“true”,例如: DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName", true);
  • 生产者确认 生产者确认,即服务端在收到来自生产者的消息时进行确认。 以下示例演示在Java客户端配置生产者确认: try { channel.confirmSelect() ; //将信道置为publisher confirm模式 //之后正常发送消息 channel.basicPublish("exchange", "routingKey" , null , "publisher confirm test" .getBytes()); if (!channel.waitForConfirms()) { System.out.println( "send message failed " ) ; // do something else.... } } catch (InterruptedException e) { e.printStackTrace() ; } 调用channel .waitForConfirms方法之后,会等待服务端确认,这是一种同步等待的方式,会对性能产生影响。如果生产者要满足at least once,就必须使用同步等待方式。
  • 消费者确认 消费者确认是指服务端通过确认消息是否成功被消费者接收,来判断是否删除队列中的此消息。 消费者确认对数据可靠性十分重要,接收重要消息的消费应用程序在未处理完消息前不应确认消息,以便消费者有足够的时间处理消息,无需担心消息处理过程中由于消费者进程异常(如工作程序崩溃、重启等)导致消息丢失。 消费者确认在客户端上配置,通过配置basicConsume方法启用确认。在channel中启用消费者确认适用于大多数场景。 以下示例演示在Java客户端配置消费者确认(使用Channel#basicAck设置basic.ack为肯定): // this example assumes an existing channel instance boolean autoAck = false; channel.basicConsume(queueName, autoAck, "a-consumer-tag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { long deliveryTag = envelope.getDeliveryTag(); // positively acknowledge a single delivery, the message will // be discarded channel.basicAck(deliveryTag, false); } }); 未确认的消息缓存在内存中,如果未确认的消息过多,会导致内存使用率过高,此时可以在客户端配置预取值来限制消费者预取的消息数量,具体方法请参见配置RabbitMQ消息预取值。
  • 前提条件 已创建RocketMQ实例和消费组。 如果通过按Message ID查询,需要提前获取消息所在的消费组名称和消息的Message ID。 Message ID为生产消息后返回的MsgId,如6中返回的内容,也可先通过Topic查询消息,记录Message ID。 如果通过按Message Key查询,需要提前获取消息所在的消费组名称和消息的Message Key。 Message Key为7中配置的消息Key,也可先通过Topic查询消息,记录Message Key。
  • 死信 死信是RabbitMQ中的一种消息机制,在消费消息时,如果队列里的消息满足以下任意一种情况,那么该消息将成为“死信”。 “requeue”被设置为“false”,消费者使用“basic.reject”或“basic.nack”否定应答(NACK)消息。 消息在队列的存活时间超过设置的TTL时间。 队列的消息数量已经超过最大队列长度。 死信消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,该消息将会被存储到死信队列中,如果没有配置,该消息将会被丢弃。 更多关于死信的说明,请参考Dead Letter Exchanges。
  • 操作场景 本章节主要介绍如何查询、导出和重新投递死信消息。 分布式消息服务RocketMQ版提供三种死信消息查询的方法:按Group查询、按Message ID查询和按Message Key查询。 按Group查询:查询某时间段内指定消费组下所有的死信消息。此方法属于范围查询,查询到的死信消息可能比较多。 按Message ID查询:查询指定Message ID的消息。此方法属于精确查找,可以快速查询到某一条死信消息。 按Message Key查询:查询指定Message Key的消息。此方法属于精确查找,可以快速查询到某一条死信消息。