Gradle 依赖

dependencies {
	implementation group: 'org.elasticsearch', name: 'elasticsearch', version: '7.11.1'
	implementation group: 'org.elasticsearch.client', name: 'elasticsearch-rest-high-level-client', version: '7.11.1'
}

Client 初始化

// 连接多个客户端
RestHighLevelClient client = new RestHighLevelClient(
        RestClient.builder(
                new HttpHost("localhost", 9200, "http"),
                new HttpHost("localhost", 9201, "http"))
                );

// 关闭
client.close();

RequestOptions

  1. 单例 RequestOption
// 单例
private static final RequestOptions COMMON_OPTIONS;

static {
	// 30M缓存, 默认100M
    RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();    
    builder.addHeader("Authorization", "Bearer " + TOKEN); 
    builder.setHttpAsyncResponseConsumerFactory(           
        new HttpAsyncResponseConsumerFactory
            .HeapBufferedResponseConsumerFactory(30 * 1024 * 1024 * 1024));
	options.addHeader("cats", "knock things off of other things");            
    COMMON_OPTIONS = builder.build();
}

// 设置 Header
request.setOptions(COMMON_OPTIONS);
  1. 自定义 RequestOption
RequestOptions.Builder options = COMMON_OPTIONS.toBuilder();
options.addHeader("cats", "knock things off of other things");
request.setOptions(options);

支持的API

单文档API

Index API

调用方法
  1. 方法一
IndexRequest request = new IndexRequest("posts");
     request.id("1");
     String jsonStr = "{" +
             "\"user\":\"kimchy\"," +
             "\"postDate\":\"2013-01-30\"," +
             "\"message\":\"trying out elasticsearch\"" +
             "}";
request.source(jsonStr, XContentType.JSON);
  • 索引
  • 索引id
  • 文档内容
  1. 方法二
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("user", "kimchy");
jsonMap.put("postDate", new Date());
jsonMap.put("message", "try out elasticsearch");
IndexRequest indexRequest = new IndexRequest("posts").id("1").source(jsonMap);
  1. 方法三
 try {
     XContentBuilder builder = XContentFactory.jsonBuilder();
     builder.startObject();
     {
          builder.field("user", "kimchy");
          builder.timeField("postDate", new Date());
          builder.field("message", "trying out Elasticsearch");
      }
      builder.endObject();
      IndexRequest indexRequest = new IndexRequest("posts").id("1").source(builder);
}catch (Exception e){
      e.printStackTrace();
}
  1. 方法四
 IndexRequest indexRequest = new IndexRequest("posts")
                .id("1")
                .source(
                        "user", "kimchy",
                        "postDate", new Date(),
                        "message", "try out Elasticsearch"
                );
可选参数
// routing value
indexRequest.routing("routing");
// time out
indexRequest.timeout("1s");
indexRequest.timeout(TimeValue.timeValueSeconds(1));
// refresh
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
indexRequest.setRefreshPolicy("wait_for");
// version
indexRequest.version(2);

indexRequest.versionType(VersionType.EXTERNAL);
// operate type
indexRequest.opType(DocWriteRequest.OpType.CREATE);
indexRequest.opType("create");
// pipeline name
indexRequest.setPipeline("pipeline");

同步/异步执行
 // 同步执行
try {
    IndexResponse indexResponse = highClient.index(indexRequest, RequestOptions.DEFAULT);
}catch (Exception e){
    e.printStackTrace();
}

// 异步执行
try {
    highClient.indexAsync(indexRequest, RequestOptions.DEFAULT, new ActionListener<IndexResponse>() {
        @Override
        public void onResponse(IndexResponse indexResponse) {
        }

        @Override
        public void onFailure(Exception e) {
        }
    });
}catch (Exception e){
    e.printStackTrace();
}
IndexResponse
IndexResponse indexResponse = highClient.index(indexRequest, RequestOptions.DEFAULT);

String index = indexResponse.getIndex();

// 新建数据或者更新
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED){
}else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED){
}
// 条数
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()){
}
if (shardInfo.getFailed() > 0){
    for (ReplicationResponse.ShardInfo.Failure failure: shardInfo.getFailures()){
        String reason = failure.reason();
    }
}
版本冲突
IndexRequest indexRequest = new IndexRequest("posts")
       .id("1")
       .source(
               "user", "kimchy",
               "postDate", new Date(),
               "message", "try out Elasticsearch"
       );

