0%

springboot中rabbitmq的集成与使用

RabbitMQ是一个基于AMQP协议的轻量级,可靠,可扩展且可移植的消息代理。Spring的一个springAMQP项目将核心Spring概念应用于基于AMQP的消息传递解决方案的开发。SpringBoot则是将springAMQP包装了一层,提供了pring-boot-starter-amqp“Starter”来为通过RabbitMQ使用AMQP提供了便利。

rabbitmq的安装可参考《ubuntu下安装rabbitmq》

springboot集成rabbitmq

引入依赖

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

spring-boot-starter-amqp

在springboot中,spring-boot-starter-amqp为我们提供了与rabbtmq的交互方式,其主要引入了如下依赖:

  • org.springframework.amqp:spring-rabbit
    • org.springframework.amqp:spring-amqp
    • com.rabbitmq:amqp-client

我们在使用的时候主要是spring-amqp和spring-rabbit里面的类及方法。

org.springframework.amqp:spring-amqp提供了amqp协议的交换器(exchange)、绑定(bind)、queue(队列)、message(消息)、template(模板)等的定义与封装,如下:

此处仅列出部分类的类名,有关类详情和更多类信息可自行查看源码

  • exchange(交换器)
    • AbstractExchange 交换器的抽象
    • DirectExchange direct交换器的定义
    • FanoutExchange fanout交换器的定义
    • HeadersExchange headers交换器的定义
    • TopicExchange topic交换器的定义
    • CustomExchange 自定义交换器的定义
    • ExchangeBuilder 交换器的builder构造器
  • bind(绑定)
    • Binding 绑定的定义
    • BindingBuilder 绑定的builder构造器
  • message(消息)
    • Message 消息的定义
    • MessageBuilder 消息的builder构造器
  • queue(队列)
    • Queue 队列的定义
    • QueueBuilder 队列构造器
  • template(模板)
    • AmqpTemplate amqp协议消息同步发送/接收等操作模板接口
    • AmqpAdmin amqp协议的绑定、交换器、队列等申明/删除等操作的接口
    • AsyncAmqpTemplate amqp协议消息异步发送/接收等操作模板接口

org.springframework.amqp:spring-rabbit主要提供了注解,方便我们基于注解使用,如下:

  • @EnableRabbit 用于配置rabbitmq信息时使用
  • @Exchange 交换器定义
  • @Queue 队列定义
  • @QueueBinding 队列绑定定义
  • @RabbitHandler rabbitmq默认消息处理器
  • @RabbitListeners、@RabbitListeners rabbitmq消息监听定义

同时还实现了AmqpTemplatej接口的BatchingRabbitTemplate、RabbitTemplate,实现了RabbitAdmin接口的RabbitAdmin,实现了AsyncAmqpTemplate的AsyncRabbitTemplate。

springboot中rabbitmq的使用

创建一个springboot项目,引入“spring-boot-starter-amqp”依赖,在application.yml文件中添加rabbitmq信息,如下所示:

1
2
3
4
5
6
7
spring:
rabbitmq:
virtual-host: sbac-rabbitmq
host: 127.0.0.1
port: 5672
username: admin
password: admin

这里没有用默认的“/”vhost,而使用了新创建的vhost “sbac-rabbitmq”,并设置账号admin对该vhost的权限信息,如下:

1
2
3
sudo rabbitmqctl add_vhost sbac-rabbitmq
sudo rabbitmqctl list_vhosts
sudo rabbitmqctl set_permissions -p sbac-rabbitmq admin ".*" ".*" ".*"

下面将来看一下springboot项目中,rabbitmq的,direct、fanout、topic、headers等四种交换器模式的使用。在此之前,先定义一个junit测试类,用于测试我们的消息发送,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.lazycece.sbac.rabbitmq.producer;

import com.lazycece.sbac.rabbitmq.entity.Message;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import javax.annotation.Resource;
import java.util.UUID;

