Spring封装RabbitMQ的时候,它做了什么事情?
- 管理对象(队列,交换机,绑定)
- 封装方法(发送消息,接收消息)
Spring AMQP 是对 Spring基于 AMQP 的消息收发解决方案,它是一个抽象层,不依赖于特定的AMQP Broker 实现和客户端的抽象,所以可以很方便地替换。比如我们可以使用spring-rabbit来实现。
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.3.5.RELEASE</version>
</dependency>
包括3个jar包:Amqp-client-3.3.4.jar,Spring-amqp.jar,Spring.rabbit.jar
下面我们就来看看 Spring AMQP 的核心组件有哪些…
1.ConnectionFactory(管理连接)
Spring AMQP 的连接工厂接口,用于创建连接。
CachingConnectionFactory 是ConnectionFactory 的一个实现类。
2.RabbitAdmin(管理交换机、队列等)
RabbitAdmin是AmqpAdmin的实现,封装了对RabbitMQ的基础管理操作,比如对交换机、队列、绑定的声明和删除等。
public static void main(String[] args) {
// 在IOC容器中拿到RabbitAdmin的bean实例
AnnotationConfigApplicationContext context = new
AnnotationConfigApplicationContext(AdminTest.class);
RabbitAdmin rabbitAdmin = context.getBean(RabbitAdmin.class);
// 声明一个交换机
rabbitAdmin.declareExchange(new DirectExchange("GP_ADMIN_EXCHANGE", false, false));
// 声明一个队列
rabbitAdmin.declareQueue(new Queue("GP_ADMIN_QUEUE", false, false, false));
// 声明一个绑定
rabbitAdmin.declareBinding( new Binding("GP_ADMIN_QUEUE", Binding.DestinationType.QUEUE,
"GP_ADMIN_EXCHANGE", "admin", null));
}
为什么我们在配置文件(Spring)或者配置类(SpringBoot)里面定义了交换机、队列、绑定关系,并没有直接调用Channel的declare的方法,Spring在启动的时候就可以帮我们创建这些元数据?这些事情就是由RabbitAdmin完成的。
RabbitAdmin 实现了 InitializingBean 接口 , 里面有唯一的一 个方法afterPropertiesSet(),这个方法会在 RabbitAdmin的属性值设置完的时候被调用。在 afterPropertiesSet ()方法中,调用了一个 initialize()方法。这里面创建了三个Collection,用来盛放交换机、队列、绑定关系。
最后依次声明返回类型为 Exchange、Queue 和 Binding 这些 Bean,底层还是调用了Channel的declare的方法。
declareExchanges(channel,exchanges.toArray(new Exchange[exchanges.size()]));
declareQueues(channel,queues.toArray(new Queue[queues.size()]));
declareBindings(channel,bindings.toArray(new Binding[bindings.size()]));
3.Message(消息体)
org.springframework.amqp.core.Message是Spring AMQP对消息的封装,有两个重要的属性:
- body:消息内容。
- messageProperties:消息属性。
4.MessageConvertor(消息转换器)
MessageConvertor的作用?
- RabbitMQ的消息在网络传输中需要转换成byte[](字节数组)进行发送,消费者需要对字节数组进行解析
- 在SpringAMQP中,消息会被封装为org.springframework.amqp.core.Message对象。消息的序列化和反序列化,就是处理Message的消息体body对象
- 如果消息已经是byte[]格式,就不需要转换
- 如果是String,会转换成byte[]
- 如果是Java对象,会使用JDK序列化将对象转换为byte[](体积大,效率差)
在调用RabbitTemplate的converAndSend()方法发送消息时,会使用MessageConvertor进行消息的序列化,默认使用SimpleMessageConverter。
在某些情况下,我们需要选择其他的高效的序列化工具。如果我们不想在每次发送消息时自己处理消息,就可以直接定义一个MessageConvertor
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
return rabbitTemplate;
}
MessageConvertor如何工作?
调用了RabbitTemplate 的 convertAndSend() 方法时会使用对应的MessageConvertor进行消息的序列化和反序列化。
- 序列化:Object —— Json —— Message(body) —— byte[]
- 反序列化:byte[] ——Message —— Json —— Object
有哪些 MessageConvertor?
在Spring中提供了一个默认的转换器:SimpleMessageConverter。
Jackson2JsonMessageConverter(RbbitMQ 自带):将对象转换为 json,然后再转换成字节数组进行传递。
如何自定义 MessageConverter?
创建一个类,实现MessageConverter 接口,重写toMessage()和fromMessage()方法。
- toMessage(): Java对象转换为Message
- fromMessage(): Message对象转换为Java对象
5.RabbitTemplate(消息模板<-Producer)
RabbitTemplate是AmqpTemplate的一个实现(目前为止也是唯一的实现),用来简化消息的收发,支持消息的确认(Confirm)与返回(Return)。
跟JDBCTemplate一 样 , 它封装了创建连接,创建消息信道,收发消息,消息格式转(ConvertAndSend→Message)、关闭信道、关闭连接等等操作。在RabbitTemplate中,它主要封装了channel的相关操作,即原来JavaAPI需要在channel中设置的,现在都要在构建RabbitTemplate的Bean时设置
针对于多个服务器连接,可以定义多个Template。可以注入到任何需要收发消息的地方使用。
// 构建一个RabbitTemplate的Bean
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
// 当消息成功到达exchange,且无法被路由时触发回调,进行消息回发
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback(){
public void returnedMessage(Message message,
int replyCode,
String replyText,
String exchange,
String routingKey){
System.out.println("回发的消息:");
System.out.println("replyCode: "+replyCode);
System.out.println("replyText: "+replyText);
System.out.println("exchange: "+exchange);
System.out.println("routingKey: "+routingKey);
}
});
rabbitTemplate.setChannelTransacted(true);
// 当消息成功到达exchange的时候触发的ack回调
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
System.out.println("发送消息失败:" + cause);
throw new RuntimeException("发送异常:" + cause);
}
}
});
return rabbitTemplate;
}
6.MessageListener (消息侦听<-Consumer)
MessageListener 是SpringAMQP 异步消息投递的监听器接口,它只有一个方法onMessage,用于处理消息队列推送来的消息,作用类似于JavaAPI中的Consumer。
MessageListenerContainer
MessageListenerContainer 可以理解为MessageListener的容器,一个Container只有一个 Listener,但是可以生成多个线程使用相同的 MessageListener 同时消费消息。
Container可以管理Listener的生命周期,可以用于对于消费者进行配置。例如:动态添加移除队列、对消费者进行设置,例如ConsumerTag、Arguments、并发、消费者数量、消息确认模式等
@Bean // 构建一个MessageListenerContainer的Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new
SimpleMessageListenerContainer(connectionFactory);
container.setQueues(getSecondQueue(),getThirdQueue()); // 监听的队列
container.setConcurrentConsumers(1); // 最小消费者数
container.setMaxConcurrentConsumers(5); // 最大的消费者数量
container.setDefaultRequeueRejected(false); // 是否重回队列
container.setAcknowledgeMode(AcknowledgeMode.AUTO); // 签收模式
container.setExposeListenerChannel(true);
container.setConsumerTagStrategy(new ConsumerTagStrategy() { // 消费端的标签策略
public String createConsumerTag(String queue) {
return null;
}
});
return container;
}
MessageListenerContainerFactory
在SpringBoot2.0中新增了一个DirectMessageListenerContainer。Spring去整合IBM MQ、JMS、Kafka也是这么做的。
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory
connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.NONE);
factory.setAutoStartup(true);
return factory;
}
可以在消费者上指定,当我们需要监听多个RabbitMQ的服务器的时候,指定不同的MessageListenerContainerFactory。
@Component
@PropertySource("classpath:mymq.properties")
@RabbitListener(queues = "${com.mymq.firstqueue}",
containerFactory="rabbitListenerContainerFactory") // 指定监听类的Factory
public class FirstConsumer {
@RabbitHandler
public void process(@Payload Merchant merchant){
System.out.println("First Queue received msg : " + merchant.getName());
}
}
产生关系与继承关系:MessageListenerContainerFactory——MessageListenerContainer——MessageListener
==> 整合演示
public class ContainerSender {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new CachingConnectionFactory(
new URI("amqp://guest:guest@localhost:5672"));
// 1.MessageListenerContainerFactory
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// 2.MessageListenerContainer
SimpleMessageListenerContainer container = factory.createListenerContainer();
/**
* 注:不用工厂模式也可以创建
* SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
*/
container.setConcurrentConsumers(1);
container.setQueueNames("MY_BASIC_SECOND_QUEUE");
// 3.MessageListener
container.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
System.out.println("收到消息:"+message);
}
});
container.start();
// RabbitTemplate
AmqpTemplate template = new RabbitTemplate(connectionFactory);
template.convertAndSend("MY_BASIC_SECOND_QUEUE", "msg 1"); // 发送消息
template.convertAndSend("MY_BASIC_SECOND_QUEUE", "msg 2");
template.convertAndSend("MY_BASIC_SECOND_QUEUE", "msg 3");
}
}










还没有评论,来说两句吧...