// 版本冲突: 编号
IndexRequest request = new IndexRequest("posts")
       .id("1")
       .source("field", "value")
       .setIfSeqNo(10L)
       .setIfPrimaryTerm(20);
try {
   IndexResponse indexResponse = highClient.index(indexRequest, RequestOptions.DEFAULT);
}catch (ElasticsearchException e){
   if (e.status() == RestStatus.CONFLICT){

   }
}catch (Exception e){
}

// 版本冲突: 文件已存在
IndexRequest request1 = new IndexRequest("posts")
       .id("1")
       .source("field", "value")
       .opType(DocWriteRequest.OpType.CREATE);
try {
   IndexResponse response = highClient.index(request1, RequestOptions.DEFAULT);
}catch (ElasticsearchException e){
   if (e.status() == RestStatus.CONFLICT){
   }
}catch (Exception e){
   
}

Get API

调用
GetRequest getRequest = new GetRequest("posts", "1");
选项
GetRequest getRequest = new GetRequest("posts", "1");

getRequest.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE);  // 禁用源搜索,默认

// 搜索字段、空数组
String[] includes = new String[]{"message", "*Date"};
String[] excludes = Strings.EMPTY_ARRAY;
FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
getRequest.fetchSourceContext(fetchSourceContext);

// 过滤字段、搜索空数组
String[] includes = Strings.EMPTY_ARRAY;
String[] excludes = new String[]{"message"};
FetchSourceContext fetchSourceContext1 = new FetchSourceContext(true, includes, excludes);
getRequest.fetchSourceContext(fetchSourceContext);

try {
    getRequest.storedFields("message");
    GetResponse getResponse = highClient.get(getRequest,  RequestOptions.DEFAULT);
    String message = getResponse.getField("message").getValue();
}catch (Exception e){
    e.printStackTrace();
}

// routing value
getRequest.routing("routing");
// 执行优先级
getRequest.preference("preference");

getRequest.realtime(false);
// 请求执行之前,是否先刷新, 默认false
getRequest.refresh(true);
// 版本
getRequest.version(2);

getRequest.versionType(VersionType.EXTERNAL);
同步执行
GetResponse getResponse = highClient.get(getRequest, RequestOptions.DEFAULT);
异步执行
highClient.getAsync(getRequest, RequestOptions.DEFAULT, new ActionListener<GetResponse>() {
    @Override
    public void onResponse(GetResponse documentFields) {
    }

    @Override
    public void onFailure(Exception e) {
    }
});
Response
  1. 示例
highClient.getAsync(getRequest, RequestOptions.DEFAULT, new ActionListener<GetResponse>() {
    @Override
    public void onResponse(GetResponse documentFields) {
        String index = documentFields.getIndex();
        String id = documentFields.getId();
        if (documentFields.isExists()){
            long version = documentFields.getVersion();
            String sourceAsString = documentFields.getSourceAsString();  // 字符串
            Map<String, Object> sourceAsMap = documentFields.getSourceAsMap();  // map
            byte[] sourceAsBytes = documentFields.getSourceAsBytes(); // 字节流
        }else {

        }
    }

    @Override
    public void onFailure(Exception e) {
    }
});
  1. 当索引不存在的时候
// 当索引不存在的时候,抛出异常
GetRequest getRequest = new GetRequest("does_not_exist", "1");
try {
    GetResponse response = highClient.get(getRequest, RequestOptions.DEFAULT);
}catch (ElasticsearchException e){
    if (e.status() == RestStatus.NOT_FOUND){
    }
}catch (Exception e){
}
  1. 指定版本
try {
	 GetRequest getRequest = new GetRequest("posts", "1")
	              .version(2);
	 GetResponse response = highClient.get(getRequest, RequestOptions.DEFAULT);
 }catch (ElasticsearchException e){
      // 版本冲突
      if (e.status() == RestStatus.CONFLICT){
      }
  }catch (Exception e){
  }

GetSourceApi

