因为最近要从elasticsearch中获取数据给前端展示,然后自己摸索到了一些查询方法,记录一下,以防忘记

只展示业务层的代码逻辑:

一、一次普通的查询方法:

 public ResultVO<List<PageVO<PageVulVo>>> page(PageParam param, @ResTypeValue String[] resTypeValues) {
        //排序
        if (StringUtils.isEmpty(param.getSortParams())) {
            param.setSortParams("first_time desc");
        }
        String sortParams = param.getSortParams();
        //搜索字段
//        Map<String, Object> map = ParamUtils.getMapBySearch(param);
        List<SearchParam> searchParams = param.getSearchParams();
        //查询条件
        NativeSearchQueryBuilder nativeSearchQueryBuilder = new NativeSearchQueryBuilder();
        this.queryVul(searchParams, nativeSearchQueryBuilder);
        //排序和分页
        PubUtils.sortAndPage(sortParams, nativeSearchQueryBuilder, param);
        //过滤
        nativeSearchQueryBuilder.withCollapseField("code");
        //查询
        Page<RealVul> pageResult = realVulRepository.search(nativeSearchQueryBuilder.build());
        //组装
        List<PageVulVo> pageVulVoList = pageResult.getContent().stream().map(e -> new PageVulVo(e)).collect(Collectors.toList());
        //返回
        return ResultUtils.pageOk(pageVulVoList, pageResult.getTotalElements());

    }

搜索条件方法:

  public NativeSearchQueryBuilder queryVul(List<SearchParam> searchParams, NativeSearchQueryBuilder nativeSearchQueryBuilder) {
        //查询条件
        BoolQueryBuilder mustQuery = QueryBuilders.boolQuery();

        for (SearchParam searchParam : searchParams) {
            String value = searchParam.getFieldValue();
            String name = searchParam.getFieldName();

            switch (name){
                case "vul_name":
                    mustQuery.must(QueryBuilders.wildcardQuery("vul_name", "*" + value + "*"));
                    break;
                case "cve_code":
                    mustQuery.must(QueryBuilders.matchQuery("cve_code", value));
                    break;
                case "port":
                    mustQuery.must(QueryBuilders.matchQuery("port",value));
                    break;
                case "scan_type":
                    mustQuery.must(QueryBuilders.matchQuery("scan_type",value));
                    break;
                case "create_time":
                    List<Long> createTime = DateUtils.parseStringToLong(value);
                    mustQuery.must(QueryBuilders.rangeQuery("create_time").gte(createTime.get(0)).lte(createTime.get(1)));
                    break;
                default:
                    break;
            }
        }
        nativeSearchQueryBuilder.withQuery(mustQuery);

        return nativeSearchQueryBuilder;
    }

分页和排序的方法:

 public static NativeSearchQueryBuilder sortAndPage(String sortParams, NativeSearchQueryBuilder nativeSearchQueryBuilder, PageParam param) {

        Pageable pageable = PageRequest.of(param.getCurrPage() - 1, param.getPageNums());
        nativeSearchQueryBuilder.withPageable(pageable);

        if (sortParams.toLowerCase().contains("desc")) {
            nativeSearchQueryBuilder.withSort(SortBuilders.fieldSort(sortParams.substring(0, sortParams.indexOf(" "))).order(SortOrder.DESC));
        } else {
            nativeSearchQueryBuilder.withSort(SortBuilders.fieldSort(sortParams.substring(0, sortParams.indexOf(" "))).order(SortOrder.ASC));
        }
        return nativeSearchQueryBuilder;
    }

这就是普通的查询办法。

二、带有统计和求最高级别的字段值

 例如要对整个elastcsearch中的数据进行统计分组和巧合的时候,上述的方法就不能满足要求了。

例如要算出以下这种数据

 

第一列是求出最高级别,第二列是算出数量,第三和第四列是算出不同级别的数量。 

为了应对这种需求,所以新的方法:

因为用的多线程是线程池,所以要自己格外配置线程池。


    @Resource
    public ElasticsearchRestTemplate estTemplate;
    @Autowired
    public AsyncTaskExecutor asyncExecutor;



