什么是消息队列?

  • 消息(Message):传输的数据。

  • 队列(Queue):队列是一种先进先出的数据结构。

  • 消息队列从字面的含义来看就是一个存放消息的容器。

  • 消息队列可以简单理解为:把要传输的数据放在队列中。

  • 把数据放到消息队列叫做生产者

  • 从消息队列里边取数据叫做消费者

消息队列是一种异步的服务间通信方式,是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。使用较多的消息队列有RocketMQ、RabbitMQ、Kafka等。

为什么使用消息队列?

使用消息队列有三个好处:

解耦

耦合性:后台各个系统相互依赖,如果一个系统挂掉了,其他也会导致无法运行

于是消息队列就进行解耦,加入了消息队列之后,不同的后台只需要将自己的数据写进消息队列即可,一个系统挂掉了,他在消息队列中的数据依旧存在,不用担心出现整体无法运行的情况

异步

异步提速:比如说我们原本有3个后台系统要向前端输出数据,每个后台都需要300ms,还要加上访问数据库的时间,如果一个用户访问的后台较多,那么访问的时间也会变得很久,用户体验较差

但如果使用了消息队列的话,不管要访问多少个后台数据,所有的后台只需要把数据都压进消息队列里面就可行了,如何用户再根据自己的需求从消息队列拿,大大减少所需时间

削峰

如果我们有一段时间的请求量非常大,就好比双11的时候,我们的后台只能接受1000个,但一下发过来3000个,这时候后台扛不住,就会崩溃

但是如果使用了消息队列,消息队列会把加载不了的信息丢到消息队列里面去,等后台持续性的加载,这样就不会出现系统崩溃的问题,顶多也只是慢一点

消息队列有什么优点和缺点?

优点就是上面所说的三个: 解耦,异步,削峰

缺点:

系统的可用性降低:我系统引入的外部依赖越多,消息队列就越容易挂掉,不加消息队列之前,我们虽然有耦合的问题,2个系统可能互相关联,一方挂掉导致其他一个也无法正常使用,但是如果加了消息队列,消息队列一旦挂掉,那么所有的系统都会挂掉

系统复杂性变高:硬生生加个 MQ 进来,你怎么保证消息没有重复消费?怎么[处理消息丢失的情况]?怎么保证消息传递的顺序性?头大头大,问题一大堆,痛苦不已

Kafka ,ActiveMQ ,RabbitMQ ,RocketMQ 都有什么优点和缺点?

 

如何保证消息队列的高可用?

先了解Rabbit的三种工作模式

rabbitmq有3种模式,但集群模式是2种

  • 单一模式:即单机情况不做集群,就单独运行一个rabbitmq而已。

  • 普通模式:默认模式,以两个节点(rabbit01、rabbit02)为例来进行说明。对于Queue来说,消息实体只存在于其中一个节点rabbit01(或者rabbit02),rabbit01和rabbit02两个节点仅有相同的元数据,即队列的结构。当消息进入rabbit01节点的Queue后,consumer从rabbit02节点消费时,RabbitMQ会临时在rabbit01、rabbit02间进行消息传输,把A中的消息实体取出并经过B发送给consumer。所以consumer应尽量连接每一个节点,从中取消息。即对于同一个逻辑队列,要在多个节点建立物理Queue。否则无论consumer连rabbit01或rabbit02,出口总在rabbit01,会产生瓶颈。当rabbit01节点故障后,rabbit02节点无法取到rabbit01节点中还未消费的消息实体。如果做了消息持久化,那么得等rabbit01节点恢复,然后才可被消费;如果没有持久化的话,就会产生消息丢失的现象。

  • 普通模式的优点:提高消费的吞吐量

  • 镜像模式:把需要的队列做成镜像队列,存在与多个节点属于[RabbitMQ的HA方案]该模式解决了普通模式中的问题,其实质和普通模式不同之处在于,消息实体会主动在镜像节点间同步,而不是在客户端取数据时临时拉取。该模式带来的副作用也很明显,除了降低系统性能外,如果镜像队列数量过多,加之大量的消息进入,集群内部的网络带宽将会被这种同步通讯大大消耗掉。所以在对可靠性要求较高的场合中适用。

  • 镜像集群的特点:

  • 性能开销非常大,因为要同步消息到对应的节点,这个会造成网络之间的数据量的频繁交互,对于网络带宽的消耗和压力都是比较重的

    没有扩展可言,rabbitMQ是集群,不是分布式的,所以当某个Queue负载过重,我们并不能通过新增节点来缓解压力,因为所以节点上的数据都是相同的,这样就没办法进行扩展了