调用
GetSourceRequest getSourceRequest = new GetSourceRequest("posts", "1");
选项
String[] includes = Strings.EMPTY_ARRAY;
String[] excludes = new String[]{"postDate"};
getSourceRequest.fetchSourceContext(
        new FetchSourceContext(true, includes, excludes)  // 第一个参数必须是true
);
// route value
getSourceRequest.routing("routing");
// 设置执行优先顺序
getSourceRequest.preference("preference");

getSourceRequest.realtime(false);

getSourceRequest.refresh(true);
同步调用
GetSourceResponse response = highClient.getSource(getSourceRequest, RequestOptions.DEFAULT);
异步调用
try {
       highClient.getSourceAsync(getSourceRequest, RequestOptions.DEFAULT, new ActionListener<GetSourceResponse>() {
           @Override
           public void onResponse(GetSourceResponse getSourceResponse) {
           }

           @Override
           public void onFailure(Exception e) {
           }
       });
}catch (Exception e){
     e.printStackTrace();
 }
Response
 Map<String, Object> source = getSourceResponse.getSource();

Exists API

调用方法
 // 取消不用的返回
GetRequest getRequest = new GetRequest("posts", "1");
getRequest.fetchSourceContext(new FetchSourceContext(false));
getRequest.storedFields("_none_");
同步调用
highClient.exists(getRequest, RequestOptions.DEFAULT);
异步调用
highClient.existsAsync(getRequest, RequestOptions.DEFAULT, new ActionListener<Boolean>() {
     @Override
     public void onResponse(Boolean aBoolean) {
     }
     @Override
     public void onFailure(Exception e) {
     }
 });

Delete API

调用方法
DeleteRequest request = new DeleteRequest("posts", "1");        

选项

 // route name
deleteRequest.routing("routing");
// time out
deleteRequest.timeout(TimeValue.timeValueMinutes(2));
deleteRequest.timeout("2m");
// refresh
deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
deleteRequest.setRefreshPolicy("wait_for");
// version
deleteRequest.version(2);
// version type
deleteRequest.versionType(VersionType.EXTERNAL);
同步调用
DeleteResponse deleteResponse = highClient.delete(deleteRequest, RequestOptions.DEFAULT);
异步调用
highClient.deleteAsync(deleteRequest, RequestOptions.DEFAULT, new ActionListener<DeleteResponse>() {
    @Override
    public void onResponse(DeleteResponse deleteResponse) {
    }

    @Override
    public void onFailure(Exception e) {
    }
});

Response

 String index = deleteResponse.getIndex();
 String id = deleteResponse.getId();
 long version = deleteResponse.getVersion();
 ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
 if (shardInfo.getTotal() != shardInfo.getSuccessful()){
 }
 if (shardInfo.getFailed() > 0){
     for (ReplicationResponse.ShardInfo.Failure failure: shardInfo.getFailures()){
         String reason = failure.reason();
     }
 }
删除一个不存在的索引
DeleteRequest deleteRequest1 = new DeleteRequest("posts", "does_not_exist");
DeleteResponse deleteResponse1 = highClient.delete(deleteRequest1, RequestOptions.DEFAULT);
if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND){
}
版本冲突
try {
    DeleteResponse deleteResponse2 = highClient.delete(
            new DeleteRequest("posts", "1").setIfSeqNo(100).setIfPrimaryTerm(2),
            RequestOptions.DEFAULT
    );
}catch (ElasticsearchException e){
    if (e.status() == RestStatus.CONFLICT){
    }
}catch (Exception e){
}

Update API

调用方法
  1. 方法一
UpdateRequest request = new UpdateRequest("posts", "1");

UpdateRequest updateRequest = new UpdateRequest("posts", "1")
               .doc(
                       "updated", new Date(),
                       "reason", "daily update"
               );
  1. 方法二、脚本
Map<String, Object> parameters = Collections.singletonMap("count", 4);
Script inline = new Script(ScriptType.INLINE, "painless", "ctx._source.field += params.count", parameters);
request.script(inline);

Script stored = new Script(ScriptType.STORED, null, "increment-field", parameters);
request.script(stored);
  1. 方法三、document
