RabbitMQ的高级特性
消费端限流
消息限流为了进行自保,进行的一种救急措施。因为巨大的流量代表着非常多的消息,这些消息如果多到服务器处理不过来就会造成服务器瘫痪,影响用户体验,造成不良影响。基本上任何一个消息队列都有限流的功能,我们就来看看在RabbitMQ之中进行限流具体应该怎么做?
RabbitMQ提供了一种 QOS
(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息还未被消费确认,则不进行新消息的消费。
代码实现
application.yaml
spring:
rabbitmq:
host: 192.168.230.150 #主机ip
port: 5672 #端口
username: ralph
password: ralph
virtual-host: /ralphCode
# 设置手动签收消息
listener:
simple:
acknowledge-mode: manual
# 一次拉取2条消息
prefetch: 2
在rabbit:listener-container 中配置 prefetch
属性设置消费端一次拉取多少消息
消费端的确认模式一定为手动确认。acknowledge="manual"
生产者端Code:
@GetMapping("/api/sendMessage")
public void send(){
for (int i=0;i<10;i++){
// 循环发送10条消息 同时配置yaml 中配置了一次拉取2条
rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME,"springboot.ralph","sb_qos_test_message"+i);
log.info("send message success!");
}
}
消费者端Code:
@RabbitListener(queues = RabbitMqConfig.QUEUE_NAME)
public void listenerQueue(Message message, Channel channel) throws IOException {
//获取消息
System.out.println("message:"+ new String(message.getBody()));
//业务处理
}
如果不手动确认签收ACK 那么就会如下所示 只接收2条数据
@RabbitListener(queues = RabbitMqConfig.QUEUE_NAME)
public void listenerQueue(Message message, Channel channel) throws IOException {
//获取消息
System.out.println("message:"+ new String(message.getBody()));
//业务处理
// 进行消息的签收
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
TTL
TTL 全称 Time To Live
(存活时间/过期时间)。当消息到达存活时间后,还没有被消费,会被自动清除。RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。这与 Redis 中的过期时间概念类似。我们应该合理使用 TTL 技术,可以有效的处理过期垃圾消息,从而降低服务器的负载,最大化的发挥服务器的性能。
RabbitMQ allows you to set TTL (time to live) for both messages and queues. This can be done using optional queue arguments or policies (the latter option is recommended). Message TTL can be enforced for a single queue, a group of queues or applied for individual messages.
RabbitMQ允许您为消息和队列设置TTL(生存时间)。 这可以使用可选的队列参数或策略来完成(建议使用后一个选项)。 可以对单个队列,一组队列强制执行消息TTL,也可以为单个消息应用消息TTL。
设置队列过期时间使用参数:x-message-ttl
,单位:ms(毫秒),会对整个队列消息统一过期。
设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期。
如果两者都进行了设置,以时间短的为准。
应用场景
以用户订单支付为场景。在各大电商平台上,订单的都有待支付时间,通常为30min。当用户超过30min未支付订单,该订单的状态应该会变成“超时取消”,或类似的状态值的改变。
代码实现
- 给指定的消息设置过期时间
注意:
RabbitMQ只会对队列头部的消息进行过期淘汰。如果单独给消息设置TTL,先入队列的消息过期时间如果设置比较长,后入队列的设置时间比较短。会造成消息不会及时地过期淘汰,导致消息的堆积。(比如一个队列里面先后分别进入A B C 三条消息 他们的存活时间分布是10H、5H、1H 但是由于没有人消费 10H过后 A死亡了 之后BC也就跟着死亡了,这说明ABC都是要等到10H后才能消费)
@GetMapping("/api/ttlSendMessage")
@ApiOperation(value="指定消息的过期时间")
public String ttlSendMessage(){
MessageProperties messageProperties = new MessageProperties();
//设置过期时间 单位为ms
messageProperties.setExpiration("5000");
byte[] msgBytes = "测试消息自动过期".getBytes();
Message message = new Message(msgBytes, messageProperties);
rabbitTemplate.convertAndSend("springboot_topic_exchange", "springboot.TTL", message);
return "ok";
}
- 给队列设置消息过期时间,队列中的所有消息都有同样的过期时间
@Bean
public Queue TTLQueue() {
Map<String, Object> map = new HashMap<>();
// 队列中的消息未被消费则30秒后过期
map.put("x-message-ttl", 30000);
return new Queue("TTL_QUEUE", true, false, false, map);
}
@Bean
public DirectExchange TTLExchange() {
return new DirectExchange("TTL_EXCHANGE", true, false);
}
@Bean
public Binding bindingTTLDirect() {
return BindingBuilder.bind(TTLQueue()).to(TTLExchange()).with("TTL");
}
设置一个产生消息的消费者 并通过swagger发送10条消息后,查看rabbitmq的对应队列看板 发现经过30s后 消息消息:
@GetMapping("/api/ttlSendMessage")
@ApiOperation(value="指定消息的过期时间")
public String ttlSendMessage(){
MessageProperties messageProperties = new MessageProperties();
//设置过期时间 单位为ms 如果两者都进行了设置,以时间短的为准
messageProperties.setExpiration("5000");
byte[] msgBytes = "测试消息自动过期".getBytes();
Message message = new Message(msgBytes, messageProperties);
rabbitTemplate.convertAndSend("TTL_EXCHANGE", "TTL", message);
return "ok";
}
死信队列
死信队列,英文缩写:DLX 。Dead Letter Exchange
(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。
那么什么情况下消息会成为死信呢?
- 队列消息长度到达限制
- 消费者拒接消费消息,
basicNack/basicReject
,并且不把消息重新放入原目标队列,requeue=false
- 原队列存在消息过期**TTL(time-to-live)**设置,消息到达超时时间未被消费
给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key
死信交换机和死信队列和普通的没有区别,只不过是多了一个参数属性,当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列
代码实现
相关配置信息application.yaml:
define:
order:
exchange: code_order_exchange
queue: code_order_queue
routingKey: code.order
dlx:
exchange: code_dlx_exchange
queue: code_dlx_queue
routingKey: code.dlx
相关配置信息config
将需要的交换机,队列,绑定都声明成SpringBean。Spring会自动创建这些到RabbitMQ服务中
/**
* @author ralph
* @create 2022-04-06 22:56
*/
@Configuration
public class DlxConfig {
@Value("${define.order.exchange}")
private String orderExchange;
@Value("${define.order.queue}")
private String orderQueue;
@Value("${define.order.routingKey}")
private String orderRoutingKey;
@Value("${define.dlx.exchange}")
private String dlxExchange;
@Value("${define.dlx.queue}")
private String dlxQueue;
@Value("${define.dlx.routingKey}")
private String dlxRoutingKey;
/**
* declare dlx exchange
* @return DirectExchange
*/
@Bean
public DirectExchange dlxExchange(){
return new DirectExchange(dlxExchange);
}
/**
* declare business exchange
* @return DirectExchange
*/
@Bean
public DirectExchange orderExchange(){
return new DirectExchange(orderExchange);
}
/**
* declare dlx queue
* @return DirectExchange
*/
@Bean
public Queue dlxQueue(){
return new Queue(dlxQueue);
}
/**
* declare business queue
* @return DirectExchange
*/
@Bean
public Queue orderQueue(){
Hashtable<String, Object> mqArguments = new Hashtable<>(2);
// queue binding dlxExchange
mqArguments.put("x-dead-letter-exchange",dlxExchange);
mqArguments.put("x-dead-letter-routing-key",dlxRoutingKey);
return new Queue(orderQueue,true,false,false,mqArguments);
}
@Bean
public Binding binding(){
return BindingBuilder.bind(dlxQueue())
.to(dlxExchange())
.with(dlxRoutingKey);
}
/**
* business queue binding business exchange
* @return Binding
*/
@Bean
public Binding orderBinding(){
return BindingBuilder.bind(orderQueue())
.to(orderExchange())
.with(orderRoutingKey);
}
}
并且业务队列上有DLX标记,可见当前队列已经绑定了一个死信队列。DLK表示的路由键。
生产者:
通过swagger 连续点击 :我们可以发现控制面板 code_order_queue
队列开始有消息了 但是由于没有消费者 ,一段时间后 这些消息就会到code_dlx_queue
队中去。
@GetMapping("/api/dlxSendMessage")
@ApiOperation(value="[DLX]指定消息的过期时间")
public String sendDlx(){
Order order = new Order();
order.setItemId(1);
order.setStatus(1);
rabbitTemplate.convertAndSend(orderExchange,orderRoutingKey, JSON.toJSONString(order),message -> {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
message.getMessageProperties().setExpiration("10000");
return message;
});
return "message send success!";
}
消费者:设置死信队列监听
通过设置对死信队列的监听,可以发现,在Springboot启动之后,创建了对RabbitMQ的监听,死信队列的消息也立刻被消费了。
因此,我们可以监听死信队列,对未被消费的消息进行下一步操作。如场景分析中的更改订单状态。
@RabbitListener(queues="code_dlx_queue")
public void dlxListener(Message message, Channel channel) throws IOException, InterruptedException {
//获取消息
System.out.println("message:"+ new String(message.getBody()));
//业务处理
// 进行消息的签收
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
延迟队列
其实就是TTL + 私信队列的结合产物, 在RabbitMQ中并未提供延迟队列功能
应用场景
- 用户生成订单之后,需要过一段时间校验订单的支付状态,如果订单仍未支付则需要及时地关闭订单。
- 用户注册成功之后,需要过一段时间比如一周后校验用户的使用情况,如果发现用户活跃度较低,则发送邮件或者短信来提醒用户使用
实现方式和说明
1.方法一:利用到RabbitMQ的两个特性:
- Time To Live (TTL)
- Dead Letter Exchanges (DLX)
2.方法二:利用RabbitMQ中的插件x-delay-message
注意:不建议使用TTL这种方式来实现延迟队列,过期的消息变为死信,进入死信接收队列,而这个队列就是普通的队列,如果这个队列拥塞了很多死信,那么死信出队列的顺序就是其进入死信接收队列的顺序。
延迟消费的引入
:
延迟消费
是延迟队列最为常用的使用模式。如下图所示,生产者产生的消息首先会进入缓冲队列(图中红色队列)。通过RabbitMQ提供的TTL扩展,这些消息会被设置过期时间,也就是延迟消费的时间。等消息过期之后,这些消息会通过配置好的DLX转发到实际消费队列(图中蓝色队列),以此达到延迟消费的效果。
延迟重试
本质上也是延迟消费的一种,但是这种模式的结构与普通的延迟消费的流程图较为不同,所以单独拎出来介绍。如下图所示,消费者发现该消息处理出现了异常,比如是因为网络波动引起的异常。那么如果不等待一段时间,直接就重试的话,很可能会导致在这期间内一直无法成功,造成一定的资源浪费。那么我们可以将其先放在缓冲队列中(图中红色队列),等消息经过一段的延迟时间后再次进入实际消费队列中(图中蓝色队列),此时由于已经过了“较长”的时间了,异常的一些波动通常已经恢复,这些消息可以被正常地消费。
代码实现
1.基于TTL和DLX
config配置:
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitmqConfig {
/**
* 死信交换机
*/
@Bean
public DirectExchange userOrderDelayExchange() {
return new DirectExchange("user.order.delay_exchange", true, false);
}
/**
* 死信队列
*/
@Bean
public Queue userOrderDelayQueue() {
Map<String, Object> map = new HashMap<>(3);
// 设置15分钟过期时间
map.put("x-message-ttl", 900000);
map.put("x-dead-letter-exchange", "user.order.receive_exchange");
map.put("x-dead-letter-routing-key", "user.order.receive_key");
return new Queue("user.order.delay_queue", true, false, false, map);
}
/**
* 给死信队列绑定交换机
*/
@Bean
public Binding userOrderDelayBinding() {
return BindingBuilder.bind(userOrderDelayQueue()).to(userOrderDelayExchange()).with("user.order.delay_key");
}
/**
* 死信接收交换机
*/
@Bean
public DirectExchange userOrderReceiveExchange() {
return new DirectExchange("user.order.receive_exchange", true, false);
}
/**
* 死信接收队列,用于接收死信,该队列为正常队列,进入该队列的消息会被立即消费
*/
@Bean
public Queue userOrderReceiveQueue() {
return new Queue("user.order.receive_queue");
}
/**
* 给死信交换机绑定消费队列
*/
@Bean
public Binding userOrderReceiveBinding() {
return BindingBuilder.bind(userOrderReceiveQueue()).to(userOrderReceiveExchange()).with("user.order.receive_key");
}
}
生产者Controller:
@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping("/api/delay")
public String createOrderTest() {
OrderMaster orderMaster = new OrderMaster();
// 未支付
orderMaster.setOrderStatus(0);
// 未支付
orderMaster.setPayStatus(0);
orderMaster.setBuyerName("张三");
orderMaster.setBuyerAddress("湖南长沙");
orderMaster.setBuyerPhone("1388888888");
orderMaster.setOrderAmount(BigDecimal.ZERO);
orderMaster.setCreateTime(DateUtils.getCurrentDate());
orderMaster.setOrderId(UUID.randomUUID().toString().replaceAll("-", ""));
orderMasterService.insert(orderMaster);
// TODO:设置超时,用mq处理已超时的下单记录(一旦记录超时,则处理为无效)
rabbitTemplate.convertAndSend("user.order.delay_exchange", "user.order.delay_key", orderMaster, message -> {
message.getMessageProperties().setExpiration("300000");
return message;
});
return "创建订单成功";
}
消费者Listener:
// 监听消息队列
@RabbitListener(queues = "user.order.receive_queue")
public void consumeMessage(OrderMaster order) throws IOException {
try {
// 如果订单状态不是0 说明订单已经被其他消费队列改动过了 加一个状态用来判断集群状态的情况
if (Objects.equals(0,order.getOrderStatus())) {
// 设置订单过去状态
order.setOrderStatus(-1);
System.out.println(order.getBuyerName());
// 自己写的一个修改的方法
orderMasterService.updateByPrimaryKeySelective(order);
}
} catch (Exception e) {
e.printStackTrace()
}
}
}
2.基于x-delay-message插件
在rabbitmq 3.5.7及以上的版本提供了一个插件
rabbitmq-delayed-message-exchange 来实现延迟队列功能。同时插件依赖 Erlang/OPT 18.0 及以上
截止22年4月 GITHUB官方插件仅支持3.8.X以上的版本:
# 进入插件安装目录(可以查看一下当前已存在的插件)
cd {rabbitmq-server}/plugins/
# 下载插件 rabbitmq_delayed_message_exchange
wget https://bintray.com/rabbitmq/community-plugins/download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez
# 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 关闭插件
rabbitmq-plugins disable rabbitmq_delayed_message_exchange
config:
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitmqConfig {
/**
* 延时队列交换机
* 注意这里的交换机类型:CustomExchange
*/
@Bean
public CustomExchange delayExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
// 属性参数 交换机名称 交换机类型 是否持久化 是否自动删除 配置参数
return new CustomExchange("delay_exchange", "x-delayed-message", true, false, args);
}
/**
* 延时队列
*/
@Bean
public Queue delayQueue() {
// 属性参数 队列名称 是否持久化
return new Queue("delay_queue", true);
}
/**
* 给延时队列绑定交换机
*/
@Bean
public Binding cfgDelayBinding() {
return BindingBuilder.bind(delayQueue()).to(delayExchange()).with("delay_key").noargs();
}
}
生产者:
@GetMapping("test/{time}/{name}")
public String createOrderTest(@PathVariable("time") Integer time, @PathVariable("name") String name) {
OrderMaster orderMaster = new OrderMaster();
// 订单未完成
orderMaster.setOrderStatus(0);
// 未付款
orderMaster.setPayStatus(0);
orderMaster.setBuyerName(name);
orderMaster.setBuyerAddress("浙江温州");
orderMaster.setBuyerPhone("13888888888");
orderMaster.setOrderAmount(BigDecimal.ZERO);
orderMaster.setCreateTime(DateUtils.getCurrentDate());
orderMaster.setOrderId(UUID.randomUUID().toString().replaceAll("-", ""));
orderMasterService.insert(orderMaster);
// 第一个参数是前面RabbitMqConfig的交换机名称 第二个参数的路由名称 第三个参数是传递的参数 第四个参数是配置属性
this.rabbitTemplate.convertAndSend(
"delay_exchange",
"delay_key",
orderMaster,
message -> {
// 配置消息的过期时间
message.getMessageProperties().setDelay(time);
return message;
}
);
return "创建订单成功";
}
消费者:
import com.bean.springcloudcommon.model.OrderMaster;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Objects;
@Component
public class OrderReceiver {
@Autowired
private OrderMasterService orderMasterService;
// 监听消息队列
@RabbitListener(queues = "delay_queue")
public void consumeMessage(OrderMaster order) throws IOException {
try {
// 如果订单状态不是0 说明订单已经被其他消费队列改动过了 加一个状态用来判断集群状态的情况
if (Objects.equals(0,order.getOrderStatus())) {
// 设置订单过去状态
order.setOrderStatus(-1);
System.out.println(order.getBuyerName());
orderMasterService.updateByPrimaryKeySelective(order);
}
} catch (Exception e) {
e.printStackTrace()
}
}
}
消息可靠性投递
1.消息幂等性保障
幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。
在MQ中指,消费多条相同的消息,得到与消费该消息一次相同的结果
一般由消费端/业务代码来保证
2.消息持久化
- 消息在网络发送、网络传输、存盘等都有可能出现意外而导致消息丢失
- 例如如果队列、交换器、消息其中一个没有开启持久化,在broker重启后消息丢失
- 所以需要在消息发送前进行存盘,然后根据状态区分不同的消息种类,可以用来做重试等
消息积压
为什么会消息积压?
- 消费者宕机积压
- 消费者消费能力不足积压
- 发送者发流量太大
解决方案:上线更多的消费者,进行正常消费上线专门的队列消费服务,将消息先批量取出来,记录数据库,再慢慢处理
评论区