接下来在看下producer生产者的配置:

1.key.serializer
实现org.apache.kafka.common.serialization.Serializer接口的键的序列化器类。

2.value.serializer
用于实现org.apache.kafka.common.serialization.Serializer接口的值的序列化器类。

3.acks
生产者要求leader在考虑完成请求之前收到的确认数量,默认值为1,可选项是0,1,-1,all。它控制发送的记录的持久性。有以下几个选项:
    1).acks=0:如果设置为0,那么生产者根本不会等待来自服务器的任何确认。该记录将会立即添加到套接字缓冲区并被认为已发送。在这种情况下,无法保证服务器已经接收到记录,重试配置将不会生效(因为客户机通常不会知道任何失败)。为每条记录返回的偏移量将始终设置为-1。
    2).acks=1:这意味着leader将把记录写到它的本地日志中,但是在没有等待所有follower的完全确认的情况下会做出回应。在这种情况下,如果leader在确认记录后立即失败,但是在follower复制它之前,那么记录将丢失。
    3).acks=all:这意味着leader将等待完整的同步副本集合来确认记录,即所有的follower完全确认同步成功后作出回应,这保证了只要至少有一个同步副本保持活动状态,记录就不会丢失。这是最有力的保证。这等价于ack=-1的设置。

4.bootstrap.servers
用于建立到Kafka集群的初始连接的主机/端口对列表。客户机将使用所有服务器,此列表只影响用于发现完整服务器集的初始主机。该列表应该以host1:port1,host2:port2,由于这些服务器仅用于初始连接,以发现完整的集群成员关系(可能会动态更改),因此此列表不需要包含完整的服务器集,为了避免连接的服务器宕机,因此需要指定多个服务器。

5.buffer.memory
生产者可用于缓冲等待发送到服务器的记录的内存总字节,默认33554432bytes,即32M。如果记录发送的速度比发送到服务器的速度快,那么生成器将阻塞max.block.ms之后,它将抛出一个异常。这个设置应该大致对应于生产者将使用的总内存,但不是硬性限制,因为生产者使用的所有内存都用于缓冲。一些额外的内存将用于压缩(如果启用了压缩)以及维护动态请求。

6.compression.type
生产者生成的所有数据的压缩类型。默认值是none(即没有压缩)。有效值是none、gzip、snappy、lz4或zstd。压缩是整批的数据,因此批处理的效率也会影响压缩比(批处理越多,压缩效果越好)。

7.retries
设置一个大于零的值将导致客户端重新发送任何发送失败并可能出现瞬态错误的记录,默认值2147483647。请注意,此重试与客户端在接收到错误后重新发送记录没有区别。允许重试在没有设置max.in.flight.requests.per.connection为1的情况下,但是可能会改变记录的顺序,因为如果将两个批发送到单个分区,并且第一个批失败并重试,但是第二个批成功,那么第二个批中的记录可能会首先出现。另外请注意,如果配置了delivery.timeout.ms的超时配置,那么在重试次数耗尽之前,生成请求将失败因为超时在成功确认之前先过期。用户通常不愿这个配置retries,而使用delivery.timeout.ms控制重试行为。

8.ssl.key.password
密钥存储文件中私钥的密码。这对于客户机是可选的。

9.ssl.keystore.location
密钥存储文件的位置。这对于客户机是可选的,可以用于客户机的双向身份验证。

10.ssl.keystore.password
密钥存储文件的存储密码。这对于客户机是可选的,只有在配置了ssl.keystore.location时才需要。

11.ssl.truststore.location
信任存储库文件的位置。

12.ssl.truststore.password
信任存储文件的密码。如果没有设置密码,仍然可以访问信任存储库,但是禁用了完整性检查。

13.batch.size
每当多个记录被发送到同一个分区时,生产者将尝试将记录批处理成更少的请求。这有助于提高客户端和服务器的性能。此配置以字节为单位控制默认批处理大小,默认大小为16384bytes,即16K。不会尝试对大于此大小的记录进行批处理。发送到broker的请求将包含多个批处理,每个分区都有一个批处理,其中有可供发送的数据。较小的批处理大小将使批处理不那么常见,并可能降低吞吐量(批处理大小为零将完全禁用批处理)。非常大的批处理可能会更加浪费内存,因为我们总是会分配指定批处理大小的缓冲区,以预期会有更多的记录。

14.client.dns.lookup
控制客户端如何使用DNS查找,默认为use_all_dns_ips,可选项有default, use_all_dns_ips, resolve_canonical_bootstrap_servers_only。如果设置为use_all_dns_ips,那么当查找返回一个主机名的多个IP地址时,就会在连接失败之前尝试连接它们,对引导和广告服务器都适用。如果值是resolve_canonical_bootstrap_servers_only,则每个条目将解析并展开为规范名称列表。

