python使用Kafka
概念Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等Kafka 是一个分布式流处理框架可以像一个消息中间件一样读写数据流(即,发布和订阅消息)Kafka 还拥有分布式流处理以及存储到磁盘的功能,所以比消息中间件更加强特性高吞吐量、低延迟:
概念
Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等
- Kafka 是一个分布式流处理框架
- 可以像一个消息中间件一样读写数据流(即,发布和订阅消息)
- Kafka 还拥有分布式流处理以及存储到磁盘的功能,所以比消息中间件更加强
特性
- 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume操作。
- 可扩展性:kafka集群支持热扩展
- 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
- 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
- 高并发:支持数千个客户端同时读写
基本术语概念
- Topic:一组消息数据的标记符;
- Producer:生产者,用于生产数据,可将生产后的消息送入指定的Topic;
- Consumer:消费者,获取数据,可消费指定的Topic;
- Group:消费者组,同一个group可以有多个消费者,一条消息在一个group中,只会被一个消费者获取;
- Partition:分区,为了保证kafka的吞吐量,一个Topic可以设置多个分区。同一分区只能被一个消费者订阅。
使用场景
- 日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
- 消息系统:解耦和生产者和消费者、缓存消息等。
- 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
- 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
- 流式处理:比如spark streaming和storm。
- 事件源
介绍
Kafka有两种主要的消息传递模式:点对点传递模式、发布-订阅模式【大部分的消息系统选用发布-订阅模式】
点对点传递模式
在点对点消息系统中,消息持久化到一个队列中。此时,将有一个或多个消费者消费队列中的数据。但是一条消息只能被消费一次。当一个消费者消费了队列中的某条数据之后,该条数据则从消息队列中删除。该模式即使有多个消费者同时消费数据,也能保证数据处理的顺序。
这种架构描述示意图如下:
生产者发送一条消息到queue,只有一个消费者能收到。
发布-订阅消息传递模式
在发布-订阅消息系统中,消息被持久化到一个topic中。与点对点消息系统不同的是,消费者可以订阅一个或多个topic,消费者可以消费该topic中所有的数据,同一条数据可以被多个消费者消费,数据被消费后不会立马删除。在发布-订阅消息系统中,消息的生产者称为发布者,消费者称为订阅者。
该模式的示例图如下:
发布者发送到topic的消息,只有订阅了topic的订阅者才会收到消息。
示例
生产者
新建 kafka_network.py
class Kakfa_network:
def __init__(self):
self.is_connect = False
self.logger = Logger('kafka_network.txt').logger
self.config_filepath = settings.SYS_CONFIG_FILE_PATH
self.set_config_values()
def set_config_values(self):
try:
with open(self.config_filepath) as sys_config_obj:
sys_config_obj_data = json.load(sys_config_obj)
self.isDebug = sys_config_obj_data['isDebug']
self.version = sys_config_obj_data['version']
kafka_config = sys_config_obj_data['kafka']
self.urls = list(kafka_config['urls'])
self.bootstrap_servers = []
for server in self.urls:
url_str = server['host'] + ':' + str(server['port'])
self.bootstrap_servers.append(url_str)
self.weight_topic = kafka_config['weight_topic']
self.kafka_uname = kafka_config['uname']
self.kafka_upassword = kafka_config['upassword']
self.communication_protocol = kafka_config['communication_protocol']
self.security_protocol = kafka_config['security_protocol']
self.sasl_mechanism = kafka_config['sasl_mechanism']
self.logger.info("当前kafka主机地址和端口号:" + str(self.bootstrap_servers))
self.logger.info("当前主程序使用的kafka topic为:" + self.weight_topic)
except:
self.logger.error('%s' % traceback.format_exc())
self.logger.error('open %s fail, please check config' % self.config_filepath)
time.sleep(10)
quit()
def init_kafka_producer(self):
flag = False
try:
self.logger.info("********************* connect kafka !************************")
self.logger.info("当前kafka主机地址和端口号: " + str(self.bootstrap_servers))
self.kafka_producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers,
security_protocol=self.security_protocol,
sasl_mechanism=self.sasl_mechanism,
sasl_plain_username=self.kafka_uname,
sasl_plain_password=self.kafka_upassword,
max_request_size= 104857600)
flag = True
except KafkaError as e:
self.kafka_producer = None
self.logger.error("*********************connect kafka data error !************************")
self.logger.error(str(e))
flag = False
self.is_connect = flag
return flag
def sender_withlog(self, topic="", value="", collection_time=0):
if collection_time == 0:
collection_time = int(round(time.time() * 1000))
time_str = time.strftime("%Y-%m-%d,%H:%M:%S", time.localtime(collection_time / 1000))
if not self.is_connect:
self.logger.info("*********************↑↑↑ %s sender withlog kafka 没有链接 !尝试重新连接************************" % time_str)
return self.init_kafka_producer()
try:
self.logger.info("↑↑↑↑↑↑往kafka发送出去大小为{sz}byte({kb}kb)的数据!topic = {tp},时间id标识:{tm}".format(sz=sys.getsizeof(value),kb=sys.getsizeof(value)/1024,tp=topic, tm=time_str))
self.kafka_producer.send(topic=topic, value=value, timestamp_ms=collection_time).add_callback(
self.send_success).add_errback(self.send_error)
self.kafka_producer.flush()
flag = True
except KafkaError as e:
self.continue_fail_counts += 1
self.logger.error(
"********************* %s sender withlog kafka send data error !************************" % time_str)
self.logger.error(str(e))
flag = False
return flag
def upload_data(self, device_id="", collection_time=0, base64_str="", hz=16000,
bits=16, size=0, duration=0):
json_str = json.dumps(
{
'uuid': str(uuid.uuid1()),
'ver': self.communication_protocol,
'sysType': 3,
'sysTag': 'AICP_SOUND',
'collectTimestamp': collection_time,
'deviceInfo': {
'carID': device_id,
'railType': 0
},
'locationInfo': {
'unitId': '',
'sty': ''
},
'segmentId': '',
'inspectionInfo': '',
'originData': {
'base64': base64_str,
'hz': hz,
'bits': bits,
'size': size,
'duration': duration
},
}
)
json_str = json_str.encode('utf-8')
flag = self.sender_withlog(topic=self.weight_topic, value=json_str, collection_time=collection_time)
if self.continue_fail_counts == 60:
self.is_connect = False
return flag
def send_success(self, *args, **kwargs):
self.continue_fail_counts = 0
self.logger.info("↘↘↘send_success kafka 发送成功,服务器反馈接收成功!")
return self.continue_fail_counts
def send_error(self, *args, **kwargs):
self.logger.info("↘↘↘send_error kafka 发送成功,但是服务器反馈接收失败!当前连续发送失败次数%d,失败原因:%s"%(self.continue_fail_counts,str(args)))
self.continue_fail_counts += 1
return self.continue_fail_counts
def start_run(self):
self.logger.info(u'Kafka开始运行')
self.init_kafka_producer()
# db = tools.Db()
if not self.init_kafka_producer():
self.logger.info(u"init Kafka fail!")
else:
self.logger.info(u"init Kafka success ! begin upload notice data ")
while True:
time.sleep(30)
res = self.upload_data(device_id=result_data[0][1], collection_time=0, base64_str="", hz=16000,
bits=16, size=0, duration=0)
if __name__ == "__main__":
upload_unread_data = Kakfa_network()
logger = upload_unread_data.logger
try:
upload_unread_data.start_run()
except:
logger.error('%s' % traceback.format_exc())
消费者
新建consumer.py
from kafka import KafkaConsumer
consumer = KafkaConsumer('test_rhj', group_id='123456', bootstrap_servers=['10.43.35.25:4531'])
for msg in consumer:
recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
print recv
更多推荐
所有评论(0)