原创

RabbitMQ

温馨提示:
本文最后更新于 2022年10月26日,已超过 919 天没有更新。若文章内的图片失效(无法正常加载),请留言反馈或直接联系我

一 应用

文档地址https://www.rabbitmq.com/networking.html

  1. 异步处理
  2. 应用解耦
  3. 流量控制(流量削峰)

二 概述

  1. 大多应用中,可通过消息服务中间件来提升系统异步通信、扩展解耦能力

  2. 消息服务中两个重要概念

    消息代理(message broker)和目的地(destination)

  3. 消息队列主要有两种形式的目的地

    队列(queue): 点对点消息 通信

    主题(topic):发布(publish)/订阅(subscribe)消息通信

  4. 点对点方式:

    消息发送者发送消息,消息代理将其放入一个队列中,消息接收者从队列中获取消息内容,消息读取后被移出队列 。

    消息只有唯一的发送者和接受者,但并不是说只能有一个接收者

  5. 发布订阅式

    发送者(发布者)发送消息到主题,多个接收者(订阅者)监听(订阅)这个主题,那么就会在消息到达时同时收到消息

  6. JMS(Java Message Service)JAVA消息服务

    基于JVM消息代理的规范。ActiveMQ、HornetMQ是JMS实现

  7. AMQP(Advanced Message Queuing Protocol)

    高级消息队列协议,也是一个消息代理的规范,兼容JMS

    RabbitMQ是AMQP的实现

image-20220518145510434

  1. Spring支持

    • spring-jms提供了对JMS的支持
    • pring-rabbit提供了对AMQP的支持
    • 需要ConnectionFactory的实现来连接消息代理
    • 提供JmsTemplate、RabbitTemplate来发送消息
    • @JmsListener(JMS)、@RabbitListener(AMQP)注解在方法上监听消息
    • 代理发布的消息
    • @EnableJms、@EnableRabbit开启支持
  2. SringBoot支持

    • JmsAutoConfiguration
    • RabbitAutoConfiguration
  3. 市面上的MQ产品

    ActiveMQ RabbitMQ RocketMQ Kafka

三 RabbitMQ概念

1. RabbitMQ简介

RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue Protocol)的开源实现

2. 核心概念

1. Message

消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成, 这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。

2.Publisher

消息的生产者,也是一个向交换器发布消息的客户端应用程序

3.Exchange

交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。 Exchange有4种类型:direct(默认),fanout, topic, 和headers,不同类型的Exchange转发消息的策略有所区别

4.Queue

消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

5. Binding

绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。

Exchange 和Queue的绑定可以是多对多的关系。

6.Connection

网络连接,比如一个TCP连接。

7.Channel

信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接

8. Consumer

消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

9.Virtual Host

虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 /

10.Broker

表示消息队列服务器实体

3. RbbitMQ流程图

image-20220518152401246

四 RabbitMQ安装

1. docker安转rabbitmq

执行命令

docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management

如果没有rabbitmq:management镜像,则会自动下载

2. 绑定端口描述

  • 4369, 25672 (Erlang发现&集群端口)
  • 5672, 5671 (AMQP端口)
  • 15672 (web管理后台端口)
  • 61613, 61614 (STOMP协议端口)
  • 1883, 8883 (MQTT协议端口)

3. 将rabbitmq启动设置为随docker启动而启动

docker update rabbitmq --restart=always

五 RabbitMQ几种连接模型

官网https://www.rabbitmq.com/getstarted.html

1. 直连(hello word)

2. 工作队列(Work queues)

3. 发布/订阅(Publish/Subscribe)

也可叫做fanout 扇出,一种广播模型

4. 路由(Routing)

5. 主题(Topics)

使用最多,相当于路由的升级版。

官网描述https://www.rabbitmq.com/tutorials/tutorial-five-java.html

发送到主题交换的消息不能有任意的 routing_key - 它必须是单词列表,由点分隔。这些词可以是任何东西,但通常它们指定与消息相关的一些特征。一些有效的路由键示例:“ stock.usd.nyse ”、“ nyse.vmw ”、“ quick.orange.rabbit ”。路由键中可以有任意多的单词,最多为 255 个字节。

