前言

最近做项目,有许多业务需要处理,放到了kafka中,为了提高消费kafka效率,引入了线程池,不同的业务处理使用不同的线程池。其他暂且不论,直接上配置。

1.注解使用

提前说明下使用地方,有个印象,后需还会讲到。

  • @EnableAsync 这里会配置在2个地方:启动类和线程池配置类
  • @Async 可以写在类上或者方法上
  • @Component 注册类到ioc
  • @ConfigurationProperties 读取yml配置
  • @Autowired
  • @Qualifier

2.yml配置

配置线程池基本信息

# 线程池配置
async-pool:
  # 消费kafka使用的线程池
  consumer-kafka:
    corePoolSize: 2
    maxPoolSize: 4
    keepAliveSeconds: 120
    queueCapacity: 10
  # 正常采集上传文件线程池
  normal-collection:
    corePoolSize: 2
    maxPoolSize: 4
    keepAliveSeconds: 120
    queueCapacity: 10
  # 搬迁失败的文件重新上传线程池
  fail-file:
    corePoolSize: 2
    maxPoolSize: 4
    keepAliveSeconds: 120
    queueCapacity: 10

3.获取yml配置信息

创建父类抽象类 AbstractExecutorPool
分别创建子类继承父类 ConsumerKafkaPool 、NormalCollectionPool、FailFilePool

@Data
public abstract class AbstractExecutorPool {
    private int corePoolSize;
    private int maxPoolSize;
    private int keepAliveSeconds;
    private int queueCapacity;
}
@Component
// 读取yml配置,会自动映射
@ConfigurationProperties(prefix = "async-pool.consumer-kafka")
@Data
public class ConsumerKafkaPool extends AbstractExecutorPool {
	// 线程前缀
    private String threadNamePrefix = "handler consumer kafka executor-";
}
@Component
@ConfigurationProperties(prefix = "async-pool.normal-collection")
@Data
public class NormalCollectionPool extends AbstractExecutorPool {
    private String threadNamePrefix = "handler norma collection executor-";
}
@Component
@ConfigurationProperties(prefix = "async-pool.fail-file")
@Data
public class FailFilePool extends AbstractExecutorPool {
    private String threadNamePrefix = "handler move fail file executor-";
}

4.线程池配置

创建类 ThreadPoolConfig
线程池具体执行步骤,这里不提,说下拒绝策略 ThreadPoolExecutor 类中提供的前4个拒绝策略,也可以自定义策略。

  • AbortPolicy 默认策略,队列满时抛出异常RejectedExecutionException
  • DiscardOldestPolicy 去除队列中最早的任务,将新任务放入队列
  • DiscardPolicy 直接丢掉任务
  • CallerRunsPolicy 队列满时,主线程执行任务
  • 自定义处理策略
@Configuration
@EnableAsync
@Slf4j
public class ThreadPoolConfig {

    private final ConsumerKafkaPool consumerKafkaPool;
    private final NormalCollectionPool normalCollectionPool;
    private final FailFilePool failFilePool;
	// 构造器注入,spring4.x以后推荐使用
    @Autowired
    public ThreadPoolConfig(ConsumerKafkaPool consumerKafkaPool, NormalCollectionPool normalCollectionPool, FailFilePool failFilePool){
        this.consumerKafkaPool = consumerKafkaPool;
        this.normalCollectionPool = normalCollectionPool;
        this.failFilePool = failFilePool;
    }

    /**
     * 创建线程池 消费kafka中消息,存入阻塞队列中
     */
    @Bean(name = "asyncExecutorConsumerKafka")
    public Executor asyncExecutorConsumerKafka() {
        return initExcutor(consumerKafkaPool, consumerKafkaPool.getThreadNamePrefix(), (r, executor) -> {
            log.info("队列已满,根据业务自行处理。。。");
        });
    }
    /**
     * 创建线程池 用于正常采集搬迁的文件 上传
     */
    @Bean(name = "asyncExecutorNormalCollection")
    public Executor asyncExecutorNormalCollection() {
        ThreadPoolExecutor.CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
        return initExcutor(normalCollectionPool, normalCollectionPool.getThreadNamePrefix(), callerRunsPolicy);
    }
    /**
     * 创建线程池 用于处理搬迁失败的文件 上传
     */
    @Bean(name = "asyncExecutorFailFile")
    public Executor asyncExecutorFailFile() {
        ThreadPoolExecutor.CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
        return initExcutor(failFilePool, failFilePool.getThreadNamePrefix(), callerRunsPolicy);
    }

