云服务器内容精选
-
准备环境 执行以下命令,检查是否已安装Go。 go version 返回如下回显时,说明Go已经安装。 go version go1.16.5 linux/amd64 如果未安装Go,请下载并安装。 进入Go脚本所在的bin目录下。 执行“touch go.mod”命令新建一个“go.mod”,并增加以下代码,添加依赖。 module rocketmq-example-go go 1.13 require ( github.com/apache/rocketmq-client-go/v2 v2.1.2 ) 执行如下命令增加代理。 export GOPROXY=https://goproxy.cn,direct 执行如下命令下载依赖。 go mod tidy
-
发送定时消息 发送定时消息的示例代码如下(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。 package main import ( "context" "fmt" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/producer" "os" ) func main() { p, _ := rocketmq.NewProducer( producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})), producer.WithRetry(2), ) err := p.Start() if err != nil { fmt.Printf("start producer error: %s", err.Error()) os.Exit(1) } msg := primitive.NewMessage("test", []byte("Hello RocketMQ Go Client!")) msg.WithProperty("__STARTDELIVERTIME", strconv.FormatInt(time.Now().UnixMilli()+3000, 10)) res, err := p.SendSync(context.Background(), msg) if err != nil { fmt.Printf("send message error: %s\n", err) } else { fmt.Printf("send message success: result=%s\n", res.String()) } err = p.Shutdown() if err != nil { fmt.Printf("shutdown producer error: %s", err.Error()) } } 示例代码中的参数说明如下,请参考收集连接信息获取参数值。 192.168.0.1:8100:表示实例连接地址和端口。 test:表示Topic名称。
-
注意事项 定时消息的最大延迟时间为1年,延迟超过1年的消息将会发送失败。 定时消息的定时时间如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。 在理想情况下,定时消息设定的时间与实际发送时间的误差在0.1s以内。但在定时消息投递压力过大时,会触发定时消息投递流控机制,精度会变差。 在0.1s的精度内,不保证消息投递的顺序性。即如果两条定时消息的定时时间差距小于0.1s,他们投递的顺序与他们发送的顺序无法确保一致。 无法确保定时消息仅投递一次,定时消息可能会重复投递。 定时消息的定时时间是服务端开始向消费端投递的时间。如果消费者当前有消息堆积,那么定时消息会排在堆积消息后面,将不能严格按照配置的时间进行投递。 由于客户端和服务端可能存在时间差,消息的实际投递时间与客户端设置的投递时间之间可能存在偏差,以服务端时间为准。 设置定时消息的投递时间后,依然受消息老化时间限制,默认消息老化时间为2天。例如,设置定时消息5天后才能被消费,如果第5天后一直没被消费,那么这条消息将在第7天被删除。 定时消息将占用普通消息约3倍的存储空间,大量使用定时消息时需要注意存储空间占用。
-
订阅顺序消息 只需要在订阅普通消息的代码基础上增加orderly=True,参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。 import time from rocketmq.client import PushConsumer, ConsumeStatus def callback(msg): print(msg.id, msg.body, msg.get_property('property')) return ConsumeStatus.CONSUME_SUC CES S def start_consume_message(): consumer = PushConsumer('consumer_group', orderly=True) consumer.set_name_server_address('192.168.0.1:8100') consumer.subscribe('TopicTest', callback) print('start consume message') consumer.start() while True: time.sleep(3600) if __name__ == '__main__': start_consume_message() 示例代码中的参数说明如下,请参考收集连接信息获取参数值。 consumer_group:表示消费组名称。 192.168.0.1:8100:表示实例连接地址和端口。 TopicTest:表示Topic名称。
-
发送顺序消息 参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。 from rocketmq.client import Producer, Message topic = 'TopicTest' gid = 'test' name_srv = '192.168.0.1:8100' def create_message(): msg = Message(topic) msg.set_keys('XXX') msg.set_tags('XXX') msg.set_property('property', 'test') msg.set_body('message body') return msg def send_orderly_with_sharding_key(): producer = Producer(gid, True) producer.set_name_server_address(name_srv) producer.start() msg = create_message() ret = producer.send_orderly_with_sharding_key(msg, 'orderId') print('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id) producer.shutdown() if __name__ == '__main__': send_orderly_with_sharding_key()
-
发送事务消息 参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。 import time from rocketmq.client import Message, TransactionMQProducer, TransactionStatus topic = 'TopicTest' gid = 'test' name_srv = '192.168.0.1:8100' def create_message(): msg = Message(topic) msg.set_keys('XXX') msg.set_tags('XXX') msg.set_property('property', 'test') msg.set_body('message body') return msg def check_callback(msg): print('check: ' + msg.body.decode('utf-8')) return TransactionStatus.COMMIT def local_execute(msg, user_args): print('local: ' + msg.body.decode('utf-8')) return TransactionStatus.UNKNOWN def send_transaction_message(count): producer = TransactionMQProducer(gid, check_callback) producer.set_name_server_address(name_srv) producer.start() for n in range(count): msg = create_message() ret = producer.send_message_in_transaction(msg, local_execute, None) print('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id) print('send transaction message done') while True: time.sleep(3600) if __name__ == '__main__': send_transaction_message(10)
-
准备环境 执行以下命令,检查是否已安装Go。 go version 返回如下回显时,说明Go已经安装。 go version go1.16.5 linux/amd64 如果未安装Go,请下载并安装。 进入Go脚本所在的bin目录下。 执行“touch go.mod”命令新建一个“go.mod”,并增加以下代码,添加依赖。 module rocketmq-example-go go 1.13 require ( github.com/apache/rocketmq-client-go/v2 v2.1.2 ) 执行如下命令增加代理。 export GOPROXY=https://goproxy.cn,direct 执行如下命令下载依赖。 go mod tidy
-
订阅顺序消息 只需要在订阅普通消息的代码基础上增加consumer.WithConsumerOrder(true),参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。 package main import ( "context" "fmt" "os" "time" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/consumer" "github.com/apache/rocketmq-client-go/v2/primitive" ) func main() { c, _ := rocketmq.NewPushConsumer( consumer.WithGroupName("testGroup"), consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})), consumer.WithConsumerModel(consumer.Clustering), consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset), consumer.WithConsumerOrder(true), ) err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { orderlyCtx, _ := primitive.GetOrderlyCtx(ctx) fmt.Printf("orderly context: %v\n", orderlyCtx) fmt.Printf("subscribe orderly callback: %v \n", msgs) return consumer.ConsumeSuccess, nil }) if err != nil { fmt.Println(err.Error()) } // Note: start after subscribe err = c.Start() if err != nil { fmt.Println(err.Error()) os.Exit(-1) } time.Sleep(time.Hour) err = c.Shutdown() if err != nil { fmt.Printf("Shutdown Consumer error: %s", err.Error()) } } 示例代码中的参数说明如下,请参考收集连接信息获取参数值。 testGroup:表示消费组名称。 192.168.0.1:8100:表示实例连接地址和端口。 test:表示Topic名称。
-
订阅普通消息 参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。 import time from rocketmq.client import PushConsumer, ConsumeStatus def callback(msg): print(msg.id, msg.body, msg.get_property('property')) return ConsumeStatus.CONSUME_SUCCESS def start_consume_message(): consumer = PushConsumer('consumer_group') consumer.set_name_server_address('192.168.0.1:8100') consumer.subscribe('TopicTest', callback) print('start consume message') consumer.start() while True: time.sleep(3600) if __name__ == '__main__': start_consume_message() 示例代码中的参数说明如下,请参考收集连接信息获取参数值。 consumer_group:表示消费组名称。 192.168.0.1:8100:表示实例连接地址和端口。 TopicTest:表示Topic名称。
-
同步发送 同步发送是指消息发送方发出一条消息到服务端,服务端接收并处理消息,然后返回响应给发送方,发送方收到响应后才会发送下一条消息的通讯方式。 参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。 from rocketmq.client import Producer, Message topic = 'TopicTest' gid = 'test' name_srv = '192.168.0.1:8100' def create_message(): msg = Message(topic) msg.set_keys('XXX') msg.set_tags('XXX') msg.set_property('property', 'test') msg.set_body('message body') return msg def send_message_sync(): producer = Producer(gid) producer.set_name_server_address(name_srv) producer.start() msg = create_message() ret = producer.send_sync(msg) print('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id) producer.shutdown() if __name__ == '__main__': send_message_sync() 示例代码中的参数说明如下,请参考收集连接信息获取参数值。 topic:表示Topic名称。 gid:表示生产者组名称,请根据业务实际情况输入生产者组名称。 name_srv:表示实例连接地址和端口。
-
同步发送 同步发送是指消息发送方发出一条消息到服务端,服务端接收并处理消息,然后返回响应给发送方,发送方收到响应后才会发送下一条消息的通讯方式。 参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。 package main import ( "context" "fmt" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/producer" "os" ) // implements a simple producer to send message. func main() { p, _ := rocketmq.NewProducer( producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})), producer.WithRetry(2), ) err := p.Start() if err != nil { fmt.Printf("start producer error: %s", err.Error()) os.Exit(1) } msg := &primitive.Message{ Topic: "topic1", Body: []byte("Hello RocketMQ Go Client!"), } msg.WithTag("TagA") msg.WithKeys([]string{"KeyA"}) res, err := p.SendSync(context.Background(), msg) if err != nil { fmt.Printf("send message error: %s\n", err) } else { fmt.Printf("send message success: result=%s\n", res.String()) } err = p.Shutdown() if err != nil { fmt.Printf("shutdown producer error: %s", err.Error()) } } 示例代码中的参数说明如下,请参考收集连接信息获取参数值。 192.168.0.1:8100:表示实例连接地址和端口。 topic1:表示Topic名称。
-
订阅普通消息 参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。 package main import ( "context" "fmt" "os" "time" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/consumer" "github.com/apache/rocketmq-client-go/v2/primitive" ) func main() { c, _ := rocketmq.NewPushConsumer( consumer.WithGroupName("testGroup"), consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})), ) err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { for i := range msgs { fmt.Printf("subscribe callback: %v \n", msgs[i]) } return consumer.ConsumeSuccess, nil }) if err != nil { fmt.Println(err.Error()) } // Note: start after subscribe err = c.Start() if err != nil { fmt.Println(err.Error()) os.Exit(-1) } time.Sleep(time.Hour) err = c.Shutdown() if err != nil { fmt.Printf("shutdown Consumer error: %s", err.Error()) } } 示例代码中的参数说明如下,请参考收集连接信息获取参数值。 testGroup:表示消费组名称。 192.168.0.1:8100:表示实例连接地址和端口。 test:表示Topic名称。
-
准备环境 执行以下命令,检查是否已安装Go。 go version 返回如下回显时,说明Go已经安装。 go version go1.16.5 linux/amd64 如果未安装Go,请下载并安装。 进入Go脚本所在的bin目录下。 执行“touch go.mod”命令新建一个“go.mod”,并增加以下代码,添加依赖。 module rocketmq-example-go go 1.13 require ( github.com/apache/rocketmq-client-go/v2 v2.1.2 ) 执行如下命令增加代理。 export GOPROXY=https://goproxy.cn,direct 执行如下命令下载依赖。 go mod tidy
-
异步发送 异步发送是指消息发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。 使用异步发送需要客户端实现异步发送回调接口(SendCallback)。即消息发送方在发送了一条消息后,不需要等待服务端响应接着发送第二条消息。发送方通过回调接口接收服务端响应,并处理响应结果。 参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。 package main import ( "context" "fmt" "os" "sync" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/producer" ) // implements a async producer to send message. func main() { p, _ := rocketmq.NewProducer( producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})), producer.WithRetry(2)) err := p.Start() if err != nil { fmt.Printf("start producer error: %s", err.Error()) os.Exit(1) } var wg sync.WaitGroup wg.Add(1) callback := func(ctx context.Context, result *primitive.SendResult, e error) { if e != nil { fmt.Printf("receive message error: %s\n", err) } else { fmt.Printf("send message success: result=%s\n", result.String()) } wg.Done() } message := primitive.NewMessage("test", []byte("Hello RocketMQ Go Client!")) err = p.SendAsync(context.Background(), callback, message) if err != nil { fmt.Printf("send message error: %s\n", err) wg.Done() } wg.Wait() err = p.Shutdown() if err != nil { fmt.Printf("shutdown producer error: %s", err.Error()) } } 示例代码中的参数说明如下,请参考收集连接信息获取参数值。 192.168.0.1:8100:表示实例连接地址和端口。 test:表示Topic名称。
-
发送定时消息 发送定时消息的示例代码如下(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。 import time from rocketmq.client import Producer, Message topic = 'TopicTest' gid = 'test' name_srv = '192.168.0.1:8100' def create_message(): msg = Message(topic) msg.set_keys('XXX') msg.set_tags('XXX') msg.set_property('property', 'test') msg.set_body('message body') return msg def send_delay_message(): producer = Producer(gid) producer.set_name_server_address(name_srv) producer.start() msg = create_message() msg.set_property('__STARTDELIVERTIME', str(int(round((time.time() + 3) * 1000)))) ret = producer.send_sync(msg) print('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id) producer.shutdown() if __name__ == '__main__': send_delay_message() 示例代码中的参数说明如下,请参考收集连接信息获取参数值。 topic:表示Topic名称。 gid:表示生产者组名称,请根据业务实际情况输入生产者组名称。 name_srv:表示实例连接地址和端口。
更多精彩内容
CDN加速
GaussDB
文字转换成语音
免费的服务器
如何创建网站
域名网站购买
私有云桌面
云主机哪个好
域名怎么备案
手机云电脑
SSL证书申请
云点播服务器
免费OCR是什么
电脑云桌面
域名备案怎么弄
语音转文字
文字图片识别
云桌面是什么
网址安全检测
网站建设搭建
国外CDN加速
SSL免费证书申请
短信批量发送
图片OCR识别
云数据库MySQL
个人域名购买
录音转文字
扫描图片识别文字
OCR图片识别
行驶证识别
虚拟电话号码
电话呼叫中心软件
怎么制作一个网站
Email注册网站
华为VNC
图像文字识别
企业网站制作
个人网站搭建
华为云计算
免费租用云托管
云桌面云服务器
ocr文字识别免费版
HTTPS证书申请
图片文字识别转换
国外域名注册商
使用免费虚拟主机
云电脑主机多少钱
鲲鹏云手机
短信验证码平台
OCR图片文字识别
SSL证书是什么
申请企业邮箱步骤
免费的企业用邮箱
云免流搭建教程
域名价格