绑定键也必须采用相同的格式。主题交换背后的逻辑 类似于直接交换- 使用特定路由键发送的消息将被传递到与匹配绑定键绑定的所有队列。但是,绑定键有两个重要的特殊情况

  • *(星号)可以只替换一个单词
  • #(hash)可以代替零个或多个单词

下图示例:

image-20220519101235705

图示解释:

队列Q1绑定了.orange. ,只要路由键满足了xx.orange.xx,或xx.xx.orange.xx.xx 都能匹配到Q1队列

队列Q2绑定了..rabbit 只要满足了xx.xx.rabbit 或 xx.xx.xx.rabbit 都能匹配到该队列

lazy.# 只要满足了lazy. ,lazy.xx ,lazy.xx.xx... 都能匹配该队列

6. RPC

7. Publisher Confirms

只用publisher确认消息,可靠发布

六 springboot整合使用rabbitMQ

1. 引入依赖

不用写版本号,springboot有对应的版本

<dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-amqp</artifactId>
      </dependency>

2. 配置基础连接

其他配置连接参考RabbitAutoConfiguration中的RabbitProperties

#rabbitmq地址
spring.rabbitmq.host=192.168.101.164
#端口
spring.rabbitmq.port=5672
#虚拟主机配置
spring.rabbitmq.virtual-host=/
#账号密码配置
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

3. 开启rabbitmq功能

//启动类处添加@EnableRabbit springboot整合的各类框架,都使用enableXXX开启对应的功能
@EnableRabbit

4. AmqpAdmin使用(交换机队列的操作)

引入amqpAdmin

@Resource
private AmqpAdmin amqpAdmin;

1. 创建交换机

交换机一共有五种类型

  • DirectExchange 直连交换机
  • FanoutExchange 扇出(广播)交换机
  • CustomExchange 自定义交换机
  • TopicExchange 主题交换机
  • HeadersExchange 头交换机
@Test
public void creatExchange(){
  //全参DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)
  //name=交换机名称 durable=是否持久化(重启时该交换机不会被删除) autoDelete=没有连接的时候自动删除 arguments=其他参数 Alternate exchange
  DirectExchange directExchange = new DirectExchange("my-direct-exchange",true,false);
  amqpAdmin.declareExchange(directExchange);
}

2. 创建队列

@Test
public void creatQueue(){
  //全参Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
  //name=队列名称 durable=是否持久化(重启时该队列不会被删除) exclusive=是否声明独占队列,如果为true,则只能由申报人连接
  //autoDelete=没有连接的时候自动删除 arguments=其他参数
  Queue queue = new Queue("my-queue",true,false,false);
  amqpAdmin.declareQueue(queue);
}

3. 创建交换机和队列绑定

@Test
public void creatBinding(){
  //全参Binding(String destination, DestinationType destinationType, String exchange, String routingKey,Map<String, Object> arguments)
  //destination=目标(被绑定的是交换机还是队列) DestinationType=目标类型 exchange=交换机 routingKey=路由键 arguments=其他参数
  Binding binding = new Binding("my-queue", Binding.DestinationType.QUEUE,"my-direct-exchange","test.connect",null);
  amqpAdmin.declareBinding(binding);
}

4. 发送消息

@Test
public void sendMessage(){
  //convertAndSend(String exchange, String routingKey, Object message)
  //exchange=交换机名称 routingKey=路由键 message=消息内容
  OmsOrderEntity omsOrderEntity = new OmsOrderEntity();
  omsOrderEntity.setId(1L);
  omsOrderEntity.setAutoConfirmDay(11);
  omsOrderEntity.setBillReceiverPhone("18380453");
  omsOrderEntity.setBillContent("大哥你好");
  rabbitTemplate.convertAndSend("my-direct-exchange","test.connect",omsOrderEntity);
}
//消息可以是对象,也可以是字符串,如果是对象,必须实现序列化方法

