加载中...

RabbitMQ之消息应答与发布确认


消息应答

消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费者的消息,因为它无法接收到。

为了保证消息在发送过程中不丢失,引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。

自动应答

消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以 某种速率能够处理这些消息的情况下使用。

手动应答

手动应答的好处是可以批量应答并且减少网络拥堵

  • true 代表批量应答 channel 上未应答的消息比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 那么此时 5-8 的这些还未应答的消息都会被确认收到消息应答
  • false 同上面相比只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答

image

消息自动重新入队:如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。

image

  • 创建生产者
/**
 * @author shaoshao
 * @Date 2022/11/20 19:43
 * @Description: 手动应答时不丢失,放回队列中重新消费
 */
public class Task02 {

    public static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));
            System.out.println("消息发送完毕!:" + message);
        }
    }
}
  • 创建两个消费者
/**
 * @author shaoshao
 * @Date 2022/11/20 20:12
 * @Description: 手动应答时不丢失,放回队列中重新消费
 */
public class Work02 {
    public static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("c1等待处理消息等待1s");
        // 声明接收消息
        DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
            // 睡眠1s
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("接收到的消息" + new String(message.getBody()));

            /**
             * 手动应答
             * 1. 消息的标记 tag
             * 2. 是否批量应答 false:不批量应答信道中的消息,true:批量
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);

        };
        // 取消消息时回调
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println(consumerTag + "消费者取消 消息消费被中断接口回调逻辑");
        };
        // 手动应答
        boolean autoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);

    }
}
/**
 * @author shaoshao
 * @Date 2022/11/20 20:12
 * @Description: 手动应答时不丢失,放回队列中重新消费
 */
public class Work03 {
    public static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("c2等待处理消息等待20s");
        // 声明接收消息
        DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
            // 睡眠1s
            try {
                Thread.sleep(20000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("接收到的消息" + new String(message.getBody()));

            /**
             * 手动应答
             * 1. 消息的标记 tag
             * 2. 是否批量应答 false:不批量应答信道中的消息,true:批量
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);

        };
        // 取消消息时回调
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println(consumerTag + "消费者取消 消息消费被中断接口回调逻辑");
        };
        // 手动应答
        boolean autoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);

    }
}

image-20221120203827476

启动生产者,输入AAA,此时会被快的接收,再输入BBB,此时根据轮询会被慢的接收,但需要等20s,再次输入CCC,此时又被快的接收,再输入DDD,此时这个应该是被慢的接收,但需要20s时间,那如果此时给慢的程序关闭,那这个DDD会被快的接收

RabbitMQ持久化

当 RabbitMQ 服务停掉以后,消息生产者发送过来的消息不丢失要如何保障?默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它忽视队列和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久化

队列持久化

之前我们创建的队列都是非持久化的,RabbitMQ 如果重启的化,该队列就会被删除掉,如果要队列实现持久化需要在声明队列的时候把 durable 参数设置为true,代表开启持久化

如果之前声明的队列不是持久化的,需要把原先队列先删除,或者重新创建一个持久化的队列

队列持久化

消息持久化

需要在消息生产者发布消息的时候,开启消息的持久化

在 basicPublish 方法的第二个参数添加这个属性: MessageProperties.PERSISTENT_TEXT_PLAIN

public class Task02 {

    public static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        boolean durable = true; // 队列持久化
        channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null);
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String message = scanner.next();
            // 设置生产者发送消息为持久化消息(保存在磁盘中,而不是在内存中)
            channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
            System.out.println("消息发送完毕!:" + message);
        }
    }
}

改动代码

不公平分发

默认是轮询,不公平分发即能者多劳,防止慢的有许多消息不能及时分发卡住,为了避免这种情况,在消费者中消费消息之前,设置参数 channel.basicQos(1);

改动代码

public class Work02 {
    public static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("c1等待处理消息等待1s");
        // 声明接收消息
        DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
            // 睡眠1s
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("接收到的消息" + new String(message.getBody()));

            /**
             * 手动应答
             * 1. 消息的标记 tag
             * 2. 是否批量应答 false:不批量应答信道中的消息,true:批量
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);

        };
        // 取消消息时回调
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println(consumerTag + "消费者取消 消息消费被中断接口回调逻辑");
        };
        // 改成不公平分发
        int prefetchSize = 1;
        channel.basicQos(prefetchSize);
        // 手动应答
        boolean autoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);

    }
}

运行测试

预取值

不公平分发和预取值分发都用到 basic.qos 方法,如果取值为 1,代表不公平分发,取值不为1,代表预取值分发

  • 设置预取值也就是预定,先给这个channel预定几个值,收到消息后发给他,预定完后再按照轮询发送。
public class Work02 {
    public static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("c1等待处理消息等待1s");
        // 声明接收消息
        DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
            // 睡眠1s
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("接收到的消息" + new String(message.getBody()));

            /**
             * 手动应答
             * 1. 消息的标记 tag
             * 2. 是否批量应答 false:不批量应答信道中的消息,true:批量
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);

        };
        // 取消消息时回调
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println(consumerTag + "消费者取消 消息消费被中断接口回调逻辑");
        };
        // 改成不公平分发
//        int prefetchSize = 1;
        // 值不等于 1,则代表预取值 预取值改成5
        int prefetchSize = 2;
        channel.basicQos(prefetchSize);
        // 手动应答
        boolean autoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);

    }
}

预取值

发布确认

生产者发布消息到 RabbitMQ 后,需要 RabbitMQ 返回「ACK(已收到)」给生产者,这样生产者才知道自己生产的消息成功发布出去。

开启发布确认

发布确认默认是没有开启的,如果要开启需要调用方法 confirmSelect,每当你要想使用发布确认,都需要在 channel 上调用该方法

channel.confirmSelect();

单个发布确认

  • 顾名思义,发一个确认一个。它是一种同步确认发布的方式
/**
 * @Description: 单个发布消息确认
 * @Date: 2022/11/21 20:03
 * @Param: []
 * @Return: void
 */
public static void publishMessageIndividually() throws Exception {
    String queueName = UUID.randomUUID().toString();
    Channel channel = RabbitMqUtils.getChannel();
    channel.confirmSelect(); // 开启发布确认
    long start = System.currentTimeMillis();

    for (int i = 0; i < MESSAGE_COUNT; i++) {
        String message = i + "";
        channel.basicPublish("", queueName, null, message.getBytes());
        boolean flag = channel.waitForConfirms();
        if (flag) {
            System.out.println("消息发布成功");
        }
    }
    long end = System.currentTimeMillis();
    System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时:" + (end - start) + "ms");
}

批量确认发布

单个确认发布方式非常慢,与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是同步的,也一样阻塞消息的发布。

/**
 * @Description: 批量确认
 * @Date: 2022/11/21 20:07
 * @Param: []
 * @Return: void
 */
public static void publishMessageBatch() throws Exception {
    String queueName = UUID.randomUUID().toString();
    Channel channel = RabbitMqUtils.getChannel();
    channel.confirmSelect(); // 开启发布确认
    long start = System.currentTimeMillis();
    //批量确认消息大小
    int batchSize = 100;
    for (int i = 0; i < MESSAGE_COUNT; i++) {
        String message = i + "";
        channel.basicPublish("", queueName, null, message.getBytes());
        if ((i + 1) % 100 == 0){
            boolean flag = channel.waitForConfirms();
            if (flag) {
                System.out.println("消息发布成功");
            }
        }

    }
    long end = System.currentTimeMillis();
    System.out.println("发布" + MESSAGE_COUNT + "个批量确认消息,耗时:" + (end - start) + "ms");
}

异步发布确认

异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都很好,利用了回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功,下面详细讲解异步确认是怎么实现的。

image

添加回调函数,在回调函数里进行确认发布

// 消息确认成功 回调函数
ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
    System.out.println("确认的消息" + deliveryTag);
};
// 消息确认失败 回调函数
ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
    System.out.println("未确认的消息:"+message+"未确认的消息tag" + deliveryTag);
};
// 消息监听器 监听哪些消息成功了,哪些消息失败了
channel.addConfirmListener(ackCallback, nackCallback);

如何处理异步未确认消息?

最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用 ConcurrentLinkedQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传递。

/**
 * @author shaoshao
 * @Date 2022/11/21 19:44
 * @Description: 发布确认模式:
 * - 单个确认
 * - 批量确认
 * - 异步批量确认
 */
public class ConfirmMessage {
    public static final int MESSAGE_COUNT = 100;

