接下来在看下consumer的配置:

1.key.deserializer
用于实现org.apache.kafka.common.serialization.Deserializer接口的键的反序列化器类。

2.value.deserializer
用于实现org.apache.kafka.common.serialization.Deserializer接口的值的反序列化器类。

3.bootstrap.servers
用于建立到Kafka集群的初始连接的主机/端口对列表。客户机将使用所有服务器,此列表只影响用于发现完整服务器集的初始主机。该列表应该以host1:port1,host2:port2,由于这些服务器仅用于初始连接,以发现完整的集群成员关系(可能会动态更改),因此此列表不需要包含完整的服务器集,为了避免连接的服务器宕机,因此需要指定多个服务器。

4.fetch.min.bytes
服务器为获取请求应该返回的最小数据量。如果可用数据不足,请求将等待大量数据累积,然后再响应请求。1字节的默认设置意味着获取请求被回答,当一个数据的单个字节可用或获取请求超时等待数据到达。将这个值设置为大于1的值将导致服务器等待更大数量的数据积累,这会在增加一些延迟的代价下提高服务器吞吐量。

5.group.id
标识此使用者所属的使用者组的惟一字符串。如果使用者通过使用subscribe(topic)或基于kafka的偏移量管理策略使用组管理功能,则需要此属性。

6.heartbeat.interval.ms
使用Kafka的组管理工具时,心跳到消费者协调器之间的预期时间,默认3000毫秒。心跳用于确保消费者会话保持活动状态,并在新消费者加入或离开组时促进再平衡。该值必须设置低于session.timeout.ms,但通常应设置不超过该值的1/3。它可以调整更低,以控制正常重新平衡的预期时间。

7.max.partition.fetch.bytes
服务器将返回的每个分区的最大数据量,默认为1048576bytes,即1M。记录由消费者分批提取,如果fetch的第一个非空分区中的第一个记录批大于此限制,则仍将返回批以确保消费者能够继续执行。broker接受的最大记录批处理大小通过message.max.bytes定义(broker配置)或max.message.bytes(topic配置)。看到fetch.max.bytes限制消费者请求大小。

8.session.timeout.ms
使用Kafka的组管理工具时用于检测客户端故障的超时时间,默认10000毫秒。客户端定期向broker发送心跳来表示其活动。如果在此会话超时过期之前broker没有收到心跳,则broker将从组中删除此客户端并启动重新平衡。注意,该值必须在broker配置中配置的group.min.session.timeout.ms和group.max.session.timeout.ms允许范围内。

9.ssl.key.password
密钥存储文件中私钥的密码。这对于客户端是可选的。

10.ssl.keystore.location
密钥存储文件的位置。这对于客户机是可选的,可以用于客户机的双向身份验证。

11.ssl.keystore.password
密钥存储文件的存储密码。这对于客户机是可选的,只有在配置了ssl.keystore.location时才需要。

12.ssl.truststore.location
信任存储库文件的位置。

13.ssl.truststore.password
信任存储文件的密码。如果没有设置密码,仍然可以访问信任存储库,但是禁用了完整性检查。

14.allow.auto.create.topics
当订阅或分配主题时,允许在代理上自动创建主题,默认为true。只有在broker允许使用'auto.create.topics'时,订阅的主题才会自动创建。当使用小于0.11.0的broker时,这个配置必须设置为“false”。

15.auto.offset.reset
当Kafka中没有初始偏移量或者服务器上的当前偏移量不再存在时(例如,因为数据已经被删除)我们该怎么做,kafka提供了可供选择的latest, earliest, none三种配置,默认为latest:
    earliest:自动重置偏移量到最早的偏移量
    latest:自动将偏移量重置为最新偏移量
    none:如果没有为使用者的组找到以前的偏移量,则向使用者抛出异常
    anything else:向使用者抛出异常

16.client.dns.lookup
控制客户端如何使用DNS查找,默认为use_all_dns_ips,可选项有default, use_all_dns_ips, resolve_canonical_bootstrap_servers_only。如果设置为use_all_dns_ips,那么当查找返回一个主机名的多个IP地址时,就会在连接失败之前尝试连接它们,对引导和广告服务器都适用。如果值是resolve_canonical_bootstrap_servers_only,则每个条目将解析并展开为规范名称列表。

