前言

环境:centos 7.9 k8s集群、zookeeper集群

本篇将在k8s中部署kafka集群,kafka依赖zookeeper集群,zookeeper集群我们已经搭建好了,可以参考https://blog.csdn.net/MssGuo/article/details/127773132

制作对应版本的kafka镜像

由于k8s官网没有kafka的相关集群安装说明,所以只好手动制作对应版本的kafka镜像。

#kafka需要jdk,所以需要下载jdk,JDK的tar包官网:`https://www.oracle.com/java/technologies/downloads/`  自行下载;
#这里直接使用dockerhub上的jkd8镜像
docker pull ascdc/jdk8
docker tag ascdc/jdk8:latest jdk8:latest

#下载官网的kafka安装包
wget https://downloads.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgz
mkdir /root/kafka-cluster && cd /root/kafka-cluster && touch Dockerfile
[root@matser kafka-cluster]# ll
总用量 102604
-rw-r--r-- 1 root root       190 1110 22:09 Dockerfile
-rw-r--r-- 1 root root 105053134 1110 01:31 kafka_2.13-3.3.1.tgz
-rw-r--r-- 1 root root      7448 1110 23:12 kafka-sts.yaml
[root@matser kafka-cluster]# 
 
#开始编写Dockerfile文件,制作kafka镜像
vim Dockerfile
FROM jdk8:latest
COPY kafka_2.13-3.3.1.tgz /opt/kafka_2.13-3.3.1.tgz
WORKDIR /opt/
RUN tar -zxvf kafka_2.13-3.3.1.tgz && rm -rf kafka_2.13-3.3.1.tgz && mv kafka_2.13-3.3.1 kafka
EXPOSE 9092


#构建kafka镜像
docker build --no-cache -t kafka:v2.13-3.3.1 .
#查看镜像,已经构成成功
[root@matser kafka-cluster]# docker images | grep kafka
kafka           v2.13-3.3.1   4b5c27bf09e5   8 minutes ago   499MB
#分发镜像到node1、node2上,也可以上传到镜像仓库
[root@matser kafka-cluster]# docker save -o kafkav2.13-3.3.1.tar.gz kafka:v2.13-3.3.1 
[root@matser kafka-cluster]# scp kafkav2.13-3.3.1.tar.gz root@node1:/root
[root@matser kafka-cluster]# scp kafkav2.13-3.3.1.tar.gz root@node2:/root
#node1、node2上进行导入镜像
[root@node1 ~]# docker load -i kafkav2.13-3.3.1.tar.gz
[root@node2 ~]# docker load -i kafkav2.13-3.3.1.tar.gz

部署kafka集群

创建kafka的yaml文件

[root@matser kafka-cluster]# cat kafka-sts.yaml 
---
apiVersion: v1
kind: Service
metadata:
  name: kafka-hs
  namespace: default
  labels:
    app: kafka
spec:
  ports:
  - port: 9092
    name: server
  clusterIP: None
  selector:
    app: kafka
--- 
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
  name: kafka-pdb
  namespace: default
spec:
  selector:
    matchLabels:
      app: kafka
  minAvailable: 1
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka
  namespace: default
spec:
  serviceName: kafka-hs
  replicas: 3
  selector:
    matchLabels:
      app: kafka
  template:
    metadata:
      labels:
        app: kafka
    spec:
      affinity:
