kafka安装准备

  • jdk-8u131-linux-x64.tar.gz
  • kafka_2.13-3.2.0.tgz
  • apache-zookeeper-3.6.1-bin.tar.gz
  • kafka-eagle-bin-2.1.0.tar.gz
  • zktools.zip
  • canal.deployer-1.1.4.tar.gz

kafka单机安装

Apache Kafka 官方提供了两个客户端(生产者、消费者)性能测试脚本,具体使用参照链接:Kafka 性能测试脚本详解

更新jdk

CentOS7自带了一个openjdk,使用的时候用诸多问题,例如明明配置了Java环境变量但是不能使用,这个时候需要卸载重新安装。

查看已有openjdk版本

rpm -qa|grep jdk

卸载openjdk

remove后面的参数是上面得到的结果.noarch结尾的包

yum -y remove copy-jdk-configs-3.3-10.el7_5.noarch

下载jdk1.8

下载jdk-8u40-linux-x64.tar.gz,上传到/usr/local/java

链接:百度网盘 请输入提取码
提取码:2673

解压

tar -zxvf jdk-8u40-linux-x64.tar.gz -C /usr/local/java

配置环境变量

在/etc/profile文件的末尾加上

export JAVA_HOME=/usr/local/java/jdk1.8.0_40
export JRE_HOME=$JAVA_HOME/jre
export CLASSPATH=$JAVA_HOME/lib:$JRE_HOME/lib:$CLASSPATH
export PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$PATH

编译使之生效

source /etc/profile

验证

java -version

下载解压kafka

获取下载地址(点开具体版本): http://kafka.apache.org/downloads, 下载 Binary 二进制版本而不是源码

通过下列命令直接下载,或者直接将下载好的 kafka_2.13-3.2.0.tgz 压缩包放到 /usr/local/ 目录,然后解压安装

cd /usr/local
wget https://mirror.bit.edu.cn/apache/kafka/3.2.0/kafka_2.13-3.2.0.tgz
tar -xzvf kafka_2.13-3.2.0.tgz
cd kafka_2.13-3.2.0

启动zookeeper

kafka需要依赖ZK,kafka安装包中已经自带了一个ZK,也可以在其他机器单独安装运行zk,改成指定已运行的ZK。
如果改成指定的ZK需要修改修改 kafka 安装目录下的 config/server.properties 文件中的 zookeeper.connect 。这里使用自带的ZK。

如果需要单独安装zookeeper: https://blog.csdn.net/luciferlongxu/article/details/126187553

后台启动ZK:

nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties >> zookeeper.nohup &

如果启动zookeeper时报错nohup

[root@rmq101 kafka_2.13-3.2.0]# nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties >> zookeeper.nohup &
[1] 1579
[root@rmq101 kafka_2.13-3.2.0]# nohup: 忽略输入重定向错误到标准输出端

修改命令,在&前加上 2>&1

[root@rmq101 kafka_2.13-3.2.0]# nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties >> zookeeper.nohup 2>&1 &
[2] 1929

检查zookeeper是否启动成功:

ps -ef|grep zookeeper

启动kafka

修改相关配置

vim config/server.properties

Broker ID启动以后就不能改了

broker.id=1

取消注释,改成本机IP:

listeners=PLAINTEXT://192.168.1.101:9092

num.partitions后面增加2行。
发送到不存在topic自动创建。允许永久删除topic。(注意:生产环境正式运行必须设置为false)

num.partitions=1
auto.create.topics.enable=true
delete.topic.enable=true

后台启动kafka(kafka安装目录下):

nohup ./bin/kafka-server-start.sh ./config/server.properties & 

如果启动kafka时报错nohup

[root@rmq101 kafka_2.13-3.2.0]# nohup ./bin/kafka-server-start.sh ./config/server.properties 2>&1 &
[2] 2277
[root@rmq101 kafka_2.13-3.2.0]# nohup: 忽略输入并把输出追加到"nohup.out"

