kafka-python的安装、基本使用与zookeeper启动等请参考:使用python连接kafka

自定义consumer读取的offset写法

注意在kafka-python中使用消费者自定义offset的读取顺序时,消费者的写法:

from kafka import KafkaConsumer
from kafka.structs import TopicPartition

consumer = KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0, 10, 2))
tp = TopicPartition("python_test", 0)  # 参数是[topic名称,partition]
consumer.assign([tp])  # 这里是声明我要手动管理这个consumer的这个partition啦

读取数据的时候使用:

consumer.seek(tp, 3) # 这里可以手动设置偏移量,比如设置从第3个数开始读
consumer.seek(tp, 50) # 从第50个数开始读取 
next(consumer)

ATTENTION:初始化consumer时,不能够使用这种一次性把topic什么的都传入进来的形式:

# 不能这么初始化 consumer
consumer = KafkaConsumer(self.topic, bootstrap_servers='localhost:9092', auto_offset_reset='latest', api_version=(0, 10, 2))

示例代码

1. 启动生产者,创建数据

首先启动生产者:

from kafka import KafkaProducer
import datetime
import json

# 启动生产者
producer = KafkaProducer(bootstrap_servers='localhost:9092', api_version=(0, 10, 2))
my_topic = "python_test"

for i in range(100):
    data = {'num': i, 'data': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
    producer.send(my_topic, json.dumps(data).encode('utf-8')).get(timeout=30)

这里启动一个python_test的topic,然后往里面写入100个数据

2. 消费者自定义offset读取

from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import random

consumer = KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0, 10, 2))
tp = TopicPartition("python_test", 0)  # 参数是[topic名称,partition]
consumer.assign([tp])  # 这里是声明我要手动管理这个consumer的这个partition啦
for i in range(10):
    random_seek = random.randint(0, 100)  # 这里是随机生成从0-100的随机整数,用于设置偏移量
    consumer.seek(tp, random_seek)  # 这里是设置偏移量
    consumer_data = next(consumer)  # 这个读取consumer的内容,注意:使用next后,偏移量自动+1
    print(consumer_data)

打印结果:

ConsumerRecord(topic='python_test', partition=0, offset=66, timestamp=1646473931338, timestamp_type=0, key=None, value=b'{"num": 61, "data": "2022-03-05 17:52:11"}', headers=[], checksum=1900770981, serialized_key_size=-1, serialized_value_size=42, serialized_header_size=-1)
ConsumerRecord(topic='python_test', partition=0, offset=87, timestamp=1646473931355, timestamp_type=0, key=None, value=b'{"num": 82, "data": "2022-03-05 17:52:11"}', headers=[], checksum=3522714781, serialized_key_size=-1, serialized_value_size=42, serialized_header_size=-1)
ConsumerRecord(topic='python_test', partition=0, offset=71, timestamp=1646473931342, timestamp_type=0, key=None, value=b'{"num": 66, "data": "2022-03-05 17:52:11"}', headers=[], checksum=1871660722, serialized_key_size=-1, serialized_value_size=42, serialized_header_size=-1)
ConsumerRecord(topic='python_test', partition=0, offset=50, timestamp=1646473931318, timestamp_type=0, key=None, value=b'{"num": 45, "data": "2022-03-05 17:52:11"}', headers=[], checksum=4078097399, serialized_key_size=-1, serialized_value_size=42, serialized_header_size=-1)
ConsumerRecord(topic='python_test', partition=0, offset=11, timestamp=1646473931271, timestamp_type=0, key=None, value=b'{"num": 6, "data": "2022-03-05 17:52:11"}', headers=[], checksum=2130074065, serialized_key_size=-1, serialized_value_size=41, serialized_header_size=-1)
ConsumerRecord(topic='python_test', partition=0, offset=71, timestamp=1646473931342, timestamp_type=0, key=None, value=b'{"num": 66, "data": "2022-03-05 17:52:11"}', headers=[], checksum=1871660722, serialized_key_size=-1, serialized_value_size=42, serialized_header_size=-1)
ConsumerRecord(topic='python_test', partition=0, offset=83, timestamp=1646473931352, timestamp_type=0, key=None, value=b'{"num": 78, "data": "2022-03-05 17:52:11"}', headers=[], checksum=1428608549, serialized_key_size=-1, serialized_value_size=42, serialized_header_size=-1)
ConsumerRecord(topic='python_test', partition=0, offset=25, timestamp=1646473931285, timestamp_type=0, key=None, value=b'{"num": 20, "data": "2022-03-05 17:52:11"}', headers=[], checksum=1811806078, serialized_key_size=-1, serialized_value_size=42, serialized_header_size=-1)
ConsumerRecord(topic='python_test', partition=0, offset=36, timestamp=1646473931296, timestamp_type=0, key=None, value=b'{"num": 31, "data": "2022-03-05 17:52:11"}', headers=[], checksum=330696171, serialized_key_size=-1, serialized_value_size=42, serialized_header_size=-1)
ConsumerRecord(topic='python_test', partition=0, offset=18, timestamp=1646473931278, timestamp_type=0, key=None, value=b'{"num": 13, "data": "2022-03-05 17:52:11"}', headers=[], checksum=719857123, serialized_key_size=-1, serialized_value_size=42, serialized_header_size=-1)
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