public ResultVO<List<PageVO<PageVulByIpVo>>> pageVulByIp(PageParam param, @ResTypeValue String[] resTypeValue) throws ExecutionException, InterruptedException {
        //查询条件
        NativeSearchQueryBuilder nativeSearchQueryBuilder = queryPreposition(param);
        //分组
        nativeSearchQueryBuilder.addAggregation(
                AggregationBuilders.terms("ip_group").field("ip").size(Integer.MAX_VALUE)
                        //风险等级
                        .subAggregation(AggregationBuilders.max("vul_severity_max").field("vul_severity"))
                        //ip名称
                        .subAggregation(AggregationBuilders.terms("ip_name").field("ip"))
                        //发现时间
                        .subAggregation(AggregationBuilders.max("create_time").field("create_time"))
                        //排序+分页
                        .subAggregation(new BucketSortPipelineAggregationBuilder("bucket_sort", null)
                                .from(param.getCurrPage()).size(param.getPageNums()))
        );
        nativeSearchQueryBuilder.addAggregation(
                //统计总数
                AggregationBuilders.terms("ip_count").field("ip").size(Integer.MAX_VALUE)
        );
        //获取分页中的ip集合
        List<String> keyList = new ArrayList<>();
        //储存风险等级
        HashMap<String, Integer> vulSeverityMap = new HashMap<>();
        //储存ip内容;顺序为:ip,create_time
        HashMap<String, String> ipContent = new HashMap<>();
        //查询结果
        AggregatedPage<RealVul> pageResult = estTemplate.queryForPage(nativeSearchQueryBuilder.build(), RealVul.class);
        //进行分组
        Terms stringTerms = (Terms) Objects.requireNonNull(pageResult.getAggregation("ip_group"));
        Terms ipCount = (Terms) Objects.requireNonNull(pageResult.getAggregation("ip_count"));
        //获取总数
        long total = Long.valueOf(ipCount.getBuckets().size());

        for (Terms.Bucket bucket : stringTerms.getBuckets()) {
            String ip = bucket.getKeyAsString();
            //获取风险等级
            Map<String, Aggregation> aggregationMap = bucket.getAggregations().asMap();
            double vulSeverity = ((ParsedMax) aggregationMap.get("vul_severity_max")).getValue();
            Integer value = Integer.valueOf((int) vulSeverity);
            //获取时间
            Aggregation time = aggregationMap.get("create_time");
            String createTimeString = ((ParsedMax) time).getValueAsString();
            //获取ip地址
            Terms ipName = (Terms) Objects.requireNonNull(aggregationMap.get("ip_name"));
            String ipString = ipName.getBuckets().get(0).getKeyAsString();
            ipContent.put(ip,ipString+"/"+createTimeString);
            vulSeverityMap.put(ip, value);
            //获取ip
            keyList.add(ip);
        }
        //获取当前用户所需的数据
        List<PageVulByIpVo> vulByIpVoList = new ArrayList<>();
        //资产相关的名称
        BaseVO<List<AssetRestVO>> assetByIPList = assetClient.getAssetByIPList(keyList);
        List<AssetRestVO> data = assetByIPList.getData();
        //组装
        for (String ip : keyList) {
            //多线程查询字段
            Future<Integer> vulSize = asyncExecutor.submit(this.getVulSize(Arrays.asList(ip), null, null, null));
            Future<Map> stateMap = asyncExecutor.submit(this.getStateMap(null, Arrays.asList(ip), null, null, null));
            Future<Map> vulSeveritysMap = asyncExecutor.submit(this.getVulSeverityMap(Arrays.asList(ip), null, null, null));
            //组装
            Integer vulSizeOne = vulSize.get();
            Map stateMapOne = stateMap.get();
            Map vulSeverity = vulSeveritysMap.get();
            Integer vulSeverityMax = vulSeverityMap.get(ip);
            String[] content = ipContent.get(ip).split("/");
            AssetRestVO assetVo=new AssetRestVO();
            if (!CollectionUtils.isEmpty(data)){
                assetVo = data.stream().filter(assetRestVO -> ip.equals(assetRestVO.getIp())).collect(Collectors.toList()).get(0);
            }
            PageVulByIpVo vulByIpVo = new PageVulByIpVo(content, vulSeverityMax, vulSizeOne, stateMapOne, vulSeverity, assetVo);
            vulByIpVoList.add(vulByIpVo);
        }
        //返回
        return ResultUtils.pageOk(vulByIpVoList, total);
    }





