     * Create all aggregators so that they can be consumed with multiple
     * buckets.
    // 创建下层aggs
    public Aggregator[] createSubAggregators(SearchContext searchContext, Aggregator parent) throws IOException {
        Aggregator[] aggregators = new Aggregator[countAggregators()];
        for (int i = 0; i < factories.length; ++i) {
            // TODO: sometimes even sub aggregations always get called with bucket 0, eg. if
            // you have a terms agg under a top-level filter agg. We should have a way to
            // propagate the fact that only bucket 0 will be collected with single-bucket
            // aggs
            final boolean collectsFromSingleBucket = false;
            Aggregator factory = factories[i].create(searchContext, parent, collectsFromSingleBucket);
            Profilers profilers = factory.context().getProfilers();
            if (profilers != null) {
                factory = new ProfilingAggregator(factory, profilers.getAggregationProfiler());
            aggregators[i] = factory;
        return aggregators;

    // 创建顶层aggs
    public Aggregator[] createTopLevelAggregators(SearchContext searchContext) throws IOException {
        // These aggregators are going to be used with a single bucket ordinal, no need to wrap the PER_BUCKET ones
        Aggregator[] aggregators = new Aggregator[factories.length];
        for (int i = 0; i < factories.length; i++) {
            // top-level aggs only get called with bucket 0
            final boolean collectsFromSingleBucket = true;
            Aggregator factory = factories[i].create(searchContext, null, collectsFromSingleBucket);
            Profilers profilers = factory.context().getProfilers();
            if (profilers != null) {
                factory = new ProfilingAggregator(factory, profilers.getAggregationProfiler());
            aggregators[i] = factory;
        return aggregators;


build->factory->aggs  ShardSearchRequest(StreamInput in)

source = in.readOptionalWriteable(SearchSourceBuilder::new);
build: parseXContent parseAggregators


parseSource(context, request.source());   parseSource
                AggregatorFactories factories = source.aggregations().build(queryShardContext, null);

aggreators: createTopLevelAggregators createSubAggregators

aggregation: execute buildAggregation

                context.aggregations(new SearchContextAggregations(factories, multiBucketConsumerService.create()));

build registerAggregations

   private static AggregatorFactories.Builder parseAggregators(XContentParser parser, int level) throws IOException {
        Matcher validAggMatcher = VALID_AGG_NAME.matcher("");
        AggregatorFactories.Builder factories = new AggregatorFactories.Builder();

        XContentParser.Token token = null;
        while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
            if (token != XContentParser.Token.FIELD_NAME) {
                throw new ParsingException(parser.getTokenLocation(),
                        "Unexpected token " + token + " in [aggs]: aggregations definitions must start with the name of the aggregation.");
            final String aggregationName = parser.currentName();
            if (!validAggMatcher.reset(aggregationName).matches()) {
                throw new ParsingException(parser.getTokenLocation(), "Invalid aggregation name [" + aggregationName
                        + "]. Aggregation names can contain any character except '[', ']', and '>'");

            token = parser.nextToken();
            if (token != XContentParser.Token.START_OBJECT) {
                throw new ParsingException(parser.getTokenLocation(), "Aggregation definition for [" + aggregationName + " starts with a ["
                        + token + "], expected a [" + XContentParser.Token.START_OBJECT + "].");

            BaseAggregationBuilder aggBuilder = null;
            AggregatorFactories.Builder subFactories = null;

            Map<String, Object> metaData = null;

            while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
                if (token != XContentParser.Token.FIELD_NAME) {
                    throw new ParsingException(
                            parser.getTokenLocation(), "Expected [" + XContentParser.Token.FIELD_NAME + "] under a ["
                            + XContentParser.Token.START_OBJECT + "], but got a [" + token + "] in [" + aggregationName + "]",
                final String fieldName = parser.currentName();

                token = parser.nextToken();
                if (token == XContentParser.Token.START_OBJECT) {
                    switch (fieldName) {
                    case "meta":
                        metaData =;
                    case "aggregations":
                    case "aggs":
                        if (subFactories != null) {
                            throw new ParsingException(parser.getTokenLocation(),
                                    "Found two sub aggregation definitions under [" + aggregationName + "]");
                        subFactories = parseAggregators(parser, level + 1);
                        if (aggBuilder != null) {
                            throw new ParsingException(parser.getTokenLocation(), "Found two aggregation type definitions in ["
                                    + aggregationName + "]: [" + aggBuilder.getType() + "] and [" + fieldName + "]");

                        aggBuilder = parser.namedObject(BaseAggregationBuilder.class, fieldName, aggregationName);
                } else {
                    throw new ParsingException(parser.getTokenLocation(), "Expected [" + XContentParser.Token.START_OBJECT + "] under ["
                            + fieldName + "], but got a [" + token + "] in [" + aggregationName + "]");

            if (aggBuilder == null) {
                throw new ParsingException(parser.getTokenLocation(), "Missing definition for aggregation [" + aggregationName + "]",
            } else {
                if (metaData != null) {

                if (subFactories != null) {

                if (aggBuilder instanceof AggregationBuilder) {
                    factories.addAggregator((AggregationBuilder) aggBuilder);
                } else {
                    factories.addPipelineAggregator((PipelineAggregationBuilder) aggBuilder);

        return factories;

org.elasticsearch.common.xcontent.NamedXContentRegistry  parseNamedObject

     * Parse a named object, throwing an exception if the parser isn't found. Throws an {@link NamedObjectNotFoundException} if the
     * {@code categoryClass} isn't registered because this is almost always a bug. Throws an {@link NamedObjectNotFoundException} if the
     * {@code categoryClass} is registered but the {@code name} isn't.
     * @throws NamedObjectNotFoundException if the categoryClass or name is not registered
    public <T, C> T parseNamedObject(Class<T> categoryClass, String name, XContentParser parser, C context) throws IOException {
        Map<String, Entry> parsers = registry.get(categoryClass);
        if (parsers == null) {
            if (registry.isEmpty()) {
                // The "empty" registry will never work so we throw a better exception as a hint.
                throw new NamedObjectNotFoundException("named objects are not supported for this parser");
            throw new NamedObjectNotFoundException("unknown named object category [" + categoryClass.getName() + "]");
        Entry entry = parsers.get(name);
        if (entry == null) {
            throw new NamedObjectNotFoundException(parser.getTokenLocation(), "unable to parse " + categoryClass.getSimpleName() +
                " with name [" + name + "]: parser not found");
        if (false ==, parser.getDeprecationHandler())) {
            /* Note that this shouldn't happen because we already looked up the entry using the names but we need to call `match` anyway
             * because it is responsible for logging deprecation warnings. */
            throw new NamedObjectNotFoundException(parser.getTokenLocation(),
                    "unable to parse " + categoryClass.getSimpleName() + " with name [" + name + "]: parser didn't match");
        return categoryClass.cast(entry.parser.parse(parser, context));


org.elasticsearch.plugins.SearchPlugin AggregationSpec

         * Specification for an {@link Aggregation}.
         * @param name the name by which this aggregation might be parsed or deserialized. Make sure that the {@link AggregationBuilder}
         *        returns this from {@link NamedWriteable#getWriteableName()}.
         * @param reader the reader registered for this aggregation's builder. Typically a reference to a constructor that takes a
         *        {@link StreamInput}
         * @param parser the parser the reads the aggregation builder from xcontent
         * @deprecated Use the ctor that takes a {@link ContextParser} instead
        public AggregationSpec(String name, Writeable.Reader<? extends AggregationBuilder> reader, Aggregator.Parser parser) {
            super(name, reader, (p, aggName) -> parser.parse(aggName, p));


将aggs的collect放入 queryCollectors中 preProcess

context.queryCollectors().put(AggregationPhase.class, collector);  executeInternal

                // plug in additional collectors, like aggregations

                shouldRescore = searchWithCollector(searchContext, searcher, query, collectors, hasFilterCollector, timeoutSet);

以leaveReader为单位进行搜索  search

   * Lower-level search API.
   * <p>
   * {@link LeafCollector#collect(int)} is called for every document. <br>
   * <p>
   * NOTE: this method executes the searches on all given leaves exclusively.
   * To search across all the searchers leaves use {@link #leafContexts}.
   * @param leaves 
   *          the searchers leaves to execute the searches on
   * @param weight
   *          to match documents
   * @param collector
   *          to receive hits
   * @throws BooleanQuery.TooManyClauses If a query would exceed 
   *         {@link BooleanQuery#getMaxClauseCount()} clauses.
  protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector)
      throws IOException {

    // TODO: should we make this
    // threaded...?  the Collector could be sync'd?
    // always use single thread:
    for (LeafReaderContext ctx : leaves) { // search each subreader
      final LeafCollector leafCollector;
      try {
        leafCollector = collector.getLeafCollector(ctx);
      } catch (CollectionTerminatedException e) {
        // there is no doc of interest in this reader context
        // continue with the following leaf
      BulkScorer scorer = weight.bulkScorer(ctx);
      if (scorer != null) {
        try {
            // 收集信息
          scorer.score(leafCollector, ctx.reader().getLiveDocs());
        } catch (CollectionTerminatedException e) {
          // collection was terminated prematurely
          // continue with the following leaf


public class FilterAggregatorFactory extends AggregatorFactory {

    private Weight weight;
    private Query filter;

    public FilterAggregatorFactory(String name, QueryBuilder filterBuilder, QueryShardContext queryShardContext,
            AggregatorFactory parent, AggregatorFactories.Builder subFactoriesBuilder, Map<String, Object> metaData) throws IOException {
        super(name, queryShardContext, parent, subFactoriesBuilder, metaData);
        filter = filterBuilder.toQuery(queryShardContext);

     * Returns the {@link Weight} for this filter aggregation, creating it if
     * necessary. This is done lazily so that the {@link Weight} is only created
     * if the aggregation collects documents reducing the overhead of the
     * aggregation in the case where no documents are collected.
     * Note that as aggregations are initialsed and executed in a serial manner,
     * no concurrency considerations are necessary here.
    public Weight getWeight() {
        if (weight == null) {
            IndexSearcher contextSearcher = queryShardContext.searcher();
            try {
                weight = contextSearcher.createWeight(contextSearcher.rewrite(filter), ScoreMode.COMPLETE_NO_SCORES, 1f);
            } catch (IOException e) {
                throw new AggregationInitializationException("Failed to initialse filter", e);
        return weight;

    public Aggregator createInternal(SearchContext searchContext,
                                        Aggregator parent,
                                        boolean collectsFromSingleBucket,
                                        List<PipelineAggregator> pipelineAggregators,
                                        Map<String, Object> metaData) throws IOException {
        return new FilterAggregator(name, () -> this.getWeight(), factories, searchContext, parent, pipelineAggregators, metaData);



        private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
            if (index == bufferSize) {
                if (hasAggs) {
                    ReduceContext reduceContext = controller.reduceContextFunction.apply(false);
                    // 会在此时机执行reduce操作
                    InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(Arrays.asList(aggsBuffer), reduceContext);
                    Arrays.fill(aggsBuffer, null);
                    aggsBuffer[0] = reducedAggs;
                if (hasTopDocs) {
                    TopDocs reducedTopDocs = mergeTopDocs(Arrays.asList(topDocsBuffer),
                        // we have to merge here in the same way we collect on a shard
                        querySearchResult.from() + querySearchResult.size(), 0);
                    Arrays.fill(topDocsBuffer, null);
                    topDocsBuffer[0] = reducedTopDocs;
                index = 1;
                if (hasAggs) {
                        topDocsStats.getTotalHits(), aggsBuffer[0], numReducePhases);
            final int i = index++;
            if (hasAggs) {
                aggsBuffer[i] = (InternalAggregations) querySearchResult.consumeAggs();
            if (hasTopDocs) {
                final TopDocsAndMaxScore topDocs = querySearchResult.consumeTopDocs(); // can't be null
                topDocsStats.add(topDocs, querySearchResult.searchTimedOut(), querySearchResult.terminatedEarly());
                setShardIndex(topDocs.topDocs, querySearchResult.getShardIndex());
                topDocsBuffer[i] = topDocs.topDocs;
            processedShards[querySearchResult.getShardIndex()] = querySearchResult.getSearchShardTarget();

    ArraySearchPhaseResults<SearchPhaseResult> newSearchPhaseResults(SearchProgressListener listener,
                                                                     SearchRequest request,
                                                                     int numShards) {
        SearchSourceBuilder source = request.source();
        boolean isScrollRequest = request.scroll() != null;
        final boolean hasAggs = source != null && source.aggregations() != null;
        final boolean hasTopDocs = source == null || source.size() != 0;
        final int trackTotalHitsUpTo = resolveTrackTotalHits(request);
        if (isScrollRequest == false && (hasAggs || hasTopDocs)) {
            // no incremental reduce if scroll is used - we only hit a single shard or sometimes more...
            // 由BatchedReduceSize 和 numShards对比来确定使用哪个?
            if (request.getBatchedReduceSize() < numShards) {
                // only use this if there are aggs and if there are more shards than we should reduce at once
                return new QueryPhaseResultConsumer(listener, this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs,
                    trackTotalHitsUpTo, request.isFinalReduce());
        return new ArraySearchPhaseResults<SearchPhaseResult>(numShards) {
            void consumeResult(SearchPhaseResult result) {

            ReducedQueryPhase reduce() {
                List<SearchPhaseResult> resultList = results.asList();
                final ReducedQueryPhase reducePhase =
                    reducedQueryPhase(resultList, isScrollRequest, trackTotalHitsUpTo, request.isFinalReduce());
                listener.notifyReduce(listener.searchShards(resultList), reducePhase.totalHits, reducePhase.aggregations);
                return reducePhase;



