订阅专栏
一、ElasticSearch的分组聚合
官网文档地址:
https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-terms-aggregation.html#_multi_field_terms_aggregation

实现背景:实现类似SQL的group by功能:

select team,age from a_person group by team,age;
1
二、多字段分组聚合的三种实现方法
在ElasticSearch中,实现多字段分组聚合的方式包括:

1、使用 Script
使用脚本script从多个字段聚合分组:

import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;

/**
 * 基于单bucket实现的多字段group by聚合
 * 
 * @author tang
 *
 */
public class AggsOneBucketGroupBy {

    private static final String elasticSearchClusterName = "cluster";
    private static final String elasticSearchAddress = "127.0.0.1";
    private static final int elasticSearchPort = 9300;

    private static String ALL_INDEX = "*";
    private static String SEPARATOR = "|";


    public static TermsAggregationBuilder buildTermsAggregationBuilder(List<String> aggregationFields) {
        String content = aggregationFields.stream().map(one -> String.format("doc['%s'].value", one))
                .collect(Collectors.joining("+'" + SEPARATOR + "'+"));
        
        TermsAggregationBuilder builder = AggregationBuilders.terms(aggregationFields.get(0));
        builder.script(new Script(ScriptType.INLINE, "painless", content,new HashMap<String,Object>()));
        builder.size(Integer.MAX_VALUE);
        return builder;
    }

    /**
     * 聚合结果解析
     * 
     */
    private static void resolveResult(Terms.Bucket bucket, List<String> fields, Map<String,Object> row, List<Map<String,Object>> resultSet) {
            boolean flag = false;
            for (String field : fields) {
                Terms terms = bucket.getAggregations().get(field);
                if (terms != null) {
                    for (Terms.Bucket bucket1 : terms.getBuckets()) {
                        row.put(field, bucket1.getKey());
                        
                        Map<String,Object> newRow=new LinkedHashMap<>(row);
                        resolveResult(bucket1, fields, newRow,resultSet);
                    }
                    
                    flag = true;
                    break;
                }
            }
            if (!flag) {
                // 到这里表示拿到了结果集中的一条
                // System.out.println(row);
                resultSet.add(row);
            }
        }
    
    public static void main(String[] args) throws IOException {
        Settings settings = Settings.builder()
                .put("cluster.name", elasticSearchClusterName)
                .put("client.transport.sniff", true).build();

        // 与ElasticSearch建立连接
        TransportAddress transportAddress = new InetSocketTransportAddress(InetAddress.getByName(elasticSearchAddress),
                elasticSearchPort);
        TransportClient client = new PreBuiltTransportClient(settings).addTransportAddress(transportAddress);

        // 这里为分组的字段列表
        List<String> groupFields = new ArrayList<>();
        groupFields.add("team");
        groupFields.add("age");
        
        // 构造聚合条件
        TermsAggregationBuilder aggregationBuilder=buildTermsAggregationBuilder(groupFields);
        aggregationBuilder.size(10);
        
        // 构造查询请求
        SearchRequestBuilder searchRequestBuilder=client.prepareSearch(ALL_INDEX)
                .addAggregation(aggregationBuilder);
        //System.out.println(searchRequestBuilder.toString());
        
        // 执行检索请求
        SearchResponse response =searchRequestBuilder 
                .execute()
                .actionGet();
        
        // 处理结果数据
        List<Map<String,Object>> resultSet=new LinkedList<>();
        Terms terms = response.getAggregations().get(groupFields.get(0));
        if (null != terms) {
            for (Terms.Bucket bucket : terms.getBuckets()) {
                Map<String, Object> one = new LinkedHashMap<>();
                one.put(groupFields.get(0), bucket.getKey());
                resolveResult(bucket, groupFields, one, resultSet);
            }
        }
        
        // 将结果全部打印
        resultSet.stream().forEach(row->System.out.println(row));
        System.out.println("total size = "+ resultSet.size());
    }

}


2、使用 copy_to 字段
如果您提前知道要从两个或多个字段分组,可在创建索引的mapping时创建一个新的专用copy_to字段,后续可以在这个字段上进行汇总查询。代码如下:

import java.io.IOException;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.Operator;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import com.google.common.collect.ImmutableMap;

/**
 * 参考教程:https://www.cnblogs.com/biehongli/p/11710704.html
 * 
 * @author tang
 * @date 2021-03-16 22:42:45
 * @since 1.0
 *
 */
public class GeneratorData001 {

    private static final String clusterName = "cluster";
    private static final String serverAddress = "127.0.0.1";
    private static final int serverPort = 9300;
    private static final String indexName="a_person";
    private static final String typeName="worker";