#        podAntiAffinity:		#节点有限,这里就注释掉pod反亲和性
#          requiredDuringSchedulingIgnoredDuringExecution:
#            - labelSelector:
#                matchExpressions:
#                  - key: "app"
#                    operator: In
#                    values:
#                    - kafka
#              topologyKey: "kubernetes.io/hostname"
#        podAffinity:
#          preferredDuringSchedulingIgnoredDuringExecution:
#             - weight: 1
#               podAffinityTerm:
#                 labelSelector:
#                    matchExpressions:
#                      - key: "app"
#                        operator: In
#                        values:
#                        - zk
#                 topologyKey: "kubernetes.io/hostname"
      terminationGracePeriodSeconds: 300
      containers:
      - name: kafka
        imagePullPolicy: IfNotPresent
        image: kafka:v2.13-3.3.1
        resources:
          requests:
            memory: "300M"			#生产环境一个给大一点
            cpu: 500m				#生产环境一个给大一点
        ports:
        - containerPort: 9092
          name: server
        command:
        - sh
        - -c
        - "exec /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=${HOSTNAME##*-} \
          --override listeners=PLAINTEXT://:9092 \
          --override zookeeper.connect=zk-0.zk-hs.default.svc.cluster.local:2181,zk-1.zk-hs.default.svc.cluster.local:2181,zk-2.zk-hs.default.svc.cluster.local:2181 \
          --override log.dirs=/var/lib/kafka/data \
          --override auto.create.topics.enable=true \
          --override auto.leader.rebalance.enable=true \
          --override background.threads=10 \
          --override compression.type=producer \
          --override delete.topic.enable=true \
          --override leader.imbalance.check.interval.seconds=300 \
          --override leader.imbalance.per.broker.percentage=10 \
          --override log.flush.interval.messages=9223372036854775807 \
          --override log.flush.offset.checkpoint.interval.ms=60000 \
          --override log.flush.scheduler.interval.ms=9223372036854775807 \
          --override log.retention.bytes=-1 \
          --override log.retention.hours=168 \
          --override log.roll.hours=168 \
          --override log.roll.jitter.hours=0 \
          --override log.segment.bytes=1073741824 \
          --override log.segment.delete.delay.ms=60000 \
          --override message.max.bytes=1000012 \
          --override min.insync.replicas=1 \
          --override num.io.threads=8 \
          --override num.network.threads=3 \
          --override num.recovery.threads.per.data.dir=1 \
          --override num.replica.fetchers=1 \
          --override offset.metadata.max.bytes=4096 \
          --override offsets.commit.required.acks=-1 \
          --override offsets.commit.timeout.ms=5000 \
          --override offsets.load.buffer.size=5242880 \
          --override offsets.retention.check.interval.ms=600000 \
          --override offsets.retention.minutes=1440 \
          --override offsets.topic.compression.codec=0 \
          --override offsets.topic.num.partitions=50 \
          --override offsets.topic.replication.factor=3 \
          --override offsets.topic.segment.bytes=104857600 \
          --override queued.max.requests=500 \
          --override quota.consumer.default=9223372036854775807 \
          --override quota.producer.default=9223372036854775807 \
          --override replica.fetch.min.bytes=1 \
          --override replica.fetch.wait.max.ms=500 \
          --override replica.high.watermark.checkpoint.interval.ms=5000 \
          --override replica.lag.time.max.ms=10000 \
          --override replica.socket.receive.buffer.bytes=65536 \
          --override replica.socket.timeout.ms=30000 \
          --override request.timeout.ms=30000 \
          --override socket.receive.buffer.bytes=102400 \
          --override socket.request.max.bytes=104857600 \
          --override socket.send.buffer.bytes=102400 \
          --override unclean.leader.election.enable=true \
          --override zookeeper.session.timeout.ms=6000 \
          --override zookeeper.set.acl=false \
          --override broker.id.generation.enable=true \
          --override connections.max.idle.ms=600000 \
          --override controlled.shutdown.enable=true \
          --override controlled.shutdown.max.retries=3 \
          --override controlled.shutdown.retry.backoff.ms=5000 \
          --override controller.socket.timeout.ms=30000 \
          --override default.replication.factor=1 \
          --override fetch.purgatory.purge.interval.requests=1000 \
          --override group.max.session.timeout.ms=300000 \
          --override group.min.session.timeout.ms=6000 \
          --override log.cleaner.backoff.ms=15000 \
          --override log.cleaner.dedupe.buffer.size=134217728 \
          --override log.cleaner.delete.retention.ms=86400000 \
          --override log.cleaner.enable=true \
          --override log.cleaner.io.buffer.load.factor=0.9 \
          --override log.cleaner.io.buffer.size=524288 \
          --override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \
          --override log.cleaner.min.cleanable.ratio=0.5 \
          --override log.cleaner.min.compaction.lag.ms=0 \
          --override log.cleaner.threads=1 \
          --override log.cleanup.policy=delete \
          --override log.index.interval.bytes=4096 \
          --override log.index.size.max.bytes=10485760 \
          --override log.message.timestamp.difference.max.ms=9223372036854775807 \
          --override log.message.timestamp.type=CreateTime \
          --override log.preallocate=false \
          --override log.retention.check.interval.ms=300000 \
          --override max.connections.per.ip=2147483647 \
          --override num.partitions=1 \
          --override producer.purgatory.purge.interval.requests=1000 \
          --override replica.fetch.backoff.ms=1000 \
          --override replica.fetch.max.bytes=1048576 \
          --override replica.fetch.response.max.bytes=10485760 \
          --override reserved.broker.max.id=1000"
        env:
        - name: KAFKA_HEAP_OPTS
          value : "-Xmx500M -Xms500M"
        - name: KAFKA_OPTS
          value: "-Dlogging.level=INFO"
        volumeMounts:
        - name: datadir
          mountPath: /var/lib/kafka/data
