分布式消息服务ROCKETMQ版-发送定时消息:取消定时消息

时间:2024-08-02 18:22:21

取消定时消息

取消定时消息的示例代码如下:

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;

public class ProducerDelayMessageExample {
    private static final Logger log = LoggerFactory.getLogger(ProducerDelayMessageExample.class);

    public static void main(String[] args) throws ClientException, IOException {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();

        String topic = "yourDelayTopic";
        // 填入grpc连接地址/grpc公网连接地址
        String endpoints = "yourEndpoints";
        // 填入用户名/密钥,用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。创建实例时,如果开启了ACL才需要添加以下代码。
        String accessKey = System.getenv("ROCKETMQ_AK");
        String secretKey = System.getenv("ROCKETMQ_SK");
        SessionCredentialsProvider sessionCredentialsProvider =
                new StaticSessionCredentialsProvider(accessKey, secretKey);

        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                // .enableSsl(false)  // 创建实例时,如果开启了SSL,请增加此行代码。
                // .setCredentialProvider(sessionCredentialsProvider)  // 创建实例时,如果开启了ACL,请添加此行代码。
                .build();
        final Producer producer = provider.newProducerBuilder()
                .setClientConfiguration(clientConfiguration)
                .setTopics(topic)
                .build();

        try {
            // ====== 发送定时消息逻辑 ======
            byte[] body = "This is a delay message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);
            String tag = "yourMessageTagA";
            Duration messageDelayTime = Duration.ofSeconds(10);
            final Message message = provider.newMessageBuilder()
                    .setTopic(topic)
                    .setTag(tag)
                    .setKeys("yourMessageKey")
                    // 设置定时消息投递时间戳
                    .setDeliveryTimestamp(System.currentTimeMillis() + messageDelayTime.toMillis())
                    .setBody(body)
                    .build();
            final SendReceipt sendReceipt = producer.send(message);
            log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());

            // ====== 发送取消消息逻辑 ======
            // 创建取消消息对象
            Message cancle = provider.newMessageBuilder()
                    .setTopic(topic)
                    .setBody("cancel".getBytes(StandardCharsets.UTF_8))
                    // 设置取消消息的时间戳,该时间戳必须与要取消的定时消息的定时时间戳一致。
                    .setDeliveryTimestamp(message.getDeliveryTimestamp().get())
                    // 设置要取消消息的ID,为发送消息的唯一ID(UNIQUE_KEY),可以从发送消息的结果中获取。
                    .addProperty("__CANCEL_SCHEDULED_MSG", sendReceipt.getMessageId().toString())
                    .build();
            // 发送取消消息,必须在定时消息被投递之前发送才可以取消。
            final SendReceipt cancelSendReceipt = producer.send(cancle);
            log.info("Send cancel message successfully, messageId={}", cancelSendReceipt.getMessageId());
        } catch (Throwable t) {
            log.error("Failed to send message", t);
        }

        // 不再使用后,手动关闭producer。
        producer.close();
    }
}
support.huaweicloud.com/devg-hrm/hrm-devg-031.html