TTL (Time To Live)
TTL是MQ中一个消息或者队列的属性,表明一条消息或者队列中所有消息或者队列的最大存活时间,单位是毫秒。如果一条消息设置了TTL属性,或者进入了设置TTL的队列,这条消息在TTL内的时间未被消费则该条消息则变成死信。
- Tips: 创建队列的时候设置队列的“x-message-ttl”属性,消息过期则会立即进入死信队列
DLX (Dead Letter Exchange)
Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列(即延时队列)中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。
- Tips: 队列中的消息TTL不同,也要按照顺序进入死信队列,也就是说第一个数据的TTL设置为十分钟时,后面的消息的TTL无论是多少都得至少等十分钟才能被消费。所以要创建定时周期不同的定时任务的话,就得创建多个TTL不同的延时队列。
实现思路
- 生产者通过延时交换机发送消息给指定路由key的延时队列
- 延时队列中的消息过期之后自动通过死信交换机发送给指定路由key的死信队列
代码
DelayRabbitConfig.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
| @Configuration public class DelayRabbitConfig { @Bean public DelaySender delaySender() { return new DelaySender(); }
@Bean public DelayReceiver delayReceiver() { return new DelayReceiver(); }
@Bean("delayExchange") public DirectExchange delayExchange() { return new DirectExchange("delayExchange"); }
@Bean("delayQueueA") public Queue delayQueueA() { Map<String, Object> args = new HashMap<>(2); // x-dead-letter-exchange 这里声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange", "deadLetterExchange"); // x-dead-letter-routing-key 这里声明当前队列的死信路由key args.put("x-dead-letter-routing-key", "deadLetterQueueARoutingKey"); // x-message-ttl 声明队列的TTL 单位是毫秒 1000*6 args.put("x-message-ttl", 6000); return QueueBuilder.durable("delayQueueA").withArguments(args).build(); }
@Bean("deadLetterExchange") public DirectExchange deadLetterExchange() { return new DirectExchange("deadLetterExchange"); }
@Bean("deadLetterQueueA") public Queue deadLetterQueueA() { return new Queue("deadLetterQueueA"); } /** * 将延时队列A与延时交换机绑定 并指定延时队列路由 * * @param queue * @param exchange * @return */ @Bean public Binding delayBindingA(@Qualifier("delayQueueA") Queue queue, @Qualifier("delayExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("delayQueueARoutingKey"); }
/** * 将死信队列 与 死信交换机绑定 指定死信队列路由 * * @param queue * @param exchange * @return */ @Bean public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("deadLetterQueueARoutingKey"); }
}
|
DelaySender.java
1 2 3 4 5 6 7 8 9 10 11 12 13
| public class DelaySender { private static final Logger LOGGER = LoggerFactory.getLogger(DelaySender.class);
@Autowired private RabbitTemplate template;
public void send() { String message = LocalDateTime.now(); this.template.convertAndSend("delayExchange","delayQueueARoutingKey", message); LOGGER.info(" [x] Sent '{}'", message); }
}
|
DelayReceiver.java
1 2 3 4 5 6 7 8 9 10
| public class DelayReceiver {
private static final Logger LOGGER = LoggerFactory.getLogger(DelayReceiver.class);
@RabbitListener(queues = "deadLetterQueueA") public void receiveA(Message message) { String msg = new String(message.getBody()); LOGGER.info("当前时间:{},死信队列A收到消息:{}", LocalDateTime.now(), msg); } }
|