采用多线程+ES批量写入
采用多线程+ES批量写入
·
前言
将Mysql的数据使用分页、多线程的方式批量导入到ElasticSearch
一、自定义线程池
自定义线程池实现:new ThreadPoolExecutor(7个参数)
- corePoolSize:核心线程数(默认值:1)
allowCoreThreadTimeout=false为默认值
如果设置allowCoreThreadTimeout=false后, 当前线程数大于corePoolSize,如果线程空闲等待时间超过keepAliveTime,则该线程会被回收。
如果设置allowCoreThreadTimeout=true后, 当前线程数小于corePoolSize时,线程池的线程空闲等待时间超过keepAliveTime,也会被回收 - maximumPoolSize:最大线程数(默认值:Integer.MAX_VALUE)
- keepAliveTime:空闲线程存活时间(默认值:60秒)
- unit:时间单位 (秒)
- workQueue:工作队列大小(默认值:Integer.MAX_VALUE)
- threadFactory:线程工厂
- handler:拒绝策略。(默认AbortPolicy策略)
JDK默认的拒绝策略有四种:
- 1.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。(默认值)
- 2.DiscardPolicy:丢弃任务,但是不抛出异常。可能导致无法发现系统的异常状态。
- 3.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务。
- 4.CallerRunsPolicy:由调用线程处理该任务。
二、上代码
/*
*多线程批量导入
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = SearchApplication.class)
public class test {
@Autowired
private ItemClient itemClient;
@Autowired
private ObjectMapper objectMapper;
@Autowired
private RestHighLevelClient highLevelClient;
/**
* @param :
* @return void
* @description 批量导入数据
*/
@Test
public void IndexTest() throws JsonProcessingException {
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3),
new ThreadPoolExecutor.DiscardOldestPolicy());
int pageSize = 10000;
//查询数据总条数count
Integer itemCount = itemClient.selectCount();
int totalPageNum = (int)Math.ceil(itemCount /pageSize );//有余数的时候自动加一
for (int pageNum = 1; pageNum <= totalPageNum; pageNum++) {
//调用feign接口查询数据(分页查询数据)
PageDTO<Item> itemPageDTO = itemClient.selectAll(pageNum, pageSize);
if (itemPageDTO.getList() != null && itemPageDTO.getList().size() != 0) {
threadPool.execute(new Runnable() {
@Override
public void run() {
//加入线程池任务中
try {
//创建批量操作请求对象
BulkRequest bulkRequest = new BulkRequest();//底层集合缓存
for (Item item : itemPageDTO.getList()) {
//导入数据
ItemDoc itemDoc = new ItemDoc(item);
//创建请求对象
IndexRequest request = new IndexRequest("item").id(itemDoc.getId().toString());
//填充内容
String json = objectMapper.writeValueAsString(itemDoc);
request.source(json, XContentType.JSON);
//把数据添加批量操作对象缓存中
bulkRequest.add(request);
}
highLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
try {
//延迟3秒等elasticsearch完成写入数据
Thread.sleep(3000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
}
更多推荐
已为社区贡献2条内容
所有评论(0)