云服务器内容精选

  • 准备环境 执行以下命令,检查是否已安装Go。 go version 返回如下回显时,说明Go已经安装。 go version go1.16.5 linux/amd64 如果未安装Go,请下载并安装。 进入Go脚本所在的bin目录下。 执行“touch go.mod”命令新建一个“go.mod”,并增加以下代码,添加依赖。 module rocketmq-example-gogo 1.13require (github.com/apache/rocketmq-client-go/v2 v2.1.2) 执行如下命令增加代理。 export GOPROXY=https://goproxy.cn,direct 执行如下命令下载依赖。 go mod tidy
  • 订阅普通消息 参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。 package mainimport ("context""fmt""os""time""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/consumer""github.com/apache/rocketmq-client-go/v2/primitive")func main() {c, _ := rocketmq.NewPushConsumer(consumer.WithGroupName("testGroup"),consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})),)err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context,msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {for i := range msgs {fmt.Printf("subscribe callback: %v \n", msgs[i])}return consumer.ConsumeSuccess, nil})if err != nil {fmt.Println(err.Error())}// Note: start after subscribeerr = c.Start()if err != nil {fmt.Println(err.Error())os.Exit(-1)}time.Sleep(time.Hour)err = c.Shutdown()if err != nil {fmt.Printf("shutdown Consumer error: %s", err.Error())}} 示例代码中的参数说明如下,请参考收集连接信息获取参数值。 testGroup:表示消费组名称。 192.168.0.1:8100:表示实例连接地址和端口。 test:表示Topic名称。
  • 同步发送 同步发送是指消息发送方发出一条消息到服务端,服务端接收并处理消息,然后返回响应给发送方,发送方收到响应后才会发送下一条消息的通讯方式。 参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。 package mainimport ("context""fmt""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/primitive""github.com/apache/rocketmq-client-go/v2/producer""os")// implements a simple producer to send message.func main() {p, _ := rocketmq.NewProducer(producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})),producer.WithRetry(2),)err := p.Start()if err != nil {fmt.Printf("start producer error: %s", err.Error())os.Exit(1)}msg := &primitive.Message{Topic: "topic1",Body: []byte("Hello RocketMQ Go Client!"),}msg.WithTag("TagA")msg.WithKeys([]string{"KeyA"})res, err := p.SendSync(context.Background(), msg)if err != nil {fmt.Printf("send message error: %s\n", err)} else {fmt.Printf("send message success: result=%s\n", res.String())}err = p.Shutdown()if err != nil {fmt.Printf("shutdown producer error: %s", err.Error())}} 示例代码中的参数说明如下,请参考收集连接信息获取参数值。 192.168.0.1:8100:表示实例连接地址和端口。 topic1:表示Topic名称。
  • 异步发送 异步发送是指消息发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。 使用异步发送需要客户端实现异步发送回调接口(SendCallback)。即消息发送方在发送了一条消息后,不需要等待服务端响应接着发送第二条消息。发送方通过回调接口接收服务端响应,并处理响应结果。 参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。 package mainimport ("context""fmt""os""sync""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/primitive""github.com/apache/rocketmq-client-go/v2/producer")// implements a async producer to send message.func main() {p, _ := rocketmq.NewProducer(producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})),producer.WithRetry(2))err := p.Start()if err != nil {fmt.Printf("start producer error: %s", err.Error())os.Exit(1)}var wg sync.WaitGroupwg.Add(1)callback := func(ctx context.Context, result *primitive.SendResult, e error) {if e != nil {fmt.Printf("receive message error: %s\n", err)} else {fmt.Printf("send message success: result=%s\n", result.String())}wg.Done()}message := primitive.NewMessage("test", []byte("Hello RocketMQ Go Client!"))err = p.SendAsync(context.Background(), callback, message)if err != nil {fmt.Printf("send message error: %s\n", err)wg.Done()}wg.Wait()err = p.Shutdown()if err != nil {fmt.Printf("shutdown producer error: %s", err.Error())}} 示例代码中的参数说明如下,请参考收集连接信息获取参数值。 192.168.0.1:8100:表示实例连接地址和端口。 test:表示Topic名称。