org.elasticsearch.search.aggregations.AggregatorFactories

  /**
     * Create all aggregators so that they can be consumed with multiple
     * buckets.
     */
    // 创建下层aggs
    public Aggregator[] createSubAggregators(SearchContext searchContext, Aggregator parent) throws IOException {
        Aggregator[] aggregators = new Aggregator[countAggregators()];
        for (int i = 0; i < factories.length; ++i) {
            // TODO: sometimes even sub aggregations always get called with bucket 0, eg. if
            // you have a terms agg under a top-level filter agg. We should have a way to
            // propagate the fact that only bucket 0 will be collected with single-bucket
            // aggs
            final boolean collectsFromSingleBucket = false;
            Aggregator factory = factories[i].create(searchContext, parent, collectsFromSingleBucket);
            Profilers profilers = factory.context().getProfilers();
            if (profilers != null) {
                factory = new ProfilingAggregator(factory, profilers.getAggregationProfiler());
            }
            aggregators[i] = factory;
        }
        return aggregators;
    }


    // 创建顶层aggs
    public Aggregator[] createTopLevelAggregators(SearchContext searchContext) throws IOException {
        // These aggregators are going to be used with a single bucket ordinal, no need to wrap the PER_BUCKET ones
        Aggregator[] aggregators = new Aggregator[factories.length];
        for (int i = 0; i < factories.length; i++) {
            // top-level aggs only get called with bucket 0
            final boolean collectsFromSingleBucket = true;
            Aggregator factory = factories[i].create(searchContext, null, collectsFromSingleBucket);
            Profilers profilers = factory.context().getProfilers();
            if (profilers != null) {
                factory = new ProfilingAggregator(factory, profilers.getAggregationProfiler());
            }
            aggregators[i] = factory;
        }
        return aggregators;
    }

大致步骤:

build->factory->aggs 

org.elasticsearch.search.internal.ShardSearchRequest  ShardSearchRequest(StreamInput in)

source = in.readOptionalWriteable(SearchSourceBuilder::new);
build:
org.elasticsearch.search.builder.SearchSourceBuilder parseXContent
org.elasticsearch.search.aggregations.AggregatorFactories parseAggregators

factory:

parseSource(context, request.source());
org.elasticsearch.search.SearchService   parseSource
                AggregatorFactories factories = source.aggregations().build(queryShardContext, null);


aggreators:
org.elasticsearch.search.aggregations.AggregatorFactories createTopLevelAggregators
org.elasticsearch.search.aggregations.AggregatorFactories createSubAggregators


aggregation:
org.elasticsearch.search.aggregations.AggregationPhase execute
org.elasticsearch.search.aggregations.Aggregator buildAggregation

                context.aggregations(new SearchContextAggregations(factories, multiBucketConsumerService.create()));

build

