Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目

01

kafka简介

kafka和我们之前说过的rabbitmq以及mq都是采用发布——订阅的模式来实现的。简单来说,一个消息系统负责将数据从一个应用传递到另外一个应用,应用只需关注于数据,无需关注数据在两个或多个应用间是如何传递的。分布式消息传递基于可靠的消息队列,在客户端应用和消息系统之间异步传递消息。

在发布-订阅消息系统中,消息被持久化到一个topic中。与点对点消息系统不同的是,消费者可以订阅一个或多个topic,消费者可以消费该topic中所有的数据,同一条数据可以被多个消费者消费,数据被消费后不会立马删除。在发布-订阅消息系统中,消息的生产者称为发布者,消费者称为订阅者。该模式的示例图如下:

kafka结构如下:

上图中一个topic配置了3个partition。Partition1有两个offset:0和1。Partition2有4个offset。Partition3有1个offset。副本的id和副本所在的机器的id恰好相同。

如果一个topic的副本数为3,那么Kafka将在集群中为每个partition创建3个相同的副本。集群中的每个broker存储一个或多个partition。多个producer和consumer可同时生产和消费数据。

下面对结构中的构成做一个简单介绍,因为这个模式在其他很多的生产消费模式中都有,所以也没有什么特别难理解的。

Partition:

topic中的数据分割为一个或多个partition。每个topic至少有一个partition。一个partition只能被同一组的一个consumer消费,当消费者数量多于partition的数量时,多余的消费者空闲。不同组的consumer可以消费同一个partition。但是一个consumer可以消费多个partition,每个partition中的数据使用多个segment文件存储。partition中的数据是有序的。如果topic有多个partition,消费数据时就不能保证数据顺序。在需要严格保证消息的消费顺序的场景下,需要将partition数设为1。

Broker:

Kafka 集群包含一个或多个服务器,服务器节点称为broker。

broker存储topic的数据。如果某topic有N个partition,集群有N个broker,那么每个broker存储该topic的一个partition。

如果某topic有N个partition,集群有(N+M)个broker,那么其中有N个broker存储该topic的一个partition,剩下的M个broker不存储该topic的partition数据。

如果某topic有N个partition,集群中broker数目少于N个,那么一个broker存储该topic的一个或多个partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致Kafka集群数据不均衡。

Consumer group:

每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。一个topic 可以配置几个partition,produce发送的消息分发到不同的partition中,consumer接受数据的时候是按照group来接受,kafka确保每个partition只能同一个group中的同一个consumer消费,如果想要重复消费,那么需要其他的组来消费。
新版kafka把这个offsert保存到了一个__consumer_offsert的topic下 这个__consumer_offsert 有50个分区,通过将group的id哈希值%50的值来确定要保存到那一个分区.  这样也是为了考虑到zookeeper不擅长大量读写的原因。
所以,如果要一个group用几个consumer来同时读取的话,需要多线程来读取,一个线程相当于一个consumer实例。当consumer的数量大于分区的数量的时候,有的consumer线程会读取不到数据。 
假设一个topic test 被groupA消费了,现在启动另外一个新的groupB来消费test,默认test-groupB的offset不是0,而是没有新建立,除非当test有数据的时候,groupB会收到该数据,该条数据也是第一条数据,groupB的offset也是刚初始化的offsert, 除非用显式的用–from-beginnging 来获取从0开始数据

02


zookeeper和kafka安装

1、zookeeper安装

wget https://archive.apache.org/dist/zookeeper/zookeeper-3.5.6/apache-zookeeper-3.5.6-bin.tar.gz
tar -zxvf apache-zookeeper-3.5.6-bin.tar.gz
cd apache-zookeeper-3.5.6-bin


启动zookeeper
cp zoo_sample.cfg zoo.cfg
bin/zkServer.sh start

 

启动zookeeper
cp zoo_sample.cfg zoo.cfg
bin/zkServer.sh start

2、安装kafka

wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.6.2/kafka_2.12-2.6.2.tgz
tar -zvxf kafka_2.12-2.6.2.tgz
cd kafka_2.12-2.6.2
cp config/zookeeper.properties config/zookeeper.properties.bak
#修改server.properties
vi config/server.properties
#启动单个broker
broker.id=1
listeners=PLAINTEXT://hostip:9092


#listeners是kafka真正bind的地址


#启动kafka
bin/kafka-server-start.sh config/server.properties


#验证zookeeper 是否注册了kafka
cd apache-zookeeper-3.5.6-bin
bin/zkCli.sh
[zk: localhost:2181(CONNECTED) 0]
#如果为空,重新启动kafka

3、简单测试

创建一个topic为testopic的主题
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
删除主题
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic testopic
查看主题
bin/kafka-topics.sh --list --zookeeper localhost:2181

往主题testopic发消息
bin/kafka-console-producer.sh --brokerist hostip:9092 --topic testopic

启动一个消费者,消费者会接收到消息
bin/kafka-console-consumer.sh --bootstrap-server hostip:9092 --topic testopic --from-beginning

03


PyKafka对Kafka的基本操作

首先安装pykafka

pip install pykafka

producer 往topic生产消息

from pykafka import KafkaClient
from pykafka.exceptions import SocketDisconnectedError, LeaderNotAvailable
client = KafkaClient(hosts='hostip:9092')
#注意kafka接收bytes类型,所以需要将字符串转换bytes
topic = client.topics['testopic'.encode()]
#这里设置异步发送消息
producer = topic.get_sync_producer()
try:
    for i in range(1,10):
        producer.produce(('this is iot inn test msg number={}'.format(i).encode()))


except (SocketDisconnectedError, LeaderNotAvailable) as e:
    producer = topic.get_producer()
    producer.stop()
    producer.start()
    for i in range(1,10):
      producer.produce(('this is iot inn test msg number'.encode()))

kafka同步生产者:这个生产者写一条消息的时候,它就立马发送到某个分区去。follower还需要从leader拉取消息到本地,follower再向leader发送确认,leader再向客户端发送确认。由于这一套流程之后,客户端才能得到确认,所以很慢。
kafka异步生产者:这个生产者写一条消息的时候,先是写到某个缓冲区,这个缓冲区里的数据还没写到broker集群里的某个分区的时候,它就返回到client去了。虽然效率快,但是不能保证消息一定被发送出去了。

consumer读取topic消息

from __future__ import division


import math
from itertools import islice


from pykafka import KafkaClient
from pykafka.common import OffsetType


client = KafkaClient(hosts='10.171.4.81:9092')
topic = client.topics['testopic'.encode()]


consumer = topic.get_simple_consumer(
    consumer_group='test',
    auto_offset_reset=OffsetType.LATEST,
    reset_offset_on_start=True)
print(consumer)
#接收最后10条消息
LAST_N_MESSAGES = 10
MAX_PARTITION_REWIND = int(math.ceil(LAST_N_MESSAGES / len(consumer._partitions)))


offsets = [(p, op.last_offset_consumed - MAX_PARTITION_REWIND)
           for p, op in consumer._partitions.items()]


offsets = [(p, (o if o > -1 else -2)) for p, o in offsets]


consumer.reset_offsets(offsets)


for message in islice(consumer, LAST_N_MESSAGES):
    print(message.offset, message.value)

查看brokers 

from pykafka import KafkaClient
client = KafkaClient(hosts="10.171.4.81:9092")
print (client.brokers)


for n in client.brokers:
    host = client.brokers[n].host
    port = client.brokers[n].port
    id = client.brokers[n].id
    print ("host=%s | port=%s | broker.id=%s " %(host,port,id))

查看所有的topics

from pykafka import KafkaClient
client = KafkaClient(hosts="10.171.4.81:9092")
for topic in client.topics:
    print (topic)

听说关注公众号的都是大牛

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