分布式消息服务RabbitMQ版-通过消息幂等实现消息去重:实施方法

时间:2025-01-26 10:37:18

实施方法

对于消息重复的场景,一般可以使用全局唯一ID来判断该消息是否已消费过。如果已经消费过,则直接返回处理结果,否则进行消息处理,并将全局ID记录下来。

  • 生产者为每一条消息设置唯一的messageID,示例代码如下:
    //持久化消息,并且生成随机的全局唯一messageIDAMQP.BasicProperties.Builder builder = new AMQP.BasicProperties().builder();builder.deliveryMode(2);builder.messageId(UUID.randomUUID().toString());//自定义发送的消息String message = "message content";//生产消息,exchangeNameroutingKey根据实际填写Queue所属的Exchange名称和Routing Keychannel.basicPublish("exchangeName", "routingKey", false, builder.build(), message.getBytes(StandardCharsets.UTF_8));String messageId = builder.build().getMessageId();System.out.println("messageID: " + messageId);System.out.println("Send message success!");//关闭信道channel.close();//关闭连接connection.close();
  • 消费者根据messageID对消息进行幂等处理,示例代码如下:
    //创建一个以messageID为主键的数据库表,利用数据库主键去重的方式来处理RabbitMQ幂等。//在消费者消费前先去数据库查询这条消息是否存在,如果存在表示消息已被消费,无需处理;如果不存在表示消息未被消费,执行消费操作//queueName根据实际填写要消费的Queue名称channel.basicConsume("queueName", false, new DefaultConsumer(channel) {    @Override    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {            //获取messageID,并判断是否为空            String messageId = properties.getMessageId();            if (StringUtils.isBlank(messageId)){                logger.info("messageId is null");                return;            }            //查询数据库中是否存在主键为messageID的记录,如果存在,说明这条消息已经被消费,无需处理,否则消费消息,并且在消费完成后将消息记录入库            //数据库查询逻辑省略            //todo            //如果数据库中没有messageID的记录,则执行消费,否则提示消息已消费            if (null == "{数据库查出来的结果记录}"){                //获取消息                String message = new String(body,StandardCharsets.UTF_8);                //手动响应                channel.basicAck(envelope.getDeliveryTag(),false);                logger.info("[x] received message: "  + message + "," + "messageId:" + messageId);                //存入数据库表中,标识该消息已消费                //数据库插入操作省略                //todo            } else {                //如果根据messageID查询到消息已消费,则不进行消费                logger.error("该消息已消费,无需重复消费");            }    }});
support.huaweicloud.com/bestpractice-rabbitmq/bp-0015.html