    public static void main(String[] args) throws Exception {
//        ConfirmMessage.publishMessageIndividually();
//        ConfirmMessage.publishMessageBatch();
        ConfirmMessage.publishMessageAsync();
    }

    /**
     * @Description: 单个发布消息确认
     * @Date: 2022/11/21 20:03
     * @Param: []
     * @Return: void
     */
    public static void publishMessageIndividually() throws Exception {
        String queueName = UUID.randomUUID().toString();
        Channel channel = RabbitMqUtils.getChannel();
        channel.confirmSelect(); // 开启发布确认
        long start = System.currentTimeMillis();

        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("", queueName, null, message.getBytes());
            boolean flag = channel.waitForConfirms();
            if (flag) {
                System.out.println("消息发布成功");
            }
        }
        long end = System.currentTimeMillis();
        System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时:" + (end - start) + "ms");
    }

    /**
     * @Description: 批量确认
     * @Date: 2022/11/21 20:07
     * @Param: []
     * @Return: void
     */
    public static void publishMessageBatch() throws Exception {
        String queueName = UUID.randomUUID().toString();
        Channel channel = RabbitMqUtils.getChannel();
        channel.confirmSelect(); // 开启发布确认
        long start = System.currentTimeMillis();
        //批量确认消息大小
        int batchSize = 100;
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("", queueName, null, message.getBytes());
            if ((i + 1) % 100 == 0) {
                boolean flag = channel.waitForConfirms();
                if (flag) {
                    System.out.println("消息发布成功");
                }
            }

        }
        long end = System.currentTimeMillis();
        System.out.println("发布" + MESSAGE_COUNT + "个批量确认消息,耗时:" + (end - start) + "ms");
    }

