1.Kafka集群搭建好以后,运行一段时间Kafka节点挂掉,程序中出现如下错误

   ERROR Error while accepting connection (kafka.network.Acceptor)
   java.io.IOException: Too many open files 

   或者

 ERROR Error while deleting the clean shutdown file in dir /home/weihu/kafka/kafka/logs (kafka.server.LogD)
 java.nio.file.FileSystemException: /home/weihu/kafka/kafka/logs/BC_20180822_PARSE-136/leader-epoch-checkpoint: Too many open files

   使用命令:ulimit -a   查看每个用户允许打开的最大文件数
   发现系统默认的是open files (-n) 1024,问题就出现在这里。
   然后执行:ulimit -n 102400
   将open files (-n) 1024 设置成open files (-n) 102400
   lsof -p 'kafka的进程号' | wc -l 

   命令行为临时修改不能持久

在配置文件里添加
   vim /etc/security/limits.conf 
   * - nofile 102400

编辑 /etc/sysctl.conf  文件增加两行 fs.file-max = 6553600 和 fs.inode-max = 262144 。一般情况下,系统最大打开文件数比较合理的设置为每4M物理内存256,比如256M.可以用lsof -p 看打开的文件句柄数

Windows最大文件句柄是16,384,你在任务管理器的性能这一项中可以看到当前打开的句柄数.

查看系统允许打开的最大文件数

#cat /proc/sys/fs/file-max

关键:如果是systemd启动的进程,需要在*.service文件里面加上如下内容: LimitNOFILE=102400

[Unit]
Description=Apache Kafka server (broker)
After=network.target  zookeeper.service
 
[Service]
Type=simple
LimitNOFILE=102400
Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.272.b10-1.el7_9.x86_64/bin"
User=root
Group=root
ExecStart=/lv01/kafka/bin/kafka-server-start.sh /lv01/kafka/config/server.properties
ExecStop=/lv01/kafka/bin/kafka-server-stop.sh
Restart=on-failure
 
[Install]
WantedBy=multi-user.target

2. 服务器磁盘空间小就二三十G,被kafka的日志文件给吃掉了

  这就需要调整kafkalog的保存时间以及segments的大小了,可以调整以下几个参数  

  # The minimum age of a log file to be eligible for deletion
  #log.retention.hours=168
    log.retention.hours=1

  # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
  # segments don't drop below log.retention.bytes.
  #log.retention.bytes=1073741824
    log.retention.bytes=50000000

  # The maximum size of a log segment file. When this size is reached a new log segment will be created.
    log.segment.bytes=50000000

3. Kafka消息过大,导致经常堵塞出现 kafka.common.MessageSizeTooLargeException

   1)producer.properties中参数 compression.codec和commpressed.topics 开启压缩功能 ;

   2)server.properties  调整  message.max.bytes 大小,同时保证这个值小于  replica.fetch.max.bytes 这个参数值

   3)consumer.properties  调整  fetch.message.max.bytes 大小,保证它大于message.max.bytes.

  在使用java实现生产者和消费者时,参考上述调整参数大小。

4.Kafka : Error from SyncGroup, The request timed out

问题现象:

连接kafka的客户端报错:

Exception in thread "main" org.apache.kafka.common.KafkaException: Unexpected error from SyncGroup: The request timed out.

解决办法:

It happens because some Kafka Streams messages have meta information footprint which is more than a regular one (when you don't use Kafka Streams). To fix the issue, go to __consumer_offsets topic settings and set max.message.bytes param higher than it is by default. For example, in our case we have max.message.bytes = 20971520. That will completely solve your problem.

5.假如Kafka集群中一个broker宕机无法恢复, 应该如何处理?

环境介绍

三个broker的集群, zk,kafka装在一起

| broker | IP | broker.id |
|---------|---------------|-----------|
| broker1 | 172.18.12.211 | 211 |
| broker2 | 172.18.12.212 | 212 |
| broker3 | 172.18.12.213 | 213 |

创建测试topic

#./bin/kafka-topics.sh --zookeeper 172.18.12.212:2181 --create --topic test1 --replication-factor 3 --partitions 1
Created topic "test1".

查看

#./bin/kafka-topics.sh --zookeeper 172.18.12.212:2181 --describe --topic test1
Topic:test1 PartitionCount:1 ReplicationFactor:3 Configs:
        Topic: test1 Partition: 0 Leader: 213 Replicas: 213,212,211 Isr: 213,212,211

注意当前
Replicas: 213,212,211
Isr: 213,212,211

造一些消息

#./bin/kafka-console-producer.sh --broker-list 172.18.12.212:9092 --topic test1
>1
>2
>3

kill broker2

[root@node024212 ~]# ps -ef| grep kafka
root 17633 1 1 Feb17 ? 00:55:18 /usr/local/java/bin/java -server -Xmx2g -Xms2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=85 -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true -Xloggc:/usr/local/kafka/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9966 -Dkafka.logs.dir=/usr/local/kafka/bin/../logs -Dlog4j.configuration=file:./bin/../config/log4j.properties -cp .:/usr/local/java/lib:/usr/local/java/jre/lib:/usr/local/kafka/bin/../libs/activation-1.1.1.jar:/usr/local/kafka/bin/../libs/aopalliance-repackaged-2.5.0-b42.jar:/usr/local/kafka/bin/../libs/argparse4j-0.7.0.jar:/usr/local/kafka/bin/../libs/audience-annotations-0.5.0.jar:/usr/local/kafka/bin/../libs/commons-lang3-3.5.jar:/usr/local/kafka/bin/../libs/compileScala.mapping:/usr/local/kafka/bin/../libs/compileScala.mapping.asc:/usr/local/kafka/bin/../libs/connect-api-2.1.0.jar:/usr/local/kafka/bin/../libs/connect-basic-auth-extension-2.1.0.jar:/usr/local/kafka/bin/../libs/connect-file-2.1.0.jar:/usr/local/kafka/bin/../libs/connect-json-2.1.0.jar:/usr/local/kafka/bin/../libs/connect-runtime-2.1.0.jar:/usr/local/kafka/bin/../libs/connect-transforms-2.1.0.jar:/usr/local/kafka/bin/../libs/guava-20.0.jar:/usr/local/kafka/bin/../libs/hk2-api-2.5.0-b42.jar:/usr/local/kafka/bin/../libs/hk2-locator-2.5.0-b42.jar:/usr/local/kafka/bin/../libs/hk2-utils-2.5.0-b42.jar:/usr/local/kafka/bin/../libs/jackson-annotations-2.9.7.jar:/usr/local/kafka/bin/../libs/jackson-core-2.9.7.jar:/usr/local/kafka/bin/../libs/jackson-databind-2.9.7.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-base-2.9.7.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-json-provider-2.9.7.jar:/usr/local/kafka/bin/../libs/jackson-module-jaxb-annotations-2.9.7.jar:/usr/local/kafka/bin/../libs/javassist-3.22.0-CR2.jar:/usr/local/kafka/bin/../libs/javax.annotation-api-1.2.jar:/usr/local/kafka/bin/../libs/javax.inject-1.jar:/usr/local/kafka/bin/../libs/javax.inject-2.5.0-b42.jar:/usr/local/kafka/bin/../libs/javax.servlet-api-3.1.0.jar:/usr/local/kafka/bin/../libs/javax.ws.rs-api-2.1.1.jar:/usr/local/kafka/bin/../libs/javax.ws.rs-api-2.1.jar:/usr/local/kafka/bin/../libs/jaxb-api-2.3.0.jar:/usr/local/kafka/bin/../libs/jersey-client-2.27.jar:/usr/local/kafka/bin/../libs/jersey-common-2.27.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-2.27.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-core-2.27.jar:/usr/local/kafka/bin/../libs/jersey-hk2-2.27.jar:/usr/local/kafka/bin/../libs/jersey-media-jaxb-2.27.jar:/usr/local/kafka/bin/../libs/jersey-server-2.27.jar:/usr/local/kafka/bin/../libs/jetty-client-9.4.12.v20180830.jar:/usr/local/kafka/bin/../libs/jetty-continuation-9.4.12.v20180830.jar:/usr/local/kafka/bin/../libs/jetty-http-9.4.12.v20180830.jar:/usr/local/kafka/bin/../libs/jetty-io-9.4.12.v20180830.jar:/usr/local/kafka/bin/../libs/jetty-security-9.4.12.v20180830.jar:/usr/local/kafka/bin/../libs/jetty-server-9.4.12.v20180830.jar:/usr/local/kafka/bin/../libs/jetty-servlet-9.4.12.v20180830.jar:/usr/local/kafka/bin/../libs/jetty-servlets-9.4.12.v20180830.jar:/usr/local/kafka/bin/../libs/jetty-util-9.4.12.v20180830.jar:/usr/local/kafka/bin/../libs/jopt-simple-5.0.4.jar:/usr/local/kafka/bin/../libs/kafka_2.12-2.1.0.jar:/usr/local/kafka/bin/../libs/kafka_2.12-2.1.0-sources.jar:/usr/local/kafka/bin/../libs/kafka-clients-2.1.0.jar:/usr/local/kafka/bin/../libs/kafka-log4j-appender-2.1.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-2.1.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-examples-2.1.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-scala_2.12-2.1.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-test-utils-2.1.0.jar:/usr/local/kafka/bin/../libs/kafka-tools-2.1.0.jar:/usr/local/kafka/bin/../libs/log4j-1.2.17.jar:/usr/local/kafka/bin/../libs/lz4-java-1.5.0.jar:/usr/local/kafka/bin/../libs/maven-artifact-3.5.4.jar:/usr/local/kafka/bin/../libs/metrics-core-2.2.0.jar:/usr/local/kafka/bin/../libs/osgi-resource-locator-1.0.1.jar:/usr/local/kafka/bin/../libs/plexus-utils-3.1.0.jar:/usr/local/kafka/bin/../libs/reflections-0.9.11.jar:/usr/local/kafka/bin/../libs/rocksdbjni-5.14.2.jar:/usr/local/kafka/bin/../libs/scala-library-2.12.7.jar:/usr/local/kafka/bin/../libs/scala-logging_2.12-3.9.0.jar:/usr/local/kafka/bin/../libs/scala-reflect-2.12.7.jar:/usr/local/kafka/bin/../libs/slf4j-api-1.7.25.jar:/usr/local/kafka/bin/../libs/slf4j-log4j12-1.7.25.jar:/usr/local/kafka/bin/../libs/snappy-java-1.1.7.2.jar:/usr/local/kafka/bin/../libs/validation-api-1.1.0.Final.jar:/usr/local/kafka/bin/../libs/zkclient-0.10.jar:/usr/local/kafka/bin/../libs/zookeeper-3.4.13.jar:/usr/local/kafka/bin/../libs/zstd-jni-1.3.5-4.jar kafka.Kafka config/server.properties
root 21806 21651 0 11:27 pts/2 00:00:00 grep --color=auto kafka
[root@node024212 ~]# kill -9 17633
[root@node024212 ~]# ps -ef| grep kafka
root 21875 21651 0 11:27 pts/2 00:00:00 grep --color=auto kafka

稍等一会, 再次describe test1

#./bin/kafka-topics.sh --zookeeper 172.18.12.212:2181 --describe --topic test1
Topic:test1 PartitionCount:1 ReplicationFactor:3 Configs:
        Topic: test1 Partition: 0 Leader: 213 Replicas: 213,212,211 Isr: 213,211

可看到副本仍然是Replicas: 213,212,211
ISR已经变为Isr: 213,211
在212启动新broker

创建一份新的配置文件, 自动一个新的broker

# cp server.properties server2.properties
# vim server2.properties
只修改这两个参数
broker.id=218
log.dirs=/DATA21/kafka/kafka-logs,/DATA22/kafka/kafka-logs,/DATA23/kafka/kafka-logs,/DATA24/kafka/kafka-logs

创建相应目录

mkdir -p /DATA21/kafka/kafka-logs
mkdir -p /DATA22/kafka/kafka-logs
mkdir -p /DATA23/kafka/kafka-logs
mkdir -p /DATA24/kafka/kafka-logs

启动新broker

./bin/kafka-server-start.sh -daemon config/server2.properties

稍等, 查看 test1 状态

#./bin/kafka-topics.sh --zookeeper 172.18.12.212:2181 --describe --topic test1
Topic:test1 PartitionCount:1 ReplicationFactor:3 Configs:
        Topic: test2 Partition: 0 Leader: 213 Replicas: 213,212,211 Isr: 213,218,211

可以看到 test1 副本仍然是Replicas: 213,212,211
ISR为Isr: 213,218,211. 也就是说缺失的副本不会自动迁移到新broker上.
使用kafka-reassign-partitions.sh重分配分区

将212删除,添加218

[root@node024211 12:04:48 /usr/local/kafka]
#echo '{"version":1,"partitions":[{"topic":"test1","partition":0,"replicas":[211,213,218]}]}' > increase-replication-factor.json

[root@node024211 12:58:30 /usr/local/kafka]
#./bin/kafka-reassign-partitions.sh --zookeeper 172.18.12.211:2181 --reassignment-json-file increase-replication-factor.json --execute
Current partition replica assignment

{"version":1,"partitions":[{"topic":"test1","partition":0,"replicas":[213,212,211],"log_dirs":["any","any","any"]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.

[root@node024211 12:58:49 /usr/local/kafka]
#./bin/kafka-reassign-partitions.sh --zookeeper 172.18.12.211:2181 --reassignment-json-file increase-replication-factor.json --verify
Status of partition reassignment:
Reassignment of partition test1-0 completed successfully

查看topic信息

#./bin/kafka-topics.sh --zookeeper 172.18.12.212:2181 --describe --topic test1
Topic:test1 PartitionCount:1 ReplicationFactor:3 Configs:
        Topic: test1 Partition: 0 Leader: 213 Replicas: 211,213,218 Isr: 213,211,218

验证218是否有全部数据

虽然看副本信息中已经有了218, 但是218是否包含旧消息呢?
我的办法是, kill 211,213, 然后–from-beginning 消费218数据, 实际测试也是可以的

#./bin/kafka-console-consumer.sh --bootstrap-server 172.18.12.212:9092 --topic test1 --from-beginning

看了下211 218的log文件大小也是一样的

[2019-02-21 13:29:19]#ls -l /DATA22/kafka/kafka-logs/test1-0/
[2019-02-21 13:29:19]total 8
[2019-02-21 13:29:19]-rw-r--r--. 1 root root 10485760 Feb 21 12:58 00000000000000000000.index
[2019-02-21 13:29:19]-rw-r--r--. 1 root root 381 Feb 21 13:00 00000000000000000000.log
[2019-02-21 13:29:19]-rw-r--r--. 1 root root 10485756 Feb 21 12:58 00000000000000000000.timeindex
[2019-02-21 13:29:19]-rw-r--r--. 1 root root 16 Feb 21 13:00 leader-epoch-checkpoint

更简单的办法

通过阅读文档发现
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Howtoreplaceafailedbroker?

    How to replace a failed broker?
    When a broker fails, Kafka doesn’t automatically re-replicate the data on the failed broker to other brokers. This is because in the common case, one brings down a broker to apply code or config changes, and will bring up the broker quickly afterward. Re-replicating the data in this case will be wasteful. In the rarer case that a broker fails completely, one will need to bring up another broker with the same broker id on a new server. The new broker will automatically replicate the missing data.

这上面说的,如果服务器真的坏了, 只需要新启动一个broker, 把broker.id设置为 损坏的那个broker的id, 就会自动复制过去丢失的数据

我实际测试了一下, 确实可以恢复

6、Getting: In the middle of a leadership election, there is currently no leader for this partition and hence it is unavailable for writes.

Kafka monitoring in cluster environment

I have an kafka cluster (3 machine with 1 zookeeper and 1 broker run on each machine) I am using kafka_exporter to monitoring consumer lag metric, it's work fine in normal case. But, when i kill 1 broker, the Prometheus cannot get metric from http://machine1:9308/metric (kafka_exporter metric endpoint), because it take a long time to get data (1,5m), so it will be timeout. Now, if I restart kafka_exporter I will see some error:

Cannot get leader of topic __consumer_offsets partition 20: kafka server: In the middle of a leadership election, there is currently no leader for this partition and hence it is unavailable for writes

When I run the command: kafka-topics.bat --describe --zookeeper machine1:2181,machine2:2181,machine3:2181 --topic __consumer_offsets The result are:

Topic:__consumer_offsets        PartitionCount:50       ReplicationFactor:1     Configs:compression.type=producer,cleanup.policy=compact,segment.bytes=104857600
Topic: __consumer_offsets       Partition: 0    Leader: -1      Replicas: 1     Isr: 1
Topic: __consumer_offsets       Partition: 1    Leader: 2       Replicas: 2     Isr: 2

Topic: __consumer_offsets       Partition: 49   Leader: 2       Replicas: 2     Isr: 2

Is this a configuration error? And how can I get the consumer lag in this case? The "Leader: -1" is an error? if I shutdown the machine 1 forever, it's still work fine?

解决方法:

The leader being -1 means that there is no other broker in the cluster that has a copy of the data for the partition.

The problem in your case is that the replication factor for your topic __consumer_offsets is 1, which means that there is only one broker that hosts the data of any partition in the topic. If you lose any one of the brokers, all the partitions on the broker become unavailable resulting in the topic becoming unavailable. So, your kafka_exporter will fail to read from this topic.

The fix to this if you want to continue exporting consumer offsets on a broker loss, is to reconfigure the topic __consumer_offsets to have replication factor more than 1.

Advised Config - Replication factor - 3, min.insync.replicas - 2.

kafka-eagle几个指标含义

1. Preferred Leader

默认用Replicas副本集里的第一个副本作为leader。

2. Brokers Spread

看作broker使用率,如kafka集群9个broker,某topic有7个partition,则broker spread: 7 / 9 = 77%

3. Brokers Skew

partition是否存在倾斜,如kafka集群9个broker,某topic有18个partition,正常每个broker应该2个partition。若其中有3个broker上的partition数>2,则broker skew:  3 / 9 = 33%

4. Brokers Leader Skew

leader partition是否存在倾斜,如kafka集群9个broker,某topic14个partition,则正常每个broker有2个leader partition。若其中一个broker有0个leader partition,一个有4个leader partition,则broker leader skew: (4 - 2) / 14 = 14%

由于kafka所有读写都在leader上进行, broker leader skew会导致不同broker的读写负载不均衡,配置参数 auto.leader.rebalance.enable=true 可以使kafka每5min自动做一次leader的rebalance,消除这个问题。

5. Lag

5.1 Lag计算

了解lag之前,先熟悉下面几个概念

  • LogStartOffset:表示一个Partition的起始位移,初始为0,虽然消息的增加以及日志清除策略的影响,这个值会阶段性的增大。
  • ConsumerOffset:消费位移,表示Partition的某个消费者消费到的位移位置。
  • HighWatermark:简称HW,代表消费端所能“观察”到的Partition的最高日志位移,HW大于等于ConsumerOffset的值。
  • LogEndOffset:简称LEO, 代表Partition的最高日志位移,其值对消费者不可见。比如在ISR(In-Sync-Replicas)副本数等于3的情况下,消息发送到Leader A之后会更新LEO的值,Follower B和Follower C也会实时拉取Leader A中的消息来更新自己,HW就表示A、B、C三者同时达到的日志位移,也就是A、B、C三者中LEO最小的那个值。由于B、C拉取A消息之间延时问题,所以HW必然不会一直与Leader的LEO相等,即LEO>=HW。

Lag是阻塞的消息个数,代表consumer的消费能力,实际计算公式为:Lag=HW - ConsumerOffset

5.2 Lag为负数

Kafka Manager先获取HW,再读取ConsumerOffset。两步操作存在一个时间gap,因此吞吐很大的topic上会出现HW < ConsumerOffset的情况。导致Lag负数。

kafka重要参数总结

num.partitions=1

#全局分区设置 默认1


offsets.topic.num.partitions=50

#__consumer_offsets分区设置 默认50


default.replication.factor=1

#全局副本设置 默认1


offsets.topic.replication.factor=3

#__consumer_offsets副本设置 默认3

acks

确认机制:

此配置是表明当一次produce请求被认为完成时的确认值。特别是,多少个其他brokers必须已经提交了数据到他们的log并且向他们的leader确认了这些信息。典型的值包括:
 0: 表示producer从来不等待来自broker的确认信息(和0.7一样的行为)。这个选择提供了最小的时延但同时风险最大(因为当server宕机时,数据将会丢失)。
 1:表示获得leader replica已经接收了数据的确认信息。这个选择时延较小同时确保了server确认接收成功。
-1:producer会获得所有同步replicas都收到数据的确认。同时时延最大,然而,这种方式并没有完全消除丢失消息的风险,因为同步replicas的数量可能是1。如果你想确保某些replicas接收到数据,那么你应该在topic-level设置中选项min.insync.replicas设置一下。

首先acks参数,是在kafkaProducer,也就是在生产者客户端里设置的也就是说,你往kafka写东西的时候,就可以设置这个参数。

这个参数实际上有三种值可以设置,分别是0,1,和all.

第一种选择是把参数设置成0

我的kafkaProducer在客户端,只要把消息发送出去,不管那条数据有没有在哪怕Partition Leader上落到磁盘,就不管他了,直接认为这个消息发送成功。

如果你采用这种设置的话,那么你必须注意的一点是,可能你发送出去的消息还在半路。结果呢,Partition Leader所在Broker就直接挂了,然后结果你的客户端还认为消息发送成功了,此时就会导致这条消息就丢失了。

第二种选择是设置acks=1

只要Partition Leader接收到消息而且写入本地磁盘了,就认为成功了,不管其他的Follower有没有同步过去这条消息了。

这种设置其实是kafka默认的设置方式

也就是说默认情况下,要是不设置这个参数,只要Partition Leader写成功就算成功。

但是这里有一个问题,万一Partition Leader刚刚接收到消息,Follower还没来得及同步过去,结果Leader所在的broker宕机了,此时也会导致这条消息丢失,因为人家客户端已经认为发送成功了。

最后一种情况就是设置为all

Partition Leader接收到消息之后,还必须要求ISR列表里跟Leader保持同步的那些Follower都要把消息同步过去,才能认为这条消息是写入成功了。
如果说Partition Leader刚接收到了消息,但是结果Follower没有收到消息,此时Leader宕机了,那么客户端会感知到这个消息没发送成功,他会重试再次发送消息过去。
此时可能Partition 2的Follower变成Leader了,此时ISR列表里只有最新的这个Follower转变成的Leader了,那么只要这个新的Leader接收消息就算成功了。

acks=all就代表数据一定不会丢失了吗?

当然不是,如果你的Partition只有一个副本,也就是一个Leader,任何Follower都没有,你认为acks=all有用吗?
当然没用了,因为ISR里就一个Leader,他接收完消息后宕机,也会导致数据丢失。
所以说,这个acks=all,必须跟ISR列表里至少有2个以上的副本配合使用,起码是有一个Leader和一个Follower才可以。
这样才能保证说写一条数据过去,一定是2个以上的副本都收到了才算是成功,此时任何一个副本宕机,不会导致数据丢失。

min.insync.replicas=2

当生产者设置应答为"all"(或“-1”)时,此配置指定了成功写入的副本应答的最小数。如果没满足此最小数,则生产者将引发异常(NotEnoughReplicas或NotEnoughReplicasAfterAppend)
当min.insync.replicas和acks强制更大的耐用性时。典型的情况是创建一个副本为3的topic,将min.insync.replicas设置为2,并设置acks为“all”。如果多数副本没有收到写入,这将确保生产者引发异常。


broker.id.generation.enable 和 reserved.broker.max.id

来配合生成新的 broker.id。
#broker.id.generation.enable参数是用来配置是否开启自动生成 broker.id 的功能,默认情况下为true,即开启此功能。自动生成的broker.id有一个默认值,默认值为1000,也就是说默认情况下自动生成的 broker.id 从1001开始。


port

#broker server服务端口


logs.dir

#kafka数据的存放地址,多个地址的话用逗号分割,多个目录分布在不同磁盘上可以提高读写性能 /data/kafka-logs-1,/data/kafka-logs-2


log.retention.{hous|minutes|ms}

#日志留存时间,默认只保留最近7天的数据。


log.retention.bytes

#空间维度上的留存策略,控制着Kafka集群需要为每个消息日志保存多大的数据。对于大小超过该参数的分区日志而言,Kafka会自动清理该分区的过期日志段文件。默认为-1,表示不依据日志大小来清除日志。


zookeeper.connect

#无默认值,可以为一个CSV(comma-separated values)逗号分隔值列表,如设置为zk1:2181,zk2:2181,zk3:2181
listeners

#broker监听器的CSV列表,格式是[协议]://[主机名]:[端口], [[协议]://[主机名]:[端口]]。该参数主要用于客户端连接broker使用,可以认为是broker端开放给clients的监听端口。如果不指定主机名,则表示绑定默认网卡;如果主机名是0.0.0.0,则表示绑定所有网卡。


副本同步leader能力参数


1、replica.lag.time.max.ms

#replicas响应leader的最长等待时间,若是超过这个时间,就将replicas排除在ISR之外

Kafka判断ISR中的follower和leader同步的根据是参数 replica.lag.time.max.ms 默认是10s

就是说如果follower落后leader 10s,则认为他失效了,会踢出ISR集合。

随着 follower 副本不断与 leader 副本进行消息同步, follower 副本的 LEO 也会逐渐后移 ,
并最终追赶上 leader 副本,此时该 follower 副本就有资格进入 ISR 集合。追赶上 leader 副本的
判定准则是此副本的 LEO 是否不小于 leader 副本的 HW,注意这里并不是和 leader 副本的 LEO
相比 。


2、replica.lag.max.messages

#如果relicas落后太多,将会认为此partition relicas已经失效。而一般情况下,因为网络延迟等原因,总会导致replicas中消息同步滞后。如果消息严重滞后,leader将认为此relicas网络延迟较大或者消息吞吐能力有限。在broker数量较少,或者网络不足的环境中,建议提高此值.
Kafka 0.9.0.0版本后移除了replica.lag.max.messages参数,只保留了replica.lag.time.max.ms作为ISR中副本管理的参数


3、num.replica.fetchers

(默认:1) - follower从leader拉取消息进行同步数据,是由fetcher线程完成的,fetcher线程数由此参数以及要连接的broker共同决定的控制,


4、replica.fetch.max.bytes

(默认: 1MB) – broker可复制的消息的最大字节数。这个值应该比message.max.bytes大,否则broker会接收此消息,但无法将此消息复制出去,从而造成数据丢失。


message.max.bytes

(默认:1000000)– broker能接收消息的最大字节数,这个值应该比消费端的fetch.message.max.bytes更小才对,否则broker就会因为消费端无法使用这个消息而挂起。

修改 Topic 级 max.message.bytes,要考虑以下两个
还要修改 Broker的 replica.fetch.max.bytes 保证复制正常
消费还要修改配置 fetch.message.max.bytes


fetch.message.max.bytes (默认 1MB)

#消费者能读取的最大消息。这个值应该大于或等于message.max.bytes。
所以,如果你一定要选择kafka来传送大的消息,还有些事项需要考虑。要传送大的消息,不是当出现问题之后再来考虑如何解决,而是在一开始设计的时候,就要考虑到大消息对集群和主题的影响。


auto.leader.rebalance.enable=false

#是否自动平衡broker之间的分配策略

    如果为false时,某个broker挂了,那分布在他上的leader副本就会自动切换到其他活着的broker上,但是挂掉的broker重启之后,集群并不会将他之前的leader副本再切换回来,这样就会使其他broker上leader副本数较多,而该broker上无leader副本(无新主题创建),从而造成负载不均衡的情况。
    这时我们可以通过 kafka-preferred-replica-election.sh 脚本来重新平衡集群中的leader副本。

    如果为true的话,controller角色就会每五分钟(leader.imbalance.check.interval.seconds默认)检查一下集群不平衡的状态(比较leader.imbalance.per.broker.percentage),进而重新平衡leader副本

leader.imbalance.per.broker.percentage = 10

#leader的不平衡比例,若是超过这个数值,会对分区进行重新的平衡
比例计算公式为:
(leader不是AR副本集的preferred replica的数量)/(broker 的AR副本数量)


leader.imbalance.check.interval.seconds = 300

#检查leader是否不平衡的时间间隔


unclean.leader.election.enable

#版本不一样默认值也不一样,建议设置为false

unclean .leader .election .enable 是关闭 Unclean Leader 选举的。何谓 Unclean?还记得Kafka有多个副本这件事吗?每个分区都有多个副本来提供高可用。在这些副 本中只能有一个副本对外提供服务,即所谓的Leader副本。
那么问题了,这些副本都有资格竞争Leader吗?显然不是,只有保存数据比较多的那些副本 才有资格竞选,那些落后进度太多的副本没资格做这件事情
好了,现在出现这种情况了:假设那些保存数据比较多的副本都挂了怎么办?我们还要不要进行 Leader选举了?此时这个参数就派上用场了。
如果设置成false,那么就坚持之前的原则,坚决不能让那些落后太多的副本竞选Leader。这样 做的后果是这个分区就不可用了,因为没有Leader 了。
反之如果是true,那么Kafka允许你从 那些"跑的慢"的副本中选一个出来当Leader。这样做的后果是数据有可能就丢失了,因为这 些副本保存的数据本来就不全,当了 Leader之后它本人就变得膨胀了,认为自己的数据才是权 威的。
这个参数在最新版的Kafka中默认就是false,本来不需要我特意提的,但是比较搞笑的是社区 对这个参数的默认值来来回回改了好几版了,鉴于我不知道你用的是哪个版本的Kafka,所以建 议你还是显式地把它设置成false吧。
注:虽然设置为false会可能导致该partition不可用,但是设置为ture会有丢数据的风险。


log.roll.{hours,ms}

日志滚动的周期时间,到达指定周期时间时,强制生成一个新的segment
默认值168(7 day)

log.segment.bytes

每个segment的最大容量。到达指定容量时,将强制生成一个新的segment
默认值1G(-1 为不限制)

**log.retention.check.interval.ms **

日志片段文件检查的周期时间    
默认值60000

Kafka的日志实际上是开始是在缓存中的(linux页缓存),然后根据一定策略定期一批一批写入到日志文件中去,以提高吞吐量.
log.flush.interval.messages

消息达到多少条时将数据写入到日志文件
默认值10000

log.flush.interval.ms

当达到该时间时,强制执行一次flush    
默认值null

log.flush.shceduler.interval.ms

周期性检查,是否需要将信息flush   

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