UpdateRequest request1 = new UpdateRequest("posts", "1");
String jsonString = "{" +
       "\"updated\":\"2017-01-01\"," +
       "\"reason\":\"daily update\"" +
       "}";
request.doc(jsonString, XContentType.JSON);

Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("updated", new Date());
jsonMap.put("reason", "daily update");
UpdateRequest updateRequest = new UpdateRequest("posts", "1")
       .doc(jsonMap);
  1. 方法四、XContentBuilder
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
    builder.timeField("updated", new Date());
    builder.field("reason", "daily update");
}
builder.endObject();
updateRequest updateRequest = new UpdateRequest("posts", "1")
        .doc(builder);
upserts
UpdateRequest request = new UpdateRequest("posts", "1");
        String jsonString = "\"created\":\"2017-01-01\"";
        request.upsert(jsonString, XContentType.JSON);
选项
 request.routing("routing");

request.timeout(TimeValue.timeValueSeconds(1));
request.timeout("1s");

request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
request.setRefreshPolicy("wait_for");

request.retryOnConflict(3);

request.fetchSource(true);  // 默认false

String[] includes = new String[]{"update", "r*"};
String[] excludes = Strings.EMPTY_ARRAY;
request.fetchSource(
        new FetchSourceContext(true, includes, excludes)
);

String[] includes = Strings.EMPTY_ARRAY;
String[] excludes = new String[]{"update"};
request.fetchSource(
        new FetchSourceContext(true, includes, excludes)
);

request.setIfSeqNo(2L);
request.setIfPrimaryTerm(1L);

request.detectNoop(false);

request.scriptedUpsert(true);

request.docAsUpsert(true);

request.waitForActiveShards(2);
request.waitForActiveShards(ActiveShardCount.ALL);
同步选项
UpdateResponse updateResponse = highClient.update(request, RequestOptions.DEFAULT);
异步选项
highClient.updateAsync(request, RequestOptions.DEFAULT, new ActionListener<UpdateResponse>() {
    @Override
    public void onResponse(UpdateResponse updateResponse) {
    }

    @Override
    public void onFailure(Exception e) {
    }
});
Response
String index = updateResponse.getIndex();
String id = updateResponse.getId();
long version = updateResponse.getVersion();
if (updateResponse.getResult() == DocWriteResponse.Result.CREATED){
}else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED){
}else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP){
}

Terms Vectors API (统计)

调用
TermVectorsRequest request = new TermVectorsRequest("authors", "1");
request.setFields("user");

XContentBuilder docBuilder = XContentFactory.jsonBuilder();
docBuilder.startObject().field("user", "guest-user").endObject();
TermVectorsRequest request1 = new TermVectorsRequest("authors", docBuilder);
选项
request.setFieldStatistics(false);  // 字段统计
request.setTermStatistics(true); 
request.setPositions(false);
request.setOffsets(false);
request.setPayloads(false);

Map<String, Integer> filterSettings = new HashMap<>();
filterSettings.put("max_num_terms", 3);
filterSettings.put("min_term_freq", 1);
filterSettings.put("max_term_freq", 10);
filterSettings.put("min_doc_freq", 1);
filterSettings.put("max_doc_freq", 100);
filterSettings.put("min_word_length", 1);
filterSettings.put("max_word_length", 10);
request.setFilterSettings(filterSettings);

Map<String, String> perFieldAnalyzer = new HashMap<>();
perFieldAnalyzer.put("user", "keyword");
request.setPerFieldAnalyzer(perFieldAnalyzer);

request.setRealtime(false);
request.setRouting("routing");
同步执行
TermVectorsResponse response = highClient.termvectors(request, RequestOptions.DEFAULT);
异步执行
highClient.termvectorsAsync(request, RequestOptions.DEFAULT, new ActionListener<TermVectorsResponse>() {
    @Override
    public void onResponse(TermVectorsResponse termVectorsResponse) {
    }

    @Override
    public void onFailure(Exception e) {
    }
});
Response
TermVectorsResponse response = highClient.termvectors(request, RequestOptions.DEFAULT);
String index = response.getIndex();
String type = request.getType();
String id = request.getId();
boolean found = response.getFound();

