最近在vx公众号看到关于多线程插入数据的文章,自己跟着写了一份案例,个人理解有限,有不足的地方还请指教。
在这里插入图片描述

业务场景:数据量大,插入数据库耗时长
解决方案:多线程插入数据,springBoot+线程池+mybatisPlus
具体实现:

  1. 配置文件application.properties
# 异步线程配置
# 配置核心线程数
async.executor.thread.core_pool_size = 30
# 配置最大线程数
async.executor.thread.max_pool_size = 50
# 配置队列大小
async.executor.thread.queue_capacity = 10000
# 配置线程池中的线程的名称前缀
async.executor.thread.name.prefix = async-batchInsert-

PS:线程池大小配置这点查了内容,我自己也不是很理解。我把原话搬出来,明确我们的需求是计算密集型还是IO密集型,首先这点我是没理解如何进行区分。
计算密集型:线程数=CPU核数+1,也可以设置成CPU核数*2
IO密集型:线程数=CPU核心数/(1-阻塞系数),0.8<=阻塞系数<=0.9

  1. 线程池初始化
@Configuration
@EnableAsync
@Slf4j
public class ExecutorConfig {
    @Value("${async.executor.thread.core_pool_size}")
    private int corePoolSize;

    @Value("${async.executor.thread.max_pool_size}")
    private int maxPoolSize;

    @Value("${async.executor.thread.queue_capacity}")
    private int queueCapacity;

    @Value("${async.executor.thread.name.prefix}")
    private String namePrefix;

    @Bean(name="asyncServiceExecutor")
    public Executor asynServiceExecutor(){
        log.warn("start asyncServiceExecutor");
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 配置核心线程数
        executor.setCorePoolSize(corePoolSize);
        // 配置最大线程数
        executor.setMaxPoolSize(maxPoolSize);
        // 配置队列大小
        executor.setQueueCapacity(queueCapacity);
        // 配置线程池中线程的名称前缀
        executor.setThreadNamePrefix(namePrefix);
        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 执行初始化
        executor.initialize();
        return executor;
    }
}

PS: 记得加上这个注解@EnableAsync,启用 Spring 的异步方法执行功能,官网详细解释:Annotation Type EnableAsync;4中拒绝策略描述:
ThreadPoolExecutor.AbortPolicy 丢弃任务并抛出RejectedExecutionException异常(默认)。
ThreadPoolExecutor.DiscardPolic 丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy 丢弃队列最前面的任务,然后重新尝试执行任务
ThreadPoolExecutor.CallerRunsPolic 由调用线程处理该任务
3. 实体类

public class Student implements Serializable {
    private Integer id;

    private String name;

    /**
     * 0-男,1-女,2-未知
     */
    private String gender;

    private static final long serialVersionUID = 1L;

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getGender() {
        return gender;
    }

    public void setGender(String gender) {
        this.gender = gender;
    }

    @Override
    public boolean equals(Object that) {
        if (this == that) {
            return true;
        }
        if (that == null) {
            return false;
        }
        if (getClass() != that.getClass()) {
            return false;
        }
        Student other = (Student) that;
        return (this.getId() == null ? other.getId() == null : this.getId().equals(other.getId()))
            && (this.getName() == null ? other.getName() == null : this.getName().equals(other.getName()))
            && (this.getGender() == null ? other.getGender() == null : this.getGender().equals(other.getGender()));
    }

    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + ((getId() == null) ? 0 : getId().hashCode());
        result = prime * result + ((getName() == null) ? 0 : getName().hashCode());
        result = prime * result + ((getGender() == null) ? 0 : getGender().hashCode());
        return result;
    }

    @Override
    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append(getClass().getSimpleName());
        sb.append(" [");
        sb.append("Hash = ").append(hashCode());
        sb.append(", id=").append(id);
        sb.append(", name=").append(name);
        sb.append(", gender=").append(gender);
        sb.append(", serialVersionUID=").append(serialVersionUID);
        sb.append("]");
        return sb.toString();
    }
}
  1. 数据访问层
