python:kafka的生产者
python:实现简单的操作kafka操作生产者
·
本文实现简单的kafka生产的类
from kafka import KafkaProducer, KafkaClient
from kafka.errors import KafkaError
import json
'''
#kafaka生产者类
'''
class Producter(object):
def __init__(self,host,port):
self.producer = KafkaProducer(bootstrap_servers=['{}:{}'.format(host,port)])
self.topic=None
#配置topic
def topicconfig(self,topic):
self.topic=topic
#发送字节数据
def send_bytemsg(self,msg,partitioner=None,key=None):
try:
if isinstance(msg,str):
self.producer=KafkaProducer(value_serializer=None)
self.producer.send(self.topic, bytes(msg,encoding="utf-8"),partition=partitioner,key=key).add_callback(self.on_send_success).add_errback(self.on_send_error)
else:
raise Exception("输入的数据不是str!")
except Exception as error:
print(error)
#打印成功发送的信息
def on_send_success(self,record_metadata):
print("topic:{} partition:{} offset:{}".format(record_metadata.topic, record_metadata.partition,
record_metadata.offset))
#打印失败的信息
def on_send_error(self,excp):
print(excp)
#指定分区发送数据
def send_partitioner(self,type="str",msg=None,pid=None):
try:
if type=="str":
self.send_bytemsg(msg,partitioner=pid)
elif type=="json":
self.sen_jsonmsg(msg,partitioner=pid)
else:
print("您输入的类型有误!")
except Exception as error:
print("异常原因:{}".format(error))
#配置是json的数据
def __cjson__(self):
self.producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii'))
#发送json的数据
def sen_jsonmsg(self,msg,partitioner=None,key=None):
try:
if isinstance(msg,dict):
self.__cjson__()
self.producer.send(self.topic,msg,key=key,partition=partitioner).add_callback(self.on_send_success).add_errback(self.on_send_error)
else:
raise Exception("发送的数据不是字典!")
except Exception as error:
print(error)
#返回topic已知的分区
def partitions_for(self):
return list(self.producer.partitions_for(self.topic))
#进行提交
def flush(self):
self.producer.flush()
def close(self):
self.producer.close(timeout=2)
a=Producter("127.0.0.1",9092)
a.topicconfig("test") #订阅topic
a.send_bytemsg("1245") #向topic的默认分区发字符串
a.sen_jsonmsg({"username":"admin"}) #向topic的默认分区发json
a.send_bytemsg("5555") #向topic的默认分区发字符串
a.send_partitioner(msg={"username":"admin"},type="json",pid=0)#向topic的0分区发json数据
a.flush() #进行数据提交,这里没搞明白,发字符串不需要提交就能发,json要提交才能发
a.close()
更多推荐
已为社区贡献1条内容
所有评论(0)