#        readinessProbe:		#存活探针死活都是探测失败,不知道为什么,这里不使用探针了
#          exec:
#           command:
#            - sh
#            - -c
#            - "/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server=localhost:9092"
#          tcpSocket:
#             port: 9092   
#          initialDelaySeconds: 10
#          periodSeconds: 3
#          timeoutSeconds: 2
#          failureThreshold: 2
      securityContext:
        runAsUser: 1000
        fsGroup: 1000
  volumeClaimTemplates:
  - metadata:
      name: datadir
    spec:
      accessModes: [ "ReadWriteOnce" ]
      resources:
        requests:
          storage: 500M
      storageClassName: "nfs-storageclass"
[root@matser kafka-cluster]# 

参数说明:
zookeeper.connect=zk-0.zk-hs.default.svc.cluster.local:2181,zk-1.zk-hs.default.svc.cluster.local:2181,zk-2.zk-hs.default.svc.cluster.local:2181 指定了zookeeper集群,zk集群也是在k8s中定义好了,而且zk-0.zk-hs.default.svc.cluster.local能解析成zk的pod的IP地址

启动kafka集群

[root@matser zookeeper-cluster]#  kubectl apply -f kafka-sts.yaml -l app=kafka
[root@matser zookeeper-cluster]# kubectl  get pods
NAME                                      READY   STATUS        RESTARTS      AGE
kafka-0                                   1/1     Running   	0 (10m ago)   15m
kafka-1                                   1/1     Running   	0 (10m ago)   15m
kafka-2                                   1/1     Running   	0 (10m ago)   15m

验证kafka集群是否正常

1、查看kafka的pod日志
kubectl logs  -f  kafka-0
kubectl logs  -f  kafka-1
kubectl logs  -f  kafka-2
2、查看zookeeper是否已经注册了kafka
[root@matser ~]# kubectl exec -it zk-0 -- bash				#登录一个zookeeper的pod
 zookeeper@zk-0:/$ zkCli.sh									#登录zk集群