修改命令,在&前加上 >/dev/null 2>&1 

[root@rmq101 kafka_2.13-3.2.0]# nohup ./bin/kafka-server-start.sh ./config/server.properties >/dev/null 2>&1 &
[3] 3027

通过 ps -ef|grep kafka 命令可以查看kafka是否启动成功

日志在logs目录下

创建Topic

创建一个名为lucifer-topic的topic,只有一个副本,一个分区:

sh bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic lucifer-topic

如果kafka创建topic时报错

[root@rmq101 kafka_2.13-3.2.0]# sh bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic lucifer-topic --replication-factor 1 --partitions 1
Exception in thread "main" joptsimple.UnrecognizedOptionException: zookeeper is not a recognized option
	at joptsimple.OptionException.unrecognizedOption(OptionException.java:108)
	at joptsimple.OptionParser.handleLongOptionToken(OptionParser.java:510)
	at joptsimple.OptionParserState$2.handleArgument(OptionParserState.java:56)
	at joptsimple.OptionParser.parse(OptionParser.java:396)
	at kafka.admin.TopicCommand$TopicCommandOptions.<init>(TopicCommand.scala:567)
	at kafka.admin.TopicCommand$.main(TopicCommand.scala:47)
	at kafka.admin.TopicCommand.main(TopicCommand.scala)

在较新版本(2.2 及更高版本)的 Kafka 不再需要 ZooKeeper 连接字符串,即- -zookeeper localhost:2181。使用 Kafka Broker的 --bootstrap-server localhost:9092来替代。

[root@rmq101 kafka_2.13-3.2.0]# sh bin/kafka-topics.sh --create --topic lucifer-topic --replication-factor 1 --partitions 1 --bootstrap-server 192.168.1.101:9092
Created topic lucifer-topic.

修改为 --bootstrap-server 192.168.1.101:9092后执行成功,可以通过下面命令查看topic是否创建成功

[root@rmq101 kafka_2.13-3.2.0]# sh bin/kafka-topics.sh --bootstrap-server 192.168.1.101:9092 --describe --topic lucifer-topic
Topic: lucifer-topic	TopicId: awcXOAvsTUmWltdSsgPz2Q	PartitionCount: 1	ReplicationFactor: 1	Configs: segment.bytes=1073741824
	Topic: lucifer-topic	Partition: 0	Leader: 101	Replicas: 101	Isr: 101
[root@rmq101 kafka_2.13-3.2.0]# 

或者通过下面命令查看已经创建的 topic:

# 2.2 以前版本
sh bin/kafka-topics.sh -list -zookeeper localhost:2181
# 2.2以后版本
sh bin/kafka-topics.sh -list -bootstrap-server 192.168.1.101:9092

启动Producer

打开一个窗口,在kafka解压目录下:

sh bin/kafka-console-producer.sh --broker-list 192.168.1.101:9092 --topic lucifer-topic

启动Consumer

在一个新的远程窗口中:当生产者发送消息,消费者就能实时收到消息

sh bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.101:9092 --topic lucifer-topic --from-beginning

查看消息内容

生产者发送消息:

[root@rmq101 kafka_2.13-3.2.0]# sh bin/kafka-console-producer.sh --broker-list 192.168.1.101:9092 --topic lucifer-topic
>hello, Im ^H^H^H^H^H^H^H^H
>hello, I'm lucifer from producer client
>how are you
>are you ready
>go
>very good
>

消费者收到消息:

[root@rmq101 kafka_2.13-3.2.0]# sh bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.101:9092 --topic lucifer-topic --from-beginning
hello, Im         
hello, I'm lucifer from producer client
how are you
are you ready
go
very good

在 /tmp/kafka-logs/ 目录下查看内容:

/tmp/kafka-logs/lucifer-topic-0/00000000000000000000.log

