flink项目写入redis时报错RedisCommandInterruptedException: Command interrupted


具体报错如下:

io.lettuce.core.RedisCommandInterruptedException: Command interrupted
	at io.lettuce.core.protocol.AsyncCommand.await(AsyncCommand.java:87)
	at io.lettuce.core.LettuceFutures.awaitOrCancel(LettuceFutures.java:112)
Caused by: java.lang.InterruptedException
	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:347)
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
	at io.lettuce.core.protocol.AsyncCommand.await(AsyncCommand.java:83)
	...15 more

后面慢慢地把不必要的代码屏蔽掉,发现加入某个aggregate函数时一定会发生报错,去掉该函数就不报错了。

将aggregate前面使用.slotSharingGroup(“newGroup”), 即把该算子扔到别的slot里面执行,redis写入不再报错。

但一段时间后还是会报错,仔细研究是aggregate里面代码导致了报错,aggregate算子发送的报错 进行影响到 redis写入的sink算子报错,也是挺奇葩的(估计是redis的连接创建是在aggregate算子所属算子链 线程创建的,哪同一个job的不同task是怎么共享class中static变量的呢?)

原因总结:当与 redis的算子 同处一个算子链 的 其他算子发生报错,导致线程崩溃时,redis算子也会发生报错,导致无法写入redis。即,同一个算子链的算子都是一个线程负责,最好确保每个算子都不发生报错(脏数据在流的开始就过滤掉),或者报错时输出日志,然后把报错吞掉。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