flink项目写入redis时报错RedisCommandInterruptedException: Command interrupted
flink项目写入redis时报错RedisCommandInterruptedException: Command interrupted具体报错如下:
·
flink项目写入redis时报错RedisCommandInterruptedException: Command interrupted
具体报错如下:
io.lettuce.core.RedisCommandInterruptedException: Command interrupted
at io.lettuce.core.protocol.AsyncCommand.await(AsyncCommand.java:87)
at io.lettuce.core.LettuceFutures.awaitOrCancel(LettuceFutures.java:112)
Caused by: java.lang.InterruptedException
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:347)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at io.lettuce.core.protocol.AsyncCommand.await(AsyncCommand.java:83)
...15 more
后面慢慢地把不必要的代码屏蔽掉,发现加入某个aggregate函数时一定会发生报错,去掉该函数就不报错了。
将aggregate前面使用.slotSharingGroup(“newGroup”), 即把该算子扔到别的slot里面执行,redis写入不再报错。
但一段时间后还是会报错,仔细研究是aggregate里面代码导致了报错,aggregate算子发送的报错 进行影响到 redis写入的sink算子报错,也是挺奇葩的(估计是redis的连接创建是在aggregate算子所属算子链 线程创建的,哪同一个job的不同task是怎么共享class中static变量的呢?)
原因总结:当与 redis的算子 同处一个算子链 的 其他算子发生报错,导致线程崩溃时,redis算子也会发生报错,导致无法写入redis。即,同一个算子链的算子都是一个线程负责,最好确保每个算子都不发生报错(脏数据在流的开始就过滤掉),或者报错时输出日志,然后把报错吞掉。
更多推荐
已为社区贡献7条内容
所有评论(0)