如何保证消息的可靠性传输?(如何处理消息丢失的问题)

生产者没有成功把消息发送给消息队列

a、丢失的原因:因为网络传输的不稳定性,当生产者在向MQ发送消息的过程中,MQ没有成功接收到消息,但是生产者却以为MQ成功接收到了消息,不会再次重复发送该消息,从而导致消息的丢失。

b、解决办法: 有两个解决办法:事务机制和confirm机制,最常用的是confirm机制。

事务机制

RabbitMQ 提供了事务功能,生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit

confirm机制

RabbitMQ可以开启 confirm 模式,在生产者那里设置开启 confirm 模式之后,生产者每次写的消息都会分配一个唯一的 id,如果消息成功写入 RabbitMQ 中,RabbitMQ 会给生产者回传一个 ack 消息,告诉你说这个消息 ok 了。如果 RabbitMQ 没能处理这个消息,会回调你的一个 nack 接口,告诉你这个消息接收失败,生产者可以发送。而且你可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,那么可以重发。

注意:RabbitMQ的事务机制是同步的,很耗型能,会降低RabbitMQ的吞吐量。confirm机制是异步的,生成者发送完一个消息之后,不需要等待RabbitMQ的回调,就可以发送下一个消息,当RabbitMQ成功接收到消息之后会自动异步的回调生产者的一个接口返回成功与否的消息。

RabbitMQ接收到消息之后丢失了消息

a、丢失的原因:RabbitMQ接收到生产者发送过来的消息,是存在内存中的,如果没有被消费完,此时RabbitMQ宕机了,那么再次启动的时候,原来内存中的那些消息都丢失了。

b、解决办法:开启RabbitMQ的持久化。当生产者把消息成功写入RabbitMQ之后,RabbitMQ就把消息持久化到磁盘。结合上面的说到的confirm机制,只有当消息成功持久化磁盘之后,才会回调生产者的接口返回ack消息,否则都算失败,生产者会重新发送。存入磁盘的消息不会丢失,就算RabbitMQ挂掉了,重启之后,他会读取磁盘中的消息,不会导致消息的丢失。

c、持久化的配置

  • 第一点是创建 queue 的时候将其设置为持久化,这样就可以保证 RabbitMQ 持久化 queue 的元数据,但是它是不会持久化 queue 里的数据的。

  • 第二个是发送消息的时候将消息的 deliveryMode 设置为 2,就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。

注意:持久化要起作用必须同时设置这两个持久化才行,RabbitMQ 哪怕是挂了,再次重启,也会从磁盘上重启恢复 queue,恢复这个 queue 里的数据。

消费者弄丢消息

a、丢失的原因:如果RabbitMQ成功的把消息发送给了消费者,那么RabbitMQ的ack机制会自动的返回成功,表明发送消息成功,下次就不会发送这个消息。但如果就在此时,消费者还没处理完该消息,然后宕机了,那么这个消息就丢失了。

b、解决的办法:简单来说,就是必须关闭 RabbitMQ 的自动 ack,可以通过一个 api 来调用就行,然后每次在自己代码里确保处理完的时候,再在程序里 ack 一把。这样的话,如果你还没处理完,不就没有 ack了?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。

如何保证消息不被重复消费? (如何保证消息消费的幂等性)

先说为什么会重复消费:正常情况下,消费者在消费消息的时候,消费完毕后,会发送一个确认消息给消息队列,消息队列就知道该消息被消费了,就会将该消息从消息队列中删除;但是因为网络传输等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将消息分发给其他的消费者。

解决思路是:保证消息的唯一性,就算是多次传输,不要让消息的多次消费带来影响;保证消息等幂性;

  • 在消息生产时,MQ内部针对每条生产者发送的消息生成一个inner-msg-id,作为去重和幂等的依据(消息投递失败并重传),避免重复的消息进入队列;

  • 在消息消费时,要求消息体中必须要有一个bizId(对于同一业务全局唯一,如支付ID、订单ID、帖子ID等)作为去重和幂等的依据,避免同一条消息被重复消费。

这个问题针对业务场景来答分以下几点:

    1. 如果消息是做数据库的insert操作,给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。

    2. 如果消息是做redis的set的操作,不用解决,因为无论set几次结果都是一样的,set操作本来就算幂等操作。

    3. 如果以上两种情况还不行,可以准备一个第三方介质,来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将<id,message>以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没消费记录即可。

如何保证消息的顺序性?

