分布式消息服务ROCKETMQ版-使用ACL权限访问:消费者增加用户认证信息

时间:2024-08-29 16:36:38

消费者增加用户认证信息

无论是普通消息、顺序消息、定时消息,还是事务消息,都参考如下代码。以下加粗内容需要替换为实例自有信息,请根据实际情况替换。

package main

import (
	"context"
	"fmt"
	"os"
	"time"

	"github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/consumer"
	"github.com/apache/rocketmq-client-go/v2/primitive"
)

func main() {
	c, err := rocketmq.NewPushConsumer(
		consumer.WithGroupName("testGroup"),
		consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})),
		consumer.WithCredentials(primitive.Credentials{
			AccessKey: os.Getenv("ROCKETMQ_AK"), //用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。
			SecretKey: os.Getenv("ROCKETMQ_SK"),
		}),
	)
	if err != nil {
		fmt.Println("init consumer error: " + err.Error())
		os.Exit(0)
	}

	err = c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context,
		msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
		fmt.Printf("subscribe callback: %v \n", msgs)
		return consumer.ConsumeSuccess, nil
	})
	if err != nil {
		fmt.Println(err.Error())
	}
	// Note: start after subscribe
	err = c.Start()
	if err != nil {
		fmt.Println(err.Error())
		os.Exit(-1)
	}
	time.Sleep(time.Hour)
	err = c.Shutdown()
	if err != nil {
		fmt.Printf("Shutdown Consumer error: %s", err.Error())
	}
}

示例代码中的参数说明如下,请参考收集连接信息获取参数值。

  • testGroup:表示消费组名称。
  • 192.168.0.1:8100:表示实例连接地址和端口。
  • AccessKey:表示用户名。创建用户的步骤,请参见创建用户
  • SecretKey:表示用户的密钥。
  • test:表示Topic名称。
support.huaweicloud.com/devg-hrm/hrm-devg-017.html