for (TermVectorsResponse.TermVector tv : response.getTermVectorsList()) {
     String fieldname = tv.getFieldName();
     int docCount = tv.getFieldStatistics().getDocCount();
     long sumTotalTermFreq = tv.getFieldStatistics().getSumTotalTermFreq();

     if (tv.getTerms() != null) {
         List<TermVectorsResponse.TermVector.Term> terms = tv.getTerms();
         for (TermVectorsResponse.TermVector.Term term: terms){
             String termStr = term.getTerm();
             int termFreq = term.getTermFreq();
             int docFreq = term.getDocFreq();
             long totalTermFreq = term.getTotalTermFreq();
             float score = term.getScore();
             if (term.getTokens() != null){
                 List<TermVectorsResponse.TermVector.Token> tokens = term.getTokens();
                 for (TermVectorsResponse.TermVector.Token token: tokens){
                     int position = token.getPosition();
                     int startOffset = token.getStartOffset();
                     int endOffset = token.getEndOffset();
                     String payload = token.getPayload();
                 }
             }
         }
     }
 }
参考
  1. 最新Java Elasticsearch 7.10教程(六)-词频统计

多文档API

Bulk API

调用方法
BulkRequest request = new BulkRequest();
      request.add(
              new IndexRequest("posts").id("1").source(XContentType.JSON, "field", "foo")
      );
      request.add(
              new IndexRequest("posts").id("2").source(XContentType.JSON, "field", "bar")
      );

BulkRequest request = new BulkRequest();
request.add(new DeleteRequest("posts", "3"));
request.add(new UpdateRequest("posts", "2").doc(XContentType.JSON, "other", "test"));
request.add(new IndexRequest("posts").id("4").source(XContentType.JSON, "field", "baz"));
选项
 request.timeout(TimeValue.timeValueMinutes(2));
 request.timeout("2m");

 request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
 request.setRefreshPolicy("wait_for");

 request.waitForActiveShards(2);
 request.waitForActiveShards(ActiveShardCount.ALL);

 request.pipeline("pipelineId");

 request.routing("routeId");
同步执行
 BulkResponse bulkResponse = highClient.bulk(request, RequestOptions.DEFAULT);
异步执行
highClient.bulkAsync(request, RequestOptions.DEFAULT, new ActionListener<BulkResponse>() {
  @Override
    public void onResponse(BulkResponse bulkItemResponses) {
    }

    @Override
    public void onFailure(Exception e) {
    }
});
Response
 BulkResponse bulkResponse = highClient.bulk(request, RequestOptions.DEFAULT);

 for (BulkItemResponse bulkItemResponse: bulkResponse){
     DocWriteResponse  itemResponse = bulkItemResponse.getResponse();

     switch (bulkItemResponse.getOpType()){
         case INDEX:
         case CREATE:
             IndexResponse indexResponse = (IndexResponse)itemResponse;
             break;
         case DELETE:
             UpdateResponse updateResponse = (UpdateResponse)itemResponse;
             break;
         case UPDATE:
             DeleteResponse deleteResponse = (DeleteResponse)itemResponse;
             break;
     }
 }

错误检查

// 有一个失败,为true
if (bulkResponse.hasFailures()){
}

// 检查每个请求的response
for (BulkItemResponse bulkItemResponse: bulkResponse){
    if (bulkItemResponse.isFailed()){
        BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
    }
}
BulkProcessor
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
      @Override
        public void beforeBulk(long executionId, BulkRequest request) {
            int numberOfActions = request.numberOfActions();
            log.debug("Executing bulk [{}] with {} requests",
                    executionId, numberOfActions);
        }

        @Override
        public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
            if (response.hasFailures()) {
                log.warn("Bulk [{}] executed with failures", executionId);
            } else {
                log.debug("Bulk [{}] completed in {} milliseconds",
                        executionId, response.getTook().getMillis());
            }
        }

        @Override
        public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
            log.error("Failed to execute bulk", failure);
        }
};

BulkProcessor.Builder
 builder = BulkProcessor.builder(
        (r, bulkListener) -> highClient.bulkAsync(r, RequestOptions.DEFAULT, bulkListener), listener
);
builder.setBulkActions(500);  // -1 不限
builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB));  // -1 不限
builder.setConcurrentRequests(0);  // 0 只允许一个请求
builder.setFlushInterval(TimeValue.timeValueSeconds(10L));
builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));  // 重试间隔和次数