[root@rmq101 lucifer-topic-0]# /usr/local/kafka_2.13-3.2.0/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/lucifer-topic-0/00000000000000000000.log --print-data-log
Dumping /tmp/kafka-logs/lucifer-topic-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: 0 lastSequence: 0 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 0 CreateTime: 1659453466769 size: 86 magic: 2 compresscodec: none crc: 842542502 isvalid: true
| offset: 0 CreateTime: 1659453466769 keySize: -1 valueSize: 18 sequence: 0 headerKeys: [] payload: hello, Im 
baseOffset: 1 lastOffset: 1 count: 1 baseSequence: 1 lastSequence: 1 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 86 CreateTime: 1659453478488 size: 107 magic: 2 compresscodec: none crc: 2883387325 isvalid: true
| offset: 1 CreateTime: 1659453478488 keySize: -1 valueSize: 39 sequence: 1 headerKeys: [] payload: hello, I'm lucifer from producer client
baseOffset: 2 lastOffset: 2 count: 1 baseSequence: 2 lastSequence: 2 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 193 CreateTime: 1659453503674 size: 79 magic: 2 compresscodec: none crc: 1027305610 isvalid: true
| offset: 2 CreateTime: 1659453503674 keySize: -1 valueSize: 11 sequence: 2 headerKeys: [] payload: how are you
baseOffset: 3 lastOffset: 3 count: 1 baseSequence: 3 lastSequence: 3 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 272 CreateTime: 1659453513330 size: 81 magic: 2 compresscodec: none crc: 3754693900 isvalid: true
| offset: 3 CreateTime: 1659453513330 keySize: -1 valueSize: 13 sequence: 3 headerKeys: [] payload: are you ready
baseOffset: 4 lastOffset: 4 count: 1 baseSequence: 4 lastSequence: 4 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 353 CreateTime: 1659453515317 size: 70 magic: 2 compresscodec: none crc: 3065572672 isvalid: true
| offset: 4 CreateTime: 1659453515317 keySize: -1 valueSize: 2 sequence: 4 headerKeys: [] payload: go
baseOffset: 5 lastOffset: 5 count: 1 baseSequence: 5 lastSequence: 5 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 423 CreateTime: 1659453522193 size: 77 magic: 2 compresscodec: none crc: 350799265 isvalid: true
| offset: 5 CreateTime: 1659453522193 keySize: -1 valueSize: 9 sequence: 5 headerKeys: [] payload: very good
baseOffset: 6 lastOffset: 6 count: 1 baseSequence: 6 lastSequence: 6 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 500 CreateTime: 1659453880343 size: 68 magic: 2 compresscodec: none crc: 3512345729 isvalid: true
| offset: 6 CreateTime: 1659453880343 keySize: -1 valueSize: 0 sequence: 6 headerKeys: [] payload: 

/tmp/kafka-logs/lucifer-topic-0/00000000000000000000.index

[root@rmq101 lucifer-topic-0]# /usr/local/kafka_2.13-3.2.0/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/lucifer-topic-0/00000000000000000000.index --print-data-log
Dumping /tmp/kafka-logs/lucifer-topic-0/00000000000000000000.index
offset: 0 position: 0

/tmp/kafka-logs/lucifer-topic-0/00000000000000000000.timeindex

[root@rmq101 lucifer-topic-0]# /usr/local/kafka_2.13-3.2.0/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/lucifer-topic-0/00000000000000000000.timeindex --print-data-log
Dumping /tmp/kafka-logs/lucifer-topic-0/00000000000000000000.timeindex
timestamp: 0 offset: 0
Found timestamp mismatch in :/tmp/kafka-logs/lucifer-topic-0/00000000000000000000.timeindex
  Index timestamp: 0, log timestamp: 1659453466769


删除kafka全部数据步骤:
1、停止每台机器上的kafka;
2、删除kafka存储目录(server.properties文件log.dirs配置,默认为“/tmp/kafka-logs”)全部topic的数据目录;
3、删除zookeeper上与kafka相关的znode节点;除了/zookeeper
4、重启kafka。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