Aggs创建
org.elasticsearch.search.aggregations.AggregatorFactories/*** Create all aggregators so that they can be consumed with multiple* buckets.*/// 创建下层aggspublic Aggregator[] createSubAggregators(SearchCon
org.elasticsearch.search.aggregations.AggregatorFactories
/**
* Create all aggregators so that they can be consumed with multiple
* buckets.
*/
// 创建下层aggs
public Aggregator[] createSubAggregators(SearchContext searchContext, Aggregator parent) throws IOException {
Aggregator[] aggregators = new Aggregator[countAggregators()];
for (int i = 0; i < factories.length; ++i) {
// TODO: sometimes even sub aggregations always get called with bucket 0, eg. if
// you have a terms agg under a top-level filter agg. We should have a way to
// propagate the fact that only bucket 0 will be collected with single-bucket
// aggs
final boolean collectsFromSingleBucket = false;
Aggregator factory = factories[i].create(searchContext, parent, collectsFromSingleBucket);
Profilers profilers = factory.context().getProfilers();
if (profilers != null) {
factory = new ProfilingAggregator(factory, profilers.getAggregationProfiler());
}
aggregators[i] = factory;
}
return aggregators;
}
// 创建顶层aggs
public Aggregator[] createTopLevelAggregators(SearchContext searchContext) throws IOException {
// These aggregators are going to be used with a single bucket ordinal, no need to wrap the PER_BUCKET ones
Aggregator[] aggregators = new Aggregator[factories.length];
for (int i = 0; i < factories.length; i++) {
// top-level aggs only get called with bucket 0
final boolean collectsFromSingleBucket = true;
Aggregator factory = factories[i].create(searchContext, null, collectsFromSingleBucket);
Profilers profilers = factory.context().getProfilers();
if (profilers != null) {
factory = new ProfilingAggregator(factory, profilers.getAggregationProfiler());
}
aggregators[i] = factory;
}
return aggregators;
}
大致步骤:
build->factory->aggs
org.elasticsearch.search.internal.ShardSearchRequest ShardSearchRequest(StreamInput in)
source = in.readOptionalWriteable(SearchSourceBuilder::new);
build:
org.elasticsearch.search.builder.SearchSourceBuilder parseXContent
org.elasticsearch.search.aggregations.AggregatorFactories parseAggregators
factory:
parseSource(context, request.source());
org.elasticsearch.search.SearchService parseSource
AggregatorFactories factories = source.aggregations().build(queryShardContext, null);
aggreators:
org.elasticsearch.search.aggregations.AggregatorFactories createTopLevelAggregators
org.elasticsearch.search.aggregations.AggregatorFactories createSubAggregators
aggregation:
org.elasticsearch.search.aggregations.AggregationPhase execute
org.elasticsearch.search.aggregations.Aggregator buildAggregation
context.aggregations(new SearchContextAggregations(factories, multiBucketConsumerService.create()));
build
org.elasticsearch.search.SearchModule registerAggregations
private static AggregatorFactories.Builder parseAggregators(XContentParser parser, int level) throws IOException {
Matcher validAggMatcher = VALID_AGG_NAME.matcher("");
AggregatorFactories.Builder factories = new AggregatorFactories.Builder();
XContentParser.Token token = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token != XContentParser.Token.FIELD_NAME) {
throw new ParsingException(parser.getTokenLocation(),
"Unexpected token " + token + " in [aggs]: aggregations definitions must start with the name of the aggregation.");
}
final String aggregationName = parser.currentName();
if (!validAggMatcher.reset(aggregationName).matches()) {
throw new ParsingException(parser.getTokenLocation(), "Invalid aggregation name [" + aggregationName
+ "]. Aggregation names can contain any character except '[', ']', and '>'");
}
token = parser.nextToken();
if (token != XContentParser.Token.START_OBJECT) {
throw new ParsingException(parser.getTokenLocation(), "Aggregation definition for [" + aggregationName + " starts with a ["
+ token + "], expected a [" + XContentParser.Token.START_OBJECT + "].");
}
BaseAggregationBuilder aggBuilder = null;
AggregatorFactories.Builder subFactories = null;
Map<String, Object> metaData = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token != XContentParser.Token.FIELD_NAME) {
throw new ParsingException(
parser.getTokenLocation(), "Expected [" + XContentParser.Token.FIELD_NAME + "] under a ["
+ XContentParser.Token.START_OBJECT + "], but got a [" + token + "] in [" + aggregationName + "]",
parser.getTokenLocation());
}
final String fieldName = parser.currentName();
token = parser.nextToken();
if (token == XContentParser.Token.START_OBJECT) {
switch (fieldName) {
case "meta":
metaData = parser.map();
break;
case "aggregations":
case "aggs":
if (subFactories != null) {
throw new ParsingException(parser.getTokenLocation(),
"Found two sub aggregation definitions under [" + aggregationName + "]");
}
subFactories = parseAggregators(parser, level + 1);
break;
default:
if (aggBuilder != null) {
throw new ParsingException(parser.getTokenLocation(), "Found two aggregation type definitions in ["
+ aggregationName + "]: [" + aggBuilder.getType() + "] and [" + fieldName + "]");
}
aggBuilder = parser.namedObject(BaseAggregationBuilder.class, fieldName, aggregationName);
}
} else {
throw new ParsingException(parser.getTokenLocation(), "Expected [" + XContentParser.Token.START_OBJECT + "] under ["
+ fieldName + "], but got a [" + token + "] in [" + aggregationName + "]");
}
}
if (aggBuilder == null) {
throw new ParsingException(parser.getTokenLocation(), "Missing definition for aggregation [" + aggregationName + "]",
parser.getTokenLocation());
} else {
if (metaData != null) {
aggBuilder.setMetaData(metaData);
}
if (subFactories != null) {
aggBuilder.subAggregations(subFactories);
}
if (aggBuilder instanceof AggregationBuilder) {
factories.addAggregator((AggregationBuilder) aggBuilder);
} else {
factories.addPipelineAggregator((PipelineAggregationBuilder) aggBuilder);
}
}
}
return factories;
}
org.elasticsearch.common.xcontent.NamedXContentRegistry parseNamedObject
/**
* Parse a named object, throwing an exception if the parser isn't found. Throws an {@link NamedObjectNotFoundException} if the
* {@code categoryClass} isn't registered because this is almost always a bug. Throws an {@link NamedObjectNotFoundException} if the
* {@code categoryClass} is registered but the {@code name} isn't.
*
* @throws NamedObjectNotFoundException if the categoryClass or name is not registered
*/
public <T, C> T parseNamedObject(Class<T> categoryClass, String name, XContentParser parser, C context) throws IOException {
Map<String, Entry> parsers = registry.get(categoryClass);
if (parsers == null) {
if (registry.isEmpty()) {
// The "empty" registry will never work so we throw a better exception as a hint.
throw new NamedObjectNotFoundException("named objects are not supported for this parser");
}
throw new NamedObjectNotFoundException("unknown named object category [" + categoryClass.getName() + "]");
}
Entry entry = parsers.get(name);
if (entry == null) {
throw new NamedObjectNotFoundException(parser.getTokenLocation(), "unable to parse " + categoryClass.getSimpleName() +
" with name [" + name + "]: parser not found");
}
if (false == entry.name.match(name, parser.getDeprecationHandler())) {
/* Note that this shouldn't happen because we already looked up the entry using the names but we need to call `match` anyway
* because it is responsible for logging deprecation warnings. */
throw new NamedObjectNotFoundException(parser.getTokenLocation(),
"unable to parse " + categoryClass.getSimpleName() + " with name [" + name + "]: parser didn't match");
}
return categoryClass.cast(entry.parser.parse(parser, context));
}
}
org.elasticsearch.plugins.SearchPlugin AggregationSpec
/**
* Specification for an {@link Aggregation}.
*
* @param name the name by which this aggregation might be parsed or deserialized. Make sure that the {@link AggregationBuilder}
* returns this from {@link NamedWriteable#getWriteableName()}.
* @param reader the reader registered for this aggregation's builder. Typically a reference to a constructor that takes a
* {@link StreamInput}
* @param parser the parser the reads the aggregation builder from xcontent
* @deprecated Use the ctor that takes a {@link ContextParser} instead
*/
@Deprecated
public AggregationSpec(String name, Writeable.Reader<? extends AggregationBuilder> reader, Aggregator.Parser parser) {
super(name, reader, (p, aggName) -> parser.parse(aggName, p));
}
collect:
将aggs的collect放入 queryCollectors中
org.elasticsearch.search.aggregations.AggregationPhase preProcess
context.queryCollectors().put(AggregationPhase.class, collector);
org.elasticsearch.search.query.QueryPhase executeInternal
// plug in additional collectors, like aggregations
collectors.add(createMultiCollectorContext(searchContext.queryCollectors().values()));
shouldRescore = searchWithCollector(searchContext, searcher, query, collectors, hasFilterCollector, timeoutSet);
以leaveReader为单位进行搜索
org.apache.lucene.search.IndexSearcher search
/**
* Lower-level search API.
*
* <p>
* {@link LeafCollector#collect(int)} is called for every document. <br>
*
* <p>
* NOTE: this method executes the searches on all given leaves exclusively.
* To search across all the searchers leaves use {@link #leafContexts}.
*
* @param leaves
* the searchers leaves to execute the searches on
* @param weight
* to match documents
* @param collector
* to receive hits
* @throws BooleanQuery.TooManyClauses If a query would exceed
* {@link BooleanQuery#getMaxClauseCount()} clauses.
*/
protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector)
throws IOException {
// TODO: should we make this
// threaded...? the Collector could be sync'd?
// always use single thread:
for (LeafReaderContext ctx : leaves) { // search each subreader
final LeafCollector leafCollector;
try {
leafCollector = collector.getLeafCollector(ctx);
} catch (CollectionTerminatedException e) {
// there is no doc of interest in this reader context
// continue with the following leaf
continue;
}
BulkScorer scorer = weight.bulkScorer(ctx);
if (scorer != null) {
try {
// 收集信息
scorer.score(leafCollector, ctx.reader().getLiveDocs());
} catch (CollectionTerminatedException e) {
// collection was terminated prematurely
// continue with the following leaf
}
}
}
}
过滤桶:
org.elasticsearch.search.aggregations.bucket.filter.FilterAggregatorFactory
public class FilterAggregatorFactory extends AggregatorFactory {
private Weight weight;
private Query filter;
public FilterAggregatorFactory(String name, QueryBuilder filterBuilder, QueryShardContext queryShardContext,
AggregatorFactory parent, AggregatorFactories.Builder subFactoriesBuilder, Map<String, Object> metaData) throws IOException {
super(name, queryShardContext, parent, subFactoriesBuilder, metaData);
filter = filterBuilder.toQuery(queryShardContext);
}
/**
* Returns the {@link Weight} for this filter aggregation, creating it if
* necessary. This is done lazily so that the {@link Weight} is only created
* if the aggregation collects documents reducing the overhead of the
* aggregation in the case where no documents are collected.
*
* Note that as aggregations are initialsed and executed in a serial manner,
* no concurrency considerations are necessary here.
*/
public Weight getWeight() {
if (weight == null) {
IndexSearcher contextSearcher = queryShardContext.searcher();
try {
weight = contextSearcher.createWeight(contextSearcher.rewrite(filter), ScoreMode.COMPLETE_NO_SCORES, 1f);
} catch (IOException e) {
throw new AggregationInitializationException("Failed to initialse filter", e);
}
}
return weight;
}
@Override
public Aggregator createInternal(SearchContext searchContext,
Aggregator parent,
boolean collectsFromSingleBucket,
List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException {
return new FilterAggregator(name, () -> this.getWeight(), factories, searchContext, parent, pipelineAggregators, metaData);
}
}
Query阶段的获取的结果都会走下面的流程
org.elasticsearch.action.search.AbstractSearchAsyncAction#onShardResult
org.elasticsearch.action.search.SearchPhaseController.QueryPhaseResultConsumer#consumeResult
org.elasticsearch.action.search.SearchPhaseController.QueryPhaseResultConsumer#consumeInternal
private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
if (index == bufferSize) {
if (hasAggs) {
ReduceContext reduceContext = controller.reduceContextFunction.apply(false);
// 会在此时机执行reduce操作
InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(Arrays.asList(aggsBuffer), reduceContext);
Arrays.fill(aggsBuffer, null);
aggsBuffer[0] = reducedAggs;
}
if (hasTopDocs) {
TopDocs reducedTopDocs = mergeTopDocs(Arrays.asList(topDocsBuffer),
// we have to merge here in the same way we collect on a shard
querySearchResult.from() + querySearchResult.size(), 0);
Arrays.fill(topDocsBuffer, null);
topDocsBuffer[0] = reducedTopDocs;
}
numReducePhases++;
index = 1;
if (hasAggs) {
progressListener.notifyPartialReduce(progressListener.searchShards(processedShards),
topDocsStats.getTotalHits(), aggsBuffer[0], numReducePhases);
}
}
final int i = index++;
if (hasAggs) {
aggsBuffer[i] = (InternalAggregations) querySearchResult.consumeAggs();
}
if (hasTopDocs) {
final TopDocsAndMaxScore topDocs = querySearchResult.consumeTopDocs(); // can't be null
topDocsStats.add(topDocs, querySearchResult.searchTimedOut(), querySearchResult.terminatedEarly());
setShardIndex(topDocs.topDocs, querySearchResult.getShardIndex());
topDocsBuffer[i] = topDocs.topDocs;
}
processedShards[querySearchResult.getShardIndex()] = querySearchResult.getSearchShardTarget();
}
org.elasticsearch.action.search.SearchPhaseController#newSearchPhaseResults
ArraySearchPhaseResults<SearchPhaseResult> newSearchPhaseResults(SearchProgressListener listener,
SearchRequest request,
int numShards) {
SearchSourceBuilder source = request.source();
boolean isScrollRequest = request.scroll() != null;
final boolean hasAggs = source != null && source.aggregations() != null;
final boolean hasTopDocs = source == null || source.size() != 0;
final int trackTotalHitsUpTo = resolveTrackTotalHits(request);
if (isScrollRequest == false && (hasAggs || hasTopDocs)) {
// no incremental reduce if scroll is used - we only hit a single shard or sometimes more...
// 由BatchedReduceSize 和 numShards对比来确定使用哪个?
if (request.getBatchedReduceSize() < numShards) {
// only use this if there are aggs and if there are more shards than we should reduce at once
return new QueryPhaseResultConsumer(listener, this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs,
trackTotalHitsUpTo, request.isFinalReduce());
}
}
return new ArraySearchPhaseResults<SearchPhaseResult>(numShards) {
@Override
void consumeResult(SearchPhaseResult result) {
super.consumeResult(result);
listener.notifyQueryResult(result.queryResult().getShardIndex());
}
@Override
ReducedQueryPhase reduce() {
List<SearchPhaseResult> resultList = results.asList();
final ReducedQueryPhase reducePhase =
reducedQueryPhase(resultList, isScrollRequest, trackTotalHitsUpTo, request.isFinalReduce());
listener.notifyReduce(listener.searchShards(resultList), reducePhase.totalHits, reducePhase.aggregations);
return reducePhase;
}
};
}
参考:
es lucene搜索及聚合流程源码分析https://www.bbsmax.com/A/kmzLMnmbJG/Collector(一)-html
https://www.amazingkoala.com.cn/Lucene/Search/2019/0812/82.html
ElasticSearch global ordinals&execution_hint - 简书1、global ordinals (1)what's this? 当我们使用doc values或者fielddata存储时,在磁盘中存储的值不是真正的字段值,而是一个字典...https://www.jianshu.com/p/6bf310afdb21
更多推荐
所有评论(0)