本文实现简单的kafka生产的类

from kafka import KafkaProducer, KafkaClient
from kafka.errors import KafkaError
import json



'''
#kafaka生产者类
'''
class Producter(object):
    def __init__(self,host,port):
        self.producer = KafkaProducer(bootstrap_servers=['{}:{}'.format(host,port)])
        self.topic=None


    #配置topic
    def topicconfig(self,topic):
        self.topic=topic

    #发送字节数据
    def send_bytemsg(self,msg,partitioner=None,key=None):
        try:
            if isinstance(msg,str):
                self.producer=KafkaProducer(value_serializer=None)
                self.producer.send(self.topic, bytes(msg,encoding="utf-8"),partition=partitioner,key=key).add_callback(self.on_send_success).add_errback(self.on_send_error)
            else:
                raise Exception("输入的数据不是str!")
        except Exception as error:
            print(error)
    #打印成功发送的信息
    def on_send_success(self,record_metadata):
        print("topic:{} partition:{} offset:{}".format(record_metadata.topic, record_metadata.partition,
                                                       record_metadata.offset))
    #打印失败的信息
    def on_send_error(self,excp):
        print(excp)

    #指定分区发送数据
    def send_partitioner(self,type="str",msg=None,pid=None):
        try:
            if type=="str":
                self.send_bytemsg(msg,partitioner=pid)
            elif type=="json":
                self.sen_jsonmsg(msg,partitioner=pid)
            else:
                print("您输入的类型有误!")
        except Exception as  error:
            print("异常原因:{}".format(error))

    #配置是json的数据
    def __cjson__(self):
        self.producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii'))

    #发送json的数据
    def sen_jsonmsg(self,msg,partitioner=None,key=None):
        try:
            if isinstance(msg,dict):
                self.__cjson__()
                self.producer.send(self.topic,msg,key=key,partition=partitioner).add_callback(self.on_send_success).add_errback(self.on_send_error)
            else:
                raise Exception("发送的数据不是字典!")
        except Exception as  error:
            print(error)

    #返回topic已知的分区
    def partitions_for(self):
        return list(self.producer.partitions_for(self.topic))

    #进行提交
    def flush(self):
        self.producer.flush()
    def close(self):
        self.producer.close(timeout=2)
a=Producter("127.0.0.1",9092) 
a.topicconfig("test") #订阅topic
a.send_bytemsg("1245") #向topic的默认分区发字符串
a.sen_jsonmsg({"username":"admin"}) #向topic的默认分区发json
a.send_bytemsg("5555") #向topic的默认分区发字符串
a.send_partitioner(msg={"username":"admin"},type="json",pid=0)#向topic的0分区发json数据
a.flush() #进行数据提交,这里没搞明白,发字符串不需要提交就能发,json要提交才能发
a.close()

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