5. 修改rabbitTemplate发送消息时的序列化方式

从自动配置代码(RabbitAutoConfiguration.class)中可以看到,rabbitTemplate的消息转换器是从容器中获取的,如果容器中不存在,则使用默认的消息转换器

public RabbitTemplateConfiguration(RabbitProperties properties,
    ObjectProvider<MessageConverter> messageConverter,
    ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers) {
  this.properties = properties;
  //从容器中获取消息转换器
  this.messageConverter = messageConverter;
  this.retryTemplateCustomizers = retryTemplateCustomizers;
}

@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnMissingBean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
  PropertyMapper map = PropertyMapper.get();
  RabbitTemplate template = new RabbitTemplate(connectionFactory);
  MessageConverter messageConverter = this.messageConverter.getIfUnique();
  //如果消息转换器不为空,则使用该消息转换器。
  if (messageConverter != null) {
    template.setMessageConverter(messageConverter);
  }
  template.setMandatory(determineMandatoryFlag());
  RabbitProperties.Template properties = this.properties.getTemplate();
  if (properties.getRetry().isEnabled()) {
    template.setRetryTemplate(new RetryTemplateFactory(
          this.retryTemplateCustomizers.orderedStream().collect(Collectors.toList())).createRetryTemplate(
                properties.getRetry(), RabbitRetryTemplateCustomizer.Target.SENDER));
  }
  map.from(properties::getReceiveTimeout).whenNonNull().as(Duration::toMillis)
        .to(template::setReceiveTimeout);
  map.from(properties::getReplyTimeout).whenNonNull().as(Duration::toMillis).to(template::setReplyTimeout);
  map.from(properties::getExchange).to(template::setExchange);
  map.from(properties::getRoutingKey).to(template::setRoutingKey);
  map.from(properties::getDefaultReceiveQueue).whenNonNull().to(template::setDefaultReceiveQueue);
  return template;
}
private MessageConverter messageConverter = new SimpleMessageConverter();//来自RabbitTemplate.class 默认的消息转换器

因此,只需要在容器中配置一个需要的消息转换器即可

6. 接收消息

1. rabbitListener和rabbithander描述
  • @RabbitListener 可以写在类和方法上
  • @RabbitHandler 标识在方法上
2. RabbitListener 参数绑定

queues是一个string数组,表示可以监听多个指定的消息队列

3. 消息消费
  • 当存在多个监听客户端时,多个客户端同时消费消息,但是每一条消息只能被消费一次
  • 一个客户端只有在消息消费完成后,才能接收下一条消息
  • 消息接收参数,可以直接传发送消息的类型(方法重载)
4. @RabbitListener写在方法上
@RabbitListener(queues = {"my-queue"})
public void receiveMessage(Message message){
  //消息类型是class org.springframework.amqp.core.Message
  System.out.println("消息类型=》"+message.getClass());
}
5. @RabbitListener写在类上,配合@RabbitHandler 使用
//通过方法重载,获取不同的消息数据
@RabbitHandler
public void receiveMsg(OmsOrderEntity message){
  System.out.println("获取到的消息类型="+message.getClass());
}

@RabbitHandler
public void receiveMsg(OmsOrderItemEntity message){
  System.out.println("获取到的消息类型="+message.getClass());
}

7. 消息确认机制-可靠抵达

官网描述https://www.rabbitmq.com/reliability.html

基础:使用消息代理(如 RabbitMQ)的系统按照分布式定义的。由于发送的协议方法(消息)不能保证到达对等方或被对等方成功处理,因此发布者和消费者都需要一种用于传递和处理确认的机制

1. 发布者消息确认

文档https://www.rabbitmq.com/publishers.html

避免在消息投递中,因为一些网络波动,客户端宕机,路由键不存等情况导致消息投递失败,从而丢失消息。在发布者端进行消息投递成功确认,投递失败采取对应的处理措施。

Streaming Confirms介绍

