云服务器内容精选

  • 死信 死信是RabbitMQ中的一种消息机制,在消费消息时,如果队列里的消息满足以下任意一种情况,那么该消息将成为“死信”。 “requeue”被设置为“false”,消费者使用“basic.reject”或“basic.nack”否定应答(NACK)消息。 消息在队列的存活时间超过设置的TTL时间。 队列的消息数量已经超过最大队列长度。 死信消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,该消息将会被存储到死信队列中,如果没有配置,该消息将会被丢弃。 更多关于死信的说明,请参考Dead Letter Exchanges。
  • 操作步骤(Go) 在客户端开启消息轨迹的方法如下: 执行以下命令,检查是否已安装Go。 go version 返回如下回显时,说明Go已经安装。 [root@ecs-test sarama]# go version go version go1.16.5 linux/amd64 如果未安装Go,请下载并安装。 新建一个“go.mod”,并增加以下代码,添加依赖。 module rocketmq-example-go go 1.13 require ( github.com/apache/rocketmq-client-go/v2 v2.1.0 ) 生产者开启消息轨迹(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。 package main import ( "context" "fmt" "os" "time" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/producer" ) func main() { namesrvs := []string{"192.168.0.1:8100"} traceCfg := &primitive.TraceConfig{ Access: primitive.Local, Resolver: primitive.NewPassthroughResolver(namesrvs), } p, _ := rocketmq.NewProducer( producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})), producer.WithRetry(2), producer.WithTrace(traceCfg)) err := p.Start() if err != nil { fmt.Printf("start producer error: %s", err.Error()) os.Exit(1) } res, err := p.SendSync(context.Background(), primitive.NewMessage("topic1", []byte("Hello RocketMQ Go Client!"))) if err != nil { fmt.Printf("send message error: %s\n", err) } else { fmt.Printf("send message success: result=%s\n", res.String()) } time.Sleep(10 * time.Second) err = p.Shutdown() if err != nil { fmt.Printf("shutdown producer error: %s", err.Error()) } } 消费者开启消息轨迹(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。 package main import ( "context" "fmt" "os" "time" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/consumer" "github.com/apache/rocketmq-client-go/v2/primitive" ) func main() { namesrvs := []string{"192.168.0.1:8100"} traceCfg := &primitive.TraceConfig{ Access: primitive.Local, Resolver: primitive.NewPassthroughResolver(namesrvs), } c, _ := rocketmq.NewPushConsumer( consumer.WithGroupName("testGroup"), consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})), consumer.WithTrace(traceCfg), ) err := c.Subscribe("TopicTest", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { fmt.Printf("subscribe callback: %v \n", msgs) return consumer.ConsumeSuccess, nil }) if err != nil { fmt.Println(err.Error()) } // Note: start after subscribe err = c.Start() if err != nil { fmt.Println(err.Error()) os.Exit(-1) } time.Sleep(time.Hour) err = c.Shutdown() if err != nil { fmt.Printf("shutdown Consumer error: %s", err.Error()) } }
  • 操作步骤(Java) 在客户端开启消息轨迹的方法如下: 生产者开启消息轨迹(除事务消息以外的消息类型) 构造函数的“enableMsgTrace”参数传入“true”,例如: DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", true); 生产者开启消息轨迹(事务消息) 构造函数的“enableMsgTrace”参数传入“true”,例如: TransactionMQProducer producer = new TransactionMQProducer(null, "ProducerGroupName", null, true, null); 消费者开启消息轨迹 构造函数的“enableMsgTrace”参数传入“true”,例如: DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName", true);
  • 什么是死信队列? 死信队列用于处理无法被正常消费的消息。 一条消息初次消费失败会被重试消费,若重试次数达到最大值(默认16次,在客户端可配置)时,依然消费失败,则其将被投递到该消费者对应的特殊队列(即死信队列)中,这种消息被称为死信消息。 死信消息具有如下特性: 不会再被消费者正常消费。 死信消息默认保留时间为48小时,超时后,会被自动删除。如果想要修改死信消息保留时间,请参考修改RocketMQ消息保留时间。
  • 操作场景 本章节主要介绍如何查询、导出和重新投递死信消息。 分布式消息服务RocketMQ版提供三种死信消息查询的方法:按Group查询、按Message ID查询和按Message Key查询。 按Group查询:查询某时间段内指定消费组下所有的死信消息。此方法属于范围查询,查询到的死信消息可能比较多。 按Message ID查询:查询指定Message ID的消息。此方法属于精确查找,可以快速查询到某一条死信消息。 按Message Key查询:查询指定Message Key的消息。此方法属于精确查找,可以快速查询到某一条死信消息。