[zk: localhost:2181(CONNECTED) 0] ls /						#查看,可以看到有很多节点
[cluster, controller_epoch, controller, brokers, zookeeper, feature, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
[zk: localhost:2181(CONNECTED) 1] ls /brokers
[ids, topics, seqid]
[zk: localhost:2181(CONNECTED) 2] ls /brokers/ids			#可以看到有个/brokers/ids有3个,这就是kafka的3个节点
[0, 1, 2]
[zk: localhost:2181(CONNECTED) 3] 
3、在kafka集群创建topic
[root@matser ~]# kubectl exec -it kafka-1 -- sh			#登录kafka的pod
$ cd /opt/kafka/bin										#下面就是创建一个topic
$ ./kafka-topics.sh --bootstrap-server localhost:9092 --topic first --create --partitions 1 --replication-factor 3
Created topic first.
$ ./kafka-topics.sh --bootstrap-server localhost:9092 --topic first  --describe		#查看指定topic的详细描述
Topic: first	TopicId: ucPP_X9bS8ezdmf_iqExdQ	PartitionCount: 1	ReplicationFactor: 3	Configs: compression.type=producer,min.insync.replicas=1,cleanup.policy=delete,segment.bytes=1073741824,flush.messages=9223372036854775807,file.delete.delay.ms=60000,max.message.bytes=1000012,min.compaction.lag.ms=0,message.timestamp.type=CreateTime,preallocate=false,min.cleanable.dirty.ratio=0.5,index.interval.bytes=4096,unclean.leader.election.enable=true,retention.bytes=-1,delete.retention.ms=86400000,message.timestamp.difference.max.ms=9223372036854775807,segment.index.bytes=10485760
	Topic: first	Partition: 0	Leader: 2	Replicas: 2,1,0	Isr: 2,1,0
4、查看后端存储是否有数据
[root@nginx data]# ll default-datadir-kafka-0-pvc-75b8d6ab-7e0f-41f8-a0c9-98fc9ca0529e
总用量 16
-rw-r--r-- 1 1000 nfsnobody   0 1110 23:12 cleaner-offset-checkpoint
drwxr-xr-x 2 1000 nfsnobody 167 1111 00:26 first-0
-rw-r--r-- 1 1000 nfsnobody   4 1111 00:36 log-start-offset-checkpoint
-rw-r--r-- 1 1000 nfsnobody  88 1111 00:26 meta.properties
-rw-r--r-- 1 1000 nfsnobody  26 1111 00:36 recovery-point-offset-checkpoint
-rw-r--r-- 1 1000 nfsnobody  26 1111 00:36 replication-offset-checkpoint
[root@nginx data]# 

自此,kakfa集群已经搭建完毕。

排查过程

在部署kafka集群的过程中,出现了很多问题,这里记录一下:
1、创建好的镜像出现问题,创建好的镜像一启动就被杀掉,一直处于重启,挂掉,重启挂掉的过程,这明显就是镜像里面的的进程启动失败了没有任何前台进程夯住容器导致。但是kubectl logskubectl describe根本看不出来是为什么kafka进程启动失败。所以,为什么弄明白为什么kafka进程启动失败,得先想办法进入容器手动启动kafka看看。
所以,得先启动容器吧,然后在容器里面使用yaml文件的command参数来启动kafka,为了能正常启动容器,这里需要对Dockerfile文件做一下修改:

[root@matser zookeeper-cluster]# cat ../kafka-cluster/Dockerfile 
FROM jdk8:latest
COPY kafka_2.13-3.3.1.tgz /opt/kafka_2.13-3.3.1.tgz
WORKDIR /opt/
RUN tar -zxvf kafka_2.13-3.3.1.tgz && rm -rf kafka_2.13-3.3.1.tgz && mv kafka_2.13-3.3.1 kafka
EXPOSE 9092
ENTRYPOINT ["/bin/bash","-c","sleep 99999"]				#加上这条,为了让容器正常启动

创建镜像,启动pod,进入容器手动启动kafka,看看报什么错误:

/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=0 \
          --override listeners=PLAINTEXT://:9092 \
          --override zookeeper.connect=zk-0.zk-hs.default.svc.cluster.local:2181,zk-1.zk-hs.default.svc.cluster.local:2181,zk-2.zk-hs.default.svc.cluster.local:2181 \
          --override log.dirs=/var/lib/kafka/data \
.....................................................................
报错如下:
[2022-11-10 14:38:26,375] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2022-11-10 14:38:26,713] INFO Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation (org.apache.zookeeper.common.X509Util)
[2022-11-10 14:38:26,787] ERROR Exiting Kafka due to fatal exception (kafka.Kafka$)
java.lang.IllegalArgumentException: requirement failed: log.message.format.version 3.0-IV1 can only be used when inter.broker.protocol.version is set to version 0.11.0 or higher
	at scala.Predef$.require(Predef.scala:337)
	at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:2227)
	at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:2063)
	at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1495)
	at kafka.Kafka$.buildServer(Kafka.scala:67)
	at kafka.Kafka$.main(Kafka.scala:87)
	at kafka.Kafka.main(Kafka.scala)

