python kafka 消费者实例-指定分区
kafka python 消费者客户端
·
topic = self.config_params['kafka']['topic_order']
server = self.config_params['kafka']['server']
group_id = self.config_params['kafka']['group_id']
consumer = KafkaConsumer(
client_id='test',
group_id=group_id,
bootstrap_servers=server,
enable_auto_commit=True,
fetch_max_wait_ms=100,
max_poll_records=10000,
api_version=(0, 10)
)
partition_num = 2
partition = list(consumer.partitions_for_topic(topic=topic))[partition_num]
topic_partition = TopicPartition(topic=topic, partition=partition)
consumer.assign([topic_partition])
end_offset = consumer.end_offsets([topic_partition])[topic_partition]
cur_offset = consumer.position(topic_partition)
consumer.seek(topic_partition, end_offset)
cur_offset = consumer.position(topic_partition)
try:
while True:
msg = consumer.poll()
time.sleep(0.05)
if msg:
for i in msg.values():
for k in i:
json_data = json.loads(k.value)
except Exception as e:
print(e)
更多推荐
已为社区贡献2条内容
所有评论(0)