今天使用Spark往Kafka中写数据,写入数据的时候配置了transactional.id,即启用事务性写入。一次性写入的数据量有点大,每次至少是30W+条数据,每条数据有十几个字段。任务经常性的会报如下错误:

    Producer的设置如下:

     ps:之前的文章说过,不能有两个Producer使用同一个transaction.id来提交事务。Kafka中的事务是基于幂等性来做的,而幂等性又是基于一个递增的seqNum来实现的。所以有两个Producer使用同一个transaction.id提交事务的话会报错。 

    报错中提及的epoch是记录事务所处的状态用的(理解为事务的元数据信息记录):We introduce the notion of a producer epoch, which enables us to ensure that there is only one legitimate active instance of a producer with a given TransactionalId, and hence enables us to maintain transaction guarantees in the event of failures.

    Kafka事务的详细流程官方的wiki上有说明,详见参考中的第二个链接,这里就不搬运了...kafka事务每个阶段都会有对应的状态。后续有空再研究研究吧...个人其实不太想投入太多时间在kafka上,毕竟它还是挺稳定的。

报错原因分析:

    报错中提示epoch过期了,而可以确定程序中确实只有一个Producer在发送,而且事务id不会重复,所以应该是一次发送的数据量太大,导致事务超时了。

    事务超时的相关参数有两个:

  • transaction.max.timeout.ms -- A broker property that specifies the maximum number of milliseconds until a transaction is aborted and forgotten. Default in many Kafka versions seems to be 900000 (15 minutes). Documentation from Kafka says:

    The maximum allowed timeout for transactions. If a client’s requested transaction time exceeds this, then the broker will return an error in InitProducerIdRequest. This prevents a client from too large of a timeout, which can stall consumers reading from topics included in the transaction.

  • transaction.timeout.ms -- A producer client property that sets the timeout in milliseconds when a transaction is created. Default in many Kafka versions seems to be 60000 (1 minute). Documentation from Kafka says:

    The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction.

    日志中显示程序写30W条数据不需要15min这么久,所以增大 transaction.timeout.ms 的时间,将其调整为10min,运行之后没有发现问题。

参考:

    https://stackoverflow.com/questions/53058715/what-is-reason-for-getting-producerfencedexception-during-producer-send    https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka(Exactly Once的状态变化)
 

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