    private Executor initExcutor(AbstractExecutorPool abstractExecutorPool,String threadName, RejectedExecutionHandler rejectedExecutionHandler){
        ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
        threadPool.setCorePoolSize(abstractExecutorPool.getCorePoolSize());
        threadPool.setMaxPoolSize(abstractExecutorPool.getMaxPoolSize());
        threadPool.setKeepAliveSeconds(abstractExecutorPool.getKeepAliveSeconds());
        threadPool.setQueueCapacity(abstractExecutorPool.getQueueCapacity());
        threadPool.setThreadNamePrefix(threadName);
        threadPool.setRejectedExecutionHandler(rejectedExecutionHandler);
        return threadPool;
    }
}

5.启动类

启动类上 加注解 @EnableAsync ,这里就不贴代码了

6.调用方法2种方式

创建接口

public interface TestService {
    void asynctest(String value);
}

使用注解@Async

该注解可以使用在方法或者类上,使用在方法上生命该方法可异步执行,在类上,该类所有方法可异步执行。

@Slf4j
@Service
public class TestServiceImpl implements TestService {
	@Override
	// 使用注解调用,不使用的请去掉
	@Async("asyncExecutorConsumerKafka")
	public void asynctest(String value) {
	    String threadName = Thread.currentThread().getName();
	    log.info(">>线程{},正在处理:{}", threadName, value);
	    try {
	        Thread.sleep(5000);
	    } catch (InterruptedException e) {
	        e.printStackTrace();
	    }
	    log.info(">>线程test{},处理完成:{}", threadName, value);
	}
}

调用

@Component
@Slf4j
public class KafkaCustomTest {
    @Autowired
    private TestService testService ;
    @Autowired
    @Qualifier("asyncExecutorConsumerKafka")
    private Executor asyncExecutorConsumerKafka;
    @KafkaListener(topics = { "test" }, autoStartup = "true")
    public void normal(ConsumerRecord<String, String> record, Acknowledgment ack) throws IOException, InterruptedException {
        String value = record.value();
        // 使用注解调用start
        testService.asynctest(value);
        // 使用注解调用end   
        // 不使用使用注解调用start  不适用注解调用请去掉实现类中的@Async
//        asyncExecutorConsumerKafka.execute(()->{
//            moveFileService.moveFile(moveFile);
//        });
        // 不使用使用注解调用end
     
        long offset = record.offset();
        log.info(">>>该消息offset:{},消息:{},已提交异步处理。", offset, value);
        ack.acknowledge();
    }
}

实践过程中的坑和经验包

ThreadPoolTaskExecutor 和 ThreadPoolExecutor

顺便提一句 ThreadPoolTaskExecutor 和 ThreadPoolExecutor 区别

  • ThreadPoolTaskExecutor 是spring提供的
  • ThreadPoolExecutor 是jdk提供的

ThreadPoolExecutor 可以在代码中写成静态的增长调用,
如:

private static ThreadPoolExecutor  pool = new ThreadPoolExecutor(5, 10, 20L,
            TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1),
            new ThreadPoolExecutor.AbortPolicy());
public static ThreadPoolExecutor getPool() {
	return pool;
}

但是 ThreadPoolTaskExecutor 写成静态的就会失效,具体区别了解不深。

LinkedBlockingQueue 和 ArrayBlockingQueue

  • LinkedBlockingQueue中的锁是分离的,生产者的锁PutLock,消费者的锁takeLock
  • ArrayBlockingQueue生产者和消费者使用的是同一把锁

ThreadPoolExecutor 可以自己指定使用那个队列。

ThreadPoolTaskExecutor 使用的LinkedBlockingQueue

/**
	 * Create the BlockingQueue to use for the ThreadPoolExecutor.
	 * <p>A LinkedBlockingQueue instance will be created for a positive
	 * capacity value; a SynchronousQueue else.
	 * @param queueCapacity the specified queue capacity
	 * @return the BlockingQueue instance
	 * @see java.util.concurrent.LinkedBlockingQueue
	 * @see java.util.concurrent.SynchronousQueue
	 */
	protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
		if (queueCapacity > 0) {
			return new LinkedBlockingQueue<>(queueCapacity);
		}
		else {
			return new SynchronousQueue<>();
		}
	}

好好学习,天天努力

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