云服务器内容精选

  • 发送定时消息 发送定时消息的示例代码如下(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。 package main import ( "context" "fmt" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/producer" "os" ) func main() { p, _ := rocketmq.NewProducer( producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})), producer.WithRetry(2), ) err := p.Start() if err != nil { fmt.Printf("start producer error: %s", err.Error()) os.Exit(1) } msg := primitive.NewMessage("test", []byte("Hello RocketMQ Go Client!")) msg.WithProperty("__STARTDELIVERTIME", strconv.FormatInt(time.Now().UnixMilli()+3000, 10)) res, err := p.SendSync(context.Background(), msg) if err != nil { fmt.Printf("send message error: %s\n", err) } else { fmt.Printf("send message success: result=%s\n", res.String()) } err = p.Shutdown() if err != nil { fmt.Printf("shutdown producer error: %s", err.Error()) } } 示例代码中的参数说明如下,请参考收集连接信息获取参数值。 192.168.0.1:8100:表示实例连接地址和端口。 test:表示Topic名称。
  • 准备环境 执行以下命令,检查是否已安装Go。 go version 返回如下回显时,说明Go已经安装。 go version go1.16.5 linux/amd64 如果未安装Go,请下载并安装。 进入Go脚本所在的bin目录下。 执行“touch go.mod”命令新建一个“go.mod”,并增加以下代码,添加依赖。 module rocketmq-example-go go 1.13 require ( github.com/apache/rocketmq-client-go/v2 v2.1.2 ) 执行如下命令增加代理。 export GOPROXY=https://goproxy.cn,direct 执行如下命令下载依赖。 go mod tidy
  • 注意事项 定时消息的最大延迟时间为1年,延迟超过1年的消息将会发送失败。 定时消息的定时时间如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。 在理想情况下,定时消息设定的时间与实际发送时间的误差在0.1s以内。但在定时消息投递压力过大时,会触发定时消息投递流控机制,精度会变差。 在0.1s的精度内,不保证消息投递的顺序性。即如果两条定时消息的定时时间差距小于0.1s,他们投递的顺序与他们发送的顺序无法确保一致。 无法确保定时消息仅投递一次,定时消息可能会重复投递。 定时消息的定时时间是服务端开始向消费端投递的时间。如果消费者当前有消息堆积,那么定时消息会排在堆积消息后面,将不能严格按照配置的时间进行投递。 由于客户端和服务端可能存在时间差,消息的实际投递时间与客户端设置的投递时间之间可能存在偏差,以服务端时间为准。 设置定时消息的投递时间后,依然受消息老化时间限制,默认消息老化时间为2天。例如,设置定时消息5天后才能被消费,如果第5天后一直没被消费,那么这条消息将在第7天被删除。 定时消息将占用普通消息约3倍的存储空间,大量使用定时消息时需要注意存储空间占用。
  • 订阅顺序消息 只需要在订阅普通消息的代码基础上增加orderly=True,参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。 import time from rocketmq.client import PushConsumer, ConsumeStatus def callback(msg): print(msg.id, msg.body, msg.get_property('property')) return ConsumeStatus.CONSUME_SUC CES S def start_consume_message(): consumer = PushConsumer('consumer_group', orderly=True) consumer.set_name_server_address('192.168.0.1:8100') consumer.subscribe('TopicTest', callback) print('start consume message') consumer.start() while True: time.sleep(3600) if __name__ == '__main__': start_consume_message() 示例代码中的参数说明如下,请参考收集连接信息获取参数值。 consumer_group:表示消费组名称。 192.168.0.1:8100:表示实例连接地址和端口。 TopicTest:表示Topic名称。
  • 发送顺序消息 参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。 from rocketmq.client import Producer, Message topic = 'TopicTest' gid = 'test' name_srv = '192.168.0.1:8100' def create_message(): msg = Message(topic) msg.set_keys('XXX') msg.set_tags('XXX') msg.set_property('property', 'test') msg.set_body('message body') return msg def send_orderly_with_sharding_key(): producer = Producer(gid, True) producer.set_name_server_address(name_srv) producer.start() msg = create_message() ret = producer.send_orderly_with_sharding_key(msg, 'orderId') print('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id) producer.shutdown() if __name__ == '__main__': send_orderly_with_sharding_key()
  • 发送事务消息 参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。 import time from rocketmq.client import Message, TransactionMQProducer, TransactionStatus topic = 'TopicTest' gid = 'test' name_srv = '192.168.0.1:8100' def create_message(): msg = Message(topic) msg.set_keys('XXX') msg.set_tags('XXX') msg.set_property('property', 'test') msg.set_body('message body') return msg def check_callback(msg): print('check: ' + msg.body.decode('utf-8')) return TransactionStatus.COMMIT def local_execute(msg, user_args): print('local: ' + msg.body.decode('utf-8')) return TransactionStatus.UNKNOWN def send_transaction_message(count): producer = TransactionMQProducer(gid, check_callback) producer.set_name_server_address(name_srv) producer.start() for n in range(count): msg = create_message() ret = producer.send_message_in_transaction(msg, local_execute, None) print('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id) print('send transaction message done') while True: time.sleep(3600) if __name__ == '__main__': send_transaction_message(10)
  • 准备环境 执行以下命令,检查是否已安装Go。 go version 返回如下回显时,说明Go已经安装。 go version go1.16.5 linux/amd64 如果未安装Go,请下载并安装。 进入Go脚本所在的bin目录下。 执行“touch go.mod”命令新建一个“go.mod”,并增加以下代码,添加依赖。 module rocketmq-example-go go 1.13 require ( github.com/apache/rocketmq-client-go/v2 v2.1.2 ) 执行如下命令增加代理。 export GOPROXY=https://goproxy.cn,direct 执行如下命令下载依赖。 go mod tidy
  • 订阅顺序消息 只需要在订阅普通消息的代码基础上增加consumer.WithConsumerOrder(true),参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。 package main import ( "context" "fmt" "os" "time" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/consumer" "github.com/apache/rocketmq-client-go/v2/primitive" ) func main() { c, _ := rocketmq.NewPushConsumer( consumer.WithGroupName("testGroup"), consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})), consumer.WithConsumerModel(consumer.Clustering), consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset), consumer.WithConsumerOrder(true), ) err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { orderlyCtx, _ := primitive.GetOrderlyCtx(ctx) fmt.Printf("orderly context: %v\n", orderlyCtx) fmt.Printf("subscribe orderly callback: %v \n", msgs) return consumer.ConsumeSuccess, nil }) if err != nil { fmt.Println(err.Error()) } // Note: start after subscribe err = c.Start() if err != nil { fmt.Println(err.Error()) os.Exit(-1) } time.Sleep(time.Hour) err = c.Shutdown() if err != nil { fmt.Printf("Shutdown Consumer error: %s", err.Error()) } } 示例代码中的参数说明如下,请参考收集连接信息获取参数值。 testGroup:表示消费组名称。 192.168.0.1:8100:表示实例连接地址和端口。 test:表示Topic名称。
  • 订阅普通消息 参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。 import time from rocketmq.client import PushConsumer, ConsumeStatus def callback(msg): print(msg.id, msg.body, msg.get_property('property')) return ConsumeStatus.CONSUME_SUCCESS def start_consume_message(): consumer = PushConsumer('consumer_group') consumer.set_name_server_address('192.168.0.1:8100') consumer.subscribe('TopicTest', callback) print('start consume message') consumer.start() while True: time.sleep(3600) if __name__ == '__main__': start_consume_message() 示例代码中的参数说明如下,请参考收集连接信息获取参数值。 consumer_group:表示消费组名称。 192.168.0.1:8100:表示实例连接地址和端口。 TopicTest:表示Topic名称。
  • 同步发送 同步发送是指消息发送方发出一条消息到服务端,服务端接收并处理消息,然后返回响应给发送方,发送方收到响应后才会发送下一条消息的通讯方式。 参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。 from rocketmq.client import Producer, Message topic = 'TopicTest' gid = 'test' name_srv = '192.168.0.1:8100' def create_message(): msg = Message(topic) msg.set_keys('XXX') msg.set_tags('XXX') msg.set_property('property', 'test') msg.set_body('message body') return msg def send_message_sync(): producer = Producer(gid) producer.set_name_server_address(name_srv) producer.start() msg = create_message() ret = producer.send_sync(msg) print('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id) producer.shutdown() if __name__ == '__main__': send_message_sync() 示例代码中的参数说明如下,请参考收集连接信息获取参数值。 topic:表示Topic名称。 gid:表示生产者组名称,请根据业务实际情况输入生产者组名称。 name_srv:表示实例连接地址和端口。
  • 订阅普通消息 参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。 package main import ( "context" "fmt" "os" "time" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/consumer" "github.com/apache/rocketmq-client-go/v2/primitive" ) func main() { c, _ := rocketmq.NewPushConsumer( consumer.WithGroupName("testGroup"), consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})), ) err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { for i := range msgs { fmt.Printf("subscribe callback: %v \n", msgs[i]) } return consumer.ConsumeSuccess, nil }) if err != nil { fmt.Println(err.Error()) } // Note: start after subscribe err = c.Start() if err != nil { fmt.Println(err.Error()) os.Exit(-1) } time.Sleep(time.Hour) err = c.Shutdown() if err != nil { fmt.Printf("shutdown Consumer error: %s", err.Error()) } } 示例代码中的参数说明如下,请参考收集连接信息获取参数值。 testGroup:表示消费组名称。 192.168.0.1:8100:表示实例连接地址和端口。 test:表示Topic名称。
  • 异步发送 异步发送是指消息发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。 使用异步发送需要客户端实现异步发送回调接口(SendCallback)。即消息发送方在发送了一条消息后,不需要等待服务端响应接着发送第二条消息。发送方通过回调接口接收服务端响应,并处理响应结果。 参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。 package main import ( "context" "fmt" "os" "sync" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/producer" ) // implements a async producer to send message. func main() { p, _ := rocketmq.NewProducer( producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})), producer.WithRetry(2)) err := p.Start() if err != nil { fmt.Printf("start producer error: %s", err.Error()) os.Exit(1) } var wg sync.WaitGroup wg.Add(1) callback := func(ctx context.Context, result *primitive.SendResult, e error) { if e != nil { fmt.Printf("receive message error: %s\n", err) } else { fmt.Printf("send message success: result=%s\n", result.String()) } wg.Done() } message := primitive.NewMessage("test", []byte("Hello RocketMQ Go Client!")) err = p.SendAsync(context.Background(), callback, message) if err != nil { fmt.Printf("send message error: %s\n", err) wg.Done() } wg.Wait() err = p.Shutdown() if err != nil { fmt.Printf("shutdown producer error: %s", err.Error()) } } 示例代码中的参数说明如下,请参考收集连接信息获取参数值。 192.168.0.1:8100:表示实例连接地址和端口。 test:表示Topic名称。
  • 同步发送 同步发送是指消息发送方发出一条消息到服务端,服务端接收并处理消息,然后返回响应给发送方,发送方收到响应后才会发送下一条消息的通讯方式。 参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。 package main import ( "context" "fmt" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/producer" "os" ) // implements a simple producer to send message. func main() { p, _ := rocketmq.NewProducer( producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})), producer.WithRetry(2), ) err := p.Start() if err != nil { fmt.Printf("start producer error: %s", err.Error()) os.Exit(1) } msg := &primitive.Message{ Topic: "topic1", Body: []byte("Hello RocketMQ Go Client!"), } msg.WithTag("TagA") msg.WithKeys([]string{"KeyA"}) res, err := p.SendSync(context.Background(), msg) if err != nil { fmt.Printf("send message error: %s\n", err) } else { fmt.Printf("send message success: result=%s\n", res.String()) } err = p.Shutdown() if err != nil { fmt.Printf("shutdown producer error: %s", err.Error()) } } 示例代码中的参数说明如下,请参考收集连接信息获取参数值。 192.168.0.1:8100:表示实例连接地址和端口。 topic1:表示Topic名称。
  • 准备环境 执行以下命令,检查是否已安装Go。 go version 返回如下回显时,说明Go已经安装。 go version go1.16.5 linux/amd64 如果未安装Go,请下载并安装。 进入Go脚本所在的bin目录下。 执行“touch go.mod”命令新建一个“go.mod”,并增加以下代码,添加依赖。 module rocketmq-example-go go 1.13 require ( github.com/apache/rocketmq-client-go/v2 v2.1.2 ) 执行如下命令增加代理。 export GOPROXY=https://goproxy.cn,direct 执行如下命令下载依赖。 go mod tidy
  • 发送定时消息 发送定时消息的示例代码如下(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。 import time from rocketmq.client import Producer, Message topic = 'TopicTest' gid = 'test' name_srv = '192.168.0.1:8100' def create_message(): msg = Message(topic) msg.set_keys('XXX') msg.set_tags('XXX') msg.set_property('property', 'test') msg.set_body('message body') return msg def send_delay_message(): producer = Producer(gid) producer.set_name_server_address(name_srv) producer.start() msg = create_message() msg.set_property('__STARTDELIVERTIME', str(int(round((time.time() + 3) * 1000)))) ret = producer.send_sync(msg) print('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id) producer.shutdown() if __name__ == '__main__': send_delay_message() 示例代码中的参数说明如下,请参考收集连接信息获取参数值。 topic:表示Topic名称。 gid:表示生产者组名称,请根据业务实际情况输入生产者组名称。 name_srv:表示实例连接地址和端口。