安装kafka 库

pip install kafka-python

消费 topic 下所有数据

from kafka import KafkaConsumer, TopicPartition


# 创建消费者
consumer = KafkaConsumer(bootstrap_servers='host:port')
topic = 'test_topic'
# 获取topic的分区
partition = list(consumer.partitions_for_topic(topic=topic))[0]
print(partition)
topic_partition = TopicPartition(topic=topic, partition=partition)
# 指定消费的topic
consumer.assign([topic_partition])
# 获取 offset 的起始位置和结束位置
start_offset = consumer.beginning_offsets([topic_partition])[topic_partition]
end_offset = consumer.end_offsets([topic_partition])[topic_partition]
print(start_offset)
print(end_offset)
# 将 offset 重置到起始位置,也可以重置到任意位置 consumer.seek()
consumer.seek_to_beginning((topic_partition))
# end_offset 是对应没有数据的 offset 消费会被阻塞, 消费到end_offset - 1就已经获取全部数据了
for i in range(start_offset, end_offset):
    msg = next(consumer)
    print(msg)
    print(msg.value)

持续消费kafka中的数据

# 没有数据的时候会被阻塞,直到有可以消费的数据
for msg in consumer:
    print(msg)
    print(msg.value)

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