17.connections.max.idle.ms
在此配置指定的毫秒数之后关闭空闲连接,默认为540000毫秒,即9分钟。

18.default.api.timeout.ms
指定客户端api的超时时间,默认60000毫秒。此配置用于未指定超时参数的所有客户端操作的默认超时。

19.enable.auto.commit
消费者的偏移量是否将定期在后台提交,默认为true。

20.exclude.internal.topics
是否应该从订阅中排除与订阅模式匹配的内部主题,默认为true。始终可以显式地订阅内部主题。

21.fetch.max.bytes
服务器应该为获取请求返回的最大数据量,默认值为52428800bytes,即50M。记录是由使用者分批获取的,如果获取的第一个非空分区中的第一个记录批大于此值,则仍将返回该记录批,以确保使用者能够继续执行。因此,这不是一个绝对的最大值。broker接受的最大记录批处理大小通过message.max.bytes定义(broker配置)或max.message.bytes(topic配置)。请注意,使用者并行执行多个读取操作。

22.group.instance.id
最终用户提供的使用者实例的唯一标识符。只允许非空字符串。如果设置,则将consumer视为静态成员,这意味着在任何时候consumer组中只允许一个具有此ID的实例。这可以与更大的会话超时结合使用,以避免由于暂时不可用(例如进程重启)而导致的组重新平衡。如果没有设置,消费者将作为动态成员加入组,这是传统行为。

23.isolation.level
控制如何读取以事务方式编写的消息,默认为read_uncommitted,可选项有read_uncommitted,read_uncommitted。如果设置为read_committed, consumer.poll()将只返回已提交的事务性消息。如果设置为read_uncommitted'(默认值),consumer.poll()将返回所有消息,甚至是已经中止的事务性消息。非事务性消息将以两种模式无条件返回。
消息将总是按偏移顺序返回。因此,在read_committed模式中,consumer.poll()将只返回到最后一个稳定偏移量(LSO)的消息,LSO小于第一个打开的事务的偏移量。特别是,任何出现在属于正在进行的事务的消息之后的消息将被扣留,直到相关事务完成。因此,read_committed消费者将无法在有飞行事务时读取高水位。
此外,在read_committed中,seekToEnd方法将返回LSO。

24.max.poll.interval.ms
使用消费者组管理时poll()调用之间的最大延迟,默认为300000毫秒,即5分钟。这为消费者在获取更多记录之前的空闲时间设置了一个上限。如果在超时结束之前没有调用poll(),则认为消费者失败,组将重新进行平衡,以便将分区重新分配给另一个成员。

25.max.poll.records
对poll()的一次调用中返回的最大记录数,默认500。

26.partition.assignment.strategy
受支持的负责分区分配策略的类名或类类型的列表(按偏好排序),当使用组管理时,客户端将使用该策略在消费者实例之间分配分区所有权,默认策略为org.apache.kafka.clients.consumer.RangeAssignor。实现org.apache.kafka.clients.consumer.ConsumerPartitionAssignor接口允许插入自定义分配策略。

27.receive.buffer.bytes
读取数据时要使用的TCP接收缓冲区(SO_RCVBUF)的大小,默认为65536bytes,即64K。如果值为-1,将使用OS默认值。

28.request.timeout.ms
配置控制客户端等待请求响应的最大时间,默认为30000毫秒。如果在超时结束前没有收到响应,客户端将在必要时重新发送请求,或者在重试耗尽时请求失败。

29.sasl.client.callback.handler.class
实现AuthenticateCallbackHandler接口的SASL客户端回调处理程序类的完全限定名。

30.sasl.jaas.config
使用JAAS配置文件使用的格式为SASL连接的JAAS登录上下文参数。该值的格式是:'loginModuleClass controlFlag (optionName=optionValue)*;'。对于broker,配置必须使用listener前缀和小写的SASL机制名称作为前缀,如listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=com.example.ScramLoginModule required;

31.sasl.kerberos.service.name
Kafka运行的Kerberos主体名。这可以在Kafka的JAAS配置或Kafka的配置中定义。

