1.Zookeeper 配置 SASL

若只关注 kafka 的安全认证,不需要配置 Zookeeper 的 SASL,但 kafka 会在 zk 中存储一些必要的信息,因此 zk 的安全认证也会影响到 kafka

1.1 新建 zoo_jaas.conf 文件

zoo_jaas.conf文件名、文件所在路径没有特殊要求,一般放置在${ZOOKEEPER_HOME}/conf目录下

Server {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin"
    user_kafka="kafka@123";
};

1.Server.username、Server.password为 Zookeeper 内部通信的用户名和密码,因此保证每个 zk 节点该属性一致即可
2.Server.user_xxx 中 xxx 为自定义用户名,用于 zkClient 连接所使用的用户名和密码,即为 kafka 创建的用户名

1.2 配置 zoo.conf 文件(为zookeeper添加SASL支持)

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
zookeeper.sasl.client=true

zookeeper.sasl.client设置为true,开启客户端身份验证,否则zoo_jaas.conf中配置的用户名将不起作用,客户端仍然可以无 jaas 文件连接,只是带有 WARNNING 而已

1.3 导入依赖包

因为使用的权限验证类为:org.apache.kafka.common.security.plain.PlainLoginModule,所以需要 kafka 相关 jar 包,新建文件夹 zk_sasl_lib,如下:
所以需要导入Kafka相关jar包,kafka-clients相关jar包,在部署kafka服务下的libs目录中可以找到,根据kafka不同版本,相关jar包版本会有所变化。所需要jar包如下,在zookeeper下创建目录zk_sasl_lib将jar包放入(目录名与位置可以随便,后续引用指定即可)

kafka-clients-2.4.1.jar
lz4-java-1.6.0.jar
slf4j-api-1.7.28.jar
slf4j-log4j12-1.7.28.jar
snappy-java-1.1.7.3.jar

1.4 修改 zkEnv.sh 文件

主要目的就是将这几个jar包使zookeeper读取到
在$ZK_HOME/bin目录下找到zkEnv.sh文件,添加如下代码,注意引用的目录下jar包,与之前创建的zoo_jaas.conf文件

修改前:

export SERVER_JVMFLAGS="-Xmx${ZK_SERVER_HEAP}m $SERVER_JVMFLAGS"

修改后:

