记录Kafka用一段时间报Failed to create new KafkaAdminClient问题
最近公司初次使用Kafka在刚使用当中一切顺利包括对接服务者和消费。但是在使用一段时间突然发现Kafka消费者在调用Kafka的时候报Failed to create new KafkaAdminClient 错在多方查找以后没有找到能解决错误的地方。在请教Kafka大神以后说检查下Kafka心跳检测突然发现没有配置心跳检测下面上代码。spring:kafka:bootstrap-servers:
·
最近公司初次使用Kafka在刚使用当中一切顺利包括对接服务者和消费。但是在使用一段时间突然发现Kafka消费者在调用Kafka的时候报Failed to create new KafkaAdminClient 错在多方查找以后没有找到能解决错误的地方。在请教Kafka大神以后说检查下Kafka心跳检测突然发现没有配置心跳检测下面上代码。
spring: kafka: bootstrap-servers: 172.16.39.6:9092 producer: # 发生错误后,消息重发的次数。 retries: 0 #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。 batch-size: 16384 # 设置生产者内存缓冲区的大 buffer-memory: 33554432 # 键的序列化方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer # 值的序列化方式 value-serializer: org.apache.kafka.common.serialization.StringSerializer # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。 # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。 # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。 acks: 1 #监测心跳 6秒检测一次 properties: session.timeout.ms: 6000 consumer: session.timeout: 6000 heartbeat-interval: 5000 # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D auto-commit-interval: 1S # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理: # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录) # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录 auto-offset-reset: latest # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量 enable-auto-commit: false # 键的反序列化方式 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 值的反序列化方式 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer group-id: 1 listener: # 在侦听器容器中运行的线程数。 concurrency: 5 #listner负责ack,每调用一次,就立即commit ack-mode: manual_immediate missing-topics-fatal: fals
在配置文件中添加
properties: session.timeout.ms: 6000
session.timeout: 6000
heartbeat-interval: 5000 解决心跳检测问题 特此记录一下
更多推荐
已为社区贡献1条内容
所有评论(0)