org.elasticsearch.search.SearchModule registerAggregations

   private static AggregatorFactories.Builder parseAggregators(XContentParser parser, int level) throws IOException {
        Matcher validAggMatcher = VALID_AGG_NAME.matcher("");
        AggregatorFactories.Builder factories = new AggregatorFactories.Builder();

        XContentParser.Token token = null;
        while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
            if (token != XContentParser.Token.FIELD_NAME) {
                throw new ParsingException(parser.getTokenLocation(),
                        "Unexpected token " + token + " in [aggs]: aggregations definitions must start with the name of the aggregation.");
            }
            final String aggregationName = parser.currentName();
            if (!validAggMatcher.reset(aggregationName).matches()) {
                throw new ParsingException(parser.getTokenLocation(), "Invalid aggregation name [" + aggregationName
                        + "]. Aggregation names can contain any character except '[', ']', and '>'");
            }

            token = parser.nextToken();
            if (token != XContentParser.Token.START_OBJECT) {
                throw new ParsingException(parser.getTokenLocation(), "Aggregation definition for [" + aggregationName + " starts with a ["
                        + token + "], expected a [" + XContentParser.Token.START_OBJECT + "].");
            }

            BaseAggregationBuilder aggBuilder = null;
            AggregatorFactories.Builder subFactories = null;

            Map<String, Object> metaData = null;

            while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
                if (token != XContentParser.Token.FIELD_NAME) {
                    throw new ParsingException(
                            parser.getTokenLocation(), "Expected [" + XContentParser.Token.FIELD_NAME + "] under a ["
                            + XContentParser.Token.START_OBJECT + "], but got a [" + token + "] in [" + aggregationName + "]",
                            parser.getTokenLocation());
                }
                final String fieldName = parser.currentName();

                token = parser.nextToken();
                if (token == XContentParser.Token.START_OBJECT) {
                    switch (fieldName) {
                    case "meta":
                        metaData = parser.map();
                        break;
                    case "aggregations":
                    case "aggs":
                        if (subFactories != null) {
                            throw new ParsingException(parser.getTokenLocation(),
                                    "Found two sub aggregation definitions under [" + aggregationName + "]");
                        }
                        subFactories = parseAggregators(parser, level + 1);
                        break;
                    default:
                        if (aggBuilder != null) {
                            throw new ParsingException(parser.getTokenLocation(), "Found two aggregation type definitions in ["
                                    + aggregationName + "]: [" + aggBuilder.getType() + "] and [" + fieldName + "]");
                        }

                        aggBuilder = parser.namedObject(BaseAggregationBuilder.class, fieldName, aggregationName);
                    }
                } else {
                    throw new ParsingException(parser.getTokenLocation(), "Expected [" + XContentParser.Token.START_OBJECT + "] under ["
                            + fieldName + "], but got a [" + token + "] in [" + aggregationName + "]");
                }
            }

            if (aggBuilder == null) {
                throw new ParsingException(parser.getTokenLocation(), "Missing definition for aggregation [" + aggregationName + "]",
                        parser.getTokenLocation());
            } else {
                if (metaData != null) {
                    aggBuilder.setMetaData(metaData);
                }

                if (subFactories != null) {
                    aggBuilder.subAggregations(subFactories);
                }

                if (aggBuilder instanceof AggregationBuilder) {
                    factories.addAggregator((AggregationBuilder) aggBuilder);
                } else {
                    factories.addPipelineAggregator((PipelineAggregationBuilder) aggBuilder);
                }
            }
        }

        return factories;
    }

org.elasticsearch.common.xcontent.NamedXContentRegistry  parseNamedObject

    /**
     * Parse a named object, throwing an exception if the parser isn't found. Throws an {@link NamedObjectNotFoundException} if the
     * {@code categoryClass} isn't registered because this is almost always a bug. Throws an {@link NamedObjectNotFoundException} if the
     * {@code categoryClass} is registered but the {@code name} isn't.
     *
     * @throws NamedObjectNotFoundException if the categoryClass or name is not registered
     */
    public <T, C> T parseNamedObject(Class<T> categoryClass, String name, XContentParser parser, C context) throws IOException {
        Map<String, Entry> parsers = registry.get(categoryClass);
        if (parsers == null) {
            if (registry.isEmpty()) {
                // The "empty" registry will never work so we throw a better exception as a hint.
                throw new NamedObjectNotFoundException("named objects are not supported for this parser");
            }
            throw new NamedObjectNotFoundException("unknown named object category [" + categoryClass.getName() + "]");
        }
        Entry entry = parsers.get(name);
        if (entry == null) {
            throw new NamedObjectNotFoundException(parser.getTokenLocation(), "unable to parse " + categoryClass.getSimpleName() +
                " with name [" + name + "]: parser not found");
        }
        if (false == entry.name.match(name, parser.getDeprecationHandler())) {
            /* Note that this shouldn't happen because we already looked up the entry using the names but we need to call `match` anyway
             * because it is responsible for logging deprecation warnings. */
            throw new NamedObjectNotFoundException(parser.getTokenLocation(),
                    "unable to parse " + categoryClass.getSimpleName() + " with name [" + name + "]: parser didn't match");
        }
        return categoryClass.cast(entry.parser.parse(parser, context));
    }

}

org.elasticsearch.plugins.SearchPlugin AggregationSpec

        /**
         * Specification for an {@link Aggregation}.
         *
         * @param name the name by which this aggregation might be parsed or deserialized. Make sure that the {@link AggregationBuilder}
         *        returns this from {@link NamedWriteable#getWriteableName()}.
         * @param reader the reader registered for this aggregation's builder. Typically a reference to a constructor that takes a
         *        {@link StreamInput}
         * @param parser the parser the reads the aggregation builder from xcontent
         * @deprecated Use the ctor that takes a {@link ContextParser} instead
         */
        @Deprecated
        public AggregationSpec(String name, Writeable.Reader<? extends AggregationBuilder> reader, Aggregator.Parser parser) {
            super(name, reader, (p, aggName) -> parser.parse(aggName, p));
        }

