一、背景
最近开发一个活动功能,需要在活动结束后给榜单 Top10 用户发放奖励。由于活动的榜单是通过 RabbitMQ 进行异步统计分值排名的,因此在活动结束时队列中可能还存在消息未消费完全,排名不准确,此时发放活动奖励必然会出错。
那么,如果解决这个问题呢? 与产品经理协商,允许延迟 10 分钟发放奖励。目前有 2 个方案:
1 2 3 4 5
| 使用定时器:判断当前时间与活动结束时间的时间差,如果 >= 10分钟就发放奖励 -- 缺点:除了需要调度线程,还需要定期访问数据库获取活动结束时间来判断,这样既浪费资源也不优雅
使用 RabbitMQ 延迟队列 -- 优点:既能满足需求也规避定时器实现方案的缺点
|
因此,最终选定 RabbitMQ 的延迟队列实现方案。但是,RabbitMQ 没有直接提供延迟队列我们该如何实现呢?请继续阅读下文。
二、TTL
TTL 全称 Time To Live(存活时间/过期时间),当消息到达存活时间后,还没有被消费,会被自动清除。
RabbitMQ 提供 2 种方式给消息设置过期时间:
1 2 3 4 5
| 设置队列过期时间参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期
调用消息过期时间方法,设置过期时间:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期
如果两者都进行了设置,以时间短的为准
|
TTL实战
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @Bean public Queue ttlQueue() { Map<String,Object> map = new HashMap<>(1); // 设置队列过期参数 map.put("x-message-ttl", 10000); return new Queue("ttl.queue", true, false, false, map); }
@Bean public DirectExchange ttlDirectExchange() { return new DirectExchange("ttl.direct.exchange", true, false); }
@Bean public Binding ttlDirectBinding() { return BindingBuilder .bind(ttlQueue()) .to(ttlDirectExchange()) .with("ttl"); }
|
测试类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @RunWith(SpringRunner.class) @SpringBootTest public class RabbitmqTestApplicationTests {
@Autowired private RabbitTemplate rabbitTemplate; @Test public void test() throws Exception { for (int i = 0; i < 10; i++) { this.rabbitTemplate.convertAndSend("ttl.direct.exchange", "ttl", "hello ttl", new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { // 调用消息过期方法 message.getMessageProperties().setExpiration("5000"); return message; } }); } } }
|
测试结果如下图:
ttl.queue 队列创建后,可以看出 ttl 属性为 10000(10秒),里边的的 10 条消息在 5 秒后被清除了。
三、死信队列
DLX 全称 Dead Letter Exchange(死信交换机),当消息成为死信 (Dead Message) 后,可以被重新发送到另一个交换机,这个交换机就是死信交换机,由于交换机是 RabbitMQ 特有的,通常我们把死信交换机也成为死信队列。
原理图如下:
要实现上边的流程,我们需要解决 2 个问题:
1 2 3 4 5 6 7
| 队列中的消息怎么成为死信: --原队列消息长度到达限制 --原队列存在消息过期设置,消息到达超时时间未被消费 --消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false
消息成为死信后如何投递到死信队列中: --给原队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key
|
死信队列实战
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| @Bean public Queue ttlQueue() { Map<String,Object> map = new HashMap<>(1); map.put("x-message-ttl", 10000); // 设置消息成为死信后,传发到的路由器 map.put("x-dead-letter-exchange","dead.direct.exchange"); map.put("x-dead-letter-routing-key", "dead"); return new Queue("ttl.queue", true, false, false, map); }
@Bean public DirectExchange ttlDirectExchange() { return new DirectExchange("ttl.direct.exchange", true, false); }
@Bean public Binding ttlDirectBinding() { return BindingBuilder .bind(ttlQueue()) .to(ttlDirectExchange()) .with("ttl"); } // =======================以下为死信队列相关配置========================= @Bean public Queue deadQueue() { return new Queue("dead.queue", true); }
@Bean public DirectExchange deadDirectExchange() { return new DirectExchange("dead.direct.exchange", true, false); }
@Bean public Binding deadDirectBinding() { return BindingBuilder .bind(deadQueue()) .to(deadDirectExchange()) .with("dead"); }
|
测试类代码不变,为了方便测试,我们这里就不写消费者代码。我们需要先把 ttl.queue 队列删除再执行代码。结果如下图:
由图可知,ttl.queue 队列里的消息在 5 秒后转移 dead.queue 队列中了,其实这样就已经实现了延迟队列。
我们把需要实现的功能套用到上边的案例中:活动结束后我们发送一条有过期时间的消息(10分钟)到 ttl.queue 队列中,该队列不需要消费者。10分钟后由于消息没被消费被转发到死信队列 dead.queue 队列中,dead.queue 设置消费者,消费者用于执行发放活动奖励。
四、参考资料
官方文档 ttl
官方文档 dlx