BulkProcessor bulkProcessor = builder.build();
bulkProcessor.add(new DeleteRequest("posts", "3"));
bulkProcessor.add(new UpdateRequest("posts", "2").doc(XContentType.JSON, "other", "test"));
bulkProcessor.add(new IndexRequest("posts").id("4").source(XContentType.JSON, "field", "baz"));

关闭
 boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);  // 异步关闭, 如果超时未完成返回false

 bulkProcessor.close(); // 直接关闭

Multi-Get API

调用方法
MultiGetRequest request = new MultiGetRequest();
request.add(new MultiGetRequest.Item("index", "example_id"));
request.add(new MultiGetRequest.Item("index", "another_id"));
选项
 // 搜索包含字段
String[] includes = new String[] {"foo", "*r"};
String[] excludes = Strings.EMPTY_ARRAY;
FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
request.add(new MultiGetRequest.Item("index", "example_id").fetchSourceContext(fetchSourceContext));
// 剔除搜索字段
String[] includes = Strings.EMPTY_ARRAY;
String[] excludes = new String[]{"foo", "*r"};
fetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
request.add(new MultiGetRequest.Item("index", "example_id").fetchSourceContext(fetchSourceContext));

request.add(new MultiGetRequest.Item("index", "with_routing").routing("some_routing"));
        request.add(new MultiGetRequest.Item("index", "with_version").versionType(VersionType.EXTERNAL).version(10123L));
 
request.preference("some_preference");
request.realtime(false);
request.refresh(true);        
Response
request.add(new MultiGetRequest.Item("index", "example_id").storedFields("foo"));
MultiGetResponse responses = highClient.mget(request, RequestOptions.DEFAULT);
MultiGetItemResponse itemResponse = responses.getResponses()[0];
String value = itemResponse.getResponse().getField("foo").getValue(); 
同步
MultiGetResponse response = highClient.mget(request, RequestOptions.DEFAULT);
异步
highClient.mgetAsync(request, RequestOptions.DEFAULT, new ActionListener<MultiGetResponse>() {
     @Override
     public void onResponse(MultiGetResponse multiGetItemResponses) {
     }

     @Override
     public void onFailure(Exception e) {
     }
 });
Response
MultiGetResponse response = highClient.mget(request, RequestOptions.DEFAULT);
MultiGetItemResponse firstItem = response.getResponses()[0];
GetResponse firstGet = firstItem.getResponse();
String index = firstItem.getIndex();
String id = firstItem.getId();
if (firstGet.isExists()){
    long version = firstGet.getVersion();
    String sourceAsString = firstGet.getSourceAsString();
    Map<String, Object> sourceAsMap = firstGet.getSourceAsMap();
    byte[] sourceAsBytes = firstGet.getSourceAsBytes();
}
异常
MultiGetRequest request = new MultiGetRequest();
request.add(new MultiGetRequest.Item("index", "example_id").version(1000L));
MultiGetResponse response = highClient.mget(request, RequestOptions.DEFAULT);
MultiGetItemResponse item = response.getResponses()[0];
Exception e = item.getFailure().getFailure();
ElasticsearchException ee = (ElasticsearchException) e;

Reindex API (重新索引记录)

调用
 ReindexRequest request = new ReindexRequest();
request.setSourceIndices("source1", "source2");
选项
ReindexRequest request = new ReindexRequest();
request.setSourceIndices("source1", "source2");
request.setDestIndex("dest");
request.setDestVersionType(VersionType.EXTERNAL);
request.setDestOpType("create");
request.setConflicts("proceed");
request.setSourceQuery(new TermQueryBuilder("user", "kimchy"));
request.setMaxDocs(10);
request.setSourceBatchSize(100);
request.setDestPipeline("my_pipeline");
执行脚本
request.setScript(
        new Script(
                ScriptType.INLINE,
                "painless",
                "if (ctx._source.user == 'kimchy') { ctx._source.likes++; }",
                Collections.emptyMap()
        )
);
设置远程信息
request.setRemoteInfo(
     new RemoteInfo(
                 "http", remoteHost, remotePort, null,
                 new BytesArray(new MatchAllQueryBuilder().toString()),
                 user, password, Collections.emptyMap(),
                 new TimeValue(100, TimeUnit.MICROSECONDS),
                 new TimeValue(100, TimeUnit.SECONDS)
         )
 );
