JAVA多线程执行,等待返回结果,再执行

1.实现callable接口

1)配置线程池
package com.neusoft.demo.server.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

/**
 * 线程池配置、启用异步
 * 
 * @author
 *
 */
@EnableAsync(proxyTargetClass = true)
@Configuration
public class AsycTaskExecutorConfig {

	@Bean
	public TaskExecutor taskExecutor() {
		ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        // 核心线程数
		taskExecutor.setCorePoolSize(50);
		//最大线程数
		taskExecutor.setMaxPoolSize(100);
		//最大队列数
        taskExecutor.setQueueCapacity(1000);
        // 线程的空闲时间
        taskExecutor.setKeepAliveSeconds(100);
        //线程前缀
        //taskExecutor.setThreadNamePrefix("asyncTaskExecutor-");
        //拒绝策略交给主线程执行
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
		return taskExecutor;
	}
}

2)编写线程实现callable接口
package com.neusoft.demo.server.thread;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.neusoft.demo.server.model.TrafficFlow;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.Callable;

/**
 * @author dume
 * @create 2021-10-26 9:44
 **/
@Scope("prototype")
@Configuration
public class ThreadTaskProcess implements Callable {
    private static final Logger log = LoggerFactory.getLogger("adminLogger");
    @Qualifier("MyHighLevelClient")
    @Autowired
    private RestHighLevelClient rhlClient;
    @Value("${elasticsearch.motor-vehicle-node-name}")
    private String motorVehicleNode;

    private int searchnum;
    private String EndTime;
    private String BeginTime;
    private String Precision;
    private String DeviceCityCode;
    private String DeviceID;
    private String PlateColor;
    private String Direction;


    public ThreadTaskProcess getInstance(String EndTime, String BeginTime, String Precision, String DeviceCityCode, String DeviceID, String PlateColor, String Direction,int searchnum) {
        this.EndTime = EndTime;
        this.BeginTime = BeginTime;
        this.Precision = Precision;
        this.DeviceCityCode = DeviceCityCode;
        this.DeviceID = DeviceID;
        this.PlateColor = PlateColor;
        this.Direction = Direction;
        this.searchnum = searchnum;
        return this;
    }
    @Override
    public Object call() throws Exception {
        //es请求
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        BoolQueryBuilder bqb = QueryBuilders.boolQuery();
        if (BeginTime != null && EndTime != null) {
            RangeQueryBuilder rqb = QueryBuilders.rangeQuery("PassTime");
            rqb.gte(BeginTime);
            rqb.lt(EndTime);
            rqb.format("yyyyMMddHHmmss");
            bqb.must(rqb);
        }
        if(StringUtils.isNotBlank(DeviceCityCode)){
            if(DeviceCityCode.contains(",")){
                String[] strings = DeviceCityCode.split(",");
                TermsQueryBuilder tqb = QueryBuilders.termsQuery("DeviceCityCode",strings);
                bqb.must(tqb);
            }else{
                TermQueryBuilder tqb = QueryBuilders.termQuery("DeviceCityCode",DeviceCityCode);
                bqb.must(tqb);
            }

        }
        if(StringUtils.isNotBlank(DeviceID)){
            TermQueryBuilder tqb = QueryBuilders.termQuery("DeviceID",DeviceID);
            bqb.must(tqb);
        }
        if(StringUtils.isNotBlank(PlateColor)){

            if(PlateColor.length()==1){
                TermQueryBuilder tqb = QueryBuilders.termQuery("PlateColor",PlateColor);
                bqb.must(tqb);
            }else{
                TermQueryBuilder tqb = QueryBuilders.termQuery("PlateColor","6");
                bqb.mustNot(tqb);
            }
        }
        if(StringUtils.isNotBlank(Direction)){
            TermQueryBuilder tqb = QueryBuilders.termQuery("Direction",Direction);
            bqb.must(tqb);
        }

        sourceBuilder.query(bqb);
        sourceBuilder.size(0);
        sourceBuilder.timeout(new TimeValue(600000));
        DateHistogramAggregationBuilder fieldBuilder = AggregationBuilders
                .dateHistogram("articles_over_time")
                .field("PassTime")
                .dateHistogramInterval(new DateHistogramInterval(Precision+"m"))
                .format("yyyy/MM/dd HH:mm")
                .order(BucketOrder.key(true));
        SearchRequest searchRequest = new SearchRequest(motorVehicleNode).source(sourceBuilder.aggregation(fieldBuilder));
        SearchResponse searchResponse = null;
        JSONArray array = new JSONArray();
        JSONArray arrayback = new JSONArray();
        try {
            searchResponse = rhlClient.search(searchRequest, RequestOptions.DEFAULT);
            Histogram histo = searchResponse.getAggregations().get("articles_over_time");
            List<Histogram.Bucket> buckets = (List<Histogram.Bucket>)histo.getBuckets();
            array = JSONArray.parseArray(JSONArray.toJSONString(buckets));
            if(CollectionUtils.isNotEmpty(array)){
                for(int i=0;i<array.size();i++){
                    JSONObject object1 = new JSONObject();
                    object1.put("num",String.valueOf(i+1));
                    object1.put("datetime",array.getJSONObject(i).getString("keyAsString"));
                    object1.put("flownumber",array.getJSONObject(i).getString("docCount"));
                    arrayback.add(object1);
                }
            }
        } catch (IOException e1) {
            e1.printStackTrace();
        }

        TrafficFlow trafficFlow = new TrafficFlow();
        trafficFlow.setSearchnum(searchnum);
        if(CollectionUtils.isNotEmpty(arrayback)){
            List<TrafficFlow> list = JSONArray.parseArray(JSONArray.toJSONString(arrayback),TrafficFlow.class);
            trafficFlow.setTrafficFlows(list);
        }
        return trafficFlow;

    }




}

