RabbitMQ五种模式的基本实现


前置工作

pom.xml

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application-dev.yml

1
2
3
4
5
6
7
8
9
10
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /mall
username: xxxx
password: xxxx
publisher-confirm-type: correlated #消息发送到交换器确认
publisher-returns: true #消息发送到队列确认

简单模式

包括一个生产者、一个消费者和一个队列。

SimpleRabbitConfig.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Configuration
public class SimpleRabbitConfig {

@Bean
public Queue simpleQueue() {
return new Queue("simple.hello");
}

@Bean
public SimpleSender simpleSender() {
return new SimpleSender();
}

@Bean
public SimpleReceiver simpleReceiver() {
return new SimpleReceiver();
}
}

SimpleSender.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class SimpleSender {

private static final Logger LOGGER = LoggerFactory.getLogger(SimpleSender.class);

@Autowired
private RabbitTemplate template;

private static final String queueName = "simple.hello";

public void send() {
String message = "Hello World!";
this.template.convertAndSend(queueName, message);
LOGGER.info(" [x] Sent '{}'", message);
}
}

SimpleReceiver.java

1
2
3
4
5
6
7
8
9
10
11
@RabbitListener(queues = "simple.hello")
public class SimpleReceiver {

private static final Logger LOGGER = LoggerFactory.getLogger(SimpleReceiver.class);

@RabbitHandler
public void receive(String in) {
LOGGER.info(" [x] Received '{}'", in);
}

}

工作模式

包括一个生产者、一个队列和两个消费者。消费者绑定同一个队列,通过竞争的方式消费消息。当一个消费者处理任务时,空闲的消费者从队列中消费消息。

WorkRabbitConfig.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Configuration
public class WorkRabbitConfig {

@Bean
public Queue workQueue() {
return new Queue("work.hello");
}

@Bean
public WorkSender workSender() {
return new WorkSender();
}

@Bean
public WorkReceiver workReceiver1() {
return new WorkReceiver(1);
}

@Bean
public WorkReceiver workReceiver2() {
return new WorkReceiver(2);
}
}

WorkSender.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class WorkSender {

private static final Logger LOGGER = LoggerFactory.getLogger(WorkSender.class);

@Autowired
private RabbitTemplate template;

private static final String queueName = "work.hello";

public void send(int index) {
StringBuilder builder = new StringBuilder("Hello");
int limitIndex = index % 3 + 1;
for (int i = 0; i < limitIndex; i++) {
builder.append('.');
}
builder.append(index + 1);
String message = builder.toString();
template.convertAndSend(queueName, message);
LOGGER.info(" [x] Sent '{}'", message);
}
}

WorkReceiver.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@RabbitListener(queues = "work.hello")
public class WorkReceiver {

private static final Logger LOGGER = LoggerFactory.getLogger(WorkReceiver.class);

private final int instance;

public WorkReceiver(int i) {
this.instance = i;
}

@RabbitHandler
public void receive(String in) {
StopWatch watch = new StopWatch();
watch.start();
LOGGER.info("instance {} [x] Received '{}'", this.instance, in);
doWork(in);
watch.stop();
LOGGER.info("instance {} [x] Done in {}s", this.instance, watch.getTotalTimeSeconds());
}

private void doWork(String in) {
for (char ch : in.toCharArray()) {
if (ch == '.') {
ThreadUtil.sleep(1000L);
}
}
}
}

发布/订阅模式

包含一个生产者、一个交换机、两个队列、两个消费者。两个消费者绑定两个队列,两个队列绑定一个交换机,生产者通过交换机向队列发送消息,消费者从队列消费消息。

FanoutRabbitConfig.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Configuration
public class FanoutRabbitConfig {

@Bean
public FanoutExchange fanout() {
return new FanoutExchange("exchange.fanout");
}

@Bean
public Queue fanoutQueue1() {
return new AnonymousQueue();
}

@Bean
public Queue fanoutQueue2() {
return new AnonymousQueue();
}

@Bean
public Binding fanoutBinding1(FanoutExchange fanout, Queue fanoutQueue1) {
return BindingBuilder.bind(fanoutQueue1).to(fanout);
}

@Bean
public Binding fanoutBinding2(FanoutExchange fanout, Queue fanoutQueue2) {
return BindingBuilder.bind(fanoutQueue2).to(fanout);
}

@Bean
public FanoutReceiver fanoutReceiver() {
return new FanoutReceiver();
}

@Bean
public FanoutSender fanoutSender() {
return new FanoutSender();
}
}

