给大家推荐一款好用的CSDN云服务,新人首购折扣哦,点击下图跳转:
CSDN开发云


文章包括两个部分:
​ 1.消息队列能解决的问题(消息队列的用处?什么场景下可以用到消息队列?)
​ 2.kafka消费低延迟解决方案



一、消息队列解决的问题

引入消息队列一般能解决一下五种场景:

  • 异步处理
  • 应用解耦
  • 流量削锋
  • 日志采集
  • 消息通讯

1.1、异步处理

场景说明:用户注册后,需要发注册邮件和注册短信。

传统的做法有两种 :

  • 1.串行的方式
  • 2.并行方式

(1)串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端

img

(2)并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间

img

假设三个业务节点每个使用50毫秒钟,不考虑网络等其他开销,则串行方式的时间是150毫秒,并行的时间可能是100毫秒。

因为CPU在单位时间内处理的请求数是一定的,假设CPU1秒内吞吐量是100次。则串行方式1秒内CPU可处理的请求量是7次(1000/150)。并行方式处理的请求量是10次(1000/100)。

小结:如以上案例描述,传统的方式系统的性能(并发量,吞吐量,响应时间)会有瓶颈。如何解决这个问题呢?

来看一下,使用消息队列的情景:

img

按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因为写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20 QPS。比串行提高了3倍,比并行提高了两倍。

1.2、应用解耦

场景说明:用户下单后,订单系统需要通知库存系统。

传统的做法是,订单系统调用库存系统的接口。如下图:

img

传统模式的缺点:

  • 假如库存系统无法访问,则订单减库存将失败,从而导致订单失败
  • 订单系统与库存系统耦合

如何解决以上问题呢?引入应用消息队列后的方案,如下图:

img

  • 订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功
  • 库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作
  • 假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦。

1.3、流量削锋

流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。

应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。

  • 可以控制活动的人数
  • 可以缓解短时间内高流量压垮应用

img

  • 用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面
  • 秒杀业务根据消息队列中的请求信息,再做后续处理

1.4、日志采集

日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。

架构简化如下:

img

  • 日志采集客户端,负责日志数据采集,定时写受写入Kafka队列
  • Kafka消息队列,负责日志数据的接收,存储和转发
  • 日志处理应用:订阅并消费kafka队列中的日志数据

以下是一个kafka日志处理应用案例:

img

(1)Kafka:接收用户日志的消息队列

(2)Logstash:做日志解析,统一成JSON输出给Elasticsearch

(3)Elasticsearch:实时日志分析服务的核心技术,一个schemaless,实时的数据存储服务,通过index组织数据,兼具强大的搜索和统计功能

(4)Kibana:基于Elasticsearch的数据可视化组件,超强的数据可视化能力是众多公司选择ELK stack的重要原因

1.5、消息通讯

消息通讯是指,消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等。

  • 点对点通讯:
    img
    客户端A和客户端B使用同一队列,进行消息通讯。

  • 聊天室通讯:
    img

以上举例了消息队列的使用场景,对于一些场景要求需要低延迟的,那么怎么才能是消息消费低延迟呢?

二、kafka消费低延迟解决方案

2.1、场景

现在我们的电商系统中,当用户购买商品支付成功之后,我们就会异步的发送一个优惠券给用户,一般用户在自己完成订单之后,过了几分钟或者十几分钟收到优惠券,这个时间范围内其实用户都是能接受的,但如果过了好几个小时都没收到的话,用户就很有可能回来投诉说没收到优惠券。

上面的是一个真实的业务场景,从内部代码运行角度看,出现kafka消费延迟的情况,必然是这样的情况:在某个时刻,producer突然在短时间内产生大量的数据丢进kafka的broker里面(假设平均1s中内丢入了5w条需要消费的消息,这个情况会持续几分钟),而consumer消费一条数据平均需要的时间没有变化,还是200ms。这样消息队列中就堆积了很多消息。

在这种情况下,kafka的consumer的行为会是:

  • kafka的consumer会从broker里面取出一批数据,给消费线程进行消费。
  • 由于取出的一批消息数量太大,consumer在session.timeout.ms时间之内没有消费完成。consumer coordinator 会由于没有接受到心跳而挂掉,并且出现一些日志,日志的意思大概是coordinator挂掉了,然后自动提交offset失败,然后重新分配partition给客户端。
  • 由于自动提交offset失败,导致重新分配了partition的客户端又重新消费之前的一批数据。
  • 接着consumer重新消费,又出现了消费超时,无限循环下去。

所以,这个时候我们就需要关注我们消息队列消息延迟情况,即我们该怎么去提升消费性能,以达到更短的消息延迟?

首先,我们要对消息队列中数据进行监控,只有有了这些监控数据才能对比分析消息是否延迟以及预测会不会延迟,下面我们就来看看如何监控消息延迟。

2.2、监控消息队列中的数据

消息延迟如何监控?

我们可以通过两种方式进行监控消息:

  • 1、消息队列提供的工具,通过监控消息的堆积来完成。
  • 2、通过生产监控消息来对消息延时的监控。

那下面我们就来详细看看。

2.2.1、消息队列提供的工具

首先,我们得从原理角度看:消息延时是怎么去理解。我们上面案例中消息队列如果堆积了很多消息,我们得要知道它的消费进度是多少,这样就能很方便计算消息延迟多少。假设目前生产 1000 条消息,然后一个消费者消费 900 条,那么我们就知道了这个消费者消息延迟 100 条。

在 Kafka 中,不同的版本消费者的消费进度是不一样的。这个消费速度就是offset。

在 Kafka0.9 之前的版本中,消费进度是存储在 ZooKeeper 中的,消费者在消费消息的时候先要从 ZooKeeper 中获取最新的消费进度,再从这个进度的基础上消费后面的消息。

