由于kafka依赖zookeeper,因此需要使用 docker 同时安装zookeeperkafka

1、拉取安装镜像

# 1、下载zookeeper镜像
➜  ~ docker pull wurstmeister/zookeeper
# 2、下载kafka镜像
➜  ~ docker pull wurstmeister/kafka

2、启动服务

# 启动 zookeeper
## docker run -d --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime wurstmeister/zookeeper
➜  ~ docker run -d --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2  --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime wurstmeister/zookeeper
# 启动kafka镜像生成容器
## docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT={host-ip}:{zookeeper-port}/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://{host-ip}:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -v /etc/localtime:/etc/localtime wurstmeister/kafka
➜  ~ docker run -d --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2 --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=host.docker.internal:2181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://host.docker.internal:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -v /etc/localtime:/etc/localtime wurstmeister/kafka

参数说明:

-d:参数指定容器后台运行

–name kafka:参数指定容器别名

-e 参数可以设置 docker 容器内环境变量,每个变量的解释:

-e KAFKA_BROKER_ID=0:在kafka集群中,每个kafka都有一个BROKER_ID来区分自己

-e KAFKA_ZOOKEEPER_CONNECT=host.docker.internal:2181/kafka:配置zookeeper管理kafka的路径host.docker.internal:2181/kafka

-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://host.docker.internal:9092:把kafka的地址端口注册给zookeeper,如果是远程访问要改成外网IP,类如 Java 程序访问出现无法连接。

-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092:配置kafka的监听端口

-v参数设置:

-v /etc/localtime:/etc/localtime 容器时间同步虚拟机的时间

注意点:

比如我的电脑是 mac,在 host-ip 这块就不能填本机 ip(windows 和 linux 可以),需要填docker.for.mac.host.internal或者host.docker.internal,zookeeper 端口启动在 2181,kafka 即将启动在 9092,那么我的命令可以是这样的docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=host.docker.internal:2181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://host.docker.internal:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka,如果要对日志文件做限制,可以参靠上面的命令。

由于 macOS 的 docker 底层实现的不同,主要原因是 macOS 的 docker 在容器和宿主之间无法通过 ip 直接通信。因此在安装的时候需要特殊注意与 ip 相关的设置,当容器需要访问宿主ip时,需要使用docker.for.mac.host.internal或者host.docker.internal代替。

这里向zookeeper注册的时候,使用的是host.docker.internal,我们在程序中连接kafka的时候,直接使用localhost会报错,如:Error connecting to node host.docker.internal:9092,其处理方式:

  • 我们可以在本机的 host 文件中,添加映射,将 127.0.0.1 host.docker.internal;
  • 不使用host.docker.internal,而是使用自己主机IP,但是当IP发生变化的时候,就要重新配置。
# 测试容器内访问宿主机ip
curl host.docker.internal:2181

3、验证

(1)进入 kafka 容器

➜  ~ docker exec -it kafka bash

(2)进入 kafka 容器中的脚本目录

注意,此时应该已经进入到了容器中的bash。进入 kafka 的脚本目录,其中kafka_2.13-2.7.0可能会随着版本而变化数字。

bash-5.1# cd opt/kafka_2.13-2.7.0/bin/

通过 ls 可以看到许多的.sh 脚本

bash-5.1# ls
connect-distributed.sh               kafka-console-producer.sh            kafka-leader-election.sh             kafka-run-class.sh                   trogdor.sh
connect-mirror-maker.sh              kafka-consumer-groups.sh             kafka-log-dirs.sh                    kafka-server-start.sh                windows
connect-standalone.sh                kafka-consumer-perf-test.sh          kafka-mirror-maker.sh                kafka-server-stop.sh                 zookeeper-security-migration.sh
kafka-acls.sh                        kafka-delegation-tokens.sh           kafka-preferred-replica-election.sh  kafka-streams-application-reset.sh   zookeeper-server-start.sh
kafka-broker-api-versions.sh         kafka-delete-records.sh              kafka-producer-perf-test.sh          kafka-topics.sh                      zookeeper-server-stop.sh
kafka-configs.sh                     kafka-dump-log.sh                    kafka-reassign-partitions.sh         kafka-verifiable-consumer.sh         zookeeper-shell.sh
kafka-console-consumer.sh            kafka-features.sh                    kafka-replica-verification.sh        kafka-verifiable-producer.sh

(3)测试 kafka 生产者和消费者

启动 kafka 生产者

运行 kafka 生产者发送消息

./kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic

看到出现了个对话提示的小>就可以发送消息了,不过不要着急,先把消费者启动了。

启动 kafka 消费者

另起一个终端,进入 kafka 容器,进入/opt/kafka_2.13-2.7.0/bin目录,运行 kafka 消费者接收消息

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning
测试发送和接收消息

在生产者中发送消息,消费者中应该能够收到对应的消息。

生产者

bash-5.1# ./kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic
>111111
>222222
>33333
>

消费者

bash-5.1# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning
111111
222222
33333
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