3)编写逻辑,线程执行完成后获取返回值
package com.neusoft.demo.server.service.impl;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.neusoft.demo.server.model.BeginAndEndDate;
import com.neusoft.demo.server.model.TrafficFlow;
import com.neusoft.demo.server.service.VbdTrafficFlowService;
import com.neusoft.demo.server.thread.ThreadTaskProcess;
import com.neusoft.demo.server.utils.DateSplitUtils;
import com.neusoft.demo.server.utils.RedisUtil;
import org.apache.commons.collections4.CollectionUtils;
import org.elasticsearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.ObjectFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.concurrent.FutureTask;
import java.util.stream.Collectors;

/**
 * @author dume
 * @create 2021-10-20 17:54
 **/
@Service
public class VbdTrafficFlowServiceImpl implements VbdTrafficFlowService {
    private static final Logger log = LoggerFactory.getLogger("adminLogger");

    @Qualifier("MyHighLevelClient")
    @Autowired
    private RestHighLevelClient rhlClient;
    @Value("${elasticsearch.motor-vehicle-node-name}")
    private String motorVehicleNode;

    @Autowired
    protected ThreadPoolTaskExecutor executorService;

    @Autowired
    private ObjectFactory<ThreadTaskProcess> processFactory;

    private static SimpleDateFormat simpleDateFormatOne = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    private static SimpleDateFormat simpleDateFormatTwo = new SimpleDateFormat("yyyyMMddHHmmss");
    private static int  oneDayTimes = 24*60*60*1000;
    private static int fiveMinutes = 5*60*1000;
    private static int fifthyMinutes = 15*60*1000;
    private static int thirtyMinutes = 30*60*1000;
    private static int sixtyMinutes = 60*60*1000;

