一、配置 ThreadPoolTaskExecutor

创建一个文件夹 config ,新建一个类 ThreadPoolConfig.java

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

/**
 * 线程池配置
 **/
@Configuration
public class ThreadPoolConfig {

    /**
 	* 核心线程池大小
 	**/
    private int corePoolSize = 50;

    /**
 	* 最大可创建的线程数
 	**/
    private int maxPoolSize = 200;

    /**
 	* 队列最大长度
 	**/
    private int queueCapacity = 10000;

    /**
 	* 线程池维护线程所允许的空闲时间
 	**/
    private int keepAliveSeconds = 300;

    @Bean(name = "threadPoolTaskExecutor")
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setMaxPoolSize(maxPoolSize);
        executor.setCorePoolSize(corePoolSize);
        executor.setQueueCapacity(queueCapacity);
        executor.setKeepAliveSeconds(keepAliveSeconds);
        // 设置处理策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return executor;
    }

}


二、使用 ThreadPoolTaskExecutor

在其他 SpringBoot 管理的 bean 下注入上面配置的类

    @Autowired
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;

使用无返回值的线程

   threadPoolTaskExecutor.execute(()->{
     // 要执行的任务
   });

使用有返回值的线程

   Future<?> submit = threadPoolTaskExecutor.submit(() -> {
     // 要执行的任务
    
     //返回 ? 对象
     return null;
   });

值得注意的是,submit方法执行的所在的主线程会等取完返回的数据之后才会结束(只有使用了 get() 方法才会堵塞线程)
例如:

	List<Future<?>> results = new ArrayList<>();

	//这里只是举例开启10个线程
	for(int i = 0;i < 10;i++){
		Future<?> submits = threadPoolTaskExecutor.submit(() -> {
			// 要执行的任务

			//返回 ? 对象
			return null;
		});

		myresults.add(submits);
	}

	//上面的线程都执行完毕,才会执行下面取数据的代码
	for(Future<?> result :results){
		//取数据(这里举例用String对象接收)
		String string = (String)result.get();
	}

三、关于处理策略

在使用线程池并且使用有界队列的时候,如果队列满了,任务添加到线程池的时候就会有问题,针对这些问题java线程池提供了以下几种策略:

  • AbortPolicy
    该策略是线程池的默认策略。使用该策略时,如果线程池队列满了丢掉这个任务并且抛出RejectedExecutionException异常。
 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            //不做任何处理,直接抛出异常
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
  • DiscardPolicy
    这个策略是AbortPolicy的slient版本,如果线程池队列满了,会直接丢掉这个任务并且不会有任何异常。
   public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        	//就是一个空的方法
        }
  • DiscardOldestPolicy
    这个策略从字面上也很好理解,丢弃最老的。也就是说如果队列满了,会将最早进入队列的任务删掉腾出空间,再尝试加入队列。
    因为队列是队尾进,队头出,所以队头元素是最老的,因此每次都是移除对头元素后再尝试入队。
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
            	//移除队头元素
                e.getQueue().poll();
                //再尝试入队
                e.execute(r);
            }
        }
  • CallerRunsPolicy
    使用此策略,如果添加到线程池失败,那么主线程会自己去执行该任务,不会等待线程池中的线程去执行。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                //直接执行run方法
                r.run();
            }
        }
  • 自定义
    如果以上策略都不符合业务场景,那么可以自己定义一个拒绝策略,只要实现RejectedExecutionHandler接口,并且实现rejectedExecution方法就可以了。具体的逻辑就在rejectedExecution方法里去定义就OK了。
    例如:我定义了我的一个拒绝策略,叫做MyRejectPolicy,里面的逻辑就是打印处理被拒绝的任务内容
public class MyRejectPolicy implements RejectedExecutionHandler{
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        //Sender是我的Runnable类,里面有message字段
        if (r instanceof Sender) {
            Sender sender = (Sender) r;
            //直接打印
            System.out.println(sender.getMessage());
        }
    }
}

处理策略这一部分原文出自:https://blog.csdn.net/jgteng/article/details/54411423

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