topic = self.config_params['kafka']['topic_order']
server = self.config_params['kafka']['server']
group_id = self.config_params['kafka']['group_id']
consumer = KafkaConsumer(
    client_id='test',
    group_id=group_id,
    bootstrap_servers=server,
    enable_auto_commit=True,
    fetch_max_wait_ms=100,
    max_poll_records=10000,
    api_version=(0, 10)
)
partition_num = 2
partition = list(consumer.partitions_for_topic(topic=topic))[partition_num]
topic_partition = TopicPartition(topic=topic, partition=partition)
consumer.assign([topic_partition])
end_offset = consumer.end_offsets([topic_partition])[topic_partition]
cur_offset = consumer.position(topic_partition)
consumer.seek(topic_partition, end_offset)
cur_offset = consumer.position(topic_partition)
try:
    while True:
    	msg = consumer.poll()
    	time.sleep(0.05)
    	if msg:
        	for i in msg.values():
            	for k in i:
                	json_data = json.loads(k.value)
except Exception as e:
    print(e)
Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