    public static void main(String[] args) throws IOException {
        Settings settings = Settings.builder()
                .put("cluster.name", clusterName)
                .put("client.transport.sniff", true).build();

        TransportAddress transportAddress=new InetSocketTransportAddress(InetAddress.getByName(serverAddress), serverPort);
        TransportClient client = new PreBuiltTransportClient(settings).addTransportAddress(transportAddress);

        Map<String, ? extends Object> indexSettings = ImmutableMap.of(
                "number_of_shards", 1, 
                "number_of_replicas", 1,
                "analysis.analyzer.underline.type", "pattern", 
                "analysis.analyzer.underline.pattern", "_");

        try {
            try {
                DeleteIndexResponse deleteIndexResponse = client.admin().indices().prepareDelete(indexName)
                        .execute().actionGet();
                if(!deleteIndexResponse.isAcknowledged()) {
                    throw new RuntimeException("delete index failed");
                }
            }catch(IndexNotFoundException e) {
                System.out.println("Index not exist!");
            }
            
            // 配置索引的mapping
            XContentBuilder contentBuilder = XContentFactory.jsonBuilder().startObject()
                    // 开启字段动态映射
                    .field("dynamic", "true").startObject("properties")
                    // 创建integer类型的id字段,并在文档中存储
                    .startObject("id").field("type", "integer").field("store", "true").endObject()
                    // 创建String类型的name字段,不分词,不建索引
                    .startObject("name").field("type", "keyword").field("index", "false").field("copy_to", "combine").endObject()
                    // 创建integer类型的age字段,默认不分词
                    .startObject("age").field("type", "integer").endObject()
                    // 创建integer类型的salary字段,默认不分词
                    .startObject("salary").field("type", "integer").endObject()
                    // 创建String类型的team字段,不分词,不建索引
                    .startObject("team").field("type", "keyword").field("index", "false").field("copy_to", "combine").endObject()
                    // 创建String类型的position字段,不分词,但是建索引
                    .startObject("position").field("type", "text").field("index", "true").endObject()
                    // 创建String类型的description字段,即分词(使用标准分词器),建立索引
                    .startObject("description").field("type", "text").field("store", "false").field("index", "true").endObject()
                    //.field("analyzer", "ik_smart").endObject()
                    // 创建String类型的addr字段,即分词(使用标准分词器),又建立索引、在文档中存储
                    .startObject("addr").field("type", "text").field("store", "true").field("index", "true").endObject()
                    //.field("analyzer", "ik_smart").endObject()
                    // 利用copy_to创建组合字段combine
                    .startObject("combine").field("type", "keyword").endObject()
                    .endObject() //End of properties
                    .endObject();
            //System.out.println(contentBuilder.string());
            
            // 创建索引
            CreateIndexResponse createIndexResponse = client.admin().indices()
                    .prepareCreate(indexName)
                    .addMapping(typeName, contentBuilder)
                    .setSettings(indexSettings)
                    .execute().actionGet();
            if(!createIndexResponse.isAcknowledged()) {
                throw new RuntimeException("create index failed");
            }

            //向索引中添加10条数据
            for (int i = 0; i < 10; ++i) {
                Map<String, Object> doc = new HashMap<>();
                doc.put("id", i);
                doc.put("name", "张三");
                doc.put("age", 22 + i);
                doc.put("salary", 1232 * i);
                doc.put("team", "研发部");
                doc.put("position", "hello world!");
                doc.put("description", "这是一条描述信息:" + i);
                client.prepareIndex(indexName, typeName, String.valueOf(i))
                        .setSource(doc)
                        .execute().actionGet();
            }
            
            for (int i = 10; i < 20; ++i) {
                Map<String, Object> doc = new HashMap<>();
                doc.put("id", i);
                doc.put("name", "张三");
                doc.put("age", 22 + i);
                doc.put("salary", 1232 * i);
                doc.put("team", "测试部");
                doc.put("position", "hello world!");
                doc.put("description", "这是一条描述信息:" + i);
                client.prepareIndex(indexName, typeName, String.valueOf(i))
                .setSource(doc)
                .execute().actionGet();
            }
            
            for (int i = 20; i < 30; ++i) {
                Map<String, Object> doc = new HashMap<>();
                doc.put("id", i);
                doc.put("name", "张三");
                doc.put("age", 22 + i);
                doc.put("salary", 1232 * i);
                doc.put("team", "运维部");
                doc.put("position", "hello world!");
                doc.put("description", "这是一条描述信息:" + i);
                client.prepareIndex(indexName, typeName, String.valueOf(i))
                .setSource(doc)
                .execute().actionGet();
            }
            
            for (int i = 30; i < 40; ++i) {
                Map<String, Object> doc = new HashMap<>();
                doc.put("id", i);
                doc.put("name", "张三");
                doc.put("age", 22 + i);
                doc.put("salary", 1232 * i);
                doc.put("team", "行政部");
                doc.put("position", "hello world!");
                doc.put("description", "这是一条描述信息:" + i);
                client.prepareIndex(indexName, typeName, String.valueOf(i))
                .setSource(doc)
                .execute().actionGet();
            }

            SearchRequestBuilder searchRequestBuilder = client.prepareSearch(indexName)
                    .setQuery(QueryBuilders.matchQuery("combine", "张三 研发部").operator(Operator.AND));
            System.out.println(searchRequestBuilder);

            System.out.println("=============================");
            SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
            System.out.println(searchResponse);
            System.out.println("=============================");
            Arrays.asList(searchResponse.getHits().getHits()).stream().forEach(i -> System.out.println(i.getSourceAsString()));
        }catch(Exception e) {
            e.printStackTrace();
        } finally {
            client.close();
        }
    }

}

