RabbitMQ通过TTL和DLX实现延时队列(动态定时任务)


TTL (Time To Live)

TTL是MQ中一个消息或者队列的属性,表明一条消息或者队列中所有消息或者队列的最大存活时间,单位是毫秒。如果一条消息设置了TTL属性,或者进入了设置TTL的队列,这条消息在TTL内的时间未被消费则该条消息则变成死信。

  • Tips: 创建队列的时候设置队列的“x-message-ttl”属性,消息过期则会立即进入死信队列

DLX (Dead Letter Exchange)

Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列(即延时队列)中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。

  • Tips: 队列中的消息TTL不同,也要按照顺序进入死信队列,也就是说第一个数据的TTL设置为十分钟时,后面的消息的TTL无论是多少都得至少等十分钟才能被消费。所以要创建定时周期不同的定时任务的话,就得创建多个TTL不同的延时队列。

实现思路

  1. 生产者通过延时交换机发送消息给指定路由key的延时队列
  2. 延时队列中的消息过期之后自动通过死信交换机发送给指定路由key的死信队列

代码

DelayRabbitConfig.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@Configuration
public class DelayRabbitConfig {
@Bean
public DelaySender delaySender() {
return new DelaySender();
}

@Bean
public DelayReceiver delayReceiver() {
return new DelayReceiver();
}

@Bean("delayExchange")
public DirectExchange delayExchange() {
return new DirectExchange("delayExchange");
}

@Bean("delayQueueA")
public Queue delayQueueA() {
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", "deadLetterExchange");
// x-dead-letter-routing-key 这里声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", "deadLetterQueueARoutingKey");
// x-message-ttl 声明队列的TTL 单位是毫秒 1000*6
args.put("x-message-ttl", 6000);
return QueueBuilder.durable("delayQueueA").withArguments(args).build();
}

@Bean("deadLetterExchange")
public DirectExchange deadLetterExchange() {
return new DirectExchange("deadLetterExchange");
}

@Bean("deadLetterQueueA")
public Queue deadLetterQueueA() {
return new Queue("deadLetterQueueA");
}

/**
* 将延时队列A与延时交换机绑定 并指定延时队列路由
*
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding delayBindingA(@Qualifier("delayQueueA") Queue queue,
@Qualifier("delayExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("delayQueueARoutingKey");
}

/**
* 将死信队列 与 死信交换机绑定 指定死信队列路由
*
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("deadLetterQueueARoutingKey");
}

}

DelaySender.java

1
2
3
4
5
6
7
8
9
10
11
12
13
public class DelaySender {
private static final Logger LOGGER = LoggerFactory.getLogger(DelaySender.class);

@Autowired
private RabbitTemplate template;

public void send() {
String message = LocalDateTime.now();
this.template.convertAndSend("delayExchange","delayQueueARoutingKey", message);
LOGGER.info(" [x] Sent '{}'", message);
}

}

DelayReceiver.java

1
2
3
4
5
6
7
8
9
10
public class DelayReceiver {

private static final Logger LOGGER = LoggerFactory.getLogger(DelayReceiver.class);

@RabbitListener(queues = "deadLetterQueueA")
public void receiveA(Message message) {
String msg = new String(message.getBody());
LOGGER.info("当前时间:{},死信队列A收到消息:{}", LocalDateTime.now(), msg);
}
}

← Prev 96.不同的二叉搜索树 | SpringDataJPA之Base抽象类+范型快速实现增删查改 Next →