from kafka import KafkaProducer
from kafka.errors import kafka_errors
import traceback
import json

# 假设生产的消息为json字符串
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    key_serializer=lambda k: json.dumps(k).encode(),
    value_serializer=lambda v: json.dumps(v).encode())

data = {'msg': 'hello kafka!'}

future = producer.send(
    'topic_001',
    key='mytopic',  # 同一个key值,会被送至同一个分区
    value=str(data),
    partition=0)  # 向分区1发送消息
print("send {}".format(str(data)))
try:
    future.get(timeout=10)  # 监控是否发送成功
except kafka_errors:  # 发送失败抛出kafka_errors
    traceback.format_exc()

执行 .../TestProj36/kafkaTestProductMessage.py

send {'msg': 'hello kafka!'}

python3.6环境测试 

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