    /**
     * @Description: 异步批量确认
     * @Date: 2022/11/21 20:07
     * @Param: []
     * @Return: void
     */
    public static void publishMessageAsync() throws Exception {
        String queueName = UUID.randomUUID().toString();
        Channel channel = RabbitMqUtils.getChannel();
        channel.confirmSelect(); // 开启发布确认
        long start = System.currentTimeMillis();
        /**
         * 线程安全有序的一个哈希表,适用于高并发的情况
         * 1.轻松的将序号与消息进行关联
         * 2.支持批量删除条目,只要给到序号
         * 3.支持高并发(多线程)
         */
        ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
        // 消息确认成功 回调函数
        ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
            // 删除确认的消息,剩下的是未确认的消息
            if (multiple) {
                ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag);
                confirmed.clear();
            } else {
                outstandingConfirms.remove(deliveryTag);
            }
            System.out.println("确认的消息" + deliveryTag);
        };
        // 消息确认失败 回调函数
        ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
            String message = outstandingConfirms.get(deliveryTag);
            System.out.println("未确认的消息:"+message+"未确认的消息tag" + deliveryTag);
        };
        // 消息监听器 监听哪些消息成功了,哪些消息失败了
        channel.addConfirmListener(ackCallback, nackCallback);
        //异步批量确认消息大小
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("", queueName, null, message.getBytes());
            outstandingConfirms.put(channel.getNextPublishSeqNo(), message);

        }
        long end = System.currentTimeMillis();
        System.out.println("发布" + MESSAGE_COUNT + "个异步发布确认消息,耗时:" + (end - start) + "ms");
    }
}
速度对比
  • 单独发布消息

    同步等待确认,简单,但吞吐量非常有限。

  • 批量发布消息

    批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是那条消息出现了问题。

  • 异步处理

    最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些


文章作者: shaoshaossm
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 shaoshaossm !
评论
  目录