在 Kafka0.9 版本之后,消费进度被迁入到 Kakfa 的一个专门的 topic 叫“__consumer_offsets”里面。

当然,作为一个成熟的组件,Kafka 也提供了一些工具来获取这个消费进度的信息帮助我们实现自己的监控,这个工具主要有两个:

(1)Kafka 提供了工具叫做“kafka-consumer-groups.sh”(它在 Kafka 安装包的 bin 目录下)。

  • 前两列是队列的基本信息,包括topic名和分区名;
  • 第三列是当前消费者的消费进度;
  • 第四列是当前生产消息的总数;
  • 第五列就是消费消息的堆积数(也就是第四列与第三列的差值)。

(2)第二个工具是JMX:Kafka 通过 JMX 暴露了消息堆积的数据,然后我们就可以通过写代码将这个堆积数据发送到我们的监控系统中去。

2.2.2、自己生成消息监控

首先,我们可以自定义一种特殊的消息,然后启动一个监控程序将消息定时循环的写入到消息队列中,这个消息可以是生成一个时间戳。同时这个消息是可以被消费者消费的,当业务消费到的时候就将其丢弃,而监控程序消费这个消息是,就将其生成时间和消费时间进行对比,如果超过了我们预设的一定阈值就像我们报警。

生产建议: 上面两种方式都是可以监控消息延迟的,但是在实际生产中,这里推荐将他们两者进行结合来使用,比如,我们先可以在监控程序中通过JMX获取消息堆积数据,然后发送到我们的dashboard 中;同时起一个探测进程确认消息的延迟情况是怎样的。

通过上面我们都已经了解了消息延迟怎么进行监控,接下来我们再来看看怎么来提升消息的写入和消费性能,这样才能将异步消息更快的处理掉。

2.3、降低消费延迟的正确姿势

我们可以通过在消费端和消息队列这两块来减少消息的延时。

2.3.1、消费端

那我们在消费端该怎么处理呢?我们消费端处理的目标应该就是尽量的提升它的处理能力,可以这么做:

  • 1、通过优化消费代码来提升性能(开多线程增加消费速率)。
  • 2、增加消费者的数量(对kafka来说是增加分区的数量)。
  • 3、可以根据消费者的消费速度对session.timeout.ms的时间进行设置,适当延长(使消费者活的更长)。
  • 4、可以减少每次从partition里面捞取的数据分片的大小,提高消费者的消费速度。

不过第二种方式并不是对于所有的消费队列有效的,它是受消费队列限制的,比如Kafka 是不能通过增加消费者数量来提升消费性能的。

因为, 在 Kafka 中,一个 Topic 可以配置多个 Partition,数据会被平均或者按照生产者指定的方式写入到多个分区中,那么在消费的时候,Kafka 约定一个分区只能被一个消费者消费,为什么要这么设计呢?如果有多个 consumer(消费者)可以消费一个分区的数据,那么在操作这个消费进度的时候就需要加锁,可能会对性能有一定的影响。

所以说, 话题的分区数量决定了消费的并行度,增加多余的消费者也是没有用处的,你可以通过增加分区来提高消费者的处理能力。

既然如此,那我们在不增加分区的情况下该怎么去提升消费性能呢?

我们虽然不能增加消费者,但是我们可以在消费者使用并行处理。所以我们就可以考虑使用多线程的方式来增加处理能力:

  • 预先创建一个或者多个线程池。
  • 拉取到消息丢到线程池中进行异步处理,将串行的消息消费变成了并行的。
  • 不仅提高了吞吐量,还可以一次消费多拉取一些消息,分配给多个线程来处理。

2.3.2、消息队列自身

如上我们学习到了怎么通过消费端来提升消息消费能力,那接下来我们要来看看消息队列自身在读取性能上该做些什么优化,其实有两个关键点:

消息存储,应当使用本地磁盘作为存储介质。Page Cache 的存在就可以提升消息的读取速度,即使要读取磁盘中的数据,由于消息的读取是顺序的并且不需要跨网络读取数据,所以读取消息的 QPS 肯定是比普通数据库高很多很多。

零拷贝技术,说是零拷贝,其实我们不可能消灭数据的拷贝,只是尽量减少拷贝的次数。在读取消息队列的数据的时候,其实就是把磁盘中的数据通过网络发送给消费客户端,在实现上会有四次数据拷贝的步骤:

  1. 数据从磁盘拷贝到内核缓冲区;
  2. 系统调用将内核缓存区的数据拷贝到用户缓冲区;
  3. 用户缓冲区的数据被写入到 Socket 缓冲区中;
  4. 操作系统再将 Socket 缓冲区的数据拷贝到网卡的缓冲区中。

    操作系统提供了 Sendfile 函数可以减少数据被拷贝的次数。使用了 Sendfile 之后,在内核缓冲区的数据不会被拷贝到用户缓冲区而是直接被拷贝到 Socket 缓冲区,节省了一次拷贝的过程提升了消息发送的性能。高级语言中对于 Sendfile 函数有封装,比如说在 Java 里面的
    java.nio.channels.FileChannel 类就提供了 transferTo 方法提供了 Sendfile 的功能。

2.4、总结

总结,我们通过提升消息队列的性能来减少消息消费的延迟,主要说了:

  • 通过消息队列工具监控消息堆积数据以及通过监控生成消息方式进行监控消息延迟情况,
  • 通过横向扩展消费者来增加处理能力,这是关键的一个点,所有的操作最终的目标就是增加消费者的消费能力。
  • 采取高性能的数据存储然后配合零拷贝技术,来提升消息消费性能。

所以, 队列经常会用在我们项目当中,做好数据堆积监控是关键。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