//算出数量
 public Callable<Integer> getVulSize(List<String> ipList, Integer bsId, Integer areaId, Integer orgId) {
        Callable<Integer> callable = new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
                if (!CollectionUtils.isEmpty(ipList)) {
                    boolQueryBuilder.must(QueryBuilders.termsQuery("ip", ipList));
                }
                if (null != bsId) {
                    boolQueryBuilder.must(QueryBuilders.matchQuery("bs_id", bsId));
                }
                if (null != areaId) {
                    boolQueryBuilder.must(QueryBuilders.matchQuery("area_id", areaId));
                }
                if (null != orgId) {
                    boolQueryBuilder.must(QueryBuilders.matchQuery("org_id", orgId));
                }

                //查询条件
                SearchQuery searchQuery = new NativeSearchQueryBuilder()
                        .withIndices(TableEnums.VULCENTER_REAL_VUL_RESULT)
                        .withQuery(boolQueryBuilder)
                        .withCollapseField("code")
                        .build();
                //查询
                int count = (int) estTemplate.count(searchQuery);
                return count;

            }
        };
        return callable;

    }




  /**
     * 获取整改状态
     */
    public Callable<Map> getStateMap(String cveCode, List<String> ipList, Integer bsId, Integer areaId, Integer orgId) {
        Callable<Map> callable = new Callable<Map>() {
            @Override
            public Map call() throws Exception {
                Map<String, Integer> stateMap = new HashMap<>();
                stateMap.put("finish", 0);
                stateMap.put("doing", 0);
                stateMap.put("wait_submit", 0);
                stateMap.put("waiting_process", 0);
                stateMap.put("ignore", 0);
                //查询条件
                //查询整改状态条件
                for (RealVulStateEnum stateEnum : RealVulStateEnum.values()) {
                    int value = stateEnum.getValue();
                    //查询条件
                    NativeSearchQueryBuilder nativeSearchQueryBuilder = new NativeSearchQueryBuilder();
                    BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
                    if (!StringUtils.isEmpty(cveCode)) {
                        boolQueryBuilder.must(QueryBuilders.matchQuery("cve_code", cveCode));
                    }
                    if (!CollectionUtils.isEmpty(ipList)) {
                        boolQueryBuilder.must(QueryBuilders.termsQuery("ip", ipList));
                    }
                    if (null != bsId) {
                        boolQueryBuilder.must(QueryBuilders.matchQuery("bs_id", bsId));
                    }
                    if (null != areaId) {
                        boolQueryBuilder.must(QueryBuilders.matchQuery("area_id", areaId));
                    }
                    if (null != orgId) {
                        boolQueryBuilder.must(QueryBuilders.matchQuery("org_id", orgId));
                    }
                    boolQueryBuilder.must(QueryBuilders.matchQuery("state", value));
                    nativeSearchQueryBuilder.withQuery(boolQueryBuilder);
                    //过滤
                    nativeSearchQueryBuilder.withCollapseField("code");
                    nativeSearchQueryBuilder.withIndices(TableEnums.VULCENTER_REAL_VUL_RESULT);
                    long count = estTemplate.count(nativeSearchQueryBuilder.build());
                    switch (RealVulStateEnum.getRealVulStateEnum(value)) {
                        case PENDING:
                            stateMap.put("waiting_process", (int) count);
                            break;
                        case PROCESSING:
                            stateMap.put("doing", (int) count);
                            break;
                        case RECTIFIED:
                        case CHECKED:
                            stateMap.put("finish", stateMap.get("finish") + (int) count);
                            break;
                        case SUBMIT:
                        case SUBMIT_REJECT:
                            stateMap.put("wait_submit", stateMap.get("wait_submit") + (int) count);
                            break;
                        default:
                            stateMap.put("ignore", (int) count);
                            break;
                    }
                }
                return stateMap;
            }
        };
        return callable;

    }