大多数客户端库通常为开发人员提供一种方法来处理来自服务器的单个确认。确认将异步到达。由于在 AMQP 0-9-1 中发布本质上也是异步的,因此此选项允许以非常少的开销进行安全发布。该算法通常与此类似:
  在频道上启用发布者确认
  对于每个发布的消息,添加一个映射条目,将当前序列号映射到消息
  当一个肯定的 ack 到达时,删除该条目
  当否定 ack 到达时,删除条目并安排其消息以重新发布(或其他合适的东西)
在 RabbitMQ Java 客户端中,确认处理程序通过ConfirmCallback和ConfirmListener接口公开。必须将一个或多个侦听器添加到通道。
2. 使用

配置文件开启发布端消息确认

#将发布端消息确认改为true
spring.rabbitmq.publisher-confirms=true
#启动强制消息
spring.rabbitmq.template.mandatory=true

配置类中重写RabbitTemplate的ConfirmCallback(发布者到服务器回调)和ReturnCallback(交换机到路由器回调)两个方法

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;


@Configuration
public class MyMessageConverter {

  @Resource
  RabbitTemplate rabbitTemplate;


  @Bean
  public MessageConverter messageConverter(){
      return new Jackson2JsonMessageConverter();
  }

  //完成依赖注入后执行初始化方法。
  @PostConstruct
  public void publisherConfirmInit(){
      //消息由发布者传递给服务器的确认回调
      rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
          /**
            * correlationData:关联消息的唯一id
            * ack:应答,true为成功,false为失败
            * cause:消息失败原因
            */
          @Override
          public void confirm(CorrelationData correlationData, boolean ack, String cause) {
              System.out.println("关联消息id="+correlationData+"--是否成功="+ack+"错误原因="+cause);
          }
      });
      //消息由交换机到队列的回调,发生错误才会回调
      rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
          /**
            * message:发送的消息
            * replyCode:回复码
            * replyText:回复内容
            * exchange:交换机
            * routingKey:路由键
            */
          @Override
          public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
              //如果出错,可以进行相应的处理
              System.out.println("消息体="+message+"回复码="+replyCode+"回复内容="+replyText+"交换机="+exchange+"路由键="+routingKey);
          }
      });


  }


}
3. 消费端消费消息确认

文档https://www.rabbitmq.com/consumers.html

默认情况下,消费端的消息是自动回复的,即消费端在接收到消息后,不管有没有处理成功,都会自动回复成功,然后服务端就会删除消息。

弊端:当消费端在处理过程中业务出错,或者客户端宕机,就会导致消息丢失,所以必须等待消息成功消费,采用手动回复确认消息

  1. 配置文件修改为手动确认

    #手动ack确认消息。一定要使用simple
    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    
    #如果使用direct,需要指定监听器的类型(一定注意)
    spring.rabbitmq.listener.direct.acknowledge-mode=manual
    spring.rabbitmq.listener.type=direct
    
  2. 使用channel.basicAck进行消息确认

    @RabbitHandler
    public void receiveMsg(Message message, OmsOrderEntity content, Channel channel) throws IOException {
      long deliveryTag = message.getMessageProperties().getDeliveryTag();
      /**
        * 确认收货(确认消息已经消费)
        * void basicAck(long deliveryTag, boolean multiple)
        * deliveryTag=交货标签
        * multiple=是否批量确认,为true,则会确认之前的所有消息,不可取。应该是单条消息单条确认
        */
      try {
          channel.basicAck(deliveryTag, false);
          System.out.println("消费了消息===" + deliveryTag+"--"+content);
      } catch (Exception e) {
      channel.basicNack(deliveryTag,false,true);
          log.error("出错了", e);
      }
    }
    
  3. 当处理消息的过程中出现错误时,使用channel.basicNack 或者basicReject拒收消息

    deliveryTag=交货标签 multiple=是否批量拒绝,为true的话会拒绝所有的消息。一般只能为false   requeue=是否重新将消息放回队列
    basicNack(long deliveryTag消息 , boolean multiple, boolean requeue)
    
    basicReject(long deliveryTag, boolean requeue)
    
正文到此结束