python如何向kafka发送数据?
from kafka import KafkaProducerfrom kafka.errors import kafka_errorsimport tracebackimport json# 假设生产的消息为json字符串producer = KafkaProducer(bootstrap_servers=['localhost:9092'],key_serializer=lambda k: j
·
from kafka import KafkaProducer
from kafka.errors import kafka_errors
import traceback
import json
# 假设生产的消息为json字符串
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
key_serializer=lambda k: json.dumps(k).encode(),
value_serializer=lambda v: json.dumps(v).encode())
data = {'msg': 'hello kafka!'}
future = producer.send(
'topic_001',
key='mytopic', # 同一个key值,会被送至同一个分区
value=str(data),
partition=0) # 向分区1发送消息
print("send {}".format(str(data)))
try:
future.get(timeout=10) # 监控是否发送成功
except kafka_errors: # 发送失败抛出kafka_errors
traceback.format_exc()
执行 .../TestProj36/kafkaTestProductMessage.py
send {'msg': 'hello kafka!'}
python3.6环境测试
更多推荐
已为社区贡献8条内容
所有评论(0)