云服务器内容精选

  • 操作场景 您可以通过明文接入RocketMQ实例,也可以通过密文接入RocketMQ实例,本章节指导您在控制台修改接入方式。 RocketMQ实例支持的接入方式如下: SSL:服务端与客户端之间通过密文传输,安全性较高,性能较低。 PLAINTEXT:服务端与客户端之间通过明文传输,安全性较低,性能较高。 PERMISSIVE:服务端与客户端之间即能通过明文传输又能通过密文传输,传输方式由客户端决定。
  • 支持多少个Topic? 不同实例规格支持的Topic个数不同,具体参见表1、表2、表3、表4和表5。 表1 实例规格说明(分布式消息服务RocketMQ版4.8.0) 资源规格 代理(个) 存储容量(GB/代理) 单个代理TPS 单个代理Topic数上限 单个代理消费组数上限 rocketmq.4u8g.cluster.small 1 ~ 10 300 ~ 30000 15000 2000 2000 rocketmq.4u8g.cluster 1 ~ 10 300 ~ 60000 20000 4000 4000 rocketmq.8u16g.cluster 1 ~ 10 300 ~ 90000 25000 8000 8000 rocketmq.12u24g.cluster 1 ~ 10 300 ~ 90000 28000 12000 12000 rocketmq.16u32g.cluster 1 ~ 10 300 ~ 90000 30000 16000 16000 表2 实例规格说明(分布式消息服务RocketMQ版5.x基础版单机) 资源规格 存储容量(GB) 实例TPS 实例Topic数上限 实例消费组数上限 rocketmq.b1.large.1 100 ~ 30000 500 50 500 表3 实例规格说明(分布式消息服务RocketMQ版5.x基础版集群) 资源规格 存储容量(GB) 实例TPS 实例Topic数上限 实例消费组数上限 实例连接数上限 rocketmq.b2.large.4 200 ~ 60000 2000 100 1000 4000 rocketmq.b2.large.8 200 ~ 60000 4000 100 1000 4000 rocketmq.b2.large.12 200 ~ 60000 6000 100 1000 6000 表4 实例规格说明(分布式消息服务RocketMQ版5.x专业版单机) 资源规格 存储容量(GB) 实例TPS 实例Topic数上限 实例消费组数上限 rocketmq.p1.large.1 100 ~ 30000 500 200 500 表5 实例规格说明(分布式消息服务RocketMQ版5.x专业版集群) 资源规格 存储容量(GB) 实例TPS 弹性TPS 实例Topic数上限 实例消费组数上限 rocketmq.p2.large.8 200 ~ 60000 4000 2000 400 1500 rocketmq.p2.large.12 200 ~ 60000 6000 3000 400 1500 rocketmq.p2.large.20 200 ~ 60000 10000 5000 800 1500 rocketmq.p2.large.40 400 ~ 120000 20000 10000 800 1500 rocketmq.p2.large.100 400 ~ 120000 50000 25000 1000 2000 rocketmq.p2.large.150 400 ~ 120000 75000 37500 1000 2000 rocketmq.p2.large.200 800 ~ 240000 100000 50000 1500 4000 rocketmq.p2.large.300 800 ~ 240000 150000 50000 1500 4000 父主题: Topic问题
  • 支持多少个消费组个数? 不同实例规格支持的消费组个数不同,具体参见表1、表2、表3、表4和表5。 表1 实例规格说明(分布式消息服务RocketMQ版4.8.0) 资源规格 代理(个) 存储容量(GB/代理) 单个代理TPS 单个代理Topic数上限 单个代理消费组数上限 rocketmq.4u8g.cluster.small 1 ~ 10 300 ~ 30000 15000 2000 2000 rocketmq.4u8g.cluster 1 ~ 10 300 ~ 60000 20000 4000 4000 rocketmq.8u16g.cluster 1 ~ 10 300 ~ 90000 25000 8000 8000 rocketmq.12u24g.cluster 1 ~ 10 300 ~ 90000 28000 12000 12000 rocketmq.16u32g.cluster 1 ~ 10 300 ~ 90000 30000 16000 16000 表2 实例规格说明(分布式消息服务RocketMQ版5.x基础版单机) 资源规格 存储容量(GB) 实例TPS 实例Topic数上限 实例消费组数上限 rocketmq.b1.large.1 100 ~ 30000 500 50 500 表3 实例规格说明(分布式消息服务RocketMQ版5.x基础版集群) 资源规格 存储容量(GB) 实例TPS 实例Topic数上限 实例消费组数上限 实例连接数上限 rocketmq.b2.large.4 200 ~ 60000 2000 100 1000 4000 rocketmq.b2.large.8 200 ~ 60000 4000 100 1000 4000 rocketmq.b2.large.12 200 ~ 60000 6000 100 1000 6000 表4 实例规格说明(分布式消息服务RocketMQ版5.x专业版单机) 资源规格 存储容量(GB) 实例TPS 实例Topic数上限 实例消费组数上限 rocketmq.p1.large.1 100 ~ 30000 500 200 500 表5 实例规格说明(分布式消息服务RocketMQ版5.x专业版集群) 资源规格 存储容量(GB) 实例TPS 弹性TPS 实例Topic数上限 实例消费组数上限 rocketmq.p2.large.8 200 ~ 60000 4000 2000 400 1500 rocketmq.p2.large.12 200 ~ 60000 6000 3000 400 1500 rocketmq.p2.large.20 200 ~ 60000 10000 5000 800 1500 rocketmq.p2.large.40 400 ~ 120000 20000 10000 800 1500 rocketmq.p2.large.100 400 ~ 120000 50000 25000 1000 2000 rocketmq.p2.large.150 400 ~ 120000 75000 37500 1000 2000 rocketmq.p2.large.200 800 ~ 240000 100000 50000 1500 4000 rocketmq.p2.large.300 800 ~ 240000 150000 50000 1500 4000 父主题: 消费组问题
  • RocketMQ是否支持流控? RocketMQ 4.8.0版本不支持流控,5.x版本支持流控。 出现如下两种情况会触发流控: 短时间内生产、消费消息请求数量过多。 秒级突发流量过大时,可能出现该秒请求被流控。 监控数据采集的是一分钟内平均TPS,不是秒级监控。 当请求被流控时,会导致生产消息请求失败,且服务端返回215错误码。为避免出现流控情况,请确保生产、消费TPS在给定实例规格内。 父主题: 监控告警问题
  • 准备环境 执行以下命令,检查是否已安装Go。 go version 返回如下回显时,说明Go已经安装。 go version go1.16.5 linux/amd64 如果未安装Go,请下载并安装。 在“go.mod”中增加以下代码,添加依赖。 module rocketmq-example-go go 1.13 require ( github.com/apache/rocketmq-clients/golang/v5 )
  • 订阅顺序消息 只需要在订阅普通消息的代码基础上增加orderly=True,参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。 import time from rocketmq.client import PushConsumer, ConsumeStatus def callback(msg): print(msg.id, msg.body, msg.get_property('property')) return ConsumeStatus.CONSUME_SUC CES S def start_consume_message(): consumer = PushConsumer('consumer_group', orderly=True) consumer.set_name_server_address('192.168.0.1:8100') consumer.subscribe('TopicTest', callback) print('start consume message') consumer.start() while True: time.sleep(3600) if __name__ == '__main__': start_consume_message() 示例代码中的参数说明如下,请参考收集连接信息获取参数值。 consumer_group:表示消费组名称。 192.168.0.1:8100:表示实例连接地址和端口。 TopicTest:表示Topic名称。
  • 发送顺序消息 参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。 from rocketmq.client import Producer, Message topic = 'TopicTest' gid = 'test' name_srv = '192.168.0.1:8100' def create_message(): msg = Message(topic) msg.set_keys('XXX') msg.set_tags('XXX') msg.set_property('property', 'test') msg.set_body('message body') return msg def send_orderly_with_sharding_key(): producer = Producer(gid, True) producer.set_name_server_address(name_srv) producer.start() msg = create_message() ret = producer.send_orderly_with_sharding_key(msg, 'orderId') print('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id) producer.shutdown() if __name__ == '__main__': send_orderly_with_sharding_key()
  • 注意事项 定时消息的最大延迟时间为1年,延迟超过1年的消息将会发送失败。 定时消息的定时时间如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。 在理想情况下,定时消息设定的时间与实际发送时间的误差在0.1s以内。但在定时消息投递压力过大时,会触发定时消息投递流控机制,精度会变差。 在0.1s的精度内,不保证消息投递的顺序性。即如果两条定时消息的定时时间差距小于0.1s,他们投递的顺序与他们发送的顺序无法确保一致。 无法确保定时消息仅投递一次,定时消息可能会重复投递。 定时消息的定时时间是服务端开始向消费端投递的时间。如果消费者当前有消息堆积,那么定时消息会排在堆积消息后面,将不能严格按照配置的时间进行投递。 由于客户端和服务端可能存在时间差,消息的实际投递时间与客户端设置的投递时间之间可能存在偏差,以服务端时间为准。 设置定时消息的投递时间后,依然受消息老化时间限制,默认消息老化时间为2天。例如,设置定时消息5天后才能被消费,如果第5天后一直没被消费,那么这条消息将在第7天被删除。 定时消息将占用普通消息约3倍的存储空间,大量使用定时消息时需要注意存储空间占用。
  • 消费者增加用户认证信息 无论是普通消息、顺序消息、定时消息,还是事务消息,都参考如下代码。以下加粗内容需要替换为实例自有信息,请根据实际情况替换。 package main import ( "context" "fmt" "os" "time" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/consumer" "github.com/apache/rocketmq-client-go/v2/primitive" ) func main() { c, err := rocketmq.NewPushConsumer( consumer.WithGroupName("testGroup"), consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})), consumer.WithCredentials(primitive.Credentials{ AccessKey: os.Getenv("ROCKETMQ_AK"), //用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。 SecretKey: os.Getenv("ROCKETMQ_SK"), }), ) if err != nil { fmt.Println("init consumer error: " + err.Error()) os.Exit(0) } err = c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { fmt.Printf("subscribe callback: %v \n", msgs) return consumer.ConsumeSuccess, nil }) if err != nil { fmt.Println(err.Error()) } // Note: start after subscribe err = c.Start() if err != nil { fmt.Println(err.Error()) os.Exit(-1) } time.Sleep(time.Hour) err = c.Shutdown() if err != nil { fmt.Printf("Shutdown Consumer error: %s", err.Error()) } } 示例代码中的参数说明如下,请参考收集连接信息获取参数值。 testGroup:表示消费组名称。 192.168.0.1:8100:表示实例连接地址和端口。 AccessKey:表示用户名。创建用户的步骤,请参见创建用户。 SecretKey:表示用户的密钥。 test:表示Topic名称。
  • 注意事项 定时消息的最大延迟时间为1年,延迟超过1年的消息将会发送失败。 定时消息的定时时间如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。 在理想情况下,定时消息设定的时间与实际发送时间的误差在0.1s以内。但在定时消息投递压力过大时,会触发定时消息投递流控机制,精度会变差。 在0.1s的精度内,不保证消息投递的顺序性。即如果两条定时消息的定时时间差距小于0.1s,他们投递的顺序与他们发送的顺序无法确保一致。 无法确保定时消息仅投递一次,定时消息可能会重复投递。 定时消息的定时时间是服务端开始向消费端投递的时间。如果消费者当前有消息堆积,那么定时消息会排在堆积消息后面,将不能严格按照配置的时间进行投递。 由于客户端和服务端可能存在时间差,消息的实际投递时间与客户端设置的投递时间之间可能存在偏差,以服务端时间为准。 设置定时消息的投递时间后,依然受消息老化时间限制,默认消息老化时间为2天。例如,设置定时消息5天后才能被消费,如果第5天后一直没被消费,那么这条消息将在第7天被删除。 定时消息将占用普通消息约3倍的存储空间,大量使用定时消息时需要注意存储空间占用。
  • 发送定时消息 发送定时消息的示例代码如下,或者通过ProducerDelayMessageExample.java获取更多示例代码。 import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.ClientServiceProvider; import org.apache.rocketmq.client.apis.SessionCredentialsProvider; import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider; import org.apache.rocketmq.client.apis.message.Message; import org.apache.rocketmq.client.apis.producer.Producer; import org.apache.rocketmq.client.apis.producer.SendReceipt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.Duration; public class ProducerDelayMessageExample { private static final Logger log = LoggerFactory.getLogger(ProducerDelayMessageExample.class); private ProducerDelayMessageExample() { } public static void main(String[] args) throws ClientException, IOException { final ClientServiceProvider provider = ClientServiceProvider.loadService(); String topic = "yourDelayTopic"; // 填入grpc连接地址/grpc公网连接地址 String endpoints = "yourEndpoints"; // 填入用户名/密钥,用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。创建实例时,如果开启了ACL才需要添加以下代码。 String accessKey = System.getenv("ROCKETMQ_AK"); String secretKey = System.getenv("ROCKETMQ_SK"); SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) // .enableSsl(false) // 创建实例时,如果将“SSL”配置为“PLAINTEXT”,则请增加此行代码。如果将“SSL”配置为“PERMISSIVE”,则请根据实际情况选择是否增加此行代码。 // .setCredentialProvider(sessionCredentialsProvider) // 创建实例时,如果开启了ACL,请添加此行代码。 .build(); final Producer producer = provider.newProducerBuilder() .setClientConfiguration(clientConfiguration) .setTopics(topic) .build(); byte[] body = "This is a delay message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8); String tag = "yourMessageTagA"; Duration messageDelayTime = Duration.ofSeconds(10); final Message message = provider.newMessageBuilder() .setTopic(topic) .setTag(tag) .setKeys("yourMessageKey") // 设置定时消息投递时间戳 .setDeliveryTimestamp(System.currentTimeMillis() + messageDelayTime.toMillis()) .setBody(body) .build(); try { final SendReceipt sendReceipt = producer.send(message); log.info("Send message successfully, messageId={}", sendReceipt.getMessageId()); } catch (Throwable t) { log.error("Failed to send message", t); } // 不再使用后,手动关闭producer。 producer.close(); } }
  • 发送定时消息 发送定时消息的示例代码如下(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。 import time from rocketmq.client import Producer, Message topic = 'TopicTest' gid = 'test' name_srv = '192.168.0.1:8100' def create_message(): msg = Message(topic) msg.set_keys('XXX') msg.set_tags('XXX') msg.set_property('property', 'test') msg.set_body('message body') return msg def send_delay_message(): producer = Producer(gid) producer.set_name_server_address(name_srv) producer.start() msg = create_message() msg.set_property('__STARTDELIVERTIME', str(int(round((time.time() + 3) * 1000)))) ret = producer.send_sync(msg) print('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id) producer.shutdown() if __name__ == '__main__': send_delay_message() 示例代码中的参数说明如下,请参考收集连接信息获取参数值。 topic:表示Topic名称。 gid:表示生产者组名称,请根据业务实际情况输入生产者组名称。 name_srv:表示实例连接地址和端口。
  • 注意事项 定时消息的最大延迟时间为1年,延迟超过1年的消息将会发送失败。 定时消息的定时时间如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。 在理想情况下,定时消息设定的时间与实际发送时间的误差在0.1s以内。但在定时消息投递压力过大时,会触发定时消息投递流控机制,精度会变差。 在0.1s的精度内,不保证消息投递的顺序性。即如果两条定时消息的定时时间差距小于0.1s,他们投递的顺序与他们发送的顺序无法确保一致。 无法确保定时消息仅投递一次,定时消息可能会重复投递。 定时消息的定时时间是服务端开始向消费端投递的时间。如果消费者当前有消息堆积,那么定时消息会排在堆积消息后面,将不能严格按照配置的时间进行投递。 由于客户端和服务端可能存在时间差,消息的实际投递时间与客户端设置的投递时间之间可能存在偏差,以服务端时间为准。 设置定时消息的投递时间后,依然受消息老化时间限制,默认消息老化时间为2天。例如,设置定时消息5天后才能被消费,如果第5天后一直没被消费,那么这条消息将在第7天被删除。 定时消息将占用普通消息约3倍的存储空间,大量使用定时消息时需要注意存储空间占用。
  • 订阅普通消息 参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。 package main import ( "context" "fmt" "log" "os" "time" "github.com/apache/rocketmq-clients/golang" "github.com/apache/rocketmq-clients/golang/credentials" ) const ( Topic = "topic01" GroupName = "groupname" Endpoint = "192.168.xx.xx:8080" AccessKey = os.Getenv("ROCKETMQ_AK") //用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。 SecretKey = os.Getenv("ROCKETMQ_SK") ) var ( // 接收消息请求的最大等待时间 awaitDuration = time.Second * 5 // 每次能接收的最大消息数 maxMessageNum int32 = 16 // 消息不可见时间,在消息被接收后对其他消费者不可见,直到超时。 invisibleDuration = time.Second * 20 ) func main() { os.Setenv("mq.consoleAppender.enabled", "true") golang.ResetLogger() simpleConsumer, err := golang.NewSimpleConsumer(&golang.Config{ Endpoint: Endpoint, Group: GroupName, Credentials: &credentials.SessionCredentials{ AccessKey: AccessKey, AccessSecret: SecretKey, }, }, golang.WithAwaitDuration(awaitDuration), golang.WithSubscriptionExpressions(map[string]*golang.FilterExpression{ Topic: golang.SUB_ALL, }), ) if err != nil { log.Fatal(err) } err = simpleConsumer.Start() if err != nil { log.Fatal(err) } defer simpleConsumer.GracefulStop() go func() { for { fmt.Println("start recevie message") mvs, err := simpleConsumer.Receive(context.TODO(), maxMessageNum, invisibleDuration) if err != nil { fmt.Println(err) } for _, mv := range mvs { simpleConsumer.Ack(context.TODO(), mv) fmt.Println(mv) } fmt.Println("wait a moment") fmt.Println() time.Sleep(time.Second * 3) } }() time.Sleep(time.Minute) } 示例代码中的参数说明如下,请参考收集连接信息获取参数值。 Topic:输入Topic名称。 GroupName:输入消费组名称。 Endpoint:输入grpc连接地址/grpc公网连接地址。 AccessKey:创建实例时,如果开启了ACL,需要输入用户名。 SecretKey:创建实例时,如果开启了ACL,需要输入用户密钥。
  • 准备环境 执行以下命令,检查是否已安装Go。 go version 返回如下回显时,说明Go已经安装。 go version go1.16.5 linux/amd64 如果未安装Go,请下载并安装。 在“go.mod”中增加以下代码,添加依赖。 module rocketmq-example-go go 1.13 require ( github.com/apache/rocketmq-clients/golang/v5 )