安装openjdk-8-jdk

sudo apt-get install openjdk-8-jdk

查看java版本,看看是否安装成功

wzj@wzj-ThinkPad-T61:~$ java -version
java version "1.8.0_171"
Java(TM) SE Runtime Environment (build 1.8.0_171-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.171-b11, mixed mode)

kafka下载

wget https://dlcdn.apache.org/kafka/3.1.0/kafka_2.12-3.1.0.tgz

解压Kafka

tar -xzf kafka_2.12-3.1.0.tgz

配置清除策略

root@LAPTOP-0G7ACGOA:~/kafka/kafka_2.12-3.1.0/config# vim server.properties


log.retention.hours=48 #数据最多保存48小时

log.retention.bytes=1073741824 #数据最多1G

启动zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties &
(使用 bin/zookeeper-server-start.sh -daemon config/zookeeper.properties 以守护进程启动)

启动kafka

bin/kafka-server-start.sh config/server.properties &

启动zookeeper报ZooKeeper audit is disabled错

修改zkServer.cmd 添加"-Dzookeeper.audit.enable=true"

创建主题topic

bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test --partitions 2 --replication-factor 1

用下面指令创建

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

报错“Exception in thread "main" joptsimple.UnrecognizedOptionException: zookeeper is not a recognized option”

在较新版本(2.2 及更高版本)的 Kafka 不再需要 ZooKeeper 连接字符串,即- -zookeeper localhost:2181。使用 Kafka Broker的 --bootstrap-server localhost:9092来替代- -zookeeper localhost:2181。

发送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

接收消息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

再次启动报错

[2022-03-16 16:18:48,212] ERROR Error while loading log dir /tmp/kafka-logs (kafka.server.LogDirFailureChannel)
java.io.IOException: Invalid argument
        at java.io.RandomAccessFile.setLength(Native Method)
        at kafka.log.AbstractIndex.$anonfun$resize$1(AbstractIndex.scala:189)
        at kafka.log.AbstractIndex.resize(AbstractIndex.scala:175)
        at kafka.log.AbstractIndex.$anonfun$trimToValidSize$1(AbstractIndex.scala:241)
        at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:241)
        at kafka.log.LogSegment.recover(LogSegment.scala:378)
        at kafka.log.LogLoader$.recoverSegment(LogLoader.scala:375)
        at kafka.log.LogLoader$.recoverLog(LogLoader.scala:426)
        at kafka.log.LogLoader$.$anonfun$load$11(LogLoader.scala:168)
        at kafka.log.LogLoader$.load(LogLoader.scala:280)
        at kafka.log.UnifiedLog$.apply(UnifiedLog.scala:1785)
        at kafka.log.LogManager.loadLog(LogManager.scala:282)
        at kafka.log.LogManager.$anonfun$loadLogs$13(LogManager.scala:368)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

linux会定时清理/tmp目录下的文件,我的kafkari日志文件目录正是放在了/tmp/kafka-logs目录下,导致被定时给清理掉了,所以kafka在尝试读取或追加日志时就会出错。

Kafka最新指令

启动命令:
bin/kafka-server-start.sh -daemon config/server.properties

创建topic
./kafka-topics.sh --create --bootstrap-server spark01:9092 --replication-factor 1 --partitions 1 --topic test2

查看topic
./kafka-topics.sh --bootstrap-server spark01:9092 --list

向指定topic中生产数据
./kafka-console-producer.sh --broker-list spark01:9092 --topic test2
例如:{"id":"1","name":"xiaoming","age":"20"}

查看topic具体内容
./kafka-console-consumer.sh --bootstrap-server spark01:9092 --topic test2 --from-beginning

创建消费者组
./kafka-console-consumer.sh --bootstrap-server spark01:9092 --topic test2 --group kafkatest

查看消费者组
./kafka-consumer-groups.sh --bootstrap-server spark01:9092 --list

查看消费者详情
./kafka-consumer-groups.sh --bootstrap-server spark01:9092 --describe  --group kafkatest

消费数据
./kafka-console-consumer.sh --bootstrap-server spark01:9092 --topic test2 --from-beginning

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