mysql的binlog同步。你再mysql里增删改3条binlog。接着这三条binlog发送到MQ里面。到消费出来依次执行。起码要保证人家是按照顺序来的吧。不然本来是增加、修改、删除。你愣是给更改了顺序,换成了删除、修改、增加。这就乱了。

搞3个Queue,每个消费者就消费其中的一个Queue,把需要保证顺序的数据发到1个Queue里去

如何解决消息队列的延迟以及过期失效的问题?

过期失效就是TTL。如果消息在Queue中积压超过一定的时间就会被RabbitMQ给清理掉。这个数据就没了。这就不是数据积压MQ中了,而是大量的数据会直接搞丢。

在这种情况下,增加consume消费积压就不起作用了。此时,只能将丢失的那批数据,写个临时的程序,一点一点查出来,然后再灌入MQ中,把白天丢失的数据补回来。

消息队列满了之后该如何解决?

如果消息积压在 mq 里,你很长时间都没有处理掉,此时导致 mq 都快写满了,咋办?这个还有别的办法吗?没有,谁让你第一个方案执行的太慢了,你临时写程序,接入数据来消费,消费一个丢弃一个,都不要了,快速消费掉所有的消息。然后走第二个方案,到了晚上再补数据吧。

有几百万消息持续积压几小时,说说该怎么解决?

在日常工作中使用RabbitMQ偶尔会遇不可预料的情况导致的消息积压,一般出现消息积压基本上分为几种情况:

  1. 消费者消费消息的速度赶不上生产速度,这总问题主要是业务逻辑没设计好消费者和生产者之间的平衡,需要改业务流程或逻辑已保证消费度跟上生产消息的速,譬如增加消费者的数量等。

  2. 消费者出现异常,导致一直无法接收新的消息,这种问题需要排查消费的逻辑是不是又问题,需要优化程序。

 

几千万条数据在MQ里,积压了七八个小时。这个时候就是恢复consumer的问题。让它恢复消费速度,然后傻傻地等待几个小时消费完毕。这个肯定不能再面试的时候说。1个消费者1秒时1000条,1秒3个消费者是3000条。1分钟是18万条。1个小时是1000多万条。如果积压了上万条数据,即使消费者恢复了,也大概需要1个多小时才能恢复过来。

原来3个消费者1个小时。现在30个消费者,需要10分钟搞定。

一般情况下,这个时候只能做临时扩容了。具体操作步骤和思路如下:

① 先修改consumer的问题,确保其恢复消费速度,然后将现有consumer都停掉。

② 新建1个topic,partition是原来的10倍,临时建立好原来10倍或者20倍的Queue。

③ 然后写一个临时的分发数据的consumer程序,这个程序部署上去,消费积压的数据。消费之后,不做耗时的处理。直接均匀轮训写入临时建立好的10倍数量的Queue。

④ 接着征用10倍的机器来部署consume。每一批consumer消费1个临时的queue。

⑤ 这种做法,相当于将queue资源和consume资源扩大10倍,以10倍的速度来消费数据。

⑥ 等快速消费完积压数据之后,恢复原来的部署架构,重新用原先的consumer来消费消息。

如果让你写一个消息队列,该如何进行架构设计?说说你的思路?

1、首先MQ得支持可伸缩性吧。就是需要的时候增加吞吐量和容量?

2、其次,需要考虑一下MQ的数据是不是要持久化到磁盘

3、再次,考虑一下MQ的可用性。

4、最后,考虑一下能不能支持数据零丢失

首先这个mq得支持可伸缩性吧,就是需要的时候快速扩容,就可以增加吞吐量和容量,那怎么搞?设计个分布式的系统呗,参照一下kafka的设计理念,broker -> topic -> partition,每个partition放一个机器,就存一部分数据。如果现在资源不够了,简单啊,给topic增加partition,然后做数据迁移,增加机器,不就可以存放更多数据,提供更高的吞吐量了? 其次你得考虑一下这个mq的数据要不要落地磁盘吧?那肯定要了,落磁盘,才能保证别进程挂了数据就丢了。那落磁盘的时候怎么落啊?顺序写,这样就没有磁盘随机读写的寻址开销,磁盘顺序读写的性能是很高的,这就是kafka的思路。 (3)其次你考虑一下你的mq的可用性啊?这个事儿,具体参考我们之前可用性那个环节讲解的kafka的高可用保障机制。多副本 -> leader & follower -> broker挂了重新选举leader即可对外服务。 能不能支持数据0丢失啊?可以的,参考我们之前说的那个kafka数据零丢失方案

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