生产环境 java.util.concurrent.RejectedExecutionException: event executor terminated 错误分析
今天运维在执行线上功能时发现服务异常,跟进之后发现Redis出现问题,如下:[ERROR] [2022-05-26 00:05:01.518] [c.q.d.utils.DataAccessSampleRedisUtil:326][Thread-154] 服务名称:ivdg-data-access-service--> 查询缓存数据出错:Unknown redis exception; ne
今天运维在执行线上功能时发现服务异常,跟进之后发现Redis出现问题,如下:
[ERROR] [2022-05-26 00:05:01.518] [c.q.d.utils.DataAccessSampleRedisUtil:326][Thread-154] 服务名称:ivdg-data-access-service--> 查询缓存数据出错:Unknown redis exception; nested exception is java.util.concurrent.RejectedExecutionException: event executor terminated
org.springframework.data.redis.RedisSystemException: Unknown redis exception; nested exception is java.util.concurrent.RejectedExecutionException: event executor terminated
at org.springframework.data.redis.FallbackExceptionTranslationStrategy.getFallback(FallbackExceptionTranslationStrategy.java:53)
at org.springframework.data.redis.FallbackExceptionTranslationStrategy.translate(FallbackExceptionTranslationStrategy.java:43)
at org.springframework.data.redis.connection.lettuce.LettuceConnection.convertLettuceAccessException(LettuceConnection.java:273)
at org.springframework.data.redis.connection.lettuce.LettuceScriptingCommands.convertLettuceAccessException(LettuceScriptingCommands.java:236)
at org.springframework.data.redis.connection.lettuce.LettuceScriptingCommands.evalSha(LettuceScriptingCommands.java:195)
at org.springframework.data.redis.connection.DefaultedRedisConnection.evalSha(DefaultedRedisConnection.java:1502)
at org.springframework.data.redis.connection.DefaultStringRedisConnection.evalSha(DefaultStringRedisConnection.java:1757)
at sun.reflect.GeneratedMethodAccessor606.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.data.redis.core.CloseSuppressingInvocationHandler.invoke(CloseSuppressingInvocationHandler.java:61)
at com.sun.proxy.$Proxy258.evalSha(Unknown Source)
at org.springframework.data.redis.core.script.DefaultScriptExecutor.eval(DefaultScriptExecutor.java:77)
at org.springframework.data.redis.core.script.DefaultScriptExecutor.lambda$execute$0(DefaultScriptExecutor.java:68)
at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:228)
at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:188)
at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:175)
at org.springframework.data.redis.core.script.DefaultScriptExecutor.execute(DefaultScriptExecutor.java:58)
at org.springframework.data.redis.core.script.DefaultScriptExecutor.execute(DefaultScriptExecutor.java:52)
at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:350)
at com.qsdi.dataAccess.utils.DataAccessSampleRedisUtil.lambda$scanData$7(DataAccessSampleRedisUtil.java:324)
at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
at java.util.stream.IntPipeline$Head.forEach(IntPipeline.java:559)
at com.qsdi.dataAccess.utils.DataAccessSampleRedisUtil.scanData(DataAccessSampleRedisUtil.java:318)
at com.qsdi.dataAccess.service.impl.DataSamplingInspectionHourStatisticsServiceImpl.handleHourData(DataSamplingInspectionHourStatisticsServiceImpl.java:69)
at com.qsdi.dataAccess.service.impl.DataSamplingInspectionHourStatisticsServiceImpl.statisticsDataByTime(DataSamplingInspectionHourStatisticsServiceImpl.java:58)
at com.qsdi.dataAccess.job.HourStatisticsByVehicleJobHandler.execute(HourStatisticsByVehicleJobHandler.java:29)
at com.xxl.job.core.thread.JobThread.run(JobThread.java:163)
Caused by: java.util.concurrent.RejectedExecutionException: event executor terminated
at io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:926)
at io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:353)
at io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:346)
at io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:828)
at io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:818)
at io.netty.util.concurrent.AbstractScheduledEventExecutor.schedule(AbstractScheduledEventExecutor.java:261)
at io.netty.util.concurrent.AbstractScheduledEventExecutor.schedule(AbstractScheduledEventExecutor.java:177)
at io.netty.util.concurrent.AbstractEventExecutorGroup.schedule(AbstractEventExecutorGroup.java:50)
at io.netty.util.concurrent.AbstractEventExecutorGroup.schedule(AbstractEventExecutorGroup.java:32)
at io.lettuce.core.protocol.CommandExpiryWriter.potentiallyExpire(CommandExpiryWriter.java:168)
at io.lettuce.core.protocol.CommandExpiryWriter.write(CommandExpiryWriter.java:115)
at io.lettuce.core.RedisChannelHandler.dispatch(RedisChannelHandler.java:195)
at io.lettuce.core.StatefulRedisConnectionImpl.dispatch(StatefulRedisConnectionImpl.java:176)
at io.lettuce.core.AbstractRedisAsyncCommands.dispatch(AbstractRedisAsyncCommands.java:474)
at io.lettuce.core.AbstractRedisAsyncCommands.evalsha(AbstractRedisAsyncCommands.java:512)
at sun.reflect.GeneratedMethodAccessor608.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at io.lettuce.core.FutureSyncInvocationHandler.handleInvocation(FutureSyncInvocationHandler.java:63)
at io.lettuce.core.internal.AbstractInvocationHandler.invoke(AbstractInvocationHandler.java:79)
at com.sun.proxy.$Proxy254.evalsha(Unknown Source)
at org.springframework.data.redis.connection.lettuce.LettuceScriptingCommands.evalSha(LettuceScriptingCommands.java:193)
... 23 common frames omitted
如上代码段所示, 再调用redis执行luna脚本查询时提示异常,具体异常描述是由SingeThreadEventExecutor这个类调用execute方法时出现的异常,源码如下:
private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if (reject) {
reject();
}
}
}
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}
分析这个方法,大意是将当前写事件提交到 SingleThreadEventExecutor 任务队列中。报错是在 addTask 这个方法:
/**
* Add a task to the task queue, or throws a {@link RejectedExecutionException} if this instance was shutdown
* before.
*/
protected void addTask(Runnable task) {
ObjectUtil.checkNotNull(task, "task");
if (!offerTask(task)) {
reject(task);
}
}
这个方法判断如果没有插入任务队列成功就调用 reject (task) 拒绝任务,reject (task) 里面抛出的异常就是我们看到的最外面的异常。于是看一看 offerTask 方法:
final boolean offerTask(Runnable task) {
if (isShutdown()) {
reject();
}
return taskQueue.offer(task);
}
@Override
public boolean isShutdown() {
return state >= ST_SHUTDOWN;
}
private static final int ST_NOT_STARTED = 1;
private static final int ST_STARTED = 2;
private static final int ST_SHUTTING_DOWN = 3;
private static final int ST_SHUTDOWN = 4;
private static final int ST_TERMINATED = 5;
这里可以看出 offerTask 的时候校验了当前的 SingleThreadEventExecutor 的状态是否是结束和终止。那么问题来了,是什么导致的 SingleThreadEventExecutor 的状态为终止的勒,通过前面缘由中日志可以看到 SingleThreadEventExecutor 也打印了堆溢出的错误日志,搜索这个日志是哪里打印的,发现:
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}
// Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
if (logger.isErrorEnabled()) {
logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
"be called before run() implementation terminates.");
}
}
try {
// Run all remaining tasks and shutdown hooks. At this point the event loop
// is in ST_SHUTTING_DOWN state still accepting tasks which is needed for
// graceful shutdown with quietPeriod.
for (;;) {
if (confirmShutdown()) {
break;
}
}
// Now we want to make sure no more tasks can be added from this point. This is
// achieved by switching the state. Any new tasks beyond this point will be rejected.
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) {
break;
}
}
// We have the final set of tasks in the queue now, no more can be added, run all remaining.
// No need to loop here, this is the final pass.
confirmShutdown();
} finally {
try {
cleanup();
} finally {
// Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify
// the future. The user may block on the future and once it unblocks the JVM may terminate
// and start unloading classes.
// See https://github.com/netty/netty/issues/6596.
FastThreadLocal.removeAll();
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.countDown();
int numUserTasks = drainTasks();
if (numUserTasks > 0 && logger.isWarnEnabled()) {
logger.warn("An event executor terminated with " +
"non-empty task queue (" + numUserTasks + ')');
}
terminationFuture.setSuccess(null);
}
}
}
}
});
}
这个方法,这个方法里面同时在 finally 设置了状态为 ST_TERMINATED,这个方法大意是启动 eventloop 真正的线程来轮询读写事件,这个方法将在首次执行 excute 方法的时候被调用,注意这里的 executor 变量是一个线程池,于是查阅资料才认识到,eventloopgroup 下的每个 eventloop 实际上都会引用到一个线程池,这个线程池线程数目与 eventloop 总大小相同,是 fork/join 框架创建的,真正执行轮询逻辑实际上是这个线程池去提交的。eventloop 的意义实际上只是为了保存任务队列。真正的去轮询任务队列的逻辑是由其代理的线程池去执行的。
这样也就清楚了,整个事故实际很简单,就是先由于内存溢出的错误导致 eventloop 的状态为终止,这个终止状态据我看到的代码貌似是不可逆的,然后当在调用 chanel.write 的时候提交任务到 eventloop 校验状态为终止所以报了拒绝错误。
解决这个 bug 的重要途径是找到内存溢出的原因,这里就不赘述了,因为在网上没有搜到这个错的原因,所以跟了一下代码,在这里做个记录,后续分析一下到底是哪里有内存溢出问题再贴上解决方案
更多推荐
所有评论(0)