collect:

将aggs的collect放入 queryCollectors中

org.elasticsearch.search.aggregations.AggregationPhase preProcess

context.queryCollectors().put(AggregationPhase.class, collector);

org.elasticsearch.search.query.QueryPhase  executeInternal

                // plug in additional collectors, like aggregations
                collectors.add(createMultiCollectorContext(searchContext.queryCollectors().values()));



                shouldRescore = searchWithCollector(searchContext, searcher, query, collectors, hasFilterCollector, timeoutSet);

以leaveReader为单位进行搜索 

org.apache.lucene.search.IndexSearcher  search


  /**
   * Lower-level search API.
   * 
   * <p>
   * {@link LeafCollector#collect(int)} is called for every document. <br>
   * 
   * <p>
   * NOTE: this method executes the searches on all given leaves exclusively.
   * To search across all the searchers leaves use {@link #leafContexts}.
   * 
   * @param leaves 
   *          the searchers leaves to execute the searches on
   * @param weight
   *          to match documents
   * @param collector
   *          to receive hits
   * @throws BooleanQuery.TooManyClauses If a query would exceed 
   *         {@link BooleanQuery#getMaxClauseCount()} clauses.
   */
  protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector)
      throws IOException {

    // TODO: should we make this
    // threaded...?  the Collector could be sync'd?
    // always use single thread:
    for (LeafReaderContext ctx : leaves) { // search each subreader
      final LeafCollector leafCollector;
      try {
        leafCollector = collector.getLeafCollector(ctx);
      } catch (CollectionTerminatedException e) {
        // there is no doc of interest in this reader context
        // continue with the following leaf
        continue;
      }
      BulkScorer scorer = weight.bulkScorer(ctx);
      if (scorer != null) {
        try {
            // 收集信息
          scorer.score(leafCollector, ctx.reader().getLiveDocs());
        } catch (CollectionTerminatedException e) {
          // collection was terminated prematurely
          // continue with the following leaf
        }
      }
    }
  }

过滤桶:

org.elasticsearch.search.aggregations.bucket.filter.FilterAggregatorFactory

public class FilterAggregatorFactory extends AggregatorFactory {

    private Weight weight;
    private Query filter;

    public FilterAggregatorFactory(String name, QueryBuilder filterBuilder, QueryShardContext queryShardContext,
            AggregatorFactory parent, AggregatorFactories.Builder subFactoriesBuilder, Map<String, Object> metaData) throws IOException {
        super(name, queryShardContext, parent, subFactoriesBuilder, metaData);
        filter = filterBuilder.toQuery(queryShardContext);
    }

    /**
     * Returns the {@link Weight} for this filter aggregation, creating it if
     * necessary. This is done lazily so that the {@link Weight} is only created
     * if the aggregation collects documents reducing the overhead of the
     * aggregation in the case where no documents are collected.
     * 
     * Note that as aggregations are initialsed and executed in a serial manner,
     * no concurrency considerations are necessary here.
     */
    public Weight getWeight() {
        if (weight == null) {
            IndexSearcher contextSearcher = queryShardContext.searcher();
            try {
                weight = contextSearcher.createWeight(contextSearcher.rewrite(filter), ScoreMode.COMPLETE_NO_SCORES, 1f);
            } catch (IOException e) {
                throw new AggregationInitializationException("Failed to initialse filter", e);
            }
        }
        return weight;
    }

    @Override
    public Aggregator createInternal(SearchContext searchContext,
                                        Aggregator parent,
                                        boolean collectsFromSingleBucket,
                                        List<PipelineAggregator> pipelineAggregators,
                                        Map<String, Object> metaData) throws IOException {
        return new FilterAggregator(name, () -> this.getWeight(), factories, searchContext, parent, pipelineAggregators, metaData);
    }

}

Query阶段的获取的结果都会走下面的流程

org.elasticsearch.action.search.AbstractSearchAsyncAction#onShardResult

org.elasticsearch.action.search.SearchPhaseController.QueryPhaseResultConsumer#consumeResult