其他示例教程:https://elasticstack.blog.csdn.net/article/details/99689986

3、直接 multi_terms 聚合
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;

/**
 * 基于迭代嵌套bucket的聚合计算
 * 
 * @author tang
 *
 */
public class AggsIteratorBucketGroupBy {

    private static final String elasticSearchClusterName = "cluster";
    private static final String elasticSearchAddress = "127.0.0.1";
    private static final int elasticSearchPort = 9300;

    private static String ALL_INDEX = "*";

    /**
     * 构造分组聚合Builder
     */
    public static AggregationBuilder buildTermsAggregationBuilder(List<String> aggregationFields){
        Iterator<String> iterator = aggregationFields.iterator();
        TermsAggregationBuilder builderRoot = null;
        TermsAggregationBuilder builderCursor = null;
        while(iterator.hasNext()){
            String field = iterator.next();
            TermsAggregationBuilder builder = AggregationBuilders.terms(aggregationFields.stream().collect(Collectors.joining())).field(field);
            builder.size(Integer.MAX_VALUE);
            if(builderRoot == null){
                builderRoot = builder;
                builderCursor = builderRoot;
            }else {
                builderCursor.subAggregation(builder);
                builderCursor = builder;
            }
        }
        return builderRoot;
    }

    /**
     * 聚合结果解析
     */
    private static void resolveResult(Terms.Bucket bucket, List<String> fields, Map<String,Object> row, List<Map<String,Object>> resultSet) {
            boolean flag = false;
            for (String field : fields) {
                Terms terms = bucket.getAggregations().get(field);
                if (terms != null) {
                    for (Terms.Bucket bucket1 : terms.getBuckets()) {
                        row.put(field, bucket1.getKey());
                        
                        Map<String,Object> newRow=new LinkedHashMap<>(row);
                        resolveResult(bucket1, fields, newRow,resultSet);
                    }
                    
                    flag = true;
                    break;
                }
            }
            if (!flag) {
                // 到这里表示拿到了结果集中的一条
                // System.out.println(row);
                resultSet.add(row);
            }
        }
    
    public static void main(String[] args) throws IOException {
        Settings settings = Settings.builder()
                .put("cluster.name", elasticSearchClusterName)
                .put("client.transport.sniff", true).build();

        // 与ElasticSearch建立连接
        TransportAddress transportAddress = new InetSocketTransportAddress(InetAddress.getByName(elasticSearchAddress),    elasticSearchPort);
        TransportClient client = new PreBuiltTransportClient(settings).addTransportAddress(transportAddress);

        // 这里为分组的字段列表
        List<String> groupFields = new ArrayList<>();
        groupFields.add("team");
        groupFields.add("age");
        
        // 构造聚合条件
        AggregationBuilder aggregationBuilder=buildTermsAggregationBuilder(groupFields);
        
        // 构造查询请求,注意:ElasticSearch的分组聚合查询无法分页
        SearchRequestBuilder searchRequestBuilder=client.prepareSearch(ALL_INDEX)
                .addAggregation(aggregationBuilder);
        //System.out.println(searchRequestBuilder.toString());
        
        // 执行检索请求
        SearchResponse response =searchRequestBuilder 
                .execute()
                .actionGet();
        
        // 处理结果数据
        List<Map<String,Object>> resultSet=new LinkedList<>();
        Terms terms = response.getAggregations().get(groupFields.stream().collect(Collectors.joining()));
        if (null != terms) {
            for (Terms.Bucket bucket : terms.getBuckets()) {
                Map<String, Object> one = new LinkedHashMap<>();
                one.put(groupFields.get(0), bucket.getKey());
                resolveResult(bucket, groupFields, one, resultSet);
            }
        }
        
        // 将结果全部打印
        resultSet.stream().forEach(row->System.out.println(row));
        System.out.println("total size = "+ resultSet.size());
    }

}


其他示例教程:https://www.jianshu.com/p/bb723817ad1e
 

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