python 消费kafka数据
消费topic下所有数据。持续消费kafka中的数据。
·
安装kafka 库
pip install kafka-python
消费 topic 下所有数据
from kafka import KafkaConsumer, TopicPartition
# 创建消费者
consumer = KafkaConsumer(bootstrap_servers='host:port')
topic = 'test_topic'
# 获取topic的分区
partition = list(consumer.partitions_for_topic(topic=topic))[0]
print(partition)
topic_partition = TopicPartition(topic=topic, partition=partition)
# 指定消费的topic
consumer.assign([topic_partition])
# 获取 offset 的起始位置和结束位置
start_offset = consumer.beginning_offsets([topic_partition])[topic_partition]
end_offset = consumer.end_offsets([topic_partition])[topic_partition]
print(start_offset)
print(end_offset)
# 将 offset 重置到起始位置,也可以重置到任意位置 consumer.seek()
consumer.seek_to_beginning((topic_partition))
# end_offset 是对应没有数据的 offset 消费会被阻塞, 消费到end_offset - 1就已经获取全部数据了
for i in range(start_offset, end_offset):
msg = next(consumer)
print(msg)
print(msg.value)
持续消费kafka中的数据
# 没有数据的时候会被阻塞,直到有可以消费的数据
for msg in consumer:
print(msg)
print(msg.value)
更多推荐
已为社区贡献2条内容
所有评论(0)