python kafka获取最新的offset
主要使用:consumer.end_offsets()即可示例代码首先运行生产者:from kafka import KafkaProducerimport datetimeimport json# 启动生产者producer = KafkaProducer(bootstrap_servers='localhost:9092', api_version=(0, 10, 2))my_topic =
·
主要使用:consumer.end_offsets()
即可
示例代码
首先运行生产者:
from kafka import KafkaProducer
import datetime
import json
# 启动生产者
producer = KafkaProducer(bootstrap_servers='localhost:9092', api_version=(0, 10, 2))
my_topic = "python_test"
for i in range(100):
data = {'num': i, 'data': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
producer.send(my_topic, json.dumps(data).encode('utf-8')).get(timeout=30)
然后在消费者这里获取:
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
consumer = KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0, 10, 2))
tp = TopicPartition("python_test", 0) # 参数是[topic名称,partition]
consumer.assign([tp]) # 这里是声明我要手动管理这个consumer的这个partition啦
off_set_dict: dict = consumer.end_offsets([tp])
latest_offset = list(off_set_dict.values())[0] # 这是最新的offset
print(latest_offset)
更多推荐
所有评论(0)