Spring boot中使用线程池如何控制主线程和子线程的事务
目录一、使用场景二、思路三、代码及注释如下:四、测试验证:1、情况1:子线程中有一个执行失败2、情况2、删除t_person表中id为201的数据重新插入编辑3、情况3:主线程报错就不演示了一、使用场景数据库有两张表 t_person 和 t_school 如下:前端传来10000条person数据要插入到t_person,同时要删除t_school表中id为1的数据(为提高效率采用线程池做)二
目录
2、情况2、删除 t_person表中id为201的数据重新插入编辑
一、使用场景
数据库有两张表 t_person 和 t_school 如下:前端传来10000条person数据要插入到t_person,同时要删除t_school表中id为1的数据(为提高效率采用线程池做)
二、思路
1、要保证主线程和子线程使用的同一个sqlSession
2、手动控制提交和回滚
3、将10000条数据均分成10份,每份1000条,创建10个任务,放入线程池执行!
三、代码及注释如下:
1、核心业务代码
@Service
public class PersonServiceImpl extends ServiceImpl<PersonMapper, Person> implements IPersonService {
@Autowired
private SqlSessionTemplate sqlSessionTemplate;
@Autowired
private SchoolMapper schoolMapper;
private ArrayBlockingQueue queue=new ArrayBlockingQueue(8,true);
private ThreadPoolExecutor.CallerRunsPolicy policy=new ThreadPoolExecutor.CallerRunsPolicy();
//1、创建核心线程为10的线程池
private ThreadPoolExecutor executor = new ThreadPoolExecutor(10,15,10, TimeUnit.SECONDS
,queue,policy);
@Override
public int insertPerson(Person person) {
return this.baseMapper.insert(person);
}
@Override
@Transactional
public void inserPersonBatch(List<Person> list) throws SQLException {
//2、根据sqlSessionTemplate获取SqlSession工厂
SqlSessionFactory sqlSessionFactory = sqlSessionTemplate.getSqlSessionFactory();
SqlSession sqlSession = sqlSessionFactory.openSession();
//3、获取Connection来手动控制事务
Connection connection = sqlSession.getConnection();
try{
//4、设置手动提交
connection.setAutoCommit(false);
//5、获取PersonMapper(此处是由于无法通过this.baseMapper调用自定义的saveBatch方法)
PersonMapper mapper = sqlSession.getMapper(PersonMapper.class);
//6、主线程去删除t_school表中id为1的数据
schoolMapper.deleteById("1");
//7、将传入List中的10000个数据按1000一组均分成10组
List<List<Person>> lists = ListUtils.averageAssign(list,1000);
//8、新建任务列表
List<Callable<Integer>> callableList = new ArrayList<>();
//9、根据均分的5组数据分别新建5个Callable任务
for(int i = 0; i < lists.size(); i++){
List<Person> insertList = lists.get(i);
Callable<Integer> callable = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int n = 0;
try{
n = mapper.saveBatch(insertList);
}catch (Exception e){
//插入失败返回0
return n;
}
//插入成功返回成功提交数
return n;
}
};
callableList.add(callable);
}
//10、任务放入线程池开始执行
List<Future<Integer>> futures = executor.invokeAll(callableList);
//11、对比每个任务的返回值 <= 0 代表执行失败
for(Future<Integer> future : futures){
if(future.get() <= 0){
//12、只要有一组任务失败回滚整个connection
connection.rollback();
return;
}
}
//13、主线程和子线程都执行成功 直接提交
connection.commit();
System.out.println("添加成功!");
}catch (Exception e){
//14、主线程报错回滚
connection.rollback();
log.error(e.toString());
throw new SQLException("出现异常!");
}
return;
}
}
2、PersonMapper中自定义批量插入
<insert id="saveBatch" parameterType="list">
insert into t_person(id,name,age,addr,classes,school_id)
values
<foreach collection="list" index="index" item="item" separator=",">
(
#{item.id},
#{item.name},
#{item.age},
#{item.addr},
#{item.classes},
#{item.schoolId}
)
</foreach>
</insert>
3、均分List工具类
public class ListUtils {
public static <T> List<List<T>> averageAssign(List<T> source, int limit) {
if (null == source || source.isEmpty()) {
return Collections.emptyList();
}
List<List<T>> result = new ArrayList<>();
int listCount = (source.size() - 1) / limit + 1;
int remaider = source.size() % listCount; // (先计算出余数)
int number = source.size() / listCount; // 然后是商
int offset = 0;// 偏移量
for (int i = 0; i < listCount; i++) {
List<T> value;
if (remaider > 0) {
value = source.subList(i * number + offset, (i + 1) * number + offset + 1);
remaider--;
offset++;
} else {
value = source.subList(i * number + offset, (i + 1) * number + offset);
}
result.add(value);
}
return result;
}
}
四、测试验证:
controller层如下:传入10000条数据
@GetMapping("/addBatch")
public void addBatch() {
List<Person> list = new ArrayList<>();
for(int i = 1; i <= 10000; i++){
Person p = new Person();
p.setId(i);
p.setName("张三" + i);
p.setAge(i);
p.setAddr("重庆");
p.setClasses("一班");
p.setSchoolId(i);
list.add(p);
}
try{
this.iPersonService.inserPersonBatch(list);
}catch (Exception e){
e.printStackTrace();
}
}
1、情况1:子线程中有一个执行失败
t_person表主键唯一 10000条Person数据id按1—10000设置
如图t_person表中已经有一条id为201的数据 所以线程池中有一个任务执行会失败!
我们打断点来看:此时已经分配好10个任务
如下图:插入id为201的数据时失败,线程池第一个任务执行失败返回0,其余全部成功返回1000
执行rollback回滚
执行完毕观察数据库:
t_school表数据没有被删,
t_person表数据也没有变化
2、情况2、删除 t_person表中id为201的数据重新插入
此时10个任务全部执行成功:
执行commit
执行完毕观察数据库:
t_school表数据已被删除
t_person表中10000条数据也成功插入:
3、情况3:主线程报错就不演示了
以上测试成功!
更多推荐
所有评论(0)