32.sasl.login.callback.handler.class
实现AuthenticateCallbackHandler接口的SASL登录回调处理程序类的完全限定名。对于broker,登录回调处理程序配置必须带有监听器前缀和小写的SASL机制名称,如listener.name.sasl_ssl.scram-sha-256.sasl.login.callback.handler.class=com.example.CustomScramLoginCallbackHandler。

33.sasl.login.class
实现登录接口的类的完全限定名。对于broker,登录配置必须使用监听器前缀和小写的SASL机制名称作为前缀。例如,listener.name.sasl_ssl.scram-sha-256.sasl.login.class=com.example.CustomScramLogin。

34.sasl.mechanism
用于客户端连接的SASL机制。这可以是安全提供程序可用的任何机制。GSSAPI是默认机制。

35.security.protocol
用于与broker通信的协议,默认为PLAINTEXT,可选项有PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL。

36.send.buffer.bytes
发送数据时要使用的TCP发送缓冲区(SO_SNDBUF)的大小,默认为131072bytes,即128K。如果值为-1,将使用OS默认值。

37.ssl.enabled.protocols
为SSL连接启用的协议列表。在使用Java 11或更新版本运行时,默认为“TLSv1.2,TLSv1.3”,否则为“TLSv1.2”。对于Java 11的默认值,如果客户端和服务器都支持TLSv1.3,那么它们会更喜欢TLSv1.3,否则会退回到TLSv1.2(假设两者都至少支持TLSv1.2)。这个默认值应该适用于大多数情况。另外,请参阅“ssl.protocol”的配置文档。

38.ssl.keystore.type
密钥存储文件的文件格式。这对于客户机是可选的。

39.ssl.protocol
用于生成SSLContext的SSL协议。在使用Java11或更新版本运行时,默认为“TLSv1.3”,否则为“TLSv1.2”。这个值对于大多数用例来说都是合适的。最近的jvm允许的值是“TLSv1.2”和“TLSv1.3”。旧的jvm可能支持“TLS”、“TLSv1.1”、“SSL”、“SSLv2”和“SSLv3”,但由于已知的安全漏洞,不建议使用它们。使用此配置的默认值和'ssl.enabled'。如果服务器不支持“TLSv1.3”,客户端将降级到“TLSv1.2”。如果这个配置被设置为'TLSv1.2',客户端将不会使用'TLSv1.3',即使它是ssl.enabled中的值之一。协议和服务器只支持“TLSv1.3”。

40.ssl.provider
用于SSL连接的安全提供程序的名称。默认值是JVM的默认安全提供程序。

41.ssl.truststore.type
信任存储区文件的文件格式,默认JKS。

42.auto.commit.interval.ms
如果enable.auto.commit被设置为true,那么消费者偏移量自动提交给Kafka的频率,默认5000毫秒。

43.check.crcs
自动检查所使用记录的CRC32,默认为true。这确保了消息不会在网络上或磁盘上发生损坏。这个检查会增加一些开销,因此在寻求极端性能的情况下可能会禁用它。

44.client.id
在发出请求时传递给服务器的id字符串。这样做的目的是通过允许在服务器端请求日志记录中包含逻辑应用程序名称,从而能够跟踪ip/port之外的请求源。

45.client.rack
此客户机的机架标识符。它可以是任何字符串值,指示此客户端的物理位置。它与代理配置'broker.rack'相对应。

46.fetch.max.wait.ms
如果没有足够的数据来立即满足fetch.min.bytes给出的要求,那么服务器在回答取回请求之前阻塞的最大时间,默认500毫秒。

47.interceptor.classes
用作拦截器的类列表。实现了org.apache.kafka.clients.producer。ProducerInterceptor接口允许在将记录发布到Kafka集群之前拦截(并可能更改)生产者收到的记录。默认情况下,没有拦截器。

48.metadata.max.age.ms
在一定后,即使没有看到任何分区leader变更,我们也会强制刷新元数据,以主动发现任何新的代理或分区,默认为300000毫秒,即5分钟。

49.metric.reporters
用作度量报告器的类列表。实现org.apache.kafka.common.metrics.MetricsReporter接口允许插入将被通知新度量创建的类。JmxReporter一直包含用来注册JMX统计信息。

