RabbitMQ
# RabbitMQ架构
(1)生产者Publisher:生产消息,就是投递消息的一方。消息一般包含两个部分:消息体(payload)和标签(Label)
(2)消费者Consumer:消费消息,也就是接收消息的一方。消费者连接到RabbitMQ服务器,并订阅到队列上。消费消息时只消费消息体,丢弃标签。
(3)Broker服务节点:表示消息队列服务器实体。一般情况下一个Broker可以看做一个RabbitMQ服务器。
(4)Queue:消息队列,用来存放消息。一个消息可投入一个或多个队列,多个消费者可以订阅同一队列,这时队列中的消息会被平摊(轮询)给多个消费者进行处理。
(5)Exchange:交换器,接受生产者发送的消息,根据路由键将消息路由到绑定的队列上。
(6)Routing Key: 路由关键字,用于指定这个消息的路由规则,需要与交换器类型和绑定键(Binding Key)联合使用才能最终生效。
(7)Binding:绑定,通过绑定将交换器和队列关联起来,一般会指定一个BindingKey,通过BindingKey,交换器就知道将消息路由给哪个队列了。
(8)Connection :网络连接,比如一个TCP连接,用于连接到具体broker
(9)Channel: 信道,AMQP 命令都是在信道中进行的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接,一个TCP连接可以用多个信道。客户端可以建立多个channel,每个channel表示一个会话任务。
(10)Message:消息,由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
(11)Virtual host:虚拟主机,用于逻辑隔离,表示一批独立的交换器、消息队列和相关对象。一个Virtual host可以有若干个Exchange和Queue,同一个Virtual host不能有同名的Exchange或Queue。最重要的是,其拥有独立的权限系统,可以做到 vhost 范围的用户控制。当然,从 RabbitMQ 的全局角度,vhost 可以作为不同权限隔离的手段
# vhost的作用
每一个rabbitmq服务器都能创建虚拟的消息服务器,我们称之为虚拟主机(virtual host)。简称vhost
特性:
每一个vhost本质上是一个小型的独立的rabbitmq服务器,拥有自己独立的完整的一套队列、绑定关系、交换器等。同一个服务器上的多个vhost是完全隔离的。队列及交换器等不互通。
所以一个broker可以开设多个vhost,用于不同用户的权限分离
如何创建vhost?
1)通过前台页面的admin中创建
2)使用rabbitmqctl add_vhost vhost名称 命令
如何删除vhost?
1)前台删除
2)rabbitmqctl delete_vhost vhost_name
# 什么是AMQP协议
AMQP协议,所谓的高级消息队列协议,可以把它理解成一种公认的协议规范,就像http协议一样,只是这个AMQP协议针对的是消息队列。这个协议使得遵从了它的规范的客户端应用和消息中间件服务器的全功能互操作成为可能。
Broker:用于接受信息和分发信息的应用。
Virtual hosts:在一个Broker上面划分出多个隔离的环境,这多个环境就可以理解成是Virtual hosts,就像使用虚拟机一样,每个虚拟机之间都有完整的组件,各Virtual hosts下的用户、交换器以及队列等互不影响,这样方便不同的业务团队在使用同一个Rabbit server提供的服务时,能够划清界限。
Connection:消息生产者和消息消费者还有Broker之间的TCP连接,如果要断开连接,只会在客户端断开,而
Broker不会断开连接,除非网络出现了故障或者Broker服务出了问题。
Channel:通道,如果每一次访问消息队列中间件都建立一个TCP连接的话,那么系统资源会被大量的占用,效率也会降低,所以AMQP提供了Channel机制,共享同一个TCP连接,而一个TCP连接里可以有大量的Channel。假设如果有多个线程访问消息队列中间件服务,每个线程通常都会有自己单独的Channel来做通信,而每个Channel会有自己的Channel id,这样客户端和Broker就能够互相识别Channel,所以Channel之间是完全隔离的。
Exchange:交换机,这是消息到达Broker的第一站,由于Exchange和Queues之间有绑定键来确定双方发送消息的匹配规则,所以这时Exchange会根据消息的路由键和自己的类型,来匹配绑定规则,将消息分发到对应的Queues上。
Queue:队列,消息所到达的最终站,消费者从这里拿消息做消费。
Binding:可以把它理解成一个虚拟的连接,定义了Exchange和Queues之间的匹配规则,只有匹配这个规则的交换机里的消息才会被发送到这个队列里,不过如果消息没法找到匹配的队列的话,那么,根据该条消息的属性,这个消息要么被丢弃,要么返回生产者那里。
Exchange的类型
这里把Exchange和Queues之间的匹配规则称之为绑定键。
类型 说明
direct(直连) 消息的路由键要和绑定键一模一样才能分发到队列
fanout(广播) 无需绑定键,只要有和这个交换机做绑定的队列,都会收到消息,有点类似发布-订阅
topic(主题) 这个类型的交换机要求消息路由键和绑定键要模糊匹配才能分发,以.号来分割每个词, #号代表匹配多个词,*号代表匹配只一个词
headers(header属性) 这种类型的交换机不再是基于路由键了,而是基于消息中的header属性,只有消息中header属性的值与绑定键相同时,消息才会被分发到相应的队列中
默认交换机 如果不指定上述的交换机类型,就会使用默认的direct类型,同时绑定键默认是队列名,所以消息会分发到与路由键同名的队列里
# 消息是如何路由的
消息提供方->路由->一至多个队列消息发布到交换器时,消息将拥有一个路由键(routing key),在消息创建时设定。通过队列绑定键,可以把队列绑定到交换器上。消息到达交换器后,RabbitMQ 会将消息的路由键与队列的绑定键进行匹配(针对不同的交换器有不同的路由规则);
常用的交换器主要分为一下三种(广播模式):
fanout:如果交换器收到消息,将会广播到所有绑定的队列上
direct:如果路由键完全匹配,消息就被投递到相应的队列
topic:可以使来自不同源头的消息能够到达同一个队列。 使用 topic 交换器时,可以使用通配符
# Rabbitmq里的交换机类型有哪些,都有什么区别?什么是交换机?
1、什么是Exchange
在RabbitMQ中,生产者发送消息不会直接将消息投递到队列中,而是先将消息投递到交换机中, 在由交换机转发到具体的队列, 队列再将消息以推送或者拉取方式给消费者进行消费.
2、路由键 ( RoutingKey)
生产者将消息发送给交换机的时候, 会指定RoutingKey指定路由规则。
3、绑定键 ( BindingKey)
通过绑定键将交换机与队列关联起来, 这样RabbitMQ就知道如何正确地将消息路由到队列。
4、关系
生产者将消息发送给哪个Exchange是需要由RoutingKey决定的,生产者需要将Exchange与哪个队列绑定时需要由 BindingKey决定的。
二、交换机类型和区别
1、直连交换机: Direct exchange
直连交换机的路由算法非常简单: 将消息推送到binding key与该消息的routing key相同的队列。
直连交换机X上绑定了两个队列。第一个队列绑定了绑定o键range, 第二个队列有两个绑定键: black和green。
在这种场景下,一 个消息在布时指定了路由键为orange将会只被路由到队列Q1 I 路由键为black 和green的消息都将被路由到队列Q2。其他的消息都将被丢失。
同一个绑定键可以绑定到不同的队列上去, 可以增加一个交换机X与队列Q2的绑定键,在这种清况下,直连交换机将会和广播交换机有着相同的行为, 将消息推送到所有匹配的队列。一个路由键为black的消息将会同时被推送到队列Q1和Q2。
2、 主题交换机: Topic exchange
直连交换机的缺点:
直连交换机的 routing_key方案非常简单 ,如果我们希望一 条消息发送给多个队列 ,那么这个交换机需 要绑定上非常多的 routing_key.
假设每个交换机上都绑定一堆的 routing_key连接到各个队列上。那么消息的管理 就会异常地困难。
主题交换机的特点:
发送到主题交换机的 消息不能有任意的 routing key, 必须是由点号分开的一串单词,这些单词可以是任意的,但通常是与消息相关的一些特征。
如以下是几个有效的routing key:
“stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabb 代”, routing key的单词可以 有很多,最大限制是255 bytes。
Topic 交换机的 逻辑与 direct 交换机有点 相似 使用特定路由键发送的消息 将被发送到所有使用匹配绑定键绑定的队列 ,然而 ,绑定键有两个特殊的情况:
*表示匹配任意一个单词
#表示匹配任意—个或多个单词
如:
routing key quick.orange.rabbit-> queue Ql, Q2
routing key lazy.orange.elephant-> queue Ql,Q2
延申:
当一个队列的绑定键是"#",它将会接收所有的消息,而不再考虑所接收消息的路由键。
当一个队列的绑定键没有用到"#"和’*"时,它又像 direct 交换一样工作。
2、扇形交换机: Fanout exchange
扇形交换机是最基本的交换机类型,它所能做的事清非常简单广播消息。
扇形交换机会把能接收到的消息全部发送给绑定在自己身上的队列。因为广播不需要'思考”,所以扇形交换机处理消息的速度也是所有的交换机类型里面最快的。
3、首部交换机: Headers exchange
类似主题交换机,但是头交换机使用多个消息属性来代替路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。
此交换机有个重要参数:”x-match”
当”x-match”为“any”时,消息头的任意一个值被匹配就可以满足条件
当”x-match”设置为“all”的时候,就需要消息头的所有值都匹配成功
4、默认交换机
实际上是— 个由 RabbitMQ预先声明好的名字为空字符串的直连交换机 (direct exchange) 。
它有一个特殊的属性使得它对于简单应用特别有用处 :那就是每个新建队列 (queue) 都会自动绑定到默认交换机上,绑定的 路由键(routing key) 名称与队列名称相同。
当你声明了一个名为“hello”的队列,RabbitMQ会自动将其绑定到默认交换机上,绑定(binding)的路由键名称也是为“hello”。
当携带着名为“hello”的路由键的信息被发送到默认交换机的时候,此消息会被默认交换机路由至名为“hello”的队列中
类似amq.*的名称的交换机:这些是RabbitMQ默认创建的交换机。
这些队列名称被预留做RabbitMQ内部使用,不能被应用使用,否则抛出403错误
5、Dead Letter Exchange(死信交换机)
RabbitMQ作为一个高级消息中间件,提出了死信交换器的概念。
这种交互器专门处理死了的信息(被拒绝可以重新投递的信息不能算死的)。
消息变成死信一般是以下三种情况:
①、消息被拒绝,并且设置requeue参数为false。
②、消息过期(默认情况下Rabbit中的消息不过期,但是可以设置队列的过期时间和信息的过期的效果)
③、队列达到最大长度(一般当设置了最大队列长度或大小并达到最大值时)
当满足上面三种情况时,消息会变成死信消息,并通过死信交换机投递到相应的队列中。
我们只需要监听相应队列,就可以对死信消息进行最后的处理。
订单超时处理:
生产者生产一条1分钟后超时的订单信息到正常交换机exchange-a中,消息匹配到队列queue-a,但一分钟后仍未消费。 消息会被投递到死信交换机dlx-exchange中,并发送到私信队列中。
死信队列dlx-queue的消费者拿到信息后,根据消息去查询订单的状态,如果仍然是未支付状态,将订单状态更新为超时状态。
6.交换机的属性
Name:交换机名称
Type:交换机类型,direct,topic,fanout,headers
Durability:是否需要持久化,如果持久性,则RabbitMQ重启后,交换机还存在
Auto Delete:当最后一个绑定到Exchange上的队列删除后,自动删除该Exchange
Internal:当前Exchange是否用于RabbitMQ内部使用,默认为false。
Arguments:扩展参数,用于扩展AMQP协议定制使用
# 如何保证消息的顺序消费
消息在投入到queue的时候是有顺序,如果只是单个消费者来处理对应的单个queue,是不会出现消息错乱的问题。但是在消费的时候有可能多个消费者消费同一个queue,由于各个消费者处理消息的时间不同,导致消息未能按照预期的顺序处理。其实根本的问题就是如何保证消息按照预期的顺序处理完成。
出现消费顺序错乱的情况
一个queue只存在一个consumer,但是为了提高处理效率,consumer中使用了多线程进行处理
保证消息顺序性的方法
将原来的一个queue拆分成多个queue,每个queue都有一个自己的consumer。该种方案的核心是生产者在投递消息的时候根据业务数据关键值(例如订单ID哈希值对订单队列数取模)来将需要保证先后顺序的同一类数据(同一个订单的数据) 发送到同一个queue当中。
一个queue就一个consumer,在consumer中维护多个内存队列,根据业务数据关键值(例如订单ID哈希值对内存队列数取模)将消息加入到不同的内存队列中,然后多个真正负责处理消息的线程去各自对应的内存队列当中获取消息进行消费。
RabbitMQ保证消息顺序性总结: 核心思路就是根据业务数据关键值划分成多个消息集合,而且每个消息集合中的消息数据都是有序的,每个消息集合有自己独立的一个consumer。多个消息集合的存在保证了消息消费的效率,每个有序的消息集合对应单个的consumer也保证了消息消费时的有序性。
# 消息队列如何保证消息的可靠性传输
消息的可靠性传输分为两个问题,一个是保证消息不被重复消费,另一个是保证消息不丢失. 保证消息不重复被消费,就是保证消息的幂等性问题,消息的幂等性是指一个操作执行任意多次所产生的影响均与一次执行的影响相同,在mq里,也就是消息只能被消费一次,不能被重复消费.
来看看消息丢失的场景:
发送方丢失,可能发送方在发送消息的过程中,出现网络问题等导致mq接收不到消息,导致了消息丢失. 要解决这个问题,首先可以采用事务机制,在发送消息的时候实现事务机制,若是出现发送失败的情况,可以进行回滚,而让消息重新被发送.但是开启了事务,发送方就必须同步等待事务执行完毕或者回滚,导致消息一多,性能会下降. 但是,还有一个更好的办法:可以采用确认机制,发送方在发送消息的时候必须要保证要收到一个确认消息,如果没有收到或者收到失败的确认消息,就说明消息发送失败,要重新进行发送,确认机制是可以采用异步进行的,这样就极大地保证了在保留效率的基础上又能保证消息的不丢失问题. 第二个丢失问题可能是在mq方发生的,如果mq没有进行持久化,出现了宕机关机等情况,消息就会丢失,解决办法无非就是将消息进行持久化,这样在出现问题的时候可以及时对消息进行恢复. 第三个丢失问题可能在消费方发生,这和发送方丢失问题类似,解决这个问题也是采用确认机制,这样一来就可以实现效率上的保证和消息不丢失的保证. 但是解决了这些问题,就会产生下面的幂等性问题: 我们都知道mq是可以进行重发的,且只有在它认为失败的情况会进行重发.什么时候mq会认为它发送给消费者的消息是失败的呢?也就是超出了它等待消费者响应的时间,这是一个超时时间,若是过了这个时间消费者仍然没有响应,说明mq发送失败,就会进行重试,而其实这个时候消费者可能是没有失败的,它只是因为某个原因导致消费超出了mq的等待时间而已,这个时候mq再发送一次消息,消费者就会重复消费. 实现幂等性消费:
① 通过数据库:比如处理订单时,记录订单ID,在消费前,去数据库中进行查询该记录是否存在,如果存在则直接返回。
② 使用全局唯一ID,再配合第三组主键做消费记录,比如使用 redis 的 set 结构,生产者发送消息时给消息分配一个全局ID,在每次消费者开始消费前,先去redis中查询有没有消费记录,如果消费过则不进行处理,如果没消费过,则进行处理,消费完之后,就将这个ID以k-v的形式存入redis中(过期时间根据具体情况设置)。
# 如何处理消息堆积情况?
场景题:几千万条数据在MQ里积压了七八个小时。
1、出现该问题的原因:
消息堆积往往是生产者的生产速度与消费者的消费速度不匹配导致的。有可能就是消费者消费能力弱,渐渐地消息就积压了,也有可能是因为消息消费失败反复复重试造成的,也有可能是消费端出了问题,导致不消费了或者消费极其慢。比如,消费端每次消费之后要写mysql,结果mysql挂了,消费端hang住了不动了,或者消费者本地依赖的一个东西挂了,导致消费者挂了。
所以如果是 bug 则处理 bug;如果是因为本身消费能力较弱,则优化消费逻辑,比如优化前是一条一条消息消费处理的,那么就可以批量处理进行优化。
2、临时扩容,快速处理积压的消息:
(1)先修复 consumer 的问题,确保其恢复消费速度,然后将现有的 consumer 都停掉;
(2)临时创建原先 N 倍数量的 queue ,然后写一个临时分发数据的消费者程序,将该程序部署上去消费队列中积压的数据,消费之后不做任何耗时处理,直接均匀轮询写入临时建立好的 N 倍数量的 queue 中;
(3)接着,临时征用 N 倍的机器来部署 consumer,每个 consumer 消费一个临时 queue 的数据
(4)等快速消费完积压数据之后,恢复原先部署架构 ,重新用原先的 consumer 机器消费消息。
这种做法相当于临时将 queue 资源和 consumer 资源扩大 N 倍,以正常 N 倍速度消费。
3、MQ长时间未处理导致MQ写满的情况如何处理:
如果消息积压在MQ里,并且长时间都没处理掉,导致MQ都快写满了,这种情况肯定是临时扩容方案执行太慢,这种时候只好采用 “丢弃+批量重导” 的方式来解决了。首先,临时写个程序,连接到mq里面消费数据,消费一个丢弃一个,快速消费掉积压的消息,降低MQ的压力,然后在流量低峰期时去手动查询重导丢失的这部分数据
- 短时间内无法扩容或者扩容无法完全解决问题
可以尝试降低一些非核心业务的消息处理,其次可以通过监控排查,优化消费者端的业务代码或者查看是否存在一些消息被重复消息的情况.
# 集群模式
- 普通集群模式:
普通集群模式用于提高系统的吞吐量,通过添加节点来线性扩展消息队列的吞吐量。也就是在多台机器上启动多个 RabbitMQ 实例,而队列 queue 的消息只会存放在其中一个 RabbitMQ 实例上,但是每个实例都同步 queue 的元数据(元数据是 queue 的一些配置信息,通过元数据,可以找到 queue 所在实例)。消费的时候,如果连接到了另外的实例,那么该实例就会从数据实际所在的实例上的queue拉取消息过来,就是说让集群中多个节点来服务某个 queue 的读写操作
但普通集群模式的缺点在于:无高可用性,queue所在的节点宕机了,其他实例就无法从那个实例拉取数据;RabbitMQ 内部也会产生大量的数据传输。
- 镜像队列集群模式:
镜像队列集群是RabbitMQ 真正的高可用模式,集群中一般会包含一个主节点master和若干个从节点slave,如果master由于某种原因失效,那么按照slave加入的时间排序,"资历最老"的slave会被提升为新的master。
镜像队列下,所有的消息只会向master发送,再由master将命令的执行结果广播给slave,所以master与slave节点的状态是相同的。比如,每次写消息到 queue 时,master会自动将消息同步到各个slave实例的queue;如果消费者与slave建立连接并进行订阅消费,其实质上也是从master上获取消息,只不过看似是从slave上消费而已,比如消费者与slave建立了TCP连接并执行Basic.Get的操作,那么也是由slave将Basic.Get请求发往master,再由master准备好数据返回给slave,最后由slave投递给消费者。
从上面可以看出,队列的元数据和消息会存在于多个实例上,也就是说每个 RabbitMQ 节点都有这个 queue 的完整镜像,任何一个机器宕机了,其它机器节点还包含了这个 queue 的完整数据,其他消费者都可以到其它节点上去消费数据。
(1)缺点:
① 性能开销大,消息需要同步到所有机器上,导致网络带宽压力和消耗很重
② 非分布式,没有扩展性,如果 queue 的数据量大到这个机器上的容量无法容纳了,此时该方案就会出现问题了
(2)如何开启镜像集群模式呢?
在RabbitMQ 的管理控制台Admin页面下,新增一个镜像集群模式的策略,指定的时候是可以要求数据同步到所有节点的,也可以要求同步到指定数量的节点,再次创建 queue 的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。