    @Override
    public JSONObject getTrafficFlow(String EndTime, String BeginTime, String Precision, String DeviceCityCode, String DeviceID, String PlateColor, String Direction) {
        JSONObject object = new JSONObject();
        List<TrafficFlow> trafficFlows = new ArrayList();
        /**
         * 时间段处理
         */
        int precisionMinutes =  getMinutes(Precision);
        Date start = new Date();
        Date end = new Date();
        List<BeginAndEndDate> beginAndEndDateList = new ArrayList<>();
        //时间分片数

        try{
            start = simpleDateFormatTwo.parse(BeginTime);
            end = simpleDateFormatTwo.parse(EndTime);
        }catch (Exception e){
            e.printStackTrace();
            log.error("时间转化失败",e.getMessage());
        }

        if(end.getTime()-start.getTime()<=oneDayTimes){

            BeginAndEndDate beginAndEndDate =  new BeginAndEndDate();
            beginAndEndDate.setBeginTime(BeginTime);
            beginAndEndDate.setEndTime(EndTime);
            beginAndEndDateList.add(beginAndEndDate);
        }else{
          //将时间转化为可以被精度整除的时间
            long startlong = start.getTime();
            long endlong = end.getTime();
            int precision = Integer.valueOf(Precision);
            precision = precision*60*1000;
            startlong = startlong%precision==0?startlong:startlong-(startlong%precision);
            endlong = endlong%precision==0?endlong:endlong+(precision-(endlong%precision));
            List<DateSplitUtils.DateSplit> dateSplits =
                    DateSplitUtils.splitDate(
                            new Date(startlong),
                            new Date(endlong),
                            DateSplitUtils.IntervalType.DAY,
                            1);
            for (DateSplitUtils.DateSplit dateSplit : dateSplits) {
                BeginAndEndDate beginAndEndDate =  new BeginAndEndDate();
                beginAndEndDate.setBeginTime(dateSplit.getStartDateTimeStr());
                beginAndEndDate.setEndTime(dateSplit.getEndDateTimeStr());
                beginAndEndDateList.add(beginAndEndDate);
            }
        }

        List<FutureTask<Object>> futureTasks = new ArrayList<FutureTask<Object>>();
        JSONObject data = new JSONObject();

        for(int i=0;i<beginAndEndDateList.size();i++){
            futureTasks.add(
                    new FutureTask<>(
                            processFactory.getObject().getInstance(
                                    beginAndEndDateList.get(i).getEndTime(),
                                    beginAndEndDateList.get(i).getBeginTime(),
                                    Precision,
                                    DeviceCityCode,
                                    DeviceID,
                                    PlateColor,
                                    Direction,
                                    i+1
                            )
                    )
            );
        }
        // 加入 线程池
        for (FutureTask<Object> futureTask : futureTasks) {
            executorService.submit(futureTask);
        }
        // 获取线程返回结果
        for (int i = 0; i < futureTasks.size(); i++) {
            try {
                TrafficFlow trafficFlow = (TrafficFlow) futureTasks.get(i).get();
                if(null!=trafficFlow){
                    trafficFlows.add(trafficFlow);
                }

            } catch (Exception e) {
                e.printStackTrace();
                log.error("多线程获取返回结果失败",e.getMessage());
            }
        }
        /**
         * 排序并去空
         */
        List<TrafficFlow> backTrafficFlows = new ArrayList();
        if(CollectionUtils.isNotEmpty(trafficFlows)){
            trafficFlows = trafficFlows
                    .stream()
                    .sorted(Comparator.comparing(TrafficFlow::getSearchnum))
                    .collect(Collectors.toList());
            for(TrafficFlow trafficFlow : trafficFlows){
                if(CollectionUtils.isNotEmpty(trafficFlow.getTrafficFlows())){
                    backTrafficFlows.addAll(trafficFlow.getTrafficFlows());
                }

            }
            int num =1;
            for(TrafficFlow trafficFlow : backTrafficFlows){
                trafficFlow.setNum(num);
                num++;
            }
        }
        data.put("dataList",backTrafficFlows);
        object.put("data",data);
        return object;
    }

    public int getMinutes( String Precision){
        int back;
        switch (Precision){
            case "5":
                back = fiveMinutes;
                break;
            case "15":
                back = fifthyMinutes;
                break;
            case "30":
                back = thirtyMinutes;
                break;
            case "60":
                back = sixtyMinutes;
                break;
            default:
                back = sixtyMinutes;
                break;

        }
        return back;
    }
}

2.实现Runnable, 使用计数器

// 创建线程
public class MyThread implements Runnable {
    private List<MyVo> myVos;
    private CountDownLatch latch;

    public MyThread(List<MyVo> myVos, CountDownLatch latch){
        this.myVos = myVos;
        this.latch = latch;
    }

    @Override
    public void run() {
        // 执行代码逻辑
		 /**
		 *写逻辑
		 */
	
	
		// 计数器计数
        latch.countDown();
    }
}



// 创建线程池
    ExecutorService executorService = new ThreadPoolExecutor(12, 24,
            60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>());


public void executorMothod(){

try {
	List<List<MyVo>> list = new ArrayList<>();
	// 创建计数器
    CountDownLatch latch = new CountDownLatch(list.size());
    for (List<MyVo> myVos : list) {
        executorService.submit(new MyThread(myVos, latch));
    }
    // 等待计数器执行完成再获取数据
    latch.await();
} catch (InterruptedException e) {
    log.error(e.getMessage());
}finally {
    if(executorService != null && !executorService.isShutdown()){
        executorService.shutdown();
    }
}
	
log.info("线程执行完毕!");


// 执行后续程序

}
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