/**
* @author lazycece
* @date 2019/04/04
*/
@SpringBootTest
@RunWith(SpringRunner.class)
public class RabbitMqProducer {

@Resource
private RabbitTemplate rabbitTemplate;
private Message<String> message;

@Before
public void buildMessage() {
message = new Message<>();
message.setId(UUID.randomUUID().toString());
message.setContent("Hello, springboot-ac-rabbitmq !");
}

/**
* 测试方法暂时省略
*/
}

这里在测试类中只先给出了消息的定义,而至于几种模式的测试方法,将在介绍交换器模式使用的时候详细说明。

Meesage类的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.lazycece.sbac.rabbitmq.entity;

import lombok.Data;

import java.io.Serializable;

/**
* @author lazycece
* @date 2019/04/04
*/
@Data
public class Message<T> implements Serializable {
private String id;
private T content;
}

direct模式

direct为直连模式,通过direct交换器下发的消息是严格发送到按照指定路由键绑定的队列上。

下面是direct交换器模式的配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.lazycece.sbac.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @author lazycece
* @date 2019/04/04
*/
@Configuration
public class DirectConfig {

@Bean
public Queue directQueue() {
return QueueBuilder.durable("direct.queue").build();
}

@Bean
public DirectExchange directExchange() {
return (DirectExchange) ExchangeBuilder.directExchange("direct.exchange").build();
}

@Bean
public Binding directBinding() {
return BindingBuilder.bind(directQueue()).to(directExchange()).withQueueName();
}
}

如交换器的绑定设置所示,这里设置队列名为绑定路由键。下面给队列监听代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.lazycece.sbac.rabbitmq.consumer;

import com.lazycece.sbac.rabbitmq.entity.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* @author lazycece
* @date 2019/04/04
*/
@Component
@Slf4j
public class DirectConsumer {

@RabbitListener(queues = {"direct.queue"})
public void directQueueConsumer(Message message) {
log.info("direct.queue -> {} ", message.toString());
}
}

现在来测试一下direct交换器模式的消息发送,测试方法代码如下:

1
2
3
4
@Test
public void directProducer() {
rabbitTemplate.convertAndSend("direct.exchange", "direct.queue", message);
}

此处向命名“direct.exchange”的交换器中发送消息,并设置路由键为“direct.queue”。方法运行之后,便可以看见队列“direct.queue”监听到了消息,并打印出了如下信息:

1
direct.queue -> Message(id=25b23bac-6c89-4350-8743-bbb274da89e4, content=Hello, springboot-ac-rabbitmq !)

如果发送消息的时候换一个路由键,比如“direct.queue.one”,那么就不会收到消息。

fanout模式

fanout交换器为分发交换器,或者叫广播模式更为合适,因为,其不会根据路由键去区分消息到底该下发到哪儿一个队列,绑定在该交换器上的队列都会收到下发致fanout交换器的消息。

fanout交换器的配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.lazycece.sbac.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @author lazycece
* @date 2019/04/04
*/
@Configuration
public class FanoutConfig {

@Bean
public Queue fanoutQueueOne() {
return QueueBuilder.durable("fanout.queue.one").build();
}

@Bean
public Queue fanoutQueueTwo() {
return QueueBuilder.durable("fanout.queue.two").build();
}

@Bean
public Queue fanoutQueueThree() {
return QueueBuilder.durable("fanout.queue.three").build();
}

@Bean
public FanoutExchange fanoutExchange() {
return (FanoutExchange) ExchangeBuilder.fanoutExchange("fanout.exchange").build();
}

@Bean
public Binding fanoutBindingOne() {
return BindingBuilder.bind(fanoutQueueOne()).to(fanoutExchange());
}

@Bean
public Binding fanoutBindingTwo() {
return BindingBuilder.bind(fanoutQueueTwo()).to(fanoutExchange());
}

@Bean
public Binding fanoutBindingThree() {
return BindingBuilder.bind(fanoutQueueThree()).to(fanoutExchange());
}
}

