概念

Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等

  • Kafka 是一个分布式流处理框架
  • 可以像一个消息中间件一样读写数据流(即,发布和订阅消息
  • Kafka 还拥有分布式流处理以及存储到磁盘的功能,所以比消息中间件更加强

特性

  • 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume操作。
  • 可扩展性:kafka集群支持热扩展
  • 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
  • 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
  • 高并发:支持数千个客户端同时读写

基本术语概念

  • Topic:一组消息数据的标记符;
  • Producer:生产者,用于生产数据,可将生产后的消息送入指定的Topic;
  • Consumer:消费者,获取数据,可消费指定的Topic;
  • Group:消费者组,同一个group可以有多个消费者,一条消息在一个group中,只会被一个消费者获取;
  • Partition:分区,为了保证kafka的吞吐量,一个Topic可以设置多个分区。同一分区只能被一个消费者订阅。

使用场景

  • 日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
  • 消息系统:解耦和生产者和消费者、缓存消息等。
  • 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
  • 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
  • 流式处理:比如spark streaming和storm。
  • 事件源

介绍

Kafka有两种主要的消息传递模式:点对点传递模式、发布-订阅模式【大部分的消息系统选用发布-订阅模式】

点对点传递模式

​ 在点对点消息系统中,消息持久化到一个队列中。此时,将有一个或多个消费者消费队列中的数据。但是一条消息只能被消费一次。当一个消费者消费了队列中的某条数据之后,该条数据则从消息队列中删除。该模式即使有多个消费者同时消费数据,也能保证数据处理的顺序。

这种架构描述示意图如下:

img

生产者发送一条消息到queue,只有一个消费者能收到

发布-订阅消息传递模式

​ 在发布-订阅消息系统中,消息被持久化到一个topic中。与点对点消息系统不同的是,消费者可以订阅一个或多个topic,消费者可以消费该topic中所有的数据,同一条数据可以被多个消费者消费,数据被消费后不会立马删除。在发布-订阅消息系统中,消息的生产者称为发布者,消费者称为订阅者。

该模式的示例图如下:

img发布者发送到topic的消息,只有订阅了topic的订阅者才会收到消息

示例

生产者

新建 kafka_network.py

class Kakfa_network:
    def __init__(self):
        self.is_connect = False
        self.logger = Logger('kafka_network.txt').logger
        self.config_filepath = settings.SYS_CONFIG_FILE_PATH
        self.set_config_values()

    def set_config_values(self):
        try:
            with open(self.config_filepath) as sys_config_obj:
                sys_config_obj_data = json.load(sys_config_obj)
                self.isDebug = sys_config_obj_data['isDebug']
                self.version = sys_config_obj_data['version']
                kafka_config = sys_config_obj_data['kafka']
                self.urls = list(kafka_config['urls'])
                self.bootstrap_servers = []
                for server in self.urls:
                    url_str = server['host'] + ':' + str(server['port'])
                    self.bootstrap_servers.append(url_str)
                self.weight_topic = kafka_config['weight_topic']
                self.kafka_uname = kafka_config['uname']
                self.kafka_upassword = kafka_config['upassword']
                self.communication_protocol = kafka_config['communication_protocol']
                self.security_protocol = kafka_config['security_protocol']
                self.sasl_mechanism = kafka_config['sasl_mechanism']
                self.logger.info("当前kafka主机地址和端口号:" + str(self.bootstrap_servers))
                self.logger.info("当前主程序使用的kafka topic为:" + self.weight_topic)
        except:
            self.logger.error('%s' % traceback.format_exc())
            self.logger.error('open %s fail, please check config' % self.config_filepath)
            time.sleep(10)
            quit()

    def init_kafka_producer(self):
        flag = False
        try:
            self.logger.info("********************* connect kafka  !************************")
            self.logger.info("当前kafka主机地址和端口号: " + str(self.bootstrap_servers))
            self.kafka_producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers,
                                               security_protocol=self.security_protocol,
                                               sasl_mechanism=self.sasl_mechanism,
                                               sasl_plain_username=self.kafka_uname,
                                               sasl_plain_password=self.kafka_upassword,
                                               max_request_size= 104857600)
            flag = True
        except KafkaError as e:
            self.kafka_producer = None
            self.logger.error("*********************connect kafka  data error !************************")
            self.logger.error(str(e))
            flag = False
        self.is_connect = flag
        return flag

    def sender_withlog(self, topic="", value="", collection_time=0):
        if collection_time == 0:
            collection_time = int(round(time.time() * 1000))
        time_str = time.strftime("%Y-%m-%d,%H:%M:%S", time.localtime(collection_time / 1000))
        if not self.is_connect:
            self.logger.info("*********************↑↑↑ %s sender withlog kafka 没有链接 !尝试重新连接************************" % time_str)
            return self.init_kafka_producer()
        try:
            self.logger.info("↑↑↑↑↑↑往kafka发送出去大小为{sz}byte({kb}kb)的数据!topic = {tp},时间id标识:{tm}".format(sz=sys.getsizeof(value),kb=sys.getsizeof(value)/1024,tp=topic, tm=time_str))
            self.kafka_producer.send(topic=topic, value=value, timestamp_ms=collection_time).add_callback(
                    self.send_success).add_errback(self.send_error)
            self.kafka_producer.flush()
            flag = True
        except KafkaError as e:
            self.continue_fail_counts += 1
            self.logger.error(
                "********************* %s sender withlog kafka send data error !************************" % time_str)
            self.logger.error(str(e))
            flag = False
        return flag


    def upload_data(self, device_id="", collection_time=0, base64_str="", hz=16000,
                   bits=16, size=0, duration=0):
        json_str = json.dumps(
            {
                'uuid': str(uuid.uuid1()),
                'ver': self.communication_protocol,
                'sysType': 3,
                'sysTag': 'AICP_SOUND',
                'collectTimestamp': collection_time,
                'deviceInfo': {
                    'carID': device_id,
                    'railType': 0
                },
                'locationInfo': {
                    'unitId': '',
                    'sty': ''
                },
                'segmentId': '',
                'inspectionInfo': '',
                'originData': {
                    'base64': base64_str,
                    'hz': hz,
                    'bits': bits,
                    'size': size,
                    'duration': duration
                },
            }
        )
        json_str = json_str.encode('utf-8')
        flag = self.sender_withlog(topic=self.weight_topic, value=json_str, collection_time=collection_time)
        if self.continue_fail_counts == 60:
            self.is_connect = False
        return flag

    def send_success(self, *args, **kwargs):
        self.continue_fail_counts = 0
        self.logger.info("↘↘↘send_success  kafka 发送成功,服务器反馈接收成功!")
        return self.continue_fail_counts

    def send_error(self, *args, **kwargs):
        self.logger.info("↘↘↘send_error  kafka 发送成功,但是服务器反馈接收失败!当前连续发送失败次数%d,失败原因:%s"%(self.continue_fail_counts,str(args)))
        self.continue_fail_counts += 1
        return self.continue_fail_counts

    def start_run(self):
        self.logger.info(u'Kafka开始运行')
        self.init_kafka_producer()
        # db = tools.Db()
        if not self.init_kafka_producer():
            self.logger.info(u"init Kafka fail!")
        else:
            self.logger.info(u"init Kafka success !  begin upload notice data ")
            while True:
                time.sleep(30)
                    res = self.upload_data(device_id=result_data[0][1], collection_time=0, base64_str="", hz=16000,
                                           bits=16, size=0, duration=0)
               


if __name__ == "__main__":
    upload_unread_data = Kakfa_network()
    logger = upload_unread_data.logger
    try:
        upload_unread_data.start_run()
    except:
        logger.error('%s' % traceback.format_exc())
消费者

新建consumer.py

from kafka import KafkaConsumer

consumer = KafkaConsumer('test_rhj', group_id='123456', bootstrap_servers=['10.43.35.25:4531'])
for msg in consumer:
    recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
    print recv
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