Flink向Kafka发送数据时的异常问题整理
问题原因依赖版本问题,我的kafka连接器用的是flink-connector-kafka_2.11,而kafka集群安装的是0.10.0版本的,官网上说flink-connector-kafka_2.11虽然会适配Kafka的最新版本,但是对于Kafka0.11.x和0.10.x版本,建议分别使用专用的flink-connector-kafka-0.11_2.11和flink-connector
flink消费kafka时正常,但是向kafka生产消息时产生报错:
Caused by: java.lang.ClassNotFoundException:org.apache.kafka.common.errors.InvalidTxnStateException
问题原因:依赖版本问题,我的kafka连接器用的是flink-connector-kafka_2.11,而kafka集群安装的是0.10.0版本的,官网上说flink-connector-kafka_2.11虽然会适配Kafka的最新版本,但是对于Kafka 0.11.x和0.10.x版本,建议分别使用专用的flink-connector-kafka-0.11_2.11和flink-connector-kafka-0.10_2.11。于是将依赖改为
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.10.0</version>
</dependency>
后异常得到解决,然后运行flink整合kafka生产者,又出现新的报错:
Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.
报错原因:producer在向kafka broker写的时候,刚好发生选举,本来是向broker0上写的,选举之后broker1成为leader,所以无法写成功,就抛异常了。
解决办法:修改producer的重试参数retries参数,默认是0, 一般设置为3, 我设置retries=10,异常得到解决
//当生产者发送失败的时候重试的次数
props.put("retries", 10);
顺便整理出如下异常:
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for binlogCsbbroker-2 due to 30026 ms has passed since batch creation plus linger time
报错原因:具体原因我自己还没有找到,但是网友们都说是因为kafka在批量写的时候,这一批次的数据没有在30s内还处理完,(30s为request.timeout.ms默认值),这一批次的数据就过期了,所以抛出异常
解决办法:增大request.timeout.ms, 我在生产环境配置的是request.timeout.ms=60000 // 由原来默认的30s改成60s
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
报错原因:kafka client与broker断开连接了
解决办法:重启服务
更多推荐
所有评论(0)