15.client.id
在发出请求时传递给服务器的id字符串。这样做的目的是通过允许在服务器端请求日志记录中包含逻辑应用程序名称,从而能够跟踪ip/port之外的请求源。

16.connections.max.idle.ms
在此配置指定的毫秒数之后关闭空闲连接,默认为540000毫秒,即9分钟。

17.delivery.timeout.ms
调用send()后报告成功或失败的时间上限返回,默认120000毫秒,即2分钟。这限制了记录在发送前延迟的总时间、等待代理确认的时间(如果预期的话)以及可恢复发送失败的允许时间。如果遇到不可恢复的错误,重试已经结束,或者记录被添加到到达较早交付截止日期的批处理中,生产者可能会报告在此配置之前发送记录失败。此配置的值应该大于或等于request.timeout.ms和linger.ms的总和。

18.linger.ms
生产者组将在请求传输之间到达的任何记录分组为单个批处理请求。通常情况下,只有当记录到达的速度比发送的速度快时,才会发生这种情况。然而,在某些情况下,即使在中等负载下,客户端也可能希望减少请求的数量。此设置通过添加少量的人工延迟来实现这一点——也就是说,生产者不是立即发送一个记录,而是等待到给定的延迟才允许发送其他记录,以便发送的记录可以成批一起发送。这可以看作类似于TCP中的Nagle算法。这个设置给出了批处理延迟的上限:一旦我们得到批处理。不管这个设置如何,分区的记录大小都会被立即发送,但是如果我们为这个分区积累的字节数少于这个数目,我们就会在指定的时间内等待更多的记录出现。此设置的默认值为0(即没有延迟),例如,设置linger.ms=5可以减少发送的请求数量,但在没有负载的情况下,发送的记录的延迟将增加5ms。

19.max.block.ms
配置控制KafkaProducer.send()和KafkaProducer.partitionsFor()将阻塞多长时间,默认60000毫秒。这些方法可能因为缓冲区已满或元数据不可用而被阻塞。用户提供的序列化器或分拆器中的阻塞不会计入此超时。

20.max.request.size
请求的最大大小,以字节为单位,默认为1048576bytes,即1M。此设置将限制生产者在单个请求中发送的记录批的数量,以避免发送大量请求。这也有效地限制了未压缩的最大记录批大小。注意,服务器对记录批处理大小有自己的上限(如果启用了压缩,则在压缩之后),上限可能与此不同。

21.partitioner.class
实现org.apache.kafka.clients.producer.Partitioner的接口类,默认为org.apache.kafka.clients.producer.internals.DefaultPartitioner。

22.receive.buffer.bytes
读取数据时要使用的TCP接收缓冲区(SO_RCVBUF)的大小,默认为32768bytes,即32K。如果值为-1,将使用OS默认值。

23.request.timeout.ms
配置控制客户端等待请求响应的最大时间,默认30000毫秒。如果在超时结束前没有收到响应,客户端将在必要时重新发送请求,或者在重试耗尽时请求失败。这个应该比replica.lag.time.max.ms(broker上面配置)大,用于减少由于不必要的生产者重试而导致的消息重复的可能性。

24.sasl.client.callback.handler.class
实现AuthenticateCallbackHandler接口的SASL客户端回调处理程序类的完全限定名。

25.sasl.jaas.config
使用JAAS配置文件使用的格式为SASL连接的JAAS登录上下文参数。该值的格式是:'loginModuleClass controlFlag (optionName=optionValue)*;'。对于broker,配置必须使用监听器前缀和小写的SASL机制名称作为前缀。例如,listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=com.example.ScramLoginModule required;

26.sasl.kerberos.service.name
Kafka运行的Kerberos主体名。这可以在Kafka的JAAS配置或Kafka的配置中定义。

27.sasl.login.callback.handler.class
实现AuthenticateCallbackHandler接口的SASL登录回调处理程序类的完全限定名。对于broker,登录回调处理程序配置必须带有监听器前缀和小写的SASL机制名称,如listener.name.sasl_ssl.scram-sha-256.sasl.login.callback.handler.class=com.example.CustomScramLoginCallbackHandler。

28.sasl.login.class
实现登录接口的类的完全限定名。对于broker,登录配置必须使用监听器前缀和小写的SASL机制名称作为前缀。例如,listener.name.sasl_ssl.scram-sha-256.sasl.login.class=com.example.CustomScramLogin。

29.sasl.mechanism
用于客户端连接的SASL机制。这可以是安全提供程序可用的任何机制。GSSAPI是默认机制。

30.security.protocol
用于与broker通信的协议,默认为PLAINTEXT,可选项有PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL。

