分布式消息服务ROCKETMQ版-收发普通消息:订阅普通消息

时间:2025-01-02 16:36:49

订阅普通消息

参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。

package main

import (
	"context"
	"fmt"
	"os"
	"time"

	"github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/consumer"
	"github.com/apache/rocketmq-client-go/v2/primitive"
)

func main() {
	c, _ := rocketmq.NewPushConsumer(
		consumer.WithGroupName("testGroup"),
		consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})),
	)
	err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context,
		msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
		for i := range msgs {
			fmt.Printf("subscribe callback: %v \n", msgs[i])
		}

		return consumer.ConsumeSuccess, nil
	})
	if err != nil {
		fmt.Println(err.Error())
	}
	// Note: start after subscribe
	err = c.Start()
	if err != nil {
		fmt.Println(err.Error())
		os.Exit(-1)
	}
	time.Sleep(time.Hour)
	err = c.Shutdown()
	if err != nil {
		fmt.Printf("shutdown Consumer error: %s", err.Error())
	}
}

示例代码中的参数说明如下,请参考收集连接信息获取参数值。

  • testGroup:表示消费组名称。
  • 192.168.0.1:8100:表示实例连接地址和端口。
  • test:表示Topic名称。
support.huaweicloud.com/devg-hrm/hrm-devg-013.html