Kafka事务报错之 Producer attempted an operation with an old epoch
今天使用Spark往Kafka中写数据,写入数据的时候配置了transactional.id,即事务性写入。一次性写入的数据量有点大,每次至少是30W+条数据,每条数据有十几个字段。任务经常会报如下错误:...
今天使用Spark往Kafka中写数据,写入数据的时候配置了transactional.id,即启用事务性写入。一次性写入的数据量有点大,每次至少是30W+条数据,每条数据有十几个字段。任务经常性的会报如下错误:
Producer的设置如下:
ps:之前的文章说过,不能有两个Producer使用同一个transaction.id来提交事务。Kafka中的事务是基于幂等性来做的,而幂等性又是基于一个递增的seqNum来实现的。所以有两个Producer使用同一个transaction.id提交事务的话会报错。
报错中提及的epoch是记录事务所处的状态用的(理解为事务的元数据信息记录):We introduce the notion of a producer epoch, which enables us to ensure that there is only one legitimate active instance of a producer with a given TransactionalId, and hence enables us to maintain transaction guarantees in the event of failures.
Kafka事务的详细流程官方的wiki上有说明,详见参考中的第二个链接,这里就不搬运了...kafka事务每个阶段都会有对应的状态。后续有空再研究研究吧...个人其实不太想投入太多时间在kafka上,毕竟它还是挺稳定的。
报错原因分析:
报错中提示epoch过期了,而可以确定程序中确实只有一个Producer在发送,而且事务id不会重复,所以应该是一次发送的数据量太大,导致事务超时了。
事务超时的相关参数有两个:
-
transaction.max.timeout.ms
-- A broker property that specifies the maximum number of milliseconds until a transaction is aborted and forgotten. Default in many Kafka versions seems to be 900000 (15 minutes). Documentation from Kafka says:The maximum allowed timeout for transactions. If a client’s requested transaction time exceeds this, then the broker will return an error in InitProducerIdRequest. This prevents a client from too large of a timeout, which can stall consumers reading from topics included in the transaction.
-
transaction.timeout.ms
-- A producer client property that sets the timeout in milliseconds when a transaction is created. Default in many Kafka versions seems to be 60000 (1 minute). Documentation from Kafka says:The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction.
日志中显示程序写30W条数据不需要15min这么久,所以增大 transaction.timeout.ms 的时间,将其调整为10min,运行之后没有发现问题。
参考:
https://stackoverflow.com/questions/53058715/what-is-reason-for-getting-producerfencedexception-during-producer-send https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka(Exactly Once的状态变化)
更多推荐
所有评论(0)