for jar in /Users/wjun/env/zookeeper/zk_sasl_lib/*.jar;
do
        CLASSPATH="$jar:$CLASSPATH"
done

export SERVER_JVMFLAGS="-Xmx${ZK_SERVER_HEAP}m $SERVER_JVMFLAGS -Djava.security.auth.login.config=/Users/wjun/env/zookeeper/conf/zoo_jaas.conf"

重启 Zookeeper 服务即可

2.Kakfa 配置 SASL

2.1 新建 kafka_server_jaas.conf 文件

kafka_server_jaas.conf文件名和存放路径没有要求,一般放置在${KAFKA_HOME}/config目录下(这里的Client与Zookeeper相对应,KafkaServer与后期调用时读取的KafkaClient相对应,是消费生产的账号密码,不要弄混了)

KafkaServer {
 org.apache.kafka.common.security.plain.PlainLoginModule required
 username="admin"
 password="admin@123"
 user_admin="admin@123"
 user_producer="producer@123"
 user_consumer="consumer@123";
};
Client{
 org.apache.kafka.common.security.plain.PlainLoginModule required
 username="kafka"
 password="kafka@123";
};
  • KafkaServer.username、KafkaServer.password 为 broker 内部通信的用户名密码,同上
  • KafkaServer.user_xxx 其中 xxx 必须和 KafkaServer.username 配置的用户名一致,密码也一致(这个用户就是管理员用户,不受鉴权控制)
  • KafkaServer.user_producer、KafkaServer.user_consumer 为了之后的 ACL 做准备,达到消费者生产者使用不同账号且消费者账号只能消费数据,生产者账号只能生产数据
  • Client.username、Client.password 填写 Zookeeper 中注册的账号密码,用于 broker 与 zk 的通信(若 zk 没有配置 SASL 可以忽略、若 zookeeper.sasl.client 为 false 也可以忽略只是带有⚠️,日志如下)
  • 解释:
    KafkaServer中,使用user_来定义多个用户,供客户端程序(生产者、消费者程序)认 证 使用,可以定义多个,后续配置可能还可以根据不同的用户定义ACL,username=“admin”和password=“admin@123” 这两个是kafka集群内部认证时用到的。
    Client中,主要是broker链接到zookeeper,需要和之前配置的zk_server_jaas.conf里的 user_kafka=“kafka@123””;对应,如果zk文件中配置了多个,填写其中一个即可。
[2021-06-29 17:14:30,204] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/Users/wjun/env/kafka/config/kafka_server_jaas.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)

2.2 修改 server.properties 文件

listeners=SASL_PLAINTEXT://localhost:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
allow.everyone.if.no.acl.found=false
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:admin

super.users 配置超级用户,该用户不受之后的 ACL 配置影响

2.3 修改启动脚本

修改 kafka-server-start.sh 文件,使之加载到 kafka_server_jaas.conf 文件

修改前:

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi

修改后:

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/Users/wjun/env/kafka/config/kafka_server_jaas.conf"
fi

重启 kafka 服务即可

2.4 Java API 验证

代码如下:

public class TProducer {
    public static void main(String[] args) throws IOException {
        // 创建配置类
        Properties properties = new Properties();
        // 加载生产者配置文件
        properties.load(TProducer.class.getClassLoader().getResourceAsStream("producer.properties"));
        // 创建生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        ProducerRecord<String, String> producerRecord = new ProducerRecord<>("demo1", "key_1", "value_1");

        producer.send(producerRecord, (metadata, exception) -> {
            if (exception == null) {
                System.out.println("消息发送至 --> " + metadata.topic() + " 偏移量为:" + metadata.offset());
            } else {
                System.out.println("消息发送失败 " + exception.getMessage());
            }
        });

        producer.close();
    }
}

其中 producer.properties 如下:

############################# Producer Basics #############################

# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
bootstrap.servers=localhost:9092

# specify the compression codec for all data generated: none, gzip, snappy, lz4
compression.type=none

# name of the partitioner class for partitioning events; default partition spreads data randomly
# partitioner.class=

# the maximum amount of time the client will wait for the response of a request
#request.timeout.ms=

# how long `KafkaProducer.send` and `KafkaProducer.partitionsFor` will block for
#max.block.ms=

# the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together
#linger.ms=

# the maximum size of a request in bytes
#max.request.size=

# the default batch size in bytes when batching multiple records sent to a partition
#batch.size=

# the total bytes of memory the producer can use to buffer records waiting to be sent to the server
#buffer.memory=

key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

security.protocol=SASL_PLAINTEXT、sasl.mechanism=PLAIN 必须配置

此时运行会报

Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set
	at org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:133)
	at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:98)
	at org.apache.kafka.common.security.JaasContext.loadClientContext(JaasContext.java:84)
	at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:124)
	at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67)
	at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99)
	at org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:450)
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:421)
	... 2 more

原因是需要用户名密码才能连接到 Kafka,即 kafka_server_jaas.conf 配置中的

方式一:
创建 kafka_client_jaas.conf 文件

KafkaClient{
 org.apache.kafka.common.security.plain.PlainLoginModule required
 username="producer"
 password="producer@123";
};

程序启动时添加参数 -Djava.security.auth.login.config=/Users/wjun/Documents/Program/Java/kafka_demo/src/main/resources/kafka_client_jaas.conf
在这里插入图片描述
方式二:
在 producer.properties 添加:sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=“producer” password=“producer@123”;

启动程序成功生产数据

[main] INFO org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully logged in.
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.4.1
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: c57222ae8cd7866b
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1624965871345
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: l3Agv3weRiG27uo5EDj4KA
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
消息发送至 --> demo1 偏移量为:18

消费者同理

public class TConsumer {
    public static void main(String[] args) throws IOException {
        // 创建配置类
        Properties properties = new Properties();
        // 加载生产者配置文件
        properties.load(TProducer.class.getClassLoader().getResourceAsStream("consumer.properties"));
        // 构建消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        // 订阅主题
        consumer.subscribe(Collections.singletonList("demo1"));

        ConsumerRecords<String, String> records;

        while (true) {
            records = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.key() + "--" + record.value());
            }
        }
    }
}

其中 consumer.properties 如下:

# comma separated host:port pairs, each corresponding to a zk
bootstrap.servers=localhost:9092

# timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

#consumer group id
group.id=test-consumer-group

#consumer timeout
#consumer.timeout.ms=5000

key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

#offset submission method
enable.auto.commit=true

# earliest or latest
auto.offset.reset=latest

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="producer" password="producer@123";

此时会发现并不能消费,异常信息为:

Exception in thread "main" org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [demo1]

原因是该消费者组没有消费主题的权限,即 ACL 操作,当然换成 admin 是不存在这个问题的

3.Kafka ACL

配置用户具有某个主题的写权限,即生产数据

kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:producer --producer --topic demo1

配置用户具有某个主题的读权限,即消费数据

kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:consumer --consumer --topic demo1 --group test-consumer-group

查看 ACL 列表

kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic demo1

同时也可以取消用户权限,还可以限制 ip 等,具体 ACL操作见 kafka官方文档

假设你要添加一个acl “以允许198.51.100.0和198.51.100.1,Principal为User:Bob和User:Alice对主题是Test-Topic有Read和Write的执行权限”

./kafka-acls.sh --authorizer-properties zookeeper.connect=****:2181/kafka --add --allow-principal User:Bob --allow-host 198.51.100.0 --operation Read --operation Write --topic kht1

指定消费者组

./kafka-acls.sh --authorizer-properties zookeeper.connect=****:2181/kafka --add --allow-principal User:* --consumer --topic konghaot1 --group Group-1

注:其他指令./kafka-acls.sh -h

Kafka Authorization management CLI can be found under bin directory with all the other CLIs. The CLI script is called kafka-acls.sh. Following lists all the options that the script supports:

OPTIONDESCRIPTIONDEFAULTOPTION TYPE
–addIndicates to the script that user is trying to add an acl.Action
–removeIndicates to the script that user is trying to remove an acl.Action
–listIndicates to the script that user is trying to list acls.Action
–authorizerFully qualified class name of the authorizer.kafka.security.auth.SimpleAclAuthorizerConfiguration
–authorizer-propertieskey=val pairs that will be passed to authorizer for initialization. For the default authorizer the example values are: zookeeper.connect=localhost:2181Configuration
–bootstrap-serverA list of host/port pairs to use for establishing the connection to the Kafka cluster. Only one of --bootstrap-server or --authorizer option must be specified.Configuration
–command-configA property file containing configs to be passed to Admin Client. This option can only be used with --bootstrap-server option.Configuration
–clusterIndicates to the script that the user is trying to interact with acls on the singular cluster resource.ResourcePattern
–topic [topic-name]Indicates to the script that the user is trying to interact with acls on topic resource pattern(s).ResourcePattern
–group [group-name]Indicates to the script that the user is trying to interact with acls on consumer-group resource pattern(s)ResourcePattern
–transactional-id [transactional-id]The transactionalId to which ACLs should be added or removed. A value of * indicates the ACLs should apply to all transactionalIds.ResourcePattern
–delegation-token [delegation-token]Delegation token to which ACLs should be added or removed. A value of * indicates ACL should apply to all tokens.ResourcePattern
–resource-pattern-type [pattern-type]Indicates to the script the type of resource pattern, (for --add), or resource pattern filter, (for --list and --remove), the user wishes to use. When adding acls, this should be a specific pattern type, e.g. ‘literal’ or ‘prefixed’. When listing or removing acls, a specific pattern type filter can be used to list or remove acls from a specific type of resource pattern, or the filter values of ‘any’ or ‘match’ can be used, where ‘any’ will match any pattern type, but will match the resource name exactly, and ‘match’ will perform pattern matching to list or remove all acls that affect the supplied resource(s). WARNING: ‘match’, when used in combination with the ‘–remove’ switch, should be used with care.literalConfiguration
–allow-principalPrincipal is in PrincipalType:name format that will be added to ACL with Allow permission. Default PrincipalType string “User” is case sensitive. You can specify multiple --allow-principal in a single command.Principal
–deny-principalPrincipal is in PrincipalType:name format that will be added to ACL with Deny permission. Default PrincipalType string “User” is case sensitive. You can specify multiple --deny-principal in a single command.Principal
–principalPrincipal is in PrincipalType:name format that will be used along with --list option. Default PrincipalType string “User” is case sensitive. This will list the ACLs for the specified principal. You can specify multiple --principal in a single command.Principal
–allow-hostIP address from which principals listed in --allow-principal will have access.if --allow-principal is specified defaults to * which translates to “all hosts”Host
–deny-hostIP address from which principals listed in --deny-principal will be denied access.if --deny-principal is specified defaults to * which translates to “all hosts”Host
–operationOperation that will be allowed or denied. Valid values are:ReadWriteCreateDeleteAlterDescribeClusterActionDescribeConfigsAlterConfigsIdempotentWriteAllAllOperation
–producerConvenience option to add/remove acls for producer role. This will generate acls that allows WRITE, DESCRIBE and CREATE on topic.Convenience
–consumerConvenience option to add/remove acls for consumer role. This will generate acls that allows READ, DESCRIBE on topic and READ on consumer-group.Convenience
–idempotentEnable idempotence for the producer. This should be used in combination with the --producer option. Note that idempotence is enabled automatically if the producer is authorized to a particular transactional-id.Convenience
–forceConvenience option to assume yes to all queries and do not prompt.Convenience

4.kafka只允许部分ip访问(通过kafka配置)

在Kafka中,可以通过配置安全协议和认证机制来实现安全控制。但是,如果只需要限制特定IP地址访问Kafka,则可以在Kafka的配置文件中设置权限控制,限制可访问Kafka的IP地址范围。
具体实现方法如下:
1.打开Kafka配置文件,找到以下配置项:

# Hostname the broker will bind to. If not set, the server will bind to all interfaces
#此处未进行设置则代表绑定至所有网络地址
#默认值:''(等同于不设置)
listeners=PLAINTEXT://localhost:9092

2.在该配置项后面增加以下配置项:

# Allows us to set the IP address range that can access the Kafka broker
#允许我们设置可以访问Kafka broker的IP地址范围
#默认值:''(等同于不设置)
advertised.listeners=PLAINTEXT://localhost:9092
listeners=PLAINTEXT://localhost:9092
# Only accept connections from localhost and 192.168.0.X
#仅接受来自localhost和192.168.0.X的连接
#示例:允许192.168.0.1至192.168.0.255的IP地址
listeners=PLAINTEXT://localhost:9092
advertised.listeners=PLAINTEXT://localhost:9092
listeners.security.protocol.map=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
advertised.listeners=PLAINTEXT_HOST://192.168.0.1:9092
listener.security.protocol.map=PLAINTEXT_HOST:PLAINTEXT

allow.auto.create.topics=false
# The broker will allow broadcast
advertised.listeners=PLAINTEXT://localhost:9092
listeners=PLAINTEXT://localhost:9092,PLAINTEXT://192.168.0.1:9092
####这里就是设置的可访问IP地址范围### 
listeners=PLAINTEXT://192.168.0.1:9092

其中,listeners配置项是Kafka broker绑定的IP地址和端口,advertised.listeners配置项是broker对外透露的IP地址和端口

5.kafka 配置文件详解

5.1 server.properties配置文件

# kafka server配置 kafka最为重要三个配置依次为:broker.id、log.dir、zookeeper.connect
 
# 每一个broker在集群中的唯一表示,要求是正数。当该服务器的IP地址发生改变时,broker.id没有变化,则不会影响consumers的消息情况
broker.id=0
 
# broker的主机地址,若是设置了,那么会绑定到这个地址上,若是没有,会绑定到所有的接口上,并将其中之一发送到ZK,一般不设置
host.name=192.168.20.112
 
# broker server服务端口
port =9092
 
# broker处理消息的最大线程数,一般情况下数量为cpu核数
num.network.threads=4
 
# broker处理磁盘IO的线程数,数值为cpu核数2倍
num.io.threads=8
 
# socket的发送缓冲区,socket的调优参数SO_SNDBUFF
socket.send.buffer.bytes=1048576
 
# socket的接受缓冲区,socket的调优参数SO_RCVBUFF
socket.receive.buffer.bytes=1048576
 
# socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖
socket.request.max.bytes=104857600
 
# 每个topic的分区个数,若是在topic创建时候没有指定的话会被topic创建时的指定参数覆盖
num.partitions=2
 
# 数据文件保留多长时间, 存储的最大时间超过这个时间会根据log.cleanup.policy设置数据清除策略,log.retention.bytes和log.retention.minutes或log.retention.hours任意一个达到要求,都会执行删除
# 有2删除数据文件方式: 按照文件大小删除:log.retention.bytes  按照2中不同时间粒度删除:分别为分钟,小时
log.retention.hours=168
 
# topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖
log.segment.bytes=536870912
 
# 文件大小检查的周期时间,是否处罚 log.cleanup.policy中设置的策略
log.retention.check.interval.ms=60000
 
# 是否开启日志清理
log.cleaner.enable=false
 
# zookeeper集群的地址,可以是多个,多个之间用逗号分割 hostname1:port1,hostname2:port2,hostname3:port3
zookeeper.connect=192.168.20.112:2181
 
# ZooKeeper的连接超时时间
zookeeper.connection.timeout.ms=60000
 
# ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大
zookeeper.session.timeout.ms=6000
 
# ZooKeeper集群中leader和follower之间的同步时间
zookeeper.sync.time.ms =2000
 
# kafka数据的存放地址,多个地址的话用逗号分割,多个目录分布在不同磁盘上可以提高读写性能  /data/kafka-logs-1,/data/kafka-logs-2
log.dirs=/data1/ehserver/env/kafka_2.11-2.2.0/logs/kafka-logs-1
 
 
# ==========================================不重要非必须配置了解 =====================================
 
# 将默认的 delete 改成压缩模式 日志清理策略选择有:delete和compact主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖
log.cleanup.policy=compact
 
# 表示消息体的最大大小,单位是字节
message.max.bytes =6525000
 
# 一些后台任务处理的线程数,例如过期消息文件的删除等,一般情况下不需要去做修改
background.threads =4
 
#等待IO线程处理的请求队列最大数,若是等待IO的请求超过这个数值,那么会停止接受外部消息,应该是一种自我保护机制。
queued.max.requests =500
 
#这个参数会在日志segment没有达到log.segment.bytes设置的大小,也会强制新建一个segment会被 topic创建时的指定参数覆盖
#log.roll.hours =24*7
 
 
# topic每个分区的最大文件大小,一个topic的大小限制 = 分区数*log.retention.bytes。-1没有大小限log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖
# log.retention.bytes=-1
 
# 日志清理运行的线程数
log.cleaner.threads = 2
 
#日志清理时候处理的最大大小
# log.cleaner.io.max.bytes.per.second=None
 
# 日志清理去重时候的缓存空间,在空间允许的情况下,越大越好
# log.cleaner.dedupe.buffer.size=500*1024*1024
 
# 日志清理时候用到的IO块大小一般不需要修改
# log.cleaner.io.buffer.size=512*1024
 
# 日志清理中hash表的扩大因子一般不需要修改
log.cleaner.io.buffer.load.factor =0.9
 
# 检查是否处罚日志清理的间隔
log.cleaner.backoff.ms =15000
 
# 日志清理的频率控制,越大意味着更高效的清理,同时会存在一些空间上的浪费,会被topic创建时的指定参数覆盖
log.cleaner.min.cleanable.ratio=0.5
 
# 对于压缩的日志保留的最长时间,也是客户端消费消息的最长时间,同log.retention.minutes的区别在于一个控制未压缩数据,一个控制压缩后的数据。会被topic创建时的指定参数覆盖
# log.cleaner.delete.retention.ms =1day
 
# 对于segment日志的索引文件大小限制,会被topic创建时的指定参数覆盖
# log.index.size.max.bytes =10*1024*1024
 
# 当执行一个fetch操作后,需要一定的空间来扫描最近的offset大小,设置越大,代表扫描速度越快,但是也更好内存,一般情况下不需要搭理这个参数
log.index.interval.bytes =4096
 
# 表示每当消息记录数达到1000时flush一次数据到磁盘,log文件”sync”到磁盘之前累积的消息条数,因为磁盘IO操作是一个慢操作,但又是一个”数据可靠性"的必要手段,所以此参数的设置,需要在"数据可靠性"与"性能"之间做必要的权衡.如果此值过大,将会导致每次"fsync"的时间较长(IO阻塞),如果此值过小,将会导致"fsync"的次数较多,这也意味着整体的client请求有一定的延迟.物理server故障,将会导致没有fsync的消息丢失.
# log.flush.interval.messages=None 例如log.flush.interval.messages=1000
#检查是否需要固化到硬盘的时间间隔
log.flush.scheduler.interval.ms =3000
# 仅仅通过interval来控制消息的磁盘写入时机,是不足的.此参数用于控制"fsync"的时间间隔,如果消息量始终没有达到阀值,但是离上一次磁盘同步的时间间隔达到阀值,也将触发.
# log.flush.interval.ms = None 例如:log.flush.interval.ms=1000 表示每间隔1000毫秒flush一次数据到磁盘
# 文件在索引中清除后保留的时间一般不需要去修改
log.delete.delay.ms =60000
# 控制上次固化硬盘的时间点,以便于数据恢复一般不需要去修改
log.flush.offset.checkpoint.interval.ms =60000
# 是否允许自动创建topic,若是false,就需要通过命令创建topic
auto.create.topics.enable =true
default.replication.factor =1
# ===================================以下是kafka中Leader,replicas配置参数================================
## partition leader与replicas之间通讯时,socket的超时时间
#controller.socket.timeout.ms =30000
#
## partition leader与replicas数据同步时,消息的队列尺寸
#controller.message.queue.size=10
#
##replicas响应partition leader的最长等待时间,若是超过这个时间,就将replicas列入ISR(in-sync replicas),并认为它是死的,不会再加入管理中
#replica.lag.time.max.ms =10000
#
## 如果follower落后与leader太多,将会认为此follower[或者说partition relicas]已经失效。通常,在follower与leader通讯时,因为网络延迟或者链接断开,总会导致replicas中消息同步滞后如果消息之后太多,leader将认为此follower网络延迟较大或者消息吞吐能力有限,将会把此replicas迁移到其他follower中.在broker数量较少,或者网络不足的环境中,建议提高此值.
#replica.lag.max.messages =4000
#
## replicas
#replica.socket.timeout.ms=30*1000
#
## leader复制时候的socket缓存大小
#replica.socket.receive.buffer.bytes=64*1024
#
## replicas每次获取数据的最大大小
#replica.fetch.max.bytes =1024*1024
#
##replicas同leader之间通信的最大等待时间,失败了会重试
#replica.fetch.wait.max.ms =500
#
##fetch的最小数据尺寸,如果leader中尚未同步的数据不足此值,将会阻塞,直到满足条件
#replica.fetch.min.bytes =1
#
##leader进行复制的线程数,增大这个数值会增加follower的IO
#num.replica.fetchers=1
#
## 每个replica检查是否将最高水位进行固化的频率
#replica.high.watermark.checkpoint.interval.ms =5000
#
## 是否允许控制器关闭broker ,若是设置为true,会关闭所有在这个broker上的leader,并转移到其他broker
#controlled.shutdown.enable =false
#
##控制器关闭的尝试次数
#controlled.shutdown.max.retries =3
#
## 每次关闭尝试的时间间隔
#controlled.shutdown.retry.backoff.ms =5000
#
## leader的不平衡比例,若是超过这个数值,会对分区进行重新的平衡
#leader.imbalance.per.broker.percentage =10
#
## 检查leader是否不平衡的时间间隔
#leader.imbalance.check.interval.seconds =300
#
##客户端保留offset信息的最大空间大小
#offset.metadata.max.bytes
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
# 这是默认配置,使用PLAINTEXT,端口是9092, tcp用来监控的kafka端口
listeners=PLAINTEXT://192.168.20.112:9092
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0

5.2producer.properties配置文件

#指定kafka节点列表,用于获取metadata,不必全部指定
#需要kafka的服务器地址,来获取每一个topic的分片数等元数据信息。
metadata.broker.list=kafka01:9092,kafka02:9092,kafka03:9092
 
#生产者生产的消息被发送到哪个block,需要一个分组策略。
#指定分区处理类。默认kafka.producer.DefaultPartitioner,表通过key哈希到对应分区
#partitioner.class=kafka.producer.DefaultPartitioner
 
#生产者生产的消息可以通过一定的压缩策略(或者说压缩算法)来压缩。消息被压缩后发送到broker集群,
#而broker集群是不会进行解压缩的,broker集群只会把消息发送到消费者集群,然后由消费者来解压缩。
#是否压缩,默认0表示不压缩,1表示用gzip压缩,2表示用snappy压缩。
#压缩后消息中会有头来指明消息压缩类型,故在消费者端消息解压是透明的无需指定。
#文本数据会以1比10或者更高的压缩比进行压缩。
compression.codec=none
 
#指定序列化处理类,消息在网络上传输就需要序列化,它有String、数组等许多种实现。
serializer.class=kafka.serializer.DefaultEncoder
 
#如果要压缩消息,这里指定哪些topic要压缩消息,默认empty,表示不压缩。
#如果上面启用了压缩,那么这里就需要设置
#compressed.topics= 
#这是消息的确认机制,默认值是0。在面试中常被问到。
#producer有个ack参数,有三个值,分别代表:
#(1)不在乎是否写入成功;
#(2)写入leader成功;
#(3)写入leader和所有副本都成功;
#要求非常可靠的话可以牺牲性能设置成最后一种。
#为了保证消息不丢失,至少要设置为1,也就
#是说至少保证leader将消息保存成功。
#设置发送数据是否需要服务端的反馈,有三个值0,1,-1,分别代表3种状态:
#0: producer不会等待broker发送ack。生产者只要把消息发送给broker之后,就认为发送成功了,这是第1种情况;
#1: 当leader接收到消息之后发送ack。生产者把消息发送到broker之后,并且消息被写入到本地文件,才认为发送成功,这是第二种情况;#-1: 当所有的follower都同步消息成功后发送ack。不仅是主的分区将消息保存成功了,
#而且其所有的分区的副本数也都同步好了,才会被认为发动成功,这是第3种情况。
request.required.acks=0
 
#broker必须在该时间范围之内给出反馈,否则失败。
#在向producer发送ack之前,broker允许等待的最大时间 ,如果超时,
#broker将会向producer发送一个error ACK.意味着上一次消息因为某种原因
#未能成功(比如follower未能同步成功)
request.timeout.ms=10000
 
#生产者将消息发送到broker,有两种方式,一种是同步,表示生产者发送一条,broker就接收一条;
#还有一种是异步,表示生产者积累到一批的消息,装到一个池子里面缓存起来,再发送给broker,
#这个池子不会无限缓存消息,在下面,它分别有一个时间限制(时间阈值)和一个数量限制(数量阈值)的参数供我们来设置。
#一般我们会选择异步。
#同步还是异步发送消息,默认“sync”表同步,"async"表异步。异步可以提高发送吞吐量,
#也意味着消息将会在本地buffer中,并适时批量发送,但是也可能导致丢失未发送过去的消息
producer.type=sync
 
#在async模式下,当message被缓存的时间超过此值后,将会批量发送给broker,
#默认为5000ms
#此值和batch.num.messages协同工作.
queue.buffering.max.ms = 5000
 
#异步情况下,缓存中允许存放消息数量的大小。
#在async模式下,producer端允许buffer的最大消息量
#无论如何,producer都无法尽快的将消息发送给broker,从而导致消息在producer端大量沉积
#此时,如果消息的条数达到阀值,将会导致producer端阻塞或者消息被抛弃,默认为10000条消息。
queue.buffering.max.messages=20000
 
#如果是异步,指定每次批量发送数据量,默认为200
batch.num.messages=500
 
#在生产端的缓冲池中,消息发送出去之后,在没有收到确认之前,该缓冲池中的消息是不能被删除的,
#但是生产者一直在生产消息,这个时候缓冲池可能会被撑爆,所以这就需要有一个处理的策略。
#有两种处理方式,一种是让生产者先别生产那么快,阻塞一下,等会再生产;另一种是将缓冲池中的消息清空。
#当消息在producer端沉积的条数达到"queue.buffering.max.meesages"后阻塞一定时间后,
#队列仍然没有enqueue(producer仍然没有发送出任何消息)
#此时producer可以继续阻塞或者将消息抛弃,此timeout值用于控制"阻塞"的时间
#-1: 不限制阻塞超时时间,让produce一直阻塞,这个时候消息就不会被抛弃
#0: 立即清空队列,消息被抛弃
queue.enqueue.timeout.ms=-1
 
 
#当producer接收到error ACK,或者没有接收到ACK时,允许消息重发的次数
#因为broker并没有完整的机制来避免消息重复,所以当网络异常时(比如ACK丢失)
#有可能导致broker接收到重复的消息,默认值为3.
message.send.max.retries=3
 
#producer刷新topic metada的时间间隔,producer需要知道partition leader
#的位置,以及当前topic的情况
#因此producer需要一个机制来获取最新的metadata,当producer遇到特定错误时,
#将会立即刷新
#(比如topic失效,partition丢失,leader失效等),此外也可以通过此参数来配置
#额外的刷新机制,默认值600000
topic.metadata.refresh.interval.ms=60000

5.3 consumer.properties配置文件

#消费者集群通过连接Zookeeper来找到broker。
#zookeeper连接服务器地址
zookeeper.connect=zk01:2181,zk02:2181,zk03:2181
 
#zookeeper的session过期时间,默认5000ms,用于检测消费者是否挂掉
zookeeper.session.timeout.ms=5000
 
#当消费者挂掉,其他消费者要等该指定时间才能检查到并且触发重新负载均衡
zookeeper.connection.timeout.ms=10000
 
#这是一个时间阈值。
#指定多久消费者更新offset到zookeeper中。
#注意offset更新时基于time而不是每次获得的消息。
#一旦在更新zookeeper发生异常并重启,将可能拿到已拿到过的消息
zookeeper.sync.time.ms=2000
 
#Consumer归属的组ID,broker是根据group.id来判断是队列模式还是发布订阅模式,非常重要
group.id=xxxxx
 
#这是一个数量阈值,经测试是500条。
#当consumer消费一定量的消息之后,将会自动向zookeeper提交offset信息#注意offset信息并不是每消费一次消息就向zk提交
#一次,而是现在本地保存(内存),并定期提交,默认为true
auto.commit.enable=true
 
# 自动更新时间。默认60 * 1000
auto.commit.interval.ms=1000
 
# 当前consumer的标识,可以设定,也可以有系统生成,
#主要用来跟踪消息消费情况,便于观察
conusmer.id=xxx
 
# 消费者客户端编号,用于区分不同客户端,默认客户端程序自动产生
client.id=xxxx
 
# 最大取多少块缓存到消费者(默认10)
queued.max.message.chunks=50
 
# 当有新的consumer加入到group时,将会reblance,此后将会
#有partitions的消费端迁移到新  的consumer上,如果一个
#consumer获得了某个partition的消费权限,那么它将会向zk
#注册 "Partition Owner registry"节点信息,但是有可能
#此时旧的consumer尚没有释放此节点, 此值用于控制,
#注册节点的重试次数.
rebalance.max.retries=5
 
#每拉取一批消息的最大字节数
#获取消息的最大尺寸,broker不会像consumer输出大于
#此值的消息chunk 每次feth将得到多条消息,此值为总大小,
#提升此值,将会消耗更多的consumer端内存
fetch.min.bytes=6553600
 
#当消息的尺寸不足时,server阻塞的时间,如果超时,
#消息将立即发送给consumer
#数据一批一批到达,如果每一批是10条消息,如果某一批还
#不到10条,但是超时了,也会立即发送给consumer。
fetch.wait.max.ms=5000
socket.receive.buffer.bytes=655360
 
# 如果zookeeper没有offset值或offset值超出范围。
#那么就给个初始的offset。有smallest、largest、
#anything可选,分别表示给当前最小的offset、
#当前最大的offset、抛异常。默认largest
auto.offset.reset=smallest
 
# 指定序列化处理类
derializer.class=kafka.serializer.DefaultDecoder
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