/**
 * DAO公共基类,由MybatisGenerator自动生成请勿修改
 * @param <Model> The Model Class 这里是泛型不是Model类
 * @param <PK> The Primary Key Class 如果是无主键,则可以用Model来跳过,如果是多主键则是Key类
 */
public interface MyBatisBaseDao<Model, PK extends Serializable> {
    int deleteByPrimaryKey(PK id);

    int insert(Model record);

    int insertSelective(Model record);

    Model selectByPrimaryKey(PK id);

    int updateByPrimaryKeySelective(Model record);

    int updateByPrimaryKey(Model record);

    int batchInsert(Model record);
}
/**
 * StudentDAO继承基类
 */
@Repository
public interface StudentMapper extends MyBatisBaseDao<Student, Integer> {
    int batchInsert(List<Student> list);
}
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.demo.mapper.StudentMapper">
  <resultMap id="BaseResultMap" type="com.example.demo.entity.Student">
    <id column="id" jdbcType="INTEGER" property="id" />
    <result column="name" jdbcType="VARCHAR" property="name" />
    <result column="gender" jdbcType="CHAR" property="gender" />
  </resultMap>
  <sql id="Base_Column_List">
    id, `name`, gender
  </sql>
  <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="BaseResultMap">
    select 
    <include refid="Base_Column_List" />
    from student
    where id = #{id,jdbcType=INTEGER}
  </select>
  <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
    delete from student
    where id = #{id,jdbcType=INTEGER}
  </delete>
  <insert id="insert" keyColumn="id" keyProperty="id" parameterType="com.example.demo.entity.Student" useGeneratedKeys="true">
    insert into student (`name`, gender)
    values (#{name,jdbcType=VARCHAR}, #{gender,jdbcType=CHAR})
  </insert>
  <insert id="insertSelective" keyColumn="id" keyProperty="id" parameterType="com.example.demo.entity.Student" useGeneratedKeys="true">
    insert into student
    <trim prefix="(" suffix=")" suffixOverrides=",">
      <if test="name != null">
        `name`,
      </if>
      <if test="gender != null">
        gender,
      </if>
    </trim>
    <trim prefix="values (" suffix=")" suffixOverrides=",">
      <if test="name != null">
        #{name,jdbcType=VARCHAR},
      </if>
      <if test="gender != null">
        #{gender,jdbcType=CHAR},
      </if>
    </trim>
  </insert>
  <update id="updateByPrimaryKeySelective" parameterType="com.example.demo.entity.Student">
    update student
    <set>
      <if test="name != null">
        `name` = #{name,jdbcType=VARCHAR},
      </if>
      <if test="gender != null">
        gender = #{gender,jdbcType=CHAR},
      </if>
    </set>
    where id = #{id,jdbcType=INTEGER}
  </update>
  <update id="updateByPrimaryKey" parameterType="com.example.demo.entity.Student">
    update student
    set `name` = #{name,jdbcType=VARCHAR},
      gender = #{gender,jdbcType=CHAR}
    where id = #{id,jdbcType=INTEGER}
  </update>
  <update id="batchUpdate" parameterType="java.util.List">
    update student
    <trim prefix="set" suffixOverrides=",">
      <trim prefix="name = case" suffix="end,">
        <foreach collection="list" index="index" item="item">
          when id = #{item.id,jdbcType=INTEGER} then #{item.name,jdbcType=VARCHAR}
        </foreach>
      </trim>
      <trim prefix="gender = case" suffix="end,">
        <foreach collection="list" index="index" item="item">
          when id = #{item.id,jdbcType=INTEGER} then #{item.gender,jdbcType=CHAR}
        </foreach>
      </trim>
    </trim>
    where id in
    <foreach close=")" collection="list" item="item" open="(" separator=", ">
      #{item.id,jdbcType=INTEGER}
    </foreach>
  </update>
  <update id="batchUpdateSelective" parameterType="java.util.List">
    update student
    <trim prefix="set" suffixOverrides=",">
      <trim prefix="name = case" suffix="end,">
        <foreach collection="list" index="index" item="item">
          <if test="item.name != null">
            when id = #{item.id,jdbcType=INTEGER} then #{item.name,jdbcType=VARCHAR}
          </if>
        </foreach>
      </trim>
      <trim prefix="gender = case" suffix="end,">
        <foreach collection="list" index="index" item="item">
          <if test="item.gender != null">
            when id = #{item.id,jdbcType=INTEGER} then #{item.gender,jdbcType=CHAR}
          </if>
        </foreach>
      </trim>
    </trim>
    where id in
    <foreach close=")" collection="list" item="item" open="(" separator=", ">
      #{item.id,jdbcType=INTEGER}
    </foreach>
  </update>
  <insert id="batchInsert" parameterType="java.util.List">
    insert into student (`name`, gender)
    values
    <foreach collection="list" item="item" separator=",">
      (#{item.name,jdbcType=VARCHAR}, #{item.gender,jdbcType=CHAR})
    </foreach>
  </insert>
</mapper>

PS:mapper中有多余的代码,但是竟然是学习,我觉得语法拿来借鉴是不错的

  1. 工具类
public class SplitListUtils {
    
    public static <T> List<List<T>> splitList(int offset, int dataSize, Collection<T> dataList) {
        return Stream.iterate(0, i -> i + 1).limit((int) Math.ceil((float) dataSize / offset)).parallel().map(
                n -> dataList.stream().skip(n * offset).limit(offset).parallel().collect(Collectors.toList()))
                .collect(Collectors.toList());
    }

}
  1. 服务层
public interface AsyncExecutorService {
   void executeAsync(List<Student> list, CountDownLatch countDownLatch);

   List<Student> getStudentList();
}
@Service
@Slf4j
public class AsyncExecutorServiceImpl implements AsyncExecutorService {
    @Autowired
    private StudentMapper studentMapper;

    @Override
    @Async("asyncServiceExecutor")
    public void executeAsync(List<Student> list, CountDownLatch countDownLatch) {
        // 多线程批量处理入库
        try {
            log.warn("start executeAsync");
            studentMapper.batchInsert(list);
            log.warn("end executorAsync");
        } finally {
            // 很关键, 无论上面程序是否异常必须执行countDown,否则await无法释放
            countDownLatch.countDown();
        }

    }

    @Override
    public List<Student> getStudentList() {
        int i = 20000;
        Student stu;
        String[] genders = new String[]{"0", "1", "2"};
        List<Student> resultList = new ArrayList<>();
        while (i > 0) {
            stu = new Student();
            stu.setGender(genders[new Random().nextInt(2)]);
            stu.setName("代号:" + i);
            i--;
            resultList.add(stu);
        }
        return resultList;
    }
}

PS:服务层中@Async(“asyncServiceExecutor”)注解决定调用线程池执行方法,asyncServiceExecutor对应第2步线程池初始化注入的Bean名称;countDownLatch.countDown();决定线程阻塞是否可以成功释放

  1. 控制层
@RestController
@Slf4j
@RequestMapping("/async/executor")
public class AsyncExecutorController {
    @Autowired
    private AsyncExecutorService asyncExecutorService;

    @GetMapping("testAsync")
    public String testAsync() {
        log.info("start testAsync.");
        List<Student> list = asyncExecutorService.getStudentList();
        List<List<Student>> lists = SplitListUtils.splitList(100, list.size(), list);
        CountDownLatch countDownLatch = new CountDownLatch(lists.size());
        for (List<Student> itemList : lists) {
            asyncExecutorService.executeAsync(itemList, countDownLatch);
        }
        try {
            // 线程阻塞,保证所有线程都执行完成,才会走下面的代码,取最后的执行结果
            countDownLatch.await();
        } catch (InterruptedException e) {
            log.error("阻塞异常:" + e.getMessage());
        }
        return "success";
    }
}

PS:countDownLatch.await();线程阻塞至关重要,所有线程执行完成才能在接下来要执行的代码中拿到结果数据

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