/**
     * 获取风险等级
     *
     * @param asList
     * @return
     */
    private Callable<Map> getVulSeverityMap(List<String> asList, Integer bsId, Integer areaId, Integer orgId) {
        Callable<Map> callable = new Callable<Map>() {
            @Override
            public Map call() throws Exception {
                //初始化数量
                Map<String, Integer> vulSeverityMap = new HashMap<>();
                vulSeverityMap.put("super_risk", 0);
                vulSeverityMap.put("high_risk", 0);
                vulSeverityMap.put("medium_risk", 0);
                vulSeverityMap.put("low_risk", 0);
                vulSeverityMap.put("unknown", 0);
                //查询条件 依次统计5个风险等级
                for (VulSeverityEnum vulSeverityEnum : VulSeverityEnum.values()) {
                    int value = vulSeverityEnum.getValue();
                    NativeSearchQueryBuilder nativeSearchQueryBuilder = new NativeSearchQueryBuilder();
                    BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
                    //查询条件
                    if (!CollectionUtils.isEmpty(asList)) {
                        boolQueryBuilder.must(QueryBuilders.termsQuery("ip", asList));
                    }
                    if (null != bsId) {
                        boolQueryBuilder.must(QueryBuilders.matchQuery("bs_id", bsId));
                    }
                    if (null != areaId) {
                        boolQueryBuilder.must(QueryBuilders.matchQuery("area_id", areaId));
                    }
                    if (null != orgId) {
                        boolQueryBuilder.must(QueryBuilders.matchQuery("org_id", orgId));
                    }
                    boolQueryBuilder.must(QueryBuilders.matchQuery("vul_severity", value));
                    nativeSearchQueryBuilder.withQuery(boolQueryBuilder);
                    //过滤
                    nativeSearchQueryBuilder.withCollapseField("code");
                    nativeSearchQueryBuilder.withIndices(TableEnums.VULCENTER_REAL_VUL_RESULT);
                    long count = estTemplate.count(nativeSearchQueryBuilder.build());
                    switch (VulSeverityEnum.getVulSeverityEnum(value)) {
                        case UNKNOWN:
                            vulSeverityMap.put("unknown", (int) count);
                            break;
                        case LOW:
                            vulSeverityMap.put("low_risk", (int) count);
                            break;
                        case MEDIUM:
                            vulSeverityMap.put("medium_risk", (int) count);
                            break;
                        case HIGH:
                            vulSeverityMap.put("high_risk", (int) count);
                            break;
                        default:
                            vulSeverityMap.put("super_risk", (int) count);
                            break;
                    }
                }
                return vulSeverityMap;
            }
        };
        return callable;
    }

三、批量修改


        //批量修改状态
        List<UpdateQuery> updateQueryList = new ArrayList<>();
        IndexRequest indexRequest = new IndexRequest();
        indexRequest.source("state", RealVulStateEnum.PENDING.getValue());
        for (String id : idList) {
            UpdateQuery updateQuery = new UpdateQueryBuilder().withClass(RealVul.class).withId(id).withIndexRequest(indexRequest).build();
            updateQueryList.add(updateQuery);
        }
        estTemplate.bulkUpdate(updateQueryList);

四、对于数据很大的es进行统计分组,一次性的计算,可用startScroll 来进行计算

    /**
     * scroll游标快照超时时间,单位ms
     */
    private static final long SCROLL_TIMEOUT = 60 * 1000;

    /**
     * 多线程调用(线程池)
     */
    @Autowired
    public AsyncTaskExecutor asyncExecutor;

  public void selectVul() throws Exception {
   Future<Integer> vul = asyncExecutor.submit(this.vulInsertMysql(VulTypeEnum.IP_INSERT.getValue()));
、、、、、、、



}

 public Callable<Integer> vulInsertMysql(Integer type) {
        Callable<Integer> callable = new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                NativeSearchQueryBuilder nativeSearchQueryBuilder = new NativeSearchQueryBuilder();
                nativeSearchQueryBuilder.withPageable(PageRequest.of(0, 500));
                //分组
                nativeSearchQueryBuilder.addAggregation(
                        AggregationBuilders.terms("ip_group").field("ip").size(Integer.MAX_VALUE)
                                //风险等级
                                .subAggregation(AggregationBuilders.max("vul_severity_max").field("vul_severity"))
                );
                //漏洞
                AggregatedPage<RealVul> vulIp = (AggregatedPage<RealVul>) elasticsearchRestTemplate.startScroll(SCROLL_TIMEOUT, nativeSearchQueryBuilder.build(), RealVul.class);
                while (vulIp.hasContent()) {
                    List<VulIpSeverityInfo> vulList = vulIp.getContent().stream().map(e -> {
                        VulIpSeverityInfo vulIpSeverityInfo = new VulIpSeverityInfo(e.getIp(), e.getVulSeverity());
                        return vulIpSeverityInfo;
                    }).collect(Collectors.toList());
                    listToPage(vulList, type);
                    //取下一页,scrollId在es服务器上可能会发生变化,需要用最新的。发起continueScroll请求会重新刷新快照保留时间
                    vulIp = (AggregatedPage<RealVul>) elasticsearchRestTemplate.continueScroll(vulIp.getScrollId(), SCROLL_TIMEOUT, RealVul.class);
                }
                //及时释放es服务器资源
                log.info("漏洞表的ip全部取出");
                elasticsearchRestTemplate.clearScroll(vulIp.getScrollId());
                return 1;
            }
        };
        log.info("漏洞统计ip开始");
        return callable;

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