长大后想做什么?做回小孩!

0%

SpringBoot与RabbitMQ集成

学习SpringBoot整合各种框架、中间件都要从自动配置开始看起,整合RabbitMQ也不例外。

打开RabbitAutoConfiguration,发现配置类中给容器注册了一个ConnectionFactory

KKl176.png

该类需要传入RabbitProperties类对象,RabbitProperties类中封装了中间件的配置。

配置类还会给容器注册一个RabbitTemplate和一个AmqpAdminRabbitTemplate就是用来给RabbitMQ发送和接收消息的AmqpAdmin就是RabbitMQ系统管理功能组件主要用来创建队列、交换器。。。

RabbitTemplate有两个常用的方法:

1
2
3
4
5
Message需要自己构造一个;可以定义Message消息体内容和消息头
rabbitTemplate.send(exchange,routingkey,message);

不用定义消息头信息,只需要传入要发送的数据对象,object默认当成消息体,自动序列化发给RabbitMQ
rabbitTemplate.convertAndSend(exchange,routingkey,object);

手动去RabbitMQ创建好exchange并绑定好queue,写个测试:

1
2
3
4
5
6
7
8
@Test
// 点对点单播测试
void demo1(){
Map<String,Object> map =new HashMap<>();
map.put("msg","这是第一个消息");
map.put("data", Arrays.asList("这是data的数据",123,true));
rabbitTemplate.convertAndSend("exchange.direct","nynu.news",map);
}

成功发送消息到nynu.news:KKUQuF.png

再测试一下接收消息:

1
2
3
4
5
6
7
8
9
10
@Test
void receiveDemo(){
//rabbitTemplate提供了返回值为Message对象、返回值为object对象等等的接收方法
Object o = rabbitTemplate.receiveAndConvert("nynu.news");//获取消息之后,队列中相应的消息也被移出
System.out.println(o.getClass());
System.out.println(o);
}
//运行结果:
//class java.util.HashMap
//{msg=这是第一个消息, data=[这是data的数据, 123, true]}

发送消息时,可以不使用默认的java序列化策略,而是使用json格式的序列化策略来处理消息:

在RabbitTemplate中有一个消息转换器,默认值SimpleMessageConverter使用的是java的序列化策略:

1
private MessageConverter messageConverter = new SimpleMessageConverter();

如果希望使用json格式的序列化策略,可以自定义配置类,给容器注册一个我们需要的MessageConverter,MessageConverter接口的众多实现:

KKrFxI.png

选择使用Jackson2JsonMessageConverter实现:

1
2
3
4
5
6
7
@Configuration
public class AMQPConfig {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}

自定义配置类如果有设置好的MessageConverter,RabbitAutoConfiguration会自动将其配置进来。完成上述操作就可以在发送消息时序列化为json格式了。

获取消息时,反序列化转换的java对象必须包含无参构造方法,不然会抛MessageConversionException。receiveAndConvert()方法返回值虽然是Object类型对象,但是该对象用getClass()方法获取其类型时返回结果为定义的java对象,所以可以放心的将返回的Object对象强转为定义的的java对象。

上面只演示了单播,广播和可选择播报和上述方式一样,区别只是参数中Exchange是什么类型的。


在SpringBoot开发中可以使用注解的方式来使用RabbitMQ:

在service方法上加上@RabbitListener注解,此注解需要一个数组属性queue来指定方法监听的队列,然后还需要在启动类上@EnableRabbit启动对注解式RabbitMQ的支持:

1
2
3
4
@RabbitListener(queues = "nynu.news")
public void receive(Shoes shoes){
System.out.println("有鞋子订单"+shoes);
}

完成上述操作启动SpringBoot应用后,执行一个向nynu.news队列发送Shoes类型消息的方法,SpringBoot应用的控制台即可打印出数据,说明service方法执行。但是,上述方法只要指定的队列有消息,就会被获取,如果队列中消息的数据类型和接收方法需要获取的类型不一致,就会导致获取到的数据和预期有出入。

如果需要接收到消息的头信息,只需要使用Message类型对象来接受消息:

1
2
3
4
5
@RabbitListener(queues = "nynu.news")
public void msgAndHeader(Message msg){
System.out.println(msg.getBody());
System.out.println(msg.getMessageProperties());
}

有一个问题,上述所有操作都是建立在交换器和队列都创建并绑定好的情况下进行的,那么怎样在SpringBoot应用中创建和绑定这些组件呢?

可以用AmqpAdmin系统管理组件完成Exchange、Queue、Binding的创建和删除,AmqpAdmin会由配置类自动注册到容器中去,AmqpAdmin提供了很多declearXXX()、deleteXXX()、removeXXX()等等方法用来创建或者删除组件。

1
2
3
4
5
6
7
8
9
10
11
12
@Test
void creat(){
//Exchange接口有五个实现类FanoutExchange、DirectExchange、TopicExchange、HeadersExchange、CustomerEXchange
//创建Exchange实现类对象时,可以指定名字、是否持久化、是否自动删除、参数头arguments
amqpAdmin.declareExchange(new DirectExchange("amqpAdmin.directExchange"));

//Queue创建时可以指定:String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
amqpAdmin.declareQueue(new Queue("amqpAdmin.queue"));

//Binding创建时必须指定:String destination, Binding.DestinationType destinationType, String exchange, String routingKey, Map<String, Object> arguments
amqpAdmin.declareBinding(new Binding("amqpAdmin.queue",Binding.DestinationType.QUEUE,"amqpAdmin.directExchange","amqpAdmin.q",null));
}

删除组件方法指定被删除组件的名字即可,不再赘述。