FanoutSender.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class FanoutSender {
private static final Logger LOGGER = LoggerFactory.getLogger(FanoutSender.class);
private static final String exchangeName = "exchange.fanout";
@Autowired
private RabbitTemplate template;

public void send(int index) {
StringBuilder builder = new StringBuilder("Hello");
int limitIndex = index % 3 + 1;
for (int i = 0; i < limitIndex; i++) {
builder.append('.');
}
builder.append(index + 1);
String message = builder.toString();
template.convertAndSend(exchangeName, "", message);
LOGGER.info(" [x] Sent '{}'", message);
}
}

FanoutReceiver.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class FanoutReceiver {

private static final Logger LOGGER = LoggerFactory.getLogger(FanoutReceiver.class);

@RabbitListener(queues = "#{fanoutQueue1.name}")
public void receive1(String in) {
receive(in, 1);
}

@RabbitListener(queues = "#{fanoutQueue2.name}")
public void receive2(String in) {
receive(in, 2);
}

private void receive(String in, int receiver) {
StopWatch watch = new StopWatch();
watch.start();
LOGGER.info("instance {} [x] Received '{}'", receiver, in);
doWork(in);
watch.stop();
LOGGER.info("instance {} [x] Done in {}s", receiver, watch.getTotalTimeSeconds());
}

private void doWork(String in) {
for (char ch : in.toCharArray()) {
if (ch == '.') {
ThreadUtil.sleep(1000L);
}
}
}
}

路由模式

路由模式是可以根据路由键选择性给多个消费者发送消息的模式,包含一个生产者、两个消费者、两个队列和一个交换机。两个消费者绑定两个队列,两个队列通过路由键绑定到交换机上去,生产者发送消息到交换机,交换机通过路由键转发到不同队列,消费者从绑定队列消费消息。

DirectRabbitConfig.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@Configuration
public class DirectRabbitConfig {

@Bean
public DirectExchange direct() {
return new DirectExchange("exchange.direct");
}

@Bean
public Queue directQueue1() {
return new AnonymousQueue();
}

@Bean
public Queue directQueue2() {
return new AnonymousQueue();
}

@Bean
public Binding directBinding1a(DirectExchange direct, Queue directQueue1) {
return BindingBuilder.bind(directQueue1).to(direct).with("orange");
}

@Bean
public Binding directBinding1b(DirectExchange direct, Queue directQueue1) {
return BindingBuilder.bind(directQueue1).to(direct).with("black");
}

@Bean
public Binding directBinding2a(DirectExchange direct, Queue directQueue2) {
return BindingBuilder.bind(directQueue2).to(direct).with("green");
}

@Bean
public Binding directBinding2b(DirectExchange direct, Queue directQueue2) {
return BindingBuilder.bind(directQueue2).to(direct).with("black");
}

@Bean
public DirectReceiver receiver() {
return new DirectReceiver();
}

@Bean
public DirectSender directSender() {
return new DirectSender();
}
}

DirectSender.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class DirectSender {

@Autowired
private RabbitTemplate template;

private static final String exchangeName = "exchange.direct";

private final String[] keys = {"orange", "black", "green"};

private static final Logger LOGGER = LoggerFactory.getLogger(DirectSender.class);

public void send(int index) {
int limitIndex = index % 3;
String key = keys[limitIndex];
String message = "Hello to " + key + ' ' + (index + 1);
template.convertAndSend(exchangeName, key, message);
LOGGER.info(" [x] Sent '{}'", message);
}
}

DirectReceiver.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class DirectReceiver {
private static final Logger LOGGER = LoggerFactory.getLogger(DirectReceiver.class);

@RabbitListener(queues = "#{directQueue1.name}")
public void receive1(String in) {
receive(in, 1);
}

@RabbitListener(queues = "#{directQueue2.name}")
public void receive2(String in) {
receive(in, 2);
}

private void receive(String in, int receiver) {
StopWatch watch = new StopWatch();
watch.start();
LOGGER.info("instance {} [x] Received '{}'", receiver, in);
doWork(in);
watch.stop();
LOGGER.info("instance {} [x] Done in {}s", receiver, watch.getTotalTimeSeconds());
}

private void doWork(String in) {
for (char ch : in.toCharArray()) {
if (ch == '.') {
ThreadUtil.sleep(1000L);
}
}
}
}

通配符模式

跟路由模式类似,路由键换成通配符即可。