request.setSlices(2);
request.setScroll(TimeValue.timeValueMinutes(10));  // time of keeps the "search context" alive.
选项
request.setTimeout(TimeValue.timeValueMinutes(2));
request.setRefresh(true);
同步
BulkByScrollResponse bulkByScrollResponse = highClient.reindex(request, RequestOptions.DEFAULT);
异步
highClient.reindexAsync(request, RequestOptions.DEFAULT, new ActionListener<BulkByScrollResponse>() {
    @Override
    public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
    }

    @Override
    public void onFailure(Exception e) {
    }
});
Response
TimeValue timeValue = bulkByScrollResponse.getTook();
boolean timeOut = bulkByScrollResponse.isTimedOut();
long totalDocs = bulkByScrollResponse.getTotal();
long updateDocs = bulkByScrollResponse.getUpdated();
long createDocs = bulkByScrollResponse.getCreated();
long deleteDocs = bulkByScrollResponse.getDeleted();
long batches = bulkByScrollResponse.getBatches();
long noops = bulkByScrollResponse.getNoops();
long versionConflicts = bulkByScrollResponse.getVersionConflicts();
long bulkRetries = bulkByScrollResponse.getBulkRetries();
long searchRetries = bulkByScrollResponse.getSearchRetries();
TimeValue throttledMillis = bulkByScrollResponse.getStatus().getThrottled();
TimeValue throttledUntilMillis = bulkByScrollResponse.getStatus().getThrottledUntil();
List<ScrollableHitSource.SearchFailure> searchFailures = bulkByScrollResponse.getSearchFailures();
List<BulkItemResponse.Failure> bulkFailures = bulkByScrollResponse.getBulkFailures();


Update By Query API

作用

通过一个 index 更新另外一个 index

调用方法
 UpdateByQueryRequest request = new UpdateByQueryRequest("source1", "source2");
选项
 request.setConflicts("proceed");
 request.setQuery(new TermQueryBuilder("user", "kimchy"));
 request.setMaxDocs(10);
 request.setBatchSize(100);
 request.setPipeline("my_pipeline");

 request.setScript(
         new Script(
                 ScriptType.INLINE,
                 "painless",
                 "if (ctx._source.user == 'kimchy') {ctx._source.likes++; }",
                 Collections.emptyMap()
         )
 );
 request.setSlices(2);
 request.setScroll(TimeValue.timeValueMinutes(10));
 request.setRouting("=cat");  // 筛选匹配

 request.setTimeout(TimeValue.timeValueMinutes(2));
 request.setRefresh(true);
 request.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
同步执行
BulkByScrollResponse bulkByScrollResponse = highClient.updateByQuery(request, RequestOptions.DEFAULT);
异步执行
highClient.updateByQueryAsync(request, RequestOptions.DEFAULT, new ActionListener<BulkByScrollResponse>() {
    @Override
    public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
    }

    @Override
    public void onFailure(Exception e) {
    }
});
Response
TimeValue timeTaken = bulkResponse.getTook(); 
boolean timedOut = bulkResponse.isTimedOut(); 
long totalDocs = bulkResponse.getTotal(); 
long updatedDocs = bulkResponse.getUpdated(); 
long deletedDocs = bulkResponse.getDeleted(); 
long batches = bulkResponse.getBatches(); 
long noops = bulkResponse.getNoops(); 
long versionConflicts = bulkResponse.getVersionConflicts(); 
long bulkRetries = bulkResponse.getBulkRetries(); 
long searchRetries = bulkResponse.getSearchRetries(); 
TimeValue throttledMillis = bulkResponse.getStatus().getThrottled(); 
TimeValue throttledUntilMillis =
        bulkResponse.getStatus().getThrottledUntil(); 
List<ScrollableHitSource.SearchFailure> searchFailures =
        bulkResponse.getSearchFailures(); 