31.send.buffer.bytes
发送数据时要使用的TCP发送缓冲区(SO_SNDBUF)的大小,默认为131072bytes,即128K。如果值为-1,将使用OS默认值。

32.ssl.enabled.protocols
为SSL连接启用的协议列表。在使用Java 11或更新版本运行时,默认为“TLSv1.2,TLSv1.3”,否则为“TLSv1.2”。对于Java 11的默认值,如果客户端和服务器都支持TLSv1.3,那么它们会更喜欢TLSv1.3,否则会退回到TLSv1.2(假设两者都至少支持TLSv1.2)。这个默认值应该适用于大多数情况。另外,请参阅“ssl.protocol”的配置文档。

33.ssl.keystore.type
密钥存储文件的文件格式。这对于客户机是可选的。

34.ssl.protocol
用于生成SSLContext的SSL协议。在使用Java11或更新版本运行时,默认为“TLSv1.3”,否则为“TLSv1.2”。这个值对于大多数用例来说都是合适的。最近的jvm允许的值是“TLSv1.2”和“TLSv1.3”。旧的jvm可能支持“TLS”、“TLSv1.1”、“SSL”、“SSLv2”和“SSLv3”,但由于已知的安全漏洞,不建议使用它们。使用此配置的默认值和'ssl.enabled'。如果服务器不支持“TLSv1.3”,客户端将降级到“TLSv1.2”。如果这个配置被设置为'TLSv1.2',客户端将不会使用'TLSv1.3',即使它是ssl.enabled中的值之一。协议和服务器只支持“TLSv1.3”。

35.ssl.provider
用于SSL连接的安全提供程序的名称。默认值是JVM的默认安全提供程序。

36.ssl.truststore.type
信任存储区文件的文件格式,默认JKS。

37.enable.idempotence
默认为false。当设置为“true”时,生产者将确保在流中准确地写入每个消息的一个副本。如果为“false”,则由于broker失败等原因,生产者重试,可能会在流中写入重试消息的副本。注意,启用幂等性如果设置max.in.flight.requests.per.connection小于或等于5,则重试大于0,且ack必须为“all”。如果用户没有显式地设置这些值,那么将选择合适的值。如果设置了不兼容的值,则会抛出ConfigException。

38.interceptor.classes
用作拦截器的类列表。实现了org.apache.kafka.clients.producer。ProducerInterceptor接口允许在将记录发布到Kafka集群之前拦截(并可能更改)生产者收到的记录。默认情况下,没有拦截器。

39.max.in.flight.requests.per.connection
客户端在阻塞之前在单个连接上发送的未确认请求的最大数量,默认为5。注意,如果该设置设置为大于1,并且发送失败,则会有重试导致消息重新排序的风险(如果启用了重试)。

40.metadata.max.age.ms
在一段时间(以毫秒为单位)后,即使我们没有看到任何分区leader变更,我们也会强制刷新元数据,以主动发现任何新的代理或分区,默认为300000毫秒,即5分钟。

41.metadata.max.idle.ms
控制生产者为空闲主题缓存元数据的时间,默认为300000毫秒,即5分钟。如果自上次生成主题以来所经过的时间超过了元数据空闲时间,那么就会忘记该主题的元数据,下一次访问它时将强制执行元数据获取请求。

42.metric.reporters
用作度量报告器的类列表。实现org.apache.kafka.common.metrics.MetricsReporter接口允许插入将被通知新度量创建的类。JmxReporter一直包含用来注册JMX统计信息。

43.metrics.num.samples
计算指标而维护的样本数量,默认为2。

44.metrics.recording.level
度量标准的最高记录级别,默认info,可选项为INFO, DEBUG。

45.metrics.sample.window.ms
度量样本计算结束的时间窗口,默认30000毫秒。

46.reconnect.backoff.max.ms
重新连接到多次连接失败的代理时等待的最大时间(以毫秒为单位),默认1000毫秒。如果这样做,每台主机的回退量将在每次连续连接失败时呈指数增长,直到这个最大值。在计算回退增加后,增加20%的随机抖动以避免连接风暴。

47.reconnect.backoff.ms
尝试重新连接到给定主机之前等待的基本时间,默认50毫秒。这避免了在紧循环中重复连接到主机。此回退适用于客户端对broker的所有连接尝试。

48.retry.backoff.ms
尝试对给定主题分区重试失败的请求之前等待的时间,默认100毫秒。这避免了在某些故障场景下,在紧密循环中重复发送请求。

49.sasl.kerberos.kinit.cmd
Kerberos kinit命令路径,默认为/usr/bin/kinit。

50.sasl.kerberos.min.time.before.relogin
刷新尝试之间的登录线程休眠时间,默认60000毫秒,即1分钟。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