org.elasticsearch.action.search.SearchPhaseController.QueryPhaseResultConsumer#consumeInternal 

        private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
            if (index == bufferSize) {
                if (hasAggs) {
                    ReduceContext reduceContext = controller.reduceContextFunction.apply(false);
                    // 会在此时机执行reduce操作
                    InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(Arrays.asList(aggsBuffer), reduceContext);
                    Arrays.fill(aggsBuffer, null);
                    aggsBuffer[0] = reducedAggs;
                }
                if (hasTopDocs) {
                    TopDocs reducedTopDocs = mergeTopDocs(Arrays.asList(topDocsBuffer),
                        // we have to merge here in the same way we collect on a shard
                        querySearchResult.from() + querySearchResult.size(), 0);
                    Arrays.fill(topDocsBuffer, null);
                    topDocsBuffer[0] = reducedTopDocs;
                }
                numReducePhases++;
                index = 1;
                if (hasAggs) {
                    progressListener.notifyPartialReduce(progressListener.searchShards(processedShards),
                        topDocsStats.getTotalHits(), aggsBuffer[0], numReducePhases);
                }
            }
            final int i = index++;
            if (hasAggs) {
                aggsBuffer[i] = (InternalAggregations) querySearchResult.consumeAggs();
            }
            if (hasTopDocs) {
                final TopDocsAndMaxScore topDocs = querySearchResult.consumeTopDocs(); // can't be null
                topDocsStats.add(topDocs, querySearchResult.searchTimedOut(), querySearchResult.terminatedEarly());
                setShardIndex(topDocs.topDocs, querySearchResult.getShardIndex());
                topDocsBuffer[i] = topDocs.topDocs;
            }
            processedShards[querySearchResult.getShardIndex()] = querySearchResult.getSearchShardTarget();
        }

org.elasticsearch.action.search.SearchPhaseController#newSearchPhaseResults 

    ArraySearchPhaseResults<SearchPhaseResult> newSearchPhaseResults(SearchProgressListener listener,
                                                                     SearchRequest request,
                                                                     int numShards) {
        SearchSourceBuilder source = request.source();
        boolean isScrollRequest = request.scroll() != null;
        final boolean hasAggs = source != null && source.aggregations() != null;
        final boolean hasTopDocs = source == null || source.size() != 0;
        final int trackTotalHitsUpTo = resolveTrackTotalHits(request);
        if (isScrollRequest == false && (hasAggs || hasTopDocs)) {
            // no incremental reduce if scroll is used - we only hit a single shard or sometimes more...
            // 由BatchedReduceSize 和 numShards对比来确定使用哪个?
            if (request.getBatchedReduceSize() < numShards) {
                // only use this if there are aggs and if there are more shards than we should reduce at once
                return new QueryPhaseResultConsumer(listener, this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs,
                    trackTotalHitsUpTo, request.isFinalReduce());
            }
        }
        return new ArraySearchPhaseResults<SearchPhaseResult>(numShards) {
            @Override
            void consumeResult(SearchPhaseResult result) {
                super.consumeResult(result);
                listener.notifyQueryResult(result.queryResult().getShardIndex());
            }

            @Override
            ReducedQueryPhase reduce() {
                List<SearchPhaseResult> resultList = results.asList();
                final ReducedQueryPhase reducePhase =
                    reducedQueryPhase(resultList, isScrollRequest, trackTotalHitsUpTo, request.isFinalReduce());
                listener.notifyReduce(listener.searchShards(resultList), reducePhase.totalHits, reducePhase.aggregations);
                return reducePhase;
            }
        };
    }

 

参考:

es lucene搜索及聚合流程源码分析https://www.bbsmax.com/A/kmzLMnmbJG/Collector(一)-htmlhttps://www.amazingkoala.com.cn/Lucene/Search/2019/0812/82.html

ElasticSearch global ordinals&execution_hint - 简书1、global ordinals (1)what's this? 当我们使用doc values或者fielddata存储时,在磁盘中存储的值不是真正的字段值,而是一个字典...https://www.jianshu.com/p/6bf310afdb21

Kahan's Summation Formula原理—它是如何处理大数吃小数的_收拾行囊重新出发-CSDN博客_kahan summationKahan's Summation Formula原理—它是如何避免大数吃小数的Kahan求和公式原理:       首先,这个算法就是用来求和的,求a1+a2+a3+...为什么不直接相加呢,而要用Kahan求和公式呢,这个算法的用武之地在哪呢,一一道来       kahan求和算法能避免大数吃小数的情况。       大数吃小数是什么意思呢?举个例子,我们用两https://blog.csdn.net/qq_28372387/article/details/48345619

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