RabbitMQ
一 应用
文档地址https://www.rabbitmq.com/networking.html
- 异步处理
- 应用解耦
- 流量控制(流量削峰)
二 概述
大多应用中,可通过消息服务中间件来提升系统异步通信、扩展解耦能力
消息服务中两个重要概念
消息代理(message broker)和目的地(destination)
消息队列主要有两种形式的目的地
队列(queue): 点对点消息 通信
主题(topic):发布(publish)/订阅(subscribe)消息通信
点对点方式:
消息发送者发送消息,消息代理将其放入一个队列中,消息接收者从队列中获取消息内容,消息读取后被移出队列 。
消息只有唯一的发送者和接受者,但并不是说只能有一个接收者
发布订阅式
发送者(发布者)发送消息到主题,多个接收者(订阅者)监听(订阅)这个主题,那么就会在消息到达时同时收到消息
JMS(Java Message Service)JAVA消息服务
基于JVM消息代理的规范。ActiveMQ、HornetMQ是JMS实现
AMQP(Advanced Message Queuing Protocol)
高级消息队列协议,也是一个消息代理的规范,兼容JMS
RabbitMQ是AMQP的实现
Spring支持
- spring-jms提供了对JMS的支持
- pring-rabbit提供了对AMQP的支持
- 需要ConnectionFactory的实现来连接消息代理
- 提供JmsTemplate、RabbitTemplate来发送消息
- @JmsListener(JMS)、@RabbitListener(AMQP)注解在方法上监听消息
- 代理发布的消息
- @EnableJms、@EnableRabbit开启支持
SringBoot支持
- JmsAutoConfiguration
- RabbitAutoConfiguration
市面上的MQ产品
ActiveMQ RabbitMQ RocketMQ Kafka
三 RabbitMQ概念
1. RabbitMQ简介
RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue Protocol)的开源实现
2. 核心概念
1. Message
消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成, 这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
2.Publisher
消息的生产者,也是一个向交换器发布消息的客户端应用程序
3.Exchange
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。 Exchange有4种类型:direct(默认),fanout, topic, 和headers,不同类型的Exchange转发消息的策略有所区别
4.Queue
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
5. Binding
绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
Exchange 和Queue的绑定可以是多对多的关系。
6.Connection
网络连接,比如一个TCP连接。
7.Channel
信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接
8. Consumer
消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
9.Virtual Host
虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 /
10.Broker
表示消息队列服务器实体
3. RbbitMQ流程图
四 RabbitMQ安装
1. docker安转rabbitmq
执行命令
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
如果没有rabbitmq:management镜像,则会自动下载
2. 绑定端口描述
- 4369, 25672 (Erlang发现&集群端口)
- 5672, 5671 (AMQP端口)
- 15672 (web管理后台端口)
- 61613, 61614 (STOMP协议端口)
- 1883, 8883 (MQTT协议端口)
3. 将rabbitmq启动设置为随docker启动而启动
docker update rabbitmq --restart=always
五 RabbitMQ几种连接模型
官网https://www.rabbitmq.com/getstarted.html
1. 直连(hello word)
2. 工作队列(Work queues)
3. 发布/订阅(Publish/Subscribe)
也可叫做fanout 扇出,一种广播模型
4. 路由(Routing)
5. 主题(Topics)
使用最多,相当于路由的升级版。
官网描述https://www.rabbitmq.com/tutorials/tutorial-five-java.html
发送到主题交换的消息不能有任意的 routing_key - 它必须是单词列表,由点分隔。这些词可以是任何东西,但通常它们指定与消息相关的一些特征。一些有效的路由键示例:“ stock.usd.nyse ”、“ nyse.vmw ”、“ quick.orange.rabbit ”。路由键中可以有任意多的单词,最多为 255 个字节。
绑定键也必须采用相同的格式。主题交换背后的逻辑 类似于直接交换- 使用特定路由键发送的消息将被传递到与匹配绑定键绑定的所有队列。但是,绑定键有两个重要的特殊情况
- *(星号)可以只替换一个单词
- #(hash)可以代替零个或多个单词
下图示例:
图示解释:
队列Q1绑定了.orange. ,只要路由键满足了xx.orange.xx,或xx.xx.orange.xx.xx 都能匹配到Q1队列
队列Q2绑定了..rabbit 只要满足了xx.xx.rabbit 或 xx.xx.xx.rabbit 都能匹配到该队列
lazy.# 只要满足了lazy. ,lazy.xx ,lazy.xx.xx... 都能匹配该队列
6. RPC
7. Publisher Confirms
只用publisher确认消息,可靠发布
六 springboot整合使用rabbitMQ
1. 引入依赖
不用写版本号,springboot有对应的版本
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2. 配置基础连接
其他配置连接参考RabbitAutoConfiguration中的RabbitProperties
#rabbitmq地址
spring.rabbitmq.host=192.168.101.164
#端口
spring.rabbitmq.port=5672
#虚拟主机配置
spring.rabbitmq.virtual-host=/
#账号密码配置
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
3. 开启rabbitmq功能
//启动类处添加@EnableRabbit springboot整合的各类框架,都使用enableXXX开启对应的功能
@EnableRabbit
4. AmqpAdmin使用(交换机队列的操作)
引入amqpAdmin
@Resource
private AmqpAdmin amqpAdmin;
1. 创建交换机
交换机一共有五种类型
- DirectExchange 直连交换机
- FanoutExchange 扇出(广播)交换机
- CustomExchange 自定义交换机
- TopicExchange 主题交换机
- HeadersExchange 头交换机
@Test
public void creatExchange(){
//全参DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)
//name=交换机名称 durable=是否持久化(重启时该交换机不会被删除) autoDelete=没有连接的时候自动删除 arguments=其他参数 Alternate exchange
DirectExchange directExchange = new DirectExchange("my-direct-exchange",true,false);
amqpAdmin.declareExchange(directExchange);
}
2. 创建队列
@Test
public void creatQueue(){
//全参Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
//name=队列名称 durable=是否持久化(重启时该队列不会被删除) exclusive=是否声明独占队列,如果为true,则只能由申报人连接
//autoDelete=没有连接的时候自动删除 arguments=其他参数
Queue queue = new Queue("my-queue",true,false,false);
amqpAdmin.declareQueue(queue);
}
3. 创建交换机和队列绑定
@Test
public void creatBinding(){
//全参Binding(String destination, DestinationType destinationType, String exchange, String routingKey,Map<String, Object> arguments)
//destination=目标(被绑定的是交换机还是队列) DestinationType=目标类型 exchange=交换机 routingKey=路由键 arguments=其他参数
Binding binding = new Binding("my-queue", Binding.DestinationType.QUEUE,"my-direct-exchange","test.connect",null);
amqpAdmin.declareBinding(binding);
}
4. 发送消息
@Test
public void sendMessage(){
//convertAndSend(String exchange, String routingKey, Object message)
//exchange=交换机名称 routingKey=路由键 message=消息内容
OmsOrderEntity omsOrderEntity = new OmsOrderEntity();
omsOrderEntity.setId(1L);
omsOrderEntity.setAutoConfirmDay(11);
omsOrderEntity.setBillReceiverPhone("18380453");
omsOrderEntity.setBillContent("大哥你好");
rabbitTemplate.convertAndSend("my-direct-exchange","test.connect",omsOrderEntity);
}
//消息可以是对象,也可以是字符串,如果是对象,必须实现序列化方法
5. 修改rabbitTemplate发送消息时的序列化方式
从自动配置代码(RabbitAutoConfiguration.class)中可以看到,rabbitTemplate的消息转换器是从容器中获取的,如果容器中不存在,则使用默认的消息转换器
public RabbitTemplateConfiguration(RabbitProperties properties,
ObjectProvider<MessageConverter> messageConverter,
ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers) {
this.properties = properties;
//从容器中获取消息转换器
this.messageConverter = messageConverter;
this.retryTemplateCustomizers = retryTemplateCustomizers;
}
@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnMissingBean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
PropertyMapper map = PropertyMapper.get();
RabbitTemplate template = new RabbitTemplate(connectionFactory);
MessageConverter messageConverter = this.messageConverter.getIfUnique();
//如果消息转换器不为空,则使用该消息转换器。
if (messageConverter != null) {
template.setMessageConverter(messageConverter);
}
template.setMandatory(determineMandatoryFlag());
RabbitProperties.Template properties = this.properties.getTemplate();
if (properties.getRetry().isEnabled()) {
template.setRetryTemplate(new RetryTemplateFactory(
this.retryTemplateCustomizers.orderedStream().collect(Collectors.toList())).createRetryTemplate(
properties.getRetry(), RabbitRetryTemplateCustomizer.Target.SENDER));
}
map.from(properties::getReceiveTimeout).whenNonNull().as(Duration::toMillis)
.to(template::setReceiveTimeout);
map.from(properties::getReplyTimeout).whenNonNull().as(Duration::toMillis).to(template::setReplyTimeout);
map.from(properties::getExchange).to(template::setExchange);
map.from(properties::getRoutingKey).to(template::setRoutingKey);
map.from(properties::getDefaultReceiveQueue).whenNonNull().to(template::setDefaultReceiveQueue);
return template;
}
private MessageConverter messageConverter = new SimpleMessageConverter();//来自RabbitTemplate.class 默认的消息转换器
因此,只需要在容器中配置一个需要的消息转换器即可
6. 接收消息
1. rabbitListener和rabbithander描述
- @RabbitListener 可以写在类和方法上
- @RabbitHandler 标识在方法上
2. RabbitListener 参数绑定
queues是一个string数组,表示可以监听多个指定的消息队列
3. 消息消费
- 当存在多个监听客户端时,多个客户端同时消费消息,但是每一条消息只能被消费一次
- 一个客户端只有在消息消费完成后,才能接收下一条消息
- 消息接收参数,可以直接传发送消息的类型(方法重载)
4. @RabbitListener写在方法上
@RabbitListener(queues = {"my-queue"})
public void receiveMessage(Message message){
//消息类型是class org.springframework.amqp.core.Message
System.out.println("消息类型=》"+message.getClass());
}
5. @RabbitListener写在类上,配合@RabbitHandler 使用
//通过方法重载,获取不同的消息数据
@RabbitHandler
public void receiveMsg(OmsOrderEntity message){
System.out.println("获取到的消息类型="+message.getClass());
}
@RabbitHandler
public void receiveMsg(OmsOrderItemEntity message){
System.out.println("获取到的消息类型="+message.getClass());
}
7. 消息确认机制-可靠抵达
官网描述https://www.rabbitmq.com/reliability.html
基础:使用消息代理(如 RabbitMQ)的系统按照分布式定义的。由于发送的协议方法(消息)不能保证到达对等方或被对等方成功处理,因此发布者和消费者都需要一种用于传递和处理确认的机制
1. 发布者消息确认
文档https://www.rabbitmq.com/publishers.html
避免在消息投递中,因为一些网络波动,客户端宕机,路由键不存等情况导致消息投递失败,从而丢失消息。在发布者端进行消息投递成功确认,投递失败采取对应的处理措施。
Streaming Confirms介绍
大多数客户端库通常为开发人员提供一种方法来处理来自服务器的单个确认。确认将异步到达。由于在 AMQP 0-9-1 中发布本质上也是异步的,因此此选项允许以非常少的开销进行安全发布。该算法通常与此类似:
在频道上启用发布者确认
对于每个发布的消息,添加一个映射条目,将当前序列号映射到消息
当一个肯定的 ack 到达时,删除该条目
当否定 ack 到达时,删除条目并安排其消息以重新发布(或其他合适的东西)
在 RabbitMQ Java 客户端中,确认处理程序通过ConfirmCallback和ConfirmListener接口公开。必须将一个或多个侦听器添加到通道。
2. 使用
配置文件开启发布端消息确认
#将发布端消息确认改为true
spring.rabbitmq.publisher-confirms=true
#启动强制消息
spring.rabbitmq.template.mandatory=true
配置类中重写RabbitTemplate的ConfirmCallback(发布者到服务器回调)和ReturnCallback(交换机到路由器回调)两个方法
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
@Configuration
public class MyMessageConverter {
@Resource
RabbitTemplate rabbitTemplate;
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
//完成依赖注入后执行初始化方法。
@PostConstruct
public void publisherConfirmInit(){
//消息由发布者传递给服务器的确认回调
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* correlationData:关联消息的唯一id
* ack:应答,true为成功,false为失败
* cause:消息失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("关联消息id="+correlationData+"--是否成功="+ack+"错误原因="+cause);
}
});
//消息由交换机到队列的回调,发生错误才会回调
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
* message:发送的消息
* replyCode:回复码
* replyText:回复内容
* exchange:交换机
* routingKey:路由键
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
//如果出错,可以进行相应的处理
System.out.println("消息体="+message+"回复码="+replyCode+"回复内容="+replyText+"交换机="+exchange+"路由键="+routingKey);
}
});
}
}
3. 消费端消费消息确认
文档https://www.rabbitmq.com/consumers.html
默认情况下,消费端的消息是自动回复的,即消费端在接收到消息后,不管有没有处理成功,都会自动回复成功,然后服务端就会删除消息。
弊端:当消费端在处理过程中业务出错,或者客户端宕机,就会导致消息丢失,所以必须等待消息成功消费,采用手动回复确认消息
配置文件修改为手动确认
#手动ack确认消息。一定要使用simple spring.rabbitmq.listener.simple.acknowledge-mode=manual #如果使用direct,需要指定监听器的类型(一定注意) spring.rabbitmq.listener.direct.acknowledge-mode=manual spring.rabbitmq.listener.type=direct
使用channel.basicAck进行消息确认
@RabbitHandler public void receiveMsg(Message message, OmsOrderEntity content, Channel channel) throws IOException { long deliveryTag = message.getMessageProperties().getDeliveryTag(); /** * 确认收货(确认消息已经消费) * void basicAck(long deliveryTag, boolean multiple) * deliveryTag=交货标签 * multiple=是否批量确认,为true,则会确认之前的所有消息,不可取。应该是单条消息单条确认 */ try { channel.basicAck(deliveryTag, false); System.out.println("消费了消息===" + deliveryTag+"--"+content); } catch (Exception e) { channel.basicNack(deliveryTag,false,true); log.error("出错了", e); } }
当处理消息的过程中出现错误时,使用channel.basicNack 或者basicReject拒收消息
deliveryTag=交货标签 multiple=是否批量拒绝,为true的话会拒绝所有的消息。一般只能为false requeue=是否重新将消息放回队列 basicNack(long deliveryTag消息 , boolean multiple, boolean requeue) basicReject(long deliveryTag, boolean requeue)
- 本文标签: Java RabbitMQ
- 本文链接: https://www.tianyajuanke.top/article/31
- 版权声明: 本文由吴沛芙原创发布,转载请遵循《署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0)》许可协议授权