分布式消息服务RocketMQ版-收发普通消息:异步发送

时间:2025-02-12 15:02:58

异步发送

异步发送是指消息发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。

使用异步发送需要客户端实现异步发送回调接口(SendCallback)。即消息发送方在发送了一条消息后,不需要等待服务端响应接着发送第二条消息。发送方通过回调接口接收服务端响应,并处理响应结果。

参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。

package mainimport ("context""fmt""os""sync""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/primitive""github.com/apache/rocketmq-client-go/v2/producer")// implements a async producer to send message.func main() {p, _ := rocketmq.NewProducer(producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})),producer.WithRetry(2))err := p.Start()if err != nil {fmt.Printf("start producer error: %s", err.Error())os.Exit(1)}var wg sync.WaitGroupwg.Add(1)callback := func(ctx context.Context, result *primitive.SendResult, e error) {if e != nil {fmt.Printf("receive message error: %s\n", err)} else {fmt.Printf("send message success: result=%s\n", result.String())}wg.Done()}message := primitive.NewMessage("test", []byte("Hello RocketMQ Go Client!"))err = p.SendAsync(context.Background(), callback, message)if err != nil {fmt.Printf("send message error: %s\n", err)wg.Done()}wg.Wait()err = p.Shutdown()if err != nil {fmt.Printf("shutdown producer error: %s", err.Error())}}

示例代码中的参数说明如下,请参考收集连接信息获取参数值。

  • 192.168.0.1:8100:表示实例连接地址和端口。
  • test:表示Topic名称。
support.huaweicloud.com/devg-hrm/hrm-devg-013.html