分布式消息服务ROCKETMQ版-收发事务消息:发送事务消息

时间:2024-12-11 10:00:29

发送事务消息

参考如下示例代码,或者通过TransactionProducer.java获取更多示例代码。

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.io.UnsupportedEncodingException;

public class Main {
    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {
        TransactionListener transactionListener = new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                System.out.println("开始执行本地事务: " + message);
                return LocalTransactionState.COMMIT_MESSAGE;
            }

            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                System.out.println("收到回查,重新查询事务状态: " + messageExt);
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        };

        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        //填入连接地址
        producer.setNamesrvAddr("192.168.0.1:8100");
        //producer.setUseTLS(true);    //创建实例时,如果开启了SSL,请增加此行代码。
        producer.setTransactionListener(transactionListener);
        producer.start();

        Message msg =
            new Message("TopicTest", "TagA", "KEY",
                "Hello RocketMQ ".getBytes(RemotingHelper.DEFAULT_CHARSET));
        SendResult sendResult = producer.sendMessageInTransaction(msg, null);
        System.out.printf("%s%n", sendResult);

        producer.shutdown();
    }}
support.huaweicloud.com/devg-hrm/hrm-devg-004.html