Kafka安装与部署
大数据相关知识点1. Kafka介绍Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据,具有高性能、持久化、多副本备份、横向扩展能力……分布式系统,易于向外扩展;同时为发布和订阅提供高吞吐量;支持多订阅者,当失败时能自动平衡消费者;将消息持久化到磁盘,可用于批量消费;[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-hBSM
大数据相关知识点
1. Kafka介绍
Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据,具有高性能、持久化、多副本备份、横向扩展能力……
- 分布式系统,易于向外扩展;
- 同时为发布和订阅提供高吞吐量;
- 支持多订阅者,当失败时能自动平衡消费者;
- 将消息持久化到磁盘,可用于批量消费;
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-hBSMojUf-1645255556442)(C:\Users\luffy\Desktop\luffy\note\大数据申请书知识点.assets\image-20210829191209280.png)]
Producer:Producer即生产者,消息的产生者,是消息的入口。
kafka cluster:
Broker:Broker是kafka实例,每个服务器上有一个或多个kafka的实例,我们姑且认为每个broker对应一台服务器。每个kafka集群内的broker都有一个不重复的编号,如图中的broker-0、broker-1等……
Topic:消息的主题,可以理解为消息的分类,kafka的数据就保存在topic。在每个broker上都可以创建多个topic。
Partition:Topic的分区,每个topic可以有多个分区,分区的作用是做负载,提高kafka的吞吐量。同一个topic在不同的分区的数据是不重复的,partition的表现形式就是一个一个的文件夹!
Replication:每一个分区都有多个副本,副本的作用是做备胎。当主分区(Leader)故障的时候会选择一个备胎(Follower)上位,成为Leader。在kafka中默认副本的最大数量是10个,且副本的数量不能大于Broker的数量,follower和leader绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本(包括自己)。
Message:每一条发送的消息主体。
Consumer:消费者,即消息的消费方,是消息的出口。
Consumer Group:我们可以将多个消费组组成一个消费者组,在kafka的设计中同一个分区的数据只能被消费者组中的某一个消费者消费。同一个消费者组的消费者可以消费同一个topic的不同分区的数据,这也是为了提高kafka的吞吐量!
Zookeeper:kafka集群依赖zookeeper来保存集群的的元信息,来保证系统的可用性
2. 基于Ubuntu18.04下Kafka的安装和部署
2.1 安装Java
1.安装openjdk-8-jdk
sudo apt-get update
sudo apt-get install openjdk-8-jdk
2.查看java版本
java -version
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DhEcyUfk-1645255556443)(C:\Users\luffy\Desktop\luffy\note\大数据申请书知识点.assets\image-20210829193511128.png)]
2.2 安装Zookeeper
1.从 https://zookeeper.apache.org/releases.html 下载ZooKeeper目前最新的稳定版本,当前使用3.6.3版本
2.解压apache-zookeeper-3.6.3
tar -xzvf apache-zookeeper-3.6.3.tar.gz
3.要将zookeeper运行起来,需要将样例配置zoo_sample.cfg重命名为zoo.cfg,打开可以看到一些默认配置
cd apache-zookeeper-3.6.3/conf/
mv zoo_sample.cfg zoo.cfg
cat zoo.cfg
- tickTime :
时长单位为毫秒,为zk使用的基本时间度量单位。例如,1 * tickTime是客户端与zk服务端的心跳时间,2 * tickTime是客户端会话的超时时间。
tickTime的默认值为2000毫秒,更低的tickTime值可以更快地发现超时问题,但也会导致更高的网络流量(心跳消息)和更高的CPU使用率(会话的跟踪处理)。 - clientPort :
zk服务进程监听的TCP端口,默认情况下,服务端会监听2181端口。 - dataDir :
无默认配置,必须配置,用于配置存储快照文件的目录。如果没有配置dataLogDir,那么事务日志也会存储在此目录。 - server:zookeeper服务同讯配置
4.修改zoo.cfg配置
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/bigdata/zookeeper/zookeeperData
dataDir=/bigdata/zookeeper/zookeeperLog
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
## Metrics Providers
#
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true
server.1=localhost:2888:3888
重点:
dataDir=/bigdata/zookeeper/zookeeperData
dataDir=/bigdata/zookeeper/zookeeperLogserver.1=localhost:2888:3888
2.3 安装Kafka
1.下载地址:https://kafka.apache.org/downloads,ubuntu下可以直接使用wget下载
wget https://artfiles.org/apache.org/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3.tar.gz
2.解压apache-zookeeper-3.6.3.tar.gz
tar -zxvf apache-zookeeper-3.6.3.tar.gz
3.在自己的kafka目录下创建日志目录
cd kafka/
mkdir logs-1
4.进入解压的kafka目录,修改kafka-server的配置文件
vim config/server.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://localhost:9092
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://localhost:9092
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/bigdata/kafka/logs-1
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
重点:
listeners=PLAINTEXT://localhost:9092
log.dirs=/bigdata/kafka/logs-1
5.启动zookeeper
./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
查看是否启动成功
ps -ef | grep zoo
6.启动Kafka服务,使用kafka-server-start.sh,启动kafka服务
./bin/kafka-server-start.sh config/server.properties
7.创建topic,使用 kafka-topics.sh 创建单分区单副本的 topic test
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看topic列表
./bin/kafka-topics.sh --list --zookeeper localhost:2181
8.创建消息产生者,产生消息
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
9.创建消息消费者,消费消息
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
10.测试在生产消息的窗口输入内容,在消费窗口就能打印出来
11.查看topic消息
./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
3. 基于Netty到Kafka的GPS数据采集系统架构介绍
3.1 GPS数据采集
GPS设备每隔一定时间发送数据给采集端,采集端通过netty接收并解析数据后发送给kafka,服务端通过topic批量消费kafka数据,计算分析数据后分发到数据库。
3.1.1 Netty的Reactor线程模型
目前高性能网络通信服务大多是基于 epoll 机制和多线程模型组合的实现。而 Netty 可依据用户自定义的程序启动参数调整其运行期间的线程模型。etty 官方推荐使用主从 Reactor 多线程模型。其主要特点是拥有多个线程池,其中主线程池是处理新的客户端连接,处理完新连接后将新建的Socket 绑定到从线程池中的某个线程中;从线程池将负责后续对这个 Socket 的读写、编解码、业务处理工作。设计主从 Reactor 多线程模型的目的是将监听端口服务与处理数据功能剥离开来,从而提高处理数据的能力。在实际应用中,Netty 支持添加多个从线程池,可按照业务特性将不同的业务分配到不同的从线程池处理,或若干个特性相似的业务分配到同一个从线程池。
3.1.2 Kafka流式消息处理系统
Kafka 的 commit log 队列是 Kafka 消息队列概念的具体实现。生产者向 commit log 队列中发送流式消息,其他消费者可以在毫秒级延时处理这些日志的最新信息。每个数据消费者在 commit log 中有一个自己的指针,并独立移动,从而促使消费者们在分布式环境下能可靠、顺序的处理队列中的消息。commit log 可以被多个生产者和消费者所共享,并覆盖集群中的多台机器,为集群中机器提供容错保障。Kafka 作为一个现代的分布式系统还可以便捷地水平扩张和缩小。此外,Kafka 的消息代理(broker)能支持 TB 级消息的持久化。
3.2 系统架构设计
采集终端由GPS设备配合区域性的采集服务器组成。消息收集端暴露 IP 地址和端口,供采集终端连接,当有新的 TCP 连接或者新的消息发送时,都将触发消息收集端的网络通信处理程序。对于需要进一步处理的消息将由消息收集端通过异步方式推送到 Kafka 集群中。之后,再由不同的 Kafka consumer 进程按照不同的业务需求来处理被推送到 Kafka 集群中的消息。这些消息或被持久化到数据库,或进行其它实时计算。此外,Zookeeper 用来监测 Kafka 集群的运行状态,协调管理 Kafka 集群;同时 Zookeeper 还可预留作为协调管理收集端服务水平扩展业务的服务软件。
4. 交通大数据平台前端展示
4. Flink
更多推荐
所有评论(0)