一、kafka架构:

 

1、结构名词解释:

1)Producer :消息生产者,就是向kafka broker发消息的客户端;

2)Consumer :消息消费者,向kafka broker取消息的客户端;

3)Topic :可以理解为一个队列;

4) Consumer Group (CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个partion只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic;

5)Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic;

6)Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序;

7)Offset:kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka。

 

2、在使用kafka时需要注意的地方 : 

消费者在消费的时候, 是读取 partition 中的数据的, 一个 topic 可以有多个 partition , 每个消费者组会为每个 partition 生成唯一一个 offset 。

如果一个 partition 被同一个消费者组中的多个消费者在消费,那就意味着有多个消费者在修改这个 offset 。

我们的消费者工作模式一般是这样的 :

  1. 读取当前 offset
  2. 根据当前 offset 获取消息进行消费
  3. 消费成功后修改当前 offset

如果存在多个消费者, 消费者A和消费者B同一时刻很可能读到相同的 offset , 进而消费同一条数据。 然后都去改 offset , 更严重的是 消费者A消费速度远大于消费者B, 消费者A消费了很多条数据之后,读取到了消费者B修改的offset, 导致消费者A也去重复消费自己消费过的消息,所以kafka的消费者应该尽量保证消费的幂等性。

 

3、Kafka特点 :

1、依赖zookeeper 。

2、功能相对简单,他的消息是以 日志文件 的形式 保存的, 定期或达到阈值清理。

3、每个消费者在消费消息的时候, 只是简单的自己维护自己的 offset, 如果代码中对一个消费者启动了两个程序, 就会出现严重的重复消费问题。 所以需要自己保证消费的幂等性。

4、消息类型比较单一, 核心思想就是将队列保存到文件中

5、Kafka的索引是稀疏存储消息的(隔多少个消息将消息的某些信息存到索引文件中)。在查找消息的时候先通过二分法查到离这个索引最近的一个消息的索引(小于要查找的消息的id), 然后采用顺序遍历的方式定位消息。如果给所有消息创建索引, 开销比较大, 这里相当于做了个取舍。

 

二、rabbitMQ架构:

1、结构名词解释:

Broker:简单来说就是消息队列服务器实体。

Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。

Queue:消息队列载体,每个消息都会被投入到一个或多个队列。

Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。

Routing Key:路由关键字,exchange根据这个关键字进行消息投递。

vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。

producer:消息生产者,就是投递消息的程序。

consumer:消息消费者,就是接受消息的程序。

channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

 

2、一些特点:

rabbitMQ在使用之前, 须先做一系列的初始化。

  1. 创建 exchange (生产者的消息要发送到exchange)
  2. 创建 queue(消费者只能提供queue获取消息)
  3. 绑定exchange和queue(消息通过exchange按规则发送给queue,相当于路由)

消息的大概流程 :

生产者  -->  exchange  -->  queue  -->  消费者

一个Message有两个部分:payload(有效载荷)和label(标签)。payload顾名思义就是传输的数据。label是exchange的名字或者说是一个tag,它描述了payload,而且RabbitMQ也是通过这个label来决定把这个Message发给哪个Consumer

不同的 exchange 数据的路由方式不同, 未持久化的数据存储在内存中, 需要持久化的数据则存储在磁盘上。

 

3、Rabbitmq中的exchange :

Rabbitmq总共支持四种exchange: direct,topic,fanout,header。

  1. Direct

exchange在和queue进行binding时会设置routingkey,我们在将消息发送到exchange时会设置对应的routingkey,在direct类型的exchange中,只有这两个routingkey完全相同,exchange才会选择对应的binging进行消息路由。

默认交换机也是这种类型的,实际上是一个由RabbitMQ【预先声明好的名字为空字符串的直连交换机】(direct exchange)。它有一个特殊的属性使得它对于简单应用特别有用处:那就是每个新建队列(queue)都会自动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称相同。如:当你声明了一个名为”hello”的队列,RabbitMQ会自动将其绑定到默认交换机上,绑定(binding)的路由键名称也是为”hello”。因此,当携带着名为”hello”的路由键的消息被发送到默认交换机的时候,此消息会被默认交换机路由至名为”hello”的队列中。即默认交换机看起来貌似能够直接将消息投递给队列。

我在web界面找到默认交换机, 发送一个消息,消息的 routing key 和我定义的队列名字相同, 我在队列就能收到消息。我在web界面的queue中能看到他们的绑定关系。

2、Topic

此类型exchange和上面的direct类型差不多,但direct类型要求routingkey完全相等,这里的routingkey可以有通配符:'*','#'.

其中'*'表示匹配一个单词, '#'则表示匹配没有或者多个单词

3、Fanout

此exchange的路由规则很简单,直接将消息路由到所有绑定的队列中,无须对消息的routingkey进行匹配操作。

4、Header

此类型的exchange和以上三个都不一样,其路由的规则是根据header来判断,其中的header就是以下方法的arguments参数:

Dictionary<string, object> aHeader = new Dictionary<string, object>();

aHeader.Add("format", "pdf");

aHeader.Add("type", "report");

aHeader.Add("x-match", "all");

channel.QueueBind(queue: "queue.A",

                    exchange: "agreements",

                    routingKey: string.Empty,

                    arguments: aHeader);

其中的x-match为特殊的header,可以为all则表示要匹配所有的header,如果为any则表示只要匹配其中的一个header即可。在发布消息的时候就需要传入header值:

var properties = channel.CreateBasicProperties();

properties.Persistent = true;

Dictionary<string, object> mHeader1 = new Dictionary<string, object>();

mHeader1.Add("format", "pdf");

mHeader1.Add("type", "report");

properties.Headers = mHeader1;

一般来说direct和topic用来具体的路由消息,如果要用广播的消息一般用fanout的exchange。

header类型用的比较少。

 

4、Rabbitmq消息的存储机制 :

内存足够的情况下持久化的消息会在内存中存储一份以提高性能, 内存不足的情况下, 非持久化的消息会被写入磁盘。

持久层是一个逻辑上的概念,实际包含两个部分:队列索引(rabbit_queue_index)和消息存储(rabbit_msg_store)。

  rabbit_queue_index负责维护队列中落盘消息的信息,包括消息的存储地点、是否已经交互给消费者、是否已经被消费者ack等。每个队列都有与之对应的一个rabbit_queue_index。

  rabbit_msg_store以键值对的形式存储消息,它被所有队列共享,在每个节点中有且只有一个。从技术层面上来说,rabbit_msg_store具体还可以分为msg_store_persistent和msg_store_transient,

  msg_store_persistent负责持久化消息的持久化,重启后消息不会丢失;

  msg_store_transient负责非持久化消息的持久化,重启后消息会丢失。

通常情况下,习惯性的将msg_store_persistent和msg_store_transient看成rabbit_msg_store这样一个整体。

消息(包括消息体、属性和headers)可以直接存储在rabbit_queue_index中,也可以被保存在rabbit_msg_store中。默认在$RABBITMQ_HOME/var/lib/mnesia/rabbit@$HOSTNAME/ 路径下包含queues、msg_store_persistent、msg_store_transient这三个文件夹下,其分别存储对应的信息。

消息的删除只是从ETS表中删除指定消息的相关信息,同时更新消息对应的存储文件的相关信息。执行消息删除操作时,并不立即对在文件中的消息进行删除,也就是说消息依然在文件中,仅仅是标记为垃圾数据而已。当一个文件中都是垃圾数据时可以将这个文件删除。当检测到前后两个文件中的有效数据可以合并在一个文件中,并且所有的垃圾数据的大小和所有文件(至少有3个文件存在的情况下)的数据大小的比值超过设置的阈值GARBAGE_FACTORION(默认值为0.5)时才会触发垃圾回收将两个文件合并。

执行合并的两个文件一定是逻辑上相邻的两个文件。执行合并时首先锁定两个文件,并先对前面文件中的有效数据进行整理,再将后面的文件的有效数据写入到前面的文件,同时更新消息在ETS表中的记录,最后删除后面的文件。

 

三、Kafka 与 rabbitmq 对比 :

https://blog.51cto.com/u_15127596/2744812

1、功能方面

1)Kafka可以认为是简单的用文件实现了队列的功能, 消息按顺序存储在文件中(写入磁盘), 需要消费者自己去记录当前读取到了哪个位置。

2)Rabbitmq则实现了多种类型的exchange, 多种类型的消息,既可以实现持久化存储(写入磁盘), 又可以实现非持久化存储(写入内存),还有ack机制,支持事务, 还提供了web界面直观的看到各种exchange、queue的情况,及其读写、并发等实时数据。

3)Kafka主要使用pull的方式进行消费,而rabbitmq则pull和push都实现了。

4)正因为kafka功能简单, 而rabbitmq功能复杂强大, 所以kafka更容易上手。

2、性能方面

Kafka的吞吐量更高, 主要有

1)Zero Copy机制,内核copy数据直接copy到网络设备,不必经过内核到用户再到内核的copy,减小了copy次数和上下文切换次数,大大提高了效率。

2)磁盘顺序读写,减少了寻道等等的时间。

3)批量处理机制,服务端批量存储,客户端主动批量pull数据,消息处理效率高。

4)存储具有O(1)的复杂度,读物因为分区和segment,是O(log(n))的复杂度。

3、可靠性方面

Rabbitmq有ack、事务等, 可靠性更高, 而kafka仅仅在消费者端保存了一个offset记录消费情况, 很容易出现多次消费的情况, 需要在逻辑层保证消费的幂等性

4、使用场景

Kafka本身具有较高的吞吐量, 可以对数据进行存储, 适合在对消息可靠性要求不高的场景下做缓冲(比如日志系统)。

Rabbitmq则功能更强大,但是并发和缓冲能力不如kafka, 适合在对可靠性和实时性要求高的情况下使用。另外目前还有不少rpc框架支持rabbitmq(比如直播、转账等) 。

 

 

 

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