这里为此fanout交换器取名为“fanout.exchange”,并为其绑定了三个队列。下面给出三个队列的监听:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.lazycece.sbac.rabbitmq.consumer;

import com.lazycece.sbac.rabbitmq.entity.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* @author lazycece
* @date 2019/04/04
*/
@Component
@Slf4j
public class FanoutConsumer {

@RabbitListener(queues = {"fanout.queue.one"})
public void fanoutQueueOneConsumer(Message message) {
log.info("fanout.queue.one -> {} ", message.toString());
}

@RabbitListener(queues = {"fanout.queue.two"})
public void fanoutQueueTwoConsumer(Message message) {
log.info("fanout.queue.two-> {} ", message.toString());
}

@RabbitListener(queues = {"fanout.queue.three"})
public void fanoutQueueThreeConsumer(Message message) {
log.info("fanout.queue.three -> {} ", message.toString());
}
}

现在来测试一下fanout交换器模式的消息发送,测试方法代码如下:

1
2
3
4
@Test
public void fanoutProducer() {
rabbitTemplate.convertAndSend("fanout.exchange", "", message);
}

此处向命名“fanout.exchange”的交换器中发送消息,方法运行之后,便可以看见其三个监听队列均收到了消息,并打印出了如下信息:

1
2
3
fanout.queue.three -> Message(id=66c08bda-654a-4c16-a66b-8537a96ff2ee, content=Hello, springboot-ac-rabbitmq !) 
fanout.queue.one -> Message(id=66c08bda-654a-4c16-a66b-8537a96ff2ee, content=Hello, springboot-ac-rabbitmq !)
fanout.queue.two-> Message(id=66c08bda-654a-4c16-a66b-8537a96ff2ee, content=Hello, springboot-ac-rabbitmq !)

topic模式

topic交换器是一个灵活的交换器,其可以根据路由键的规则,灵活的将消息发送到想要发送的队列中去。

topic交换器的配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.lazycece.sbac.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @author lazycece
* @date 2019/04/04
*/
@Configuration
public class TopicConfig {

@Bean
public Queue topicQueueOne() {
return QueueBuilder.durable("topic.queue.one").build();
}

@Bean
public Queue topicQueueTwo() {
return QueueBuilder.durable("topic.queue.two").build();
}

@Bean
public TopicExchange topicExchange() {
return (TopicExchange) ExchangeBuilder.topicExchange("topic.exchange").build();
}

@Bean
public Binding topicBindingOne() {
return BindingBuilder.bind(topicQueueOne()).to(topicExchange()).with("routing-key");
}

@Bean
public Binding topicBindingTwo() {
return BindingBuilder.bind(topicQueueTwo()).to(topicExchange()).with("#");
}
}

这里为此topic交换器取名为“topic.exchange”,并为其绑定了两个队列,一个路由规则为“routing-key”,另一个为“#”。下面给出两个队列的监听:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.lazycece.sbac.rabbitmq.consumer;

import com.lazycece.sbac.rabbitmq.entity.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* @author lazycece
* @date 2019/04/04
*/
@Component
@Slf4j
public class TopicConsumer {

@RabbitListener(queues = {"topic.queue.one"})
public void topicQueueOneConsumer(Message message) {
log.info("topic.queue.one -> {} ", message.toString());
}

@RabbitListener(queues = {"topic.queue.two"})
public void topicQueueTwoConsumer(Message message) {
log.info("topic.queue.two -> {} ", message.toString());
}
}

现在来测试一下topic交换器模式的消息发送,测试方法代码如下:

1
2
3
4
@Test
public void topicProducer() {
rabbitTemplate.convertAndSend("topic.exchange", "routing-key", message);
}

此处向命名“topic.exchange”的交换器中发送消息,并指定路由规则为“routing-key”,方法运行之后,便可以看见其了两个监听队列均收到了消息,并打印出了如下信息:

1
2
topic.queue.two -> Message(id=dc79c754-2ea5-4941-b8e0-c41511d7b328, content=Hello, springboot-ac-rabbitmq !) 
topic.queue.one -> Message(id=dc79c754-2ea5-4941-b8e0-c41511d7b328, content=Hello, springboot-ac-rabbitmq !)

为什么呢?因为“#”为通配符,可以匹配任意路由键。如果在发送消息的时候换路由规则为“routing”时,就会发现只有“topic.queue.two”队列收到消息了,因为“routing-key”无法和“routing”匹配,而“#”可以。

headers模式

headers交换器亦可以说是灵活的交换器,因为其是根据消息的headers中的信息来判断是否分发消息致某一个队列的。

headers交换器的配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.lazycece.sbac.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @author lazycece
* @date 2019/04/04
*/
@Configuration
public class HeadersConfig {

@Bean
public Queue headersQueue() {
return QueueBuilder.durable("headers.queue").build();
}

@Bean
public HeadersExchange headersExchange() {
return (HeadersExchange) ExchangeBuilder.headersExchange("headers.exchange").build();
}

@Bean
public Binding headersBinding() {
return BindingBuilder.bind(headersQueue()).to(headersExchange()).where("headers-key").exists();
}
}

这里为此headers交换器取名为“headers.exchange”,并为其绑定了一个队列“headers.queue”,然后设置只要headers中存在一个key名为“headers-key”时,便可以被分发消息。下面给出队列的监听:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.lazycece.sbac.rabbitmq.consumer;

import com.lazycece.sbac.rabbitmq.entity.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* @author lazycece
* @date 2019/04/04
*/
@Component
@Slf4j
public class HeadersConsumer {

@RabbitListener(queues = {"headers.queue"})
public void headersQueueConsumer(Message message) {
log.info("headers.queue.one -> {} ", message.toString());
}
}

现在来测试一下headers交换器模式的消息发送,测试方法代码如下:

1
2
3
4
5
6
7
8
@Test
public void headersProducer() {
rabbitTemplate.convertAndSend("headers.exchange", "", message,
m -> {
m.getMessageProperties().getHeaders().put("headers-key", null);
return m;
});
}

此处向命名“headers.exchange”的交换器中发送消息,并为消息的headers中加入“headers-key”键,方法运行之后,便可以看见其监听队列收到了消息,并打印出了如下信息:

1
headers.queue.one -> Message(id=02d3d652-6fcc-4d3c-a18e-7fe90abe066e, content=Hello, springboot-ac-rabbitmq !)

注解使用

上面都是先给出rabbitmq的exchange、queue、binding相关的配置信息,再来监听消息,这里来看一下直接使用注解@RabbitListener来绑定和简单的用法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.lazycece.sbac.rabbitmq.consumer;

import com.lazycece.sbac.rabbitmq.entity.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* @author lazycece
* @date 2019/04/04
*/
@Component
@Slf4j
public class AnnotationConsumer {

@RabbitListener(
bindings = {
@QueueBinding(
value = @Queue(name = "topic.queue.annotation"),
exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC),
key = {"routing"}
)
}
)
public void topicQueueOneConsumer(Message message) {
log.info("topic.queue.annotation -> {} ", message.toString());
}

}

这里仍然是创建topic类型的交换器“topic.exchange”,然后创建队列“topic.queue.annotation”,再进行绑定路由键“routing”。这里用上文提到的topic模式的测试方法,毫无意外,“topic.queue.two”和“topic.queue.annotation”收到了消息,而“topic.queue.one”没有收到消息。

1
2
topic.queue.two -> Message(id=7fe62e14-8a78-43bf-a18a-422880e30c99, content=Hello, springboot-ac-rabbitmq !) 
topic.queue.annotation -> Message(id=7fe62e14-8a78-43bf-a18a-422880e30c99, content=Hello, springboot-ac-rabbitmq !)

案例源码

案例源码地址:https://github.com/lazycece/springboot-actual-combat/tree/master/springboot-ac-rabbitmq

参考文档