原来--override inter.broker.protocol.version=0.10.2-IV0参数的版本设置的有问题 。嘛的,干脆删掉它吧。让kafka自己匹配得了。

2、就绪探针一直探测失败,导致pod无法正常running。这个问题未解决,暂时不使用探针。

3、持久化存储没有数据。
明明已经指定了kafka的数据存在 --override log.dir=/var/lib/kafka/data 目录,而且也把

volumeMounts:
        - name: datadir
          mountPath: /var/lib/kafka/data

挂载到了存储上了,怎么nfs服务器上没有数据呢?一看,我靠,好坑,command命令log.dir参数是从网上复制过来的, log.dir在我这个kafka版本里面根本不是这样定义的,进入kafka-0 pod里面看/opt/kafka/config/server.properties ,人家参数名叫log.dirs有个s,也不知道kafka启动为什么不报错,网上的那些怎么是log.dir的。

k8s使用helm部署kafka

在上面的部署中需要自己制作镜像,太麻烦,可以在k8s集群中直接使用helm部署kafka集群。

kubectl  create ns kafka
helm  repo add bitnami https://charts.bitnami.com/bitnami
helm  repo  update
mkdir /root/kafka && cd /root/kafka/
helm search repo kafka
helm pull  bitnami/kafka --version=29.3.6
tar xf kafka-29.3.6.tgz 
cd kafka/
# 改一下存储类,资源大小,pv大小等
vim values.yaml	
helm -n kafka  template kafka-cluster ./
#安装
helm -n kafka  install kafka-cluster ./

kafka的svc地址:kafka-cluster.kafka.svc.cluster.local
kafka pod的域名:
    kafka-cluster-controller-0.kafka-cluster-controller-headless.kafka.svc.cluster.local:9092
    kafka-cluster-controller-1.kafka-cluster-controller-headless.kafka.svc.cluster.local:9092
    kafka-cluster-controller-2.kafka-cluster-controller-headless.kafka.svc.cluster.local:9092
#要使用kafka客户端链接kafka服务端,需要创建一个配置文件
cat > /root/kafka/client.properties <<EOF
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
    username="user1" \
    password="$(kubectl get secret kafka-cluster-user-passwords --namespace kafka -o jsonpath='{.data.client-passwords}' | base64 -d | cut -d , -f 1)";
EOF

# 创建kafka客户端pod
kubectl run kafka-cluster-client --restart='Never' --image m.daocloud.io/docker.io/bitnami/kafka:3.7.1-debian-12-r0 --namespace kafka --command -- sleep infinity
kubectl cp --namespace kafka /root/kafka/client.properties kafka-cluster-client:/tmp/client.properties
kubectl exec --tty -i kafka-cluster-client --namespace kafka -- bash

生产者:
kafka-console-producer.sh \
    --producer.config /tmp/client.properties \
    --broker-list kafka-cluster-controller-0.kafka-cluster-controller-headless.kafka.svc.cluster.local:9092,kafka-cluster-controller-1.kafka-cluster-controller-headless.kafka.svc.cluster.local:9092,kafka-cluster-controller-2.kafka-cluster-controller-headless.kafka.svc.cluster.local:9092 \
    --topic test
消费者:
kafka-console-consumer.sh \
    --consumer.config /tmp/client.properties \
    --bootstrap-server kafka-cluster.kafka.svc.cluster.local:9092 \
    --topic test \
    --from-beginning

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