50.metrics.num.samples
计算指标而维护的样本数量,默认为2。

51.metrics.recording.level
度量标准的最高记录级别,默认info,可选项为INFO, DEBUG。

52.metrics.sample.window.ms
度量样本计算结束的时间窗口,默认30000毫秒。

53.reconnect.backoff.max.ms
重新连接到多次连接失败的代理时等待的最大时间(以毫秒为单位),默认1000毫秒。如果这样做,每台主机的回退量将在每次连续连接失败时呈指数增长,直到这个最大值。在计算回退增加后,增加20%的随机抖动以避免连接风暴。

54.reconnect.backoff.ms
尝试重新连接到给定主机之前等待的基本时间,默认50毫秒。这避免了在紧循环中重复连接到主机。此回退适用于客户端对broker的所有连接尝试。

55.retry.backoff.ms
尝试对给定主题分区重试失败的请求之前等待的时间,默认100毫秒。这避免了在某些故障场景下,在紧密循环中重复发送请求。

56.sasl.kerberos.kinit.cmd
Kerberos kinit命令路径,默认为/usr/bin/kinit。

57.sasl.kerberos.min.time.before.relogin
刷新尝试之间的登录线程休眠时间,默认60000毫秒,即1分钟。

58.sasl.kerberos.ticket.renew.jitter
随机不稳定的百分比增加到更新时间,默认0.05。法定值在0至0.25(含25%)之间。目前只适用于oauthholder。

59.sasl.kerberos.ticket.renew.window.factor
登录线程将休眠直到到达从上次刷新到票证到期的指定窗口时间因子,此时它将尝试更新票证,默认为0.8。

60.sasl.login.refresh.buffer.seconds
刷新凭据时在凭据过期前要维护的缓冲区时间,以秒为单位。如果刷新发生在比缓冲区秒数更接近过期的时候,那么刷新将被上移,以尽可能多地维护缓冲区时间。合法值在0~3600秒(1小时)之间;如果没有指定值,则使用默认值300秒(5分钟)。这个值和sasl.login.refresh.min.period.seconds都会被忽略如果他们的总和超过了凭据的剩余生命周期。目前只适用于oauthholder。

61.sasl.login.refresh.min.period.seconds
登录刷新线程在刷新凭据之前等待的最小时间,以秒为单位。合法值在0到900秒之间(15分钟);如果没有指定值,则使用默认值60秒(1分钟)。此值和sasl.login.refresh.buffer.seconds将会被忽略如果他们的总和超过了凭据的剩余生命周期。目前只适用于oauthholder。

62.sasl.login.refresh.window.factor
登录刷新线程将休眠,直到达到与凭据的生存期相关的指定窗口因子,此时它将尝试刷新凭据。合法值在0.5(50%)和1.0(100%)之间;如果没有指定值,则使用缺省值0.8(80%)。目前只适用于oauthholder。

63.sasl.login.refresh.window.jitter
添加到登录刷新线程睡眠时间中的相对于凭据生命周期的最大随机时基误差。法定值在0至0.25(含25%)之间;如果没有指定值,则使用默认值0.05(5%)。目前只适用于oauthholder。

64.security.providers
可配置创建器类的列表,每个创建器类返回实现安全算法的提供程序,需要实现org.apache.kafka.common.security.auth.SecurityProviderCreator接口。

65.ssl.cipher.suites
密码套件列表。这是身份验证、加密、MAC和密钥交换算法的命名组合,用于使用TLS或SSL网络协议协商网络连接的安全设置。默认情况下,支持所有可用的密码套件。

66.ssl.endpoint.identification.algorithm
使用服务器证书验证服务器主机名的端点识别算法,默认https。

67.ssl.keymanager.algorithm
密钥管理器工厂用于SSL连接的算法。默认值是为Java虚拟机配置的密钥管理器工厂算法SunX509。

68.ssl.secure.random.implementation
用于SSL加密操作的SecureRandom PRNG实现。

69.ssl.trustmanager.algorithm
信任管理器工厂用于SSL连接的算法。默认值是PKIX,为Java虚拟机配置的信任管理器工厂算法。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