RabbitMQ 消息确认机制

RabbitMq在传递消息的过程中充当代理人Broker的角色,那么生产者到底是怎么样知道消息是被正确的投递到Broker类呢?

生产者端

rabbitmq 整个消息投递的路径

16

消息从 producer 到 exchange 则会返回一个 confirmCallback

消息从 exchange-->queue 投递失败则会返回一个 returnCallback

RabbitMq提供类监听器Listener 来接收消息投递的状态,主要涉及两种状态 ConfirmReturn.

以下两个状态 只和生产者和Borker之间有关 和消费者无关系

Confirm 确认模式

该状态表示生产者将消息发送到Broker时产生的状态, 后续会出现两种情况:

    • ack 表示broker已经收到了消息
    • nack 表示broker拒收消息 [IO、网络所导致的原因]

设置ConnectionFactory的publisher-confirms="true" 开启 确认模式。使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理

//定义回调
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
    /**
     *
     * @param correlationData 相关配置信息
     * @param ack   exchange交换机 是否成功收到了消息。true 成功,false代表失败
     * @param cause 失败原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("confirm方法被执行了...."+correlationData.getId());

         //ack 为  true表示 消息已经到达交换机
        if (ack) {
            //接收成功
            System.out.println("接收成功消息" + cause);
        } else {
            //接收失败
            System.out.println("接收失败消息" + cause);
            //做一些处理,让消息再次发送。
        }
    }
});

Return 退回模式

该状态表示 消息被broker 正常接收(ack)后,但是Broker没有对应的队列进行投递时产生的状态,消息被退回给生产者

设置ConnectionFactory的publisher-returns="true" 开启 退回模式。使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer。并执行回调函数returnedMessage

//设置交换机处理失败消息的模式为true的时候,消息达到不了队列时,会将消息重新返回给生产者
rabbitTemplate.setMandatory(true);

//定义回调
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
    /**
     *
     * @param message   消息对象
     * @param replyCode 错误码
     * @param replyText 错误信息
     * @param exchange  交换机
     * @param routingKey 路由键
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("return 执行了....");

        System.out.println("message:"+message);
        System.out.println("replyCode:"+replyCode);
        System.out.println("replyText:"+replyText);
        System.out.println("exchange:"+exchange);
        System.out.println("routingKey:"+routingKey);

        //处理某些业务
    }
});

消费者端

消费者端这里 只有一个ack Acknowledge 表示收到消费者真的收到类消息,确认方式有一下三种:

  1. 自动确认:acknowledge="none"
  2. 手动确认:acknowledge="manual"
  3. 根据异常情况确认:acknowledge="auto"

其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。