TopicRabbitConfig.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@Configuration
public class TopicRabbitConfig {

@Bean
public TopicExchange topic() {
return new TopicExchange("exchange.topic");
}

@Bean
public Queue topicQueue1() {
return new AnonymousQueue();
}

@Bean
public Queue topicQueue2() {
return new AnonymousQueue();
}

@Bean
public Binding topicBinding1a(TopicExchange topic, Queue topicQueue1) {
return BindingBuilder.bind(topicQueue1).to(topic).with("*.orange.*");
}

@Bean
public Binding topicBinding1b(TopicExchange topic, Queue topicQueue1) {
return BindingBuilder.bind(topicQueue1).to(topic).with("*.*.rabbit");
}

@Bean
public Binding topicBinding2a(TopicExchange topic, Queue topicQueue2) {
return BindingBuilder.bind(topicQueue2).to(topic).with("lazy.#");
}

@Bean
public TopicReceiver topicReceiver() {
return new TopicReceiver();
}

@Bean
public TopicSender topicSender() {
return new TopicSender();
}
}

TopicSender.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class TopicSender {
private static final String exchangeName = "exchange.topic";
private static final Logger LOGGER = LoggerFactory.getLogger(TopicSender.class);
private final String[] keys = {"quick.orange.rabbit", "lazy.orange.elephant", "quick.orange.fox",
"lazy.brown.fox", "lazy.pink.rabbit", "quick.brown.fox"};
@Autowired
private RabbitTemplate template;

public void send(int index) {
int limitIndex = index % keys.length;
String key = keys[limitIndex];
String message = "Hello to " + key + ' ' +
(index + 1);
template.convertAndSend(exchangeName, key, message);
LOGGER.info(" [x] Sent '{}'", message);
System.out.println(" [x] Sent '" + message + "'");
}
}

TopicReceiver.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class TopicReceiver {

private static final Logger LOGGER = LoggerFactory.getLogger(TopicReceiver.class);

@RabbitListener(queues = "#{topicQueue1.name}")
public void receive1(String in) {
receive(in, 1);
}

@RabbitListener(queues = "#{topicQueue2.name}")
public void receive2(String in) {
receive(in, 2);
}

public void receive(String in, int receiver) {
StopWatch watch = new StopWatch();
watch.start();
LOGGER.info("instance {} [x] Received '{}'", receiver, in);
doWork(in);
watch.stop();
LOGGER.info("instance {} [x] Done in {}s", receiver, watch.getTotalTimeSeconds());
}

private void doWork(String in) {
for (char ch : in.toCharArray()) {
if (ch == '.') {
ThreadUtil.sleep(1000L);
}
}
}
}

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
@Api(tags = "RabbitController", description = "RabbitMQ功能测试")
@RestController
@RequestMapping("/rabbit")
public class RabbitController {

@Autowired
private SimpleSender simpleSender;

@Autowired
private WorkSender workSender;

@Autowired
private FanoutSender fanoutSender;

@Autowired
private DirectSender directSender;

@Autowired
private TopicSender topicSender;

@ApiOperation("简单模式")
@RequestMapping(value = "/simple", method = RequestMethod.GET)
public CommonResult simpleTest() {
for (int i = 0; i < 10; i++) {
simpleSender.send();
ThreadUtil.sleep(1000L);
}
return CommonResult.success(null);
}

@ApiOperation("工作模式")
@RequestMapping(value = "/work", method = RequestMethod.GET)
@ResponseBody
public CommonResult workTest() {
for (int i = 0; i < 10; i++) {
workSender.send(i);
ThreadUtil.sleep(1000L);
}
return CommonResult.success(null);
}

@ApiOperation("发布/订阅模式")
@RequestMapping(value = "/fanout", method = RequestMethod.GET)
@ResponseBody
public CommonResult fanoutTest() {
for (int i = 0; i < 10; i++) {
fanoutSender.send(i);
ThreadUtil.sleep(1000L);
}
return CommonResult.success(null);
}

@ApiOperation("路由模式")
@RequestMapping(value = "/direct", method = RequestMethod.GET)
@ResponseBody
public CommonResult directTest() {
for (int i = 0; i < 10; i++) {
directSender.send(i);
ThreadUtil.sleep(1000L);
}
return CommonResult.success(null);
}

@ApiOperation("通配符模式")
@RequestMapping(value = "/topic", method = RequestMethod.GET)
@ResponseBody
public CommonResult topicTest() {
for (int i = 0; i < 10; i++) {
topicSender.send(i);
ThreadUtil.sleep(1000L);
}
return CommonResult.success(null);
}
}

← Prev Nginx配置跨域 | RedisTemplate操作Redis Next →