List<BulkItemResponse.Failure> bulkFailures =
        bulkResponse.getBulkFailures(); 

Deleted By Query API

调用方法
DeleteByQueryRequest request = new DeleteByQueryRequest("source1", "source2");
选项
request.setConflicts("proceed");
request.setQuery(new TermQueryBuilder("user", "kimchy"));
request.setMaxDocs(10);
request.setBatchSize(100);
request.setSlices(2);
request.setScroll(TimeValue.timeValueMinutes(10));
request.setRouting("=cat");

request.setTimeout(TimeValue.timeValueMinutes(2));
request.setRefresh(true);
request.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
同步
BulkByScrollResponse bulkByScrollResponse = highClient.deleteByQuery(request, RequestOptions.DEFAULT);
异步
highClient.deleteByQueryAsync(request, RequestOptions.DEFAULT, new ActionListener<BulkByScrollResponse>() {
    @Override
    public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
    }

    @Override
    public void onFailure(Exception e) {
    }
});
Response
TimeValue timeTaken = bulkResponse.getTook(); 
boolean timedOut = bulkResponse.isTimedOut(); 
long totalDocs = bulkResponse.getTotal(); 
long deletedDocs = bulkResponse.getDeleted(); 
long batches = bulkResponse.getBatches(); 
long noops = bulkResponse.getNoops(); 
long versionConflicts = bulkResponse.getVersionConflicts(); 
long bulkRetries = bulkResponse.getBulkRetries(); 
long searchRetries = bulkResponse.getSearchRetries(); 
TimeValue throttledMillis = bulkResponse.getStatus().getThrottled(); 
TimeValue throttledUntilMillis =
        bulkResponse.getStatus().getThrottledUntil(); 
List<ScrollableHitSource.SearchFailure> searchFailures =
        bulkResponse.getSearchFailures(); 
List<BulkItemResponse.Failure> bulkFailures =
        bulkResponse.getBulkFailures(); 

Rethrollte API

调用方法
// taskId 是另外一个任务的标识id
RethrottleRequest rethrottleRequest1 = new RethrottleRequest(taskId, 100.0F);
highClient.reindexRethrottle(rethrottleRequest, RequestOptions.DEFAULT);
highClient.updateByQueryRethrottle(rethrottleRequest, RequestOptions.DEFAULT);
highClient.deleteByQueryRethrottle(rethrottleRequest, RequestOptions.DEFAULT);
异步调用
highClient.reindexRethrottleAsync(request, RequestOptions.DEFAULT, new ActionListener<ListTasksResponse>() {
    @Override
    public void onResponse(ListTasksResponse listTasksResponse) {
    }

    @Override
    public void onFailure(Exception e) {
    }
});

Multi Terms API

调用方法
MultiTermVectorsRequest request = new MultiTermVectorsRequest();
TermVectorsRequest termVectorsRequest = new TermVectorsRequest("authors", "1");
termVectorsRequest.setFields("user");
request.add(termVectorsRequest);

XContentBuilder docBuilder = XContentFactory.jsonBuilder();
docBuilder.startObject().field("user", "guest-user").endObject();
TermVectorsRequest termVectorsRequest1 = new TermVectorsRequest("authors", docBuilder);
request.add(termVectorsRequest1);

TermVectorsRequest termVectorsRequest2 = new TermVectorsRequest("authors", "fake_id");
termVectorsRequest2.setFields("user");
String[] ids = {"1", "2"};
MultiTermVectorsRequest request1 = new MultiTermVectorsRequest(ids, termVectorsRequest2);
同步调用
MultiTermVectorsResponse response = highClient.mtermvectors(request, RequestOptions.DEFAULT);
异步调用
highClient.mtermvectorsAsync(request, RequestOptions.DEFAULT, new ActionListener<MultiTermVectorsResponse>() {
    @Override
    public void onResponse(MultiTermVectorsResponse multiTermVectorsResponse) {
    }

    @Override
    public void onFailure(Exception e) {
    }
});
Response
List<TermVectorsResponse> termVectorsResponseList = response.getTermVectorsResponses();
if (termVectorsResponseList != null){
    for (TermVectorsResponse vectorsResponse: termVectorsResponseList){            
    }
}

[1] java-rest-high-search

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