前置工作
pom.xml
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
application-dev.yml
1 2 3 4 5 6 7 8 9 10
| spring: rabbitmq: host: localhost port: 5672 virtual-host: /mall username: xxxx password: xxxx publisher-confirm-type: correlated #消息发送到交换器确认 publisher-returns: true #消息发送到队列确认
|
简单模式
包括一个生产者、一个消费者和一个队列。
SimpleRabbitConfig.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Configuration public class SimpleRabbitConfig {
@Bean public Queue simpleQueue() { return new Queue("simple.hello"); }
@Bean public SimpleSender simpleSender() { return new SimpleSender(); }
@Bean public SimpleReceiver simpleReceiver() { return new SimpleReceiver(); } }
|
SimpleSender.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public class SimpleSender {
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleSender.class);
@Autowired private RabbitTemplate template;
private static final String queueName = "simple.hello";
public void send() { String message = "Hello World!"; this.template.convertAndSend(queueName, message); LOGGER.info(" [x] Sent '{}'", message); } }
|
SimpleReceiver.java
1 2 3 4 5 6 7 8 9 10 11
| @RabbitListener(queues = "simple.hello") public class SimpleReceiver {
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleReceiver.class);
@RabbitHandler public void receive(String in) { LOGGER.info(" [x] Received '{}'", in); }
}
|
工作模式
包括一个生产者、一个队列和两个消费者。消费者绑定同一个队列,通过竞争的方式消费消息。当一个消费者处理任务时,空闲的消费者从队列中消费消息。
WorkRabbitConfig.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| @Configuration public class WorkRabbitConfig { @Bean public Queue workQueue() { return new Queue("work.hello"); }
@Bean public WorkSender workSender() { return new WorkSender(); } @Bean public WorkReceiver workReceiver1() { return new WorkReceiver(1); }
@Bean public WorkReceiver workReceiver2() { return new WorkReceiver(2); } }
|
WorkSender.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public class WorkSender {
private static final Logger LOGGER = LoggerFactory.getLogger(WorkSender.class);
@Autowired private RabbitTemplate template;
private static final String queueName = "work.hello";
public void send(int index) { StringBuilder builder = new StringBuilder("Hello"); int limitIndex = index % 3 + 1; for (int i = 0; i < limitIndex; i++) { builder.append('.'); } builder.append(index + 1); String message = builder.toString(); template.convertAndSend(queueName, message); LOGGER.info(" [x] Sent '{}'", message); } }
|
WorkReceiver.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| @RabbitListener(queues = "work.hello") public class WorkReceiver {
private static final Logger LOGGER = LoggerFactory.getLogger(WorkReceiver.class);
private final int instance;
public WorkReceiver(int i) { this.instance = i; }
@RabbitHandler public void receive(String in) { StopWatch watch = new StopWatch(); watch.start(); LOGGER.info("instance {} [x] Received '{}'", this.instance, in); doWork(in); watch.stop(); LOGGER.info("instance {} [x] Done in {}s", this.instance, watch.getTotalTimeSeconds()); }
private void doWork(String in) { for (char ch : in.toCharArray()) { if (ch == '.') { ThreadUtil.sleep(1000L); } } } }
|
发布/订阅模式
包含一个生产者、一个交换机、两个队列、两个消费者。两个消费者绑定两个队列,两个队列绑定一个交换机,生产者通过交换机向队列发送消息,消费者从队列消费消息。
FanoutRabbitConfig.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| @Configuration public class FanoutRabbitConfig {
@Bean public FanoutExchange fanout() { return new FanoutExchange("exchange.fanout"); }
@Bean public Queue fanoutQueue1() { return new AnonymousQueue(); }
@Bean public Queue fanoutQueue2() { return new AnonymousQueue(); }
@Bean public Binding fanoutBinding1(FanoutExchange fanout, Queue fanoutQueue1) { return BindingBuilder.bind(fanoutQueue1).to(fanout); }
@Bean public Binding fanoutBinding2(FanoutExchange fanout, Queue fanoutQueue2) { return BindingBuilder.bind(fanoutQueue2).to(fanout); }
@Bean public FanoutReceiver fanoutReceiver() { return new FanoutReceiver(); }
@Bean public FanoutSender fanoutSender() { return new FanoutSender(); } }
|
FanoutSender.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public class FanoutSender { private static final Logger LOGGER = LoggerFactory.getLogger(FanoutSender.class); private static final String exchangeName = "exchange.fanout"; @Autowired private RabbitTemplate template;
public void send(int index) { StringBuilder builder = new StringBuilder("Hello"); int limitIndex = index % 3 + 1; for (int i = 0; i < limitIndex; i++) { builder.append('.'); } builder.append(index + 1); String message = builder.toString(); template.convertAndSend(exchangeName, "", message); LOGGER.info(" [x] Sent '{}'", message); } }
|
FanoutReceiver.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public class FanoutReceiver {
private static final Logger LOGGER = LoggerFactory.getLogger(FanoutReceiver.class);
@RabbitListener(queues = "#{fanoutQueue1.name}") public void receive1(String in) { receive(in, 1); }
@RabbitListener(queues = "#{fanoutQueue2.name}") public void receive2(String in) { receive(in, 2); }
private void receive(String in, int receiver) { StopWatch watch = new StopWatch(); watch.start(); LOGGER.info("instance {} [x] Received '{}'", receiver, in); doWork(in); watch.stop(); LOGGER.info("instance {} [x] Done in {}s", receiver, watch.getTotalTimeSeconds()); }
private void doWork(String in) { for (char ch : in.toCharArray()) { if (ch == '.') { ThreadUtil.sleep(1000L); } } } }
|
路由模式
路由模式是可以根据路由键选择性给多个消费者发送消息的模式,包含一个生产者、两个消费者、两个队列和一个交换机。两个消费者绑定两个队列,两个队列通过路由键绑定到交换机上去,生产者发送消息到交换机,交换机通过路由键转发到不同队列,消费者从绑定队列消费消息。
DirectRabbitConfig.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| @Configuration public class DirectRabbitConfig {
@Bean public DirectExchange direct() { return new DirectExchange("exchange.direct"); }
@Bean public Queue directQueue1() { return new AnonymousQueue(); }
@Bean public Queue directQueue2() { return new AnonymousQueue(); }
@Bean public Binding directBinding1a(DirectExchange direct, Queue directQueue1) { return BindingBuilder.bind(directQueue1).to(direct).with("orange"); }
@Bean public Binding directBinding1b(DirectExchange direct, Queue directQueue1) { return BindingBuilder.bind(directQueue1).to(direct).with("black"); }
@Bean public Binding directBinding2a(DirectExchange direct, Queue directQueue2) { return BindingBuilder.bind(directQueue2).to(direct).with("green"); }
@Bean public Binding directBinding2b(DirectExchange direct, Queue directQueue2) { return BindingBuilder.bind(directQueue2).to(direct).with("black"); }
@Bean public DirectReceiver receiver() { return new DirectReceiver(); } @Bean public DirectSender directSender() { return new DirectSender(); } }
|
DirectSender.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public class DirectSender {
@Autowired private RabbitTemplate template;
private static final String exchangeName = "exchange.direct";
private final String[] keys = {"orange", "black", "green"};
private static final Logger LOGGER = LoggerFactory.getLogger(DirectSender.class);
public void send(int index) { int limitIndex = index % 3; String key = keys[limitIndex]; String message = "Hello to " + key + ' ' + (index + 1); template.convertAndSend(exchangeName, key, message); LOGGER.info(" [x] Sent '{}'", message); } }
|
DirectReceiver.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public class DirectReceiver { private static final Logger LOGGER = LoggerFactory.getLogger(DirectReceiver.class);
@RabbitListener(queues = "#{directQueue1.name}") public void receive1(String in) { receive(in, 1); }
@RabbitListener(queues = "#{directQueue2.name}") public void receive2(String in) { receive(in, 2); }
private void receive(String in, int receiver) { StopWatch watch = new StopWatch(); watch.start(); LOGGER.info("instance {} [x] Received '{}'", receiver, in); doWork(in); watch.stop(); LOGGER.info("instance {} [x] Done in {}s", receiver, watch.getTotalTimeSeconds()); }
private void doWork(String in) { for (char ch : in.toCharArray()) { if (ch == '.') { ThreadUtil.sleep(1000L); } } } }
|
通配符模式
跟路由模式类似,路由键换成通配符即可。
TopicRabbitConfig.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| @Configuration public class TopicRabbitConfig {
@Bean public TopicExchange topic() { return new TopicExchange("exchange.topic"); }
@Bean public Queue topicQueue1() { return new AnonymousQueue(); }
@Bean public Queue topicQueue2() { return new AnonymousQueue(); }
@Bean public Binding topicBinding1a(TopicExchange topic, Queue topicQueue1) { return BindingBuilder.bind(topicQueue1).to(topic).with("*.orange.*"); }
@Bean public Binding topicBinding1b(TopicExchange topic, Queue topicQueue1) { return BindingBuilder.bind(topicQueue1).to(topic).with("*.*.rabbit"); }
@Bean public Binding topicBinding2a(TopicExchange topic, Queue topicQueue2) { return BindingBuilder.bind(topicQueue2).to(topic).with("lazy.#"); }
@Bean public TopicReceiver topicReceiver() { return new TopicReceiver(); }
@Bean public TopicSender topicSender() { return new TopicSender(); } }
|
TopicSender.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public class TopicSender { private static final String exchangeName = "exchange.topic"; private static final Logger LOGGER = LoggerFactory.getLogger(TopicSender.class); private final String[] keys = {"quick.orange.rabbit", "lazy.orange.elephant", "quick.orange.fox", "lazy.brown.fox", "lazy.pink.rabbit", "quick.brown.fox"}; @Autowired private RabbitTemplate template;
public void send(int index) { int limitIndex = index % keys.length; String key = keys[limitIndex]; String message = "Hello to " + key + ' ' + (index + 1); template.convertAndSend(exchangeName, key, message); LOGGER.info(" [x] Sent '{}'", message); System.out.println(" [x] Sent '" + message + "'"); } }
|
TopicReceiver.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public class TopicReceiver {
private static final Logger LOGGER = LoggerFactory.getLogger(TopicReceiver.class);
@RabbitListener(queues = "#{topicQueue1.name}") public void receive1(String in) { receive(in, 1); }
@RabbitListener(queues = "#{topicQueue2.name}") public void receive2(String in) { receive(in, 2); }
public void receive(String in, int receiver) { StopWatch watch = new StopWatch(); watch.start(); LOGGER.info("instance {} [x] Received '{}'", receiver, in); doWork(in); watch.stop(); LOGGER.info("instance {} [x] Done in {}s", receiver, watch.getTotalTimeSeconds()); }
private void doWork(String in) { for (char ch : in.toCharArray()) { if (ch == '.') { ThreadUtil.sleep(1000L); } } } }
|
测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
| @Api(tags = "RabbitController", description = "RabbitMQ功能测试") @RestController @RequestMapping("/rabbit") public class RabbitController {
@Autowired private SimpleSender simpleSender;
@Autowired private WorkSender workSender;
@Autowired private FanoutSender fanoutSender;
@Autowired private DirectSender directSender;
@Autowired private TopicSender topicSender;
@ApiOperation("简单模式") @RequestMapping(value = "/simple", method = RequestMethod.GET) public CommonResult simpleTest() { for (int i = 0; i < 10; i++) { simpleSender.send(); ThreadUtil.sleep(1000L); } return CommonResult.success(null); }
@ApiOperation("工作模式") @RequestMapping(value = "/work", method = RequestMethod.GET) @ResponseBody public CommonResult workTest() { for (int i = 0; i < 10; i++) { workSender.send(i); ThreadUtil.sleep(1000L); } return CommonResult.success(null); }
@ApiOperation("发布/订阅模式") @RequestMapping(value = "/fanout", method = RequestMethod.GET) @ResponseBody public CommonResult fanoutTest() { for (int i = 0; i < 10; i++) { fanoutSender.send(i); ThreadUtil.sleep(1000L); } return CommonResult.success(null); }
@ApiOperation("路由模式") @RequestMapping(value = "/direct", method = RequestMethod.GET) @ResponseBody public CommonResult directTest() { for (int i = 0; i < 10; i++) { directSender.send(i); ThreadUtil.sleep(1000L); } return CommonResult.success(null); }
@ApiOperation("通配符模式") @RequestMapping(value = "/topic", method = RequestMethod.GET) @ResponseBody public CommonResult topicTest() { for (int i = 0; i < 10; i++) { topicSender.send(i); ThreadUtil.sleep(1000L); } return CommonResult.success(null); } }
|