kafka消息队列
kafka介绍英文文档:kafka官方文档中文学习文档:kafka中文文档简介kafka写操作快的原因:kafka的特点磁盘随机IO和顺序IO的对比:随机和顺序I/O简介:kafka是一个事件流平台,专门为分布式高吞吐量系统而设计的消息传递系统,相对其他消息系统有更好的吞吐量、内置分区、复制和固有的容错能留,使其更适合处理大规模的消息程序。kafka的两种模式点对点:消息被保留在队列中。一个或者多
kafka消息组件
kafka介绍
- 英文文档:kafka官方文档
- 中文学习文档:kafka中文文档简介
- kafka写操作快的原因:kafka的特点
- 磁盘随机IO和顺序IO的对比:随机和顺序I/O
简介:kafka是一个事件流平台,专门为分布式高吞吐量系统而设计的消息传递系统,相对其他消息系统有更好的吞吐量、内置分区、复制和固有的容错能留,使其更适合处理大规模的消息程序。
kafka的两种模式
- 点对点:消息被保留在队列中。一个或者多个消费者可以消耗队列中的消息,但是特定消息只能由最多一个消费者消费。消息被消费后将会从队列消失。
- 发布-订阅(pub-sub)消息系统:不同于点对点的是消息被保留在主题中,消费者可以订阅一个或者多个主题并使用该主题中的所有消息。这个和redis的发布订阅模式一样。
kafka的特点
- 可靠性-kafka是分布式、分区、复制和容错的。
- 可扩展性-kafka消息传递系统轻松缩放,无需停机。
- 耐用性-kafka使用‘分布式提交日志’,这意味着消息会尽可能快的保留在磁盘上,因此他是持久的
- 性能-kafka对于发布订阅消息都具有高吞吐量。即使存储了许多TB的消息,它也保持稳定的性能。保证零停机和零数据丢失。
kafka是一个分布式消息队列,具有高性能、持久化、多副本备份、横向扩展能力。在架构中起到解偶、削峰、异步处理的作用
kafka关键术语
1、消息的生产者叫Producer,消息的接受者叫Consumer,生产者将数据保存到kafka集群中,消费者从中获取消息进行处理
2、broker:英文含义中间人。在kafka中存储消息。
3、主题(topic):一个topic保存的同一类消息,生产者要发送消息必须制定topic。
4、分区(partition):每个topic都可以分成多个partition,每一个partition在存储层面都是append log文件。分区的根本原因是:kafka基于文件存储,当文件达到一定程度,磁盘空间告急,因此采用分区的办法,一个分区一个文件,这样就可以将数据存储到不同的server,另外这样做也可以负载均衡。
5、偏移量(Offset):一个分区对应一个磁盘上的文件,而消息在文件中的位置称为offset(偏移量),offset为一个long型数字,它可以唯一标记一条消息。由于kafka并没有提供其他的索引机制来存储offset,文件只能顺序的读写,所以在kafka中几乎不允许对消息进行‘随机读写’。
kafka分布式、分区和备份数据流图
kafka命令行使用
- 启动zookeeper
Kafka使用ZooKeeper的,所以你需要先启动ZooKeeper的服务器,如果你还没有,
您可以使用Kafka包装里的方便脚本来得到一个快速和污染的单节点的ZooKeeper实例。
tiger@tigerdeMacBook-Pro kafka % zookeeper-server-start zookeeper.properties
- 启动kafka
tiger@tigerdeMacBook-Pro kafka % kafka-server-start server.properties
- 创建话题
让我们创建一个名为“test”主题,只有一个分区,只有一个副本:
tiger@tigerdeMacBook-Pro kafka % kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1
Created topic test1.
- 查看话题
tiger@tigerdeMacBook-Pro kafka % kafka-topics --list --zookeeper localhost:2181
__consumer_offsets
discovery-funding-test
funture_test
howtousekafka
howtousekafka2
my-topic
test
test1
web_log
- 查看单节点代理的信息
tiger@tigerdeMacBook-Pro test1-0 % kafka-topics --describe --zookeeper localhost:2181 --topic test1
Topic: test1 TopicId: Je3z0ScdROyYRcpFMd3puA PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: test1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic:主题名称
TopicId:主题ID
PartitionCount:分区个数
ReplicationFactor:副本个数
Partition:当前节点分区
Leader:当前leader
Replicas:副本个数
Isr:分区的ISR动态选举队列
- 发送消息
tiger@tigerdeMacBook-Pro ~ % kafka-console-producer --broker-list localhost:9092 --topic test1
>this is T
>this is G
>this is H
>this is P
>
- 接收消息(从头开始消费)
tiger@bogon ~ % kafka-console-consumer --bootstrap-server localhost:9092 --topic test1 --from-beginning
this is T
this is G
this is H
this is P
安装部署kafka
系统:mac10.15.7 (19H2)
brew install kafka
brew install zookeeper
修改 /usr/local/etc/kafka/server.properties, 找到 listeners=PLAINTEXT://:9092 那一行,把注释取消掉。
然后修改为:
listeners=PLAINTEXT://localhost:9092
以服务的方式启动
brew services start zookeeper
brew services start kafka
数据传输的定义(即事务):
数据传输的定义有以下三种级别:
最多一次(at most once):消息不会被重复发送,最多被传输一次,但也有可能一次不传输。更精确的来说就是发出数据就可以,不关心broker的写入状态
最少一次(at least once):消息不会被漏发送,最少被传输一次,但也有可能被重复传输。
精确的一次(exactly once):不会漏传输也不会重复传输,每个消息都被传输一次而且仅仅被传输一次,这是大家期望的
站在producer分析三种语义
at most once意味着producer发送完一条消息后,不会确认消息是否成功。那么就存在丢失的可能。
at least once意味着producer发送完一条消息后,会议确认消息是否成功,如果producer没有收到broker的ack确认消息,那么将不断尝试重新发送。那就存在消息重复的可能性。
exactly once意味着producer的发送是幂等的。意味着消息无论发送多少遍,最终broker上的记录只有一天不重复的数据
producer at least once配置
kafka默认消息语义就是至少一次,意味着不用配置
{
'acks': 1,
}
可能会造成的问题:
producer发送数据给broker等待ack,partition的leader写入了数据回复了ack,
但是在isr动态队列中的follower同步数据的时候leader挂掉了,那么会造成数据丢失
producer at most once配置
{
'acks': 0
}
producer只管发送一次数据,不等待broker的ack状态回复。此时如果broker挂掉之后,就会造成数据丢失。
acks=0表示期望的broker的确认数。0:producer发完消息后不会等待任何broker确认;1表示会等待broker集群中的leader的确认写入消息;设置为all表示等待broker集群的leader和其所有的follower的确认写入消息
retires表示发送失败重新发送的次数。配置了retires后,如果没有将max_in_flight_requests_per_connection配置为1,有可能在造成乱序的结果。max_in_flight_requests_per_connection的配置代表着一个producer同时可以发送的未收到确认的消息数量。如果值大于1,那么可能发送了msg1后,在没有收到确认就发送了msg2,此时msg1失败后重新发送,而msg2发送成功,就造成了borker上消息的乱序。这个配置默认值为5
producer Exactly once配置
{
'acks': 'all'
}
producer发送数据的等待ack,isr动态队列中的follower同步leader的数据,
但是此时的isr队列只有partition-leader那么就会丢失数据;
producer发送数据等待ack,isr动态队列中的follower同步leader的数据,
但是在leader返回ack之前,leader挂了,那么producer会重新发送,导致数据重复
再让我们来看看kafka-python的producer的源码,我这里只分析配置:
DEFAULT_CONFIG = {
'bootstrap_servers': 'localhost', # kafka的serverip
'client_id': None, # 客户端版本号,默认值为kafka-python-producer-#(唯一数字)
'key_serializer': None, # 将发送的参数key序列化为bytes
'value_serializer': None, # 将发送的参数value序列化为bytes
'acks': 1, # 包含(0, 1, 'all')上面已经解释了
'bootstrap_topics_filter': set(), # 对topic去重
'compression_type': None, # 数据的压缩类型,支持'gzip', 'snappy', 'lz4', 'zstd' or None
'retries': 0, # 重试次数
'batch_size': 16384, # 发送的批量处理的大小
'linger_ms': 0,
'partitioner': DefaultPartitioner(),
'buffer_memory': 33554432,
'connections_max_idle_ms': 9 * 60 * 1000,
'max_block_ms': 60000,
'max_request_size': 1048576,
'metadata_max_age_ms': 300000,
'retry_backoff_ms': 100,
'request_timeout_ms': 30000,
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
'sock_chunk_bytes': 4096, # undocumented experimental option
'sock_chunk_buffer_count': 1000, # undocumented experimental option
'reconnect_backoff_ms': 50,
'reconnect_backoff_max_ms': 1000,
'max_in_flight_requests_per_connection': 5,
'security_protocol': 'PLAINTEXT',
'ssl_context': None,
'ssl_check_hostname': True,
'ssl_cafile': None,
'ssl_certfile': None,
'ssl_keyfile': None,
'ssl_crlfile': None,
'ssl_password': None,
'ssl_ciphers': None,
'api_version': None,
'api_version_auto_timeout_ms': 2000,
'metric_reporters': [],
'metrics_num_samples': 2,
'metrics_sample_window_ms': 30000,
'selector': selectors.DefaultSelector,
'sasl_mechanism': None,
'sasl_plain_username': None,
'sasl_plain_password': None,
'sasl_kerberos_service_name': 'kafka',
'sasl_kerberos_domain_name': None,
'sasl_oauth_token_provider': None
}
kafka如何实现幂等发送
kafka实现幂等的关键就是要实现broker的去重。为了实现消息发送的幂等性,kafka引入了两个概念:
-
pid。每个新的producer在初始化的时候会被分配一个唯一的PID,这个PID对于用户是不可见的。
-
Sequence Number。对于每个PID,该Producer发送数据的每个<Topic,Partition>都对应一个从0开始单调递增的Sequence Number。而borker端会对<PID,Topic,Partition>做缓存,当具有相同主键的消息提交的时,broker只会持久化一条。
但是如果PID发生变化,同时不同的partition也具有不同的主键,那么幂等性无法保证跨分区跨会话的exactly once
也就是说幂等性的成立需要保证 单次会话同一个分区
站在Consumer分析三种语义
consumer at least once
意味着consumer对一条消息可能多次消费。下面的情况:consumer先读取消息,然后处理这条消息,最后提交offset。在处理消息时成功后,consumer宕机了,此时offset还未提交,下一次读取消息时依旧是这条消息,那么处理消息的逻辑又将被执行一遍,就是at least once消费。
- 配置enable_auto_commit=false。禁止后台自动提交offset
- 手动调用consumer.commit_async()来提交offset。手动保证了offset即使更新。
通过手动提交offset,就可以实现at least once语义
consumer at most once
意味着consumer对一条消息最多消费一次,因此存在消息消费失败依旧提交offset的情况。考虑下面的情况:consumer首先读取消息。然后提交offset,最后处理这条消息。在处理消息时,consumer宕机了,此时offset已提交,下次读取消息时就是下一条消息,这就是at most once。
- 配置enable_auto_commit=True。后台定时提交offset。
- auto_commit_interval_ms配置一个很小的数值。auto_commit_interval_ms表示后台提交offset的时间间隔
通过自动提交offset,并且将定时提交时间间隔设置的很小,就可以实现consumer at most once语义
exactly once
exactly once意味着消息的消费处理逻辑和offset的提交是原子性的,即消息消费成功后offset改变,消息消费失败offset也能回滚
- isolation.level=read_committed。此参数表示何种类型的message对consumer可见。
一个常见的exactly once的使用场景是:但我们订阅了一个topic,然后往另一个topic里写数据时,我们希望连个操作是原子性的,即如果写入消息失败了,那么我们希望读取消息的offset可以回滚。
这个特性可以通过kafka的transaction特性实现。kafka是在0.11版本之后开始提供事务特性的。我们可以将consumer读取数据和producer写入数据放在同一个事务中,在事务没有成功结束前,所有这个事务包含的消息都被标记为uncommitted。只有事务成功后,所有的消息才会被标记为committed。offset消息是以消息的方式存储在broker的__consumer_offsets topic中的。因此在事务开始后,consumer读取消息后,所有的offset消息都是uncommitted状态,所有的producer写入的消息也都是uncommitted状态。
consumer通过配置isolation.level来决定uncommitted状态的message是否对consumer可见。
isolation.level有两个可选值:read_committed–>表示所有事务未提交的数据都对consumer不可见,read_uncommitted相反
再来看看kafka的消息存储
server.properties配置文件
这里文件夹下存储这每个主题,例如我这设置的topic就是howtousekafka-0,里面的东西包括消息文件和索引文件。
其中的index文件记录offset的位置和消息的消息的消息的长度。log文件记录消息。
kafka之生产者
分区的原则
1、指明partition的情况下,直接将指明的值作为partition
2、没有指明partition但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值
3、即没有partition值又没有key值的情况下,第一次使用时随机生成一个整数后(后面每次调用在这个整数上自增),将这个值与topic可用的partition总数取余得到partition值,也就是常说的round-robin算法。
producer事务
为了实现跨分区跨会话的事务,需要引入一个全剧唯一的Transaction ID(TID),
并将Producer获得的PID和TID绑定。这一当producer重启后就可以通过正在进行的TID获得原来的PID。
为了管理Transaction,kafka引入了一个新的组件Transaction Coordinator。producer就是通过
和Transaction Coordinator交互获得TID对应的任务状态。Transaction Coordinate还将负责将所有
事务写入卡夫卡的一个内部Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到
恢复,从而继续进行。
kafka之消费者
分区策略
kafka的分区策略的核心实现有两种,一种是range范围策略,一种是roundRobin轮询策略。
- range策略,分区按照顺序平铺,消费者按照顺序平铺:分区数量除以消费者数量,这里是分区数量8除以消费者数量3取整等于2(N),分区数量对消费者数量出3取余等于2(M)。kafka的range算法是前M个消费者能得到N+1个分区,剩余的消费者分配N个分区。
假设现在有p0-p7 8个分区, 消费者数量有3个,上图解释
那么N = pcount /ccount = 8/3 =2
那么M = pcount/ccount = 8%3=2
最后的分区结果就是前2个消费者得到2+1个分区,剩余的消费者分配到2个分区,如下图
- roundRobin轮询策略
一样按照上面的8个分区和3个消费者。构建消费者环顺序平铺,结果如下图
c0p0,c1p1,c2p2,c0p3,c1p4,c2p5,c0p6,c1p7
kafka的消息推送模式
kafka消息推送模式是pull模式。即producer将消息推送到集群broker,consumer从broker里pull消息,进行消费处理。
push模式的问题,kafka将消息推送至消费者,如果消费者处理因为性能问题会导致消费者崩溃。即无法适应于所有消费质量的消费者。消息系统都致力于让 consumer 以最大的速率最快速的消费消息;而pull模式完全取决于消费者的消费速度,从broker拉取消息消费。
著名的 零拷贝
DMA技术
没有DMA技术之前,I/O过程是这样的
- cpu发出响应的指令给磁盘控制器,然后返回;
- 磁盘控制器收到指令后,于是就开始准备数据,会把数据放入到磁盘控制器的内部缓冲区,然后产生一个 中断;
- cpu收到中断信号后,停下手头的工作,接着把磁盘控制器的缓冲区的数据一次一个字节的读进自己的寄存器,然后再把寄存器里的数据写入到内存,而在数据传输的期间cpu是无法执行其他任务的
可以看到,整个数据的传输过程,都需要cpu亲自参与搬运,而且这个过程cpu无法去做其他事情。那么当我们遇到大数据传输时如千兆网卡或者硬盘,那么cpu可能就忙碌不过来。相信你也碰到过这情况,当使用硬盘拷贝数据时,电脑会比之前卡顿
再后来就出现了DMA技术,全称:Direct Memory Access,中文 直接内存访问,==简单理解就是将 IO设备和内存的数据传输时搬运数据全部交由DMA搬运而不是CPU。
那么网络传输文件到底经历了多少次IO呢?
首先,期间发生了4次用户态与内核态的上下文切换,因为了发生了两次系统调用read(),write(),每次调用都会先从用户态切换为内核态,完胜任务后从内核态切换为用户态,虽然上下文切换仅需单位微秒级,但是高并发情况下,就容易被放大n倍。
那么如何提高IO性能呢?
linux 内核版本2.1中,提供了一个专门发送文件的系统调用函数sendfile();他会直接把内核缓冲区里的数据拷贝到socket缓冲区,不在拷贝到用户态,这样就只有两次上下文切换和三次数据拷贝
如果网卡支持SG-DMA(The Scatter-Gather Direct Memory Access)技术,我们可以进一步减少通过cpu吧内核缓冲区里的数据copy到socket缓冲区的过程
[root@tototo ~]# ethtool -k eth0 | grep scatter-gather
scatter-gather: on
tx-scatter-gather: on
tx-scatter-gather-fraglist: off [fixed]
从Linux内核2.4版本开始,对于网卡支持SG-DMA技术情况下,sendfile()发生了变化
- 1、通过DMA将磁盘上的数据拷贝到内核缓冲区
- 2、缓冲区描述符和数据长度传输到socket缓冲区,这样网卡的SG-DMA控制器就可以直接将内核缓存中的数据拷贝到网卡的缓冲区里,此过程不需要将数据从操作系统内核缓冲区拷贝到socket缓冲区,这样就减少了一次数据拷贝。
这就是所谓的零拷贝技术,因为我们没有从内存层面拷贝数据,也就是说去全程没有cpu来搬运数据,所有的数据都是DMA来搬运的;零拷贝技术减少了两次上下文切换和两次数据拷贝次数
使用python操作kafka
pip install kafka-python
保证zookeeper和kafka启动的情况下:
最简单的生产者:
import json
import time
import datetime
from kafka import KafkaProducer
from config import config
producer = KafkaProducer(bootstrap_servers=config.SERVER,
value_serializer=lambda m: json.dumps(m).encode())
for i in range(100):
data = {'num': i, 'ts': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
producer.send(config.TOPIC, data)
time.sleep(1)
最简单的消费者:
from config import config
from kafka import KafkaConsumer
consumer = KafkaConsumer(config.TOPIC,
bootstrap_servers=config.SERVER,
group_id='test',
auto_offset_reset='earliest')
for msg in consumer:
print(msg.value)
如果您觉得文章对您有所帮助,可以请囊中羞涩的博主吃个鸡腿饭,万分感谢。愿每一个来到这里的人生活幸福美满。
微信赞赏
支付宝赞赏
更多推荐
所有评论(0)