elasticsearch-rest-high-level-client
文章目录Gradle 依赖Client 初始化RequestOptions支持的API单文档APIIndex APIGet APIGetSourceApiExists APIDelete APIUpdate APITerms Vectors API多文档APIBulk APIMulti-Get APIReindex APIUpdate By Query APIDeleted By Query AP
·
Gradle 依赖
dependencies {
implementation group: 'org.elasticsearch', name: 'elasticsearch', version: '7.11.1'
implementation group: 'org.elasticsearch.client', name: 'elasticsearch-rest-high-level-client', version: '7.11.1'
}
Client 初始化
// 连接多个客户端
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("localhost", 9200, "http"),
new HttpHost("localhost", 9201, "http"))
);
// 关闭
client.close();
RequestOptions
- 单例
RequestOption
// 单例
private static final RequestOptions COMMON_OPTIONS;
static {
// 30M缓存, 默认100M
RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
builder.addHeader("Authorization", "Bearer " + TOKEN);
builder.setHttpAsyncResponseConsumerFactory(
new HttpAsyncResponseConsumerFactory
.HeapBufferedResponseConsumerFactory(30 * 1024 * 1024 * 1024));
options.addHeader("cats", "knock things off of other things");
COMMON_OPTIONS = builder.build();
}
// 设置 Header
request.setOptions(COMMON_OPTIONS);
- 自定义
RequestOption
RequestOptions.Builder options = COMMON_OPTIONS.toBuilder();
options.addHeader("cats", "knock things off of other things");
request.setOptions(options);
支持的API
单文档API
Index API
调用方法
- 方法一
IndexRequest request = new IndexRequest("posts");
request.id("1");
String jsonStr = "{" +
"\"user\":\"kimchy\"," +
"\"postDate\":\"2013-01-30\"," +
"\"message\":\"trying out elasticsearch\"" +
"}";
request.source(jsonStr, XContentType.JSON);
- 索引
- 索引id
- 文档内容
- 方法二
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("user", "kimchy");
jsonMap.put("postDate", new Date());
jsonMap.put("message", "try out elasticsearch");
IndexRequest indexRequest = new IndexRequest("posts").id("1").source(jsonMap);
- 方法三
try {
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
builder.field("user", "kimchy");
builder.timeField("postDate", new Date());
builder.field("message", "trying out Elasticsearch");
}
builder.endObject();
IndexRequest indexRequest = new IndexRequest("posts").id("1").source(builder);
}catch (Exception e){
e.printStackTrace();
}
- 方法四
IndexRequest indexRequest = new IndexRequest("posts")
.id("1")
.source(
"user", "kimchy",
"postDate", new Date(),
"message", "try out Elasticsearch"
);
可选参数
// routing value
indexRequest.routing("routing");
// time out
indexRequest.timeout("1s");
indexRequest.timeout(TimeValue.timeValueSeconds(1));
// refresh
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
indexRequest.setRefreshPolicy("wait_for");
// version
indexRequest.version(2);
indexRequest.versionType(VersionType.EXTERNAL);
// operate type
indexRequest.opType(DocWriteRequest.OpType.CREATE);
indexRequest.opType("create");
// pipeline name
indexRequest.setPipeline("pipeline");
同步/异步执行
// 同步执行
try {
IndexResponse indexResponse = highClient.index(indexRequest, RequestOptions.DEFAULT);
}catch (Exception e){
e.printStackTrace();
}
// 异步执行
try {
highClient.indexAsync(indexRequest, RequestOptions.DEFAULT, new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
}
@Override
public void onFailure(Exception e) {
}
});
}catch (Exception e){
e.printStackTrace();
}
IndexResponse
IndexResponse indexResponse = highClient.index(indexRequest, RequestOptions.DEFAULT);
String index = indexResponse.getIndex();
// 新建数据或者更新
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED){
}else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED){
}
// 条数
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()){
}
if (shardInfo.getFailed() > 0){
for (ReplicationResponse.ShardInfo.Failure failure: shardInfo.getFailures()){
String reason = failure.reason();
}
}
版本冲突
IndexRequest indexRequest = new IndexRequest("posts")
.id("1")
.source(
"user", "kimchy",
"postDate", new Date(),
"message", "try out Elasticsearch"
);
// 版本冲突: 编号
IndexRequest request = new IndexRequest("posts")
.id("1")
.source("field", "value")
.setIfSeqNo(10L)
.setIfPrimaryTerm(20);
try {
IndexResponse indexResponse = highClient.index(indexRequest, RequestOptions.DEFAULT);
}catch (ElasticsearchException e){
if (e.status() == RestStatus.CONFLICT){
}
}catch (Exception e){
}
// 版本冲突: 文件已存在
IndexRequest request1 = new IndexRequest("posts")
.id("1")
.source("field", "value")
.opType(DocWriteRequest.OpType.CREATE);
try {
IndexResponse response = highClient.index(request1, RequestOptions.DEFAULT);
}catch (ElasticsearchException e){
if (e.status() == RestStatus.CONFLICT){
}
}catch (Exception e){
}
Get API
调用
GetRequest getRequest = new GetRequest("posts", "1");
选项
GetRequest getRequest = new GetRequest("posts", "1");
getRequest.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE); // 禁用源搜索,默认
// 搜索字段、空数组
String[] includes = new String[]{"message", "*Date"};
String[] excludes = Strings.EMPTY_ARRAY;
FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
getRequest.fetchSourceContext(fetchSourceContext);
// 过滤字段、搜索空数组
String[] includes = Strings.EMPTY_ARRAY;
String[] excludes = new String[]{"message"};
FetchSourceContext fetchSourceContext1 = new FetchSourceContext(true, includes, excludes);
getRequest.fetchSourceContext(fetchSourceContext);
try {
getRequest.storedFields("message");
GetResponse getResponse = highClient.get(getRequest, RequestOptions.DEFAULT);
String message = getResponse.getField("message").getValue();
}catch (Exception e){
e.printStackTrace();
}
// routing value
getRequest.routing("routing");
// 执行优先级
getRequest.preference("preference");
getRequest.realtime(false);
// 请求执行之前,是否先刷新, 默认false
getRequest.refresh(true);
// 版本
getRequest.version(2);
getRequest.versionType(VersionType.EXTERNAL);
同步执行
GetResponse getResponse = highClient.get(getRequest, RequestOptions.DEFAULT);
异步执行
highClient.getAsync(getRequest, RequestOptions.DEFAULT, new ActionListener<GetResponse>() {
@Override
public void onResponse(GetResponse documentFields) {
}
@Override
public void onFailure(Exception e) {
}
});
Response
- 示例
highClient.getAsync(getRequest, RequestOptions.DEFAULT, new ActionListener<GetResponse>() {
@Override
public void onResponse(GetResponse documentFields) {
String index = documentFields.getIndex();
String id = documentFields.getId();
if (documentFields.isExists()){
long version = documentFields.getVersion();
String sourceAsString = documentFields.getSourceAsString(); // 字符串
Map<String, Object> sourceAsMap = documentFields.getSourceAsMap(); // map
byte[] sourceAsBytes = documentFields.getSourceAsBytes(); // 字节流
}else {
}
}
@Override
public void onFailure(Exception e) {
}
});
- 当索引不存在的时候
// 当索引不存在的时候,抛出异常
GetRequest getRequest = new GetRequest("does_not_exist", "1");
try {
GetResponse response = highClient.get(getRequest, RequestOptions.DEFAULT);
}catch (ElasticsearchException e){
if (e.status() == RestStatus.NOT_FOUND){
}
}catch (Exception e){
}
- 指定版本
try {
GetRequest getRequest = new GetRequest("posts", "1")
.version(2);
GetResponse response = highClient.get(getRequest, RequestOptions.DEFAULT);
}catch (ElasticsearchException e){
// 版本冲突
if (e.status() == RestStatus.CONFLICT){
}
}catch (Exception e){
}
GetSourceApi
调用
GetSourceRequest getSourceRequest = new GetSourceRequest("posts", "1");
选项
String[] includes = Strings.EMPTY_ARRAY;
String[] excludes = new String[]{"postDate"};
getSourceRequest.fetchSourceContext(
new FetchSourceContext(true, includes, excludes) // 第一个参数必须是true
);
// route value
getSourceRequest.routing("routing");
// 设置执行优先顺序
getSourceRequest.preference("preference");
getSourceRequest.realtime(false);
getSourceRequest.refresh(true);
同步调用
GetSourceResponse response = highClient.getSource(getSourceRequest, RequestOptions.DEFAULT);
异步调用
try {
highClient.getSourceAsync(getSourceRequest, RequestOptions.DEFAULT, new ActionListener<GetSourceResponse>() {
@Override
public void onResponse(GetSourceResponse getSourceResponse) {
}
@Override
public void onFailure(Exception e) {
}
});
}catch (Exception e){
e.printStackTrace();
}
Response
Map<String, Object> source = getSourceResponse.getSource();
Exists API
调用方法
// 取消不用的返回
GetRequest getRequest = new GetRequest("posts", "1");
getRequest.fetchSourceContext(new FetchSourceContext(false));
getRequest.storedFields("_none_");
同步调用
highClient.exists(getRequest, RequestOptions.DEFAULT);
异步调用
highClient.existsAsync(getRequest, RequestOptions.DEFAULT, new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean aBoolean) {
}
@Override
public void onFailure(Exception e) {
}
});
Delete API
调用方法
DeleteRequest request = new DeleteRequest("posts", "1");
选项
// route name
deleteRequest.routing("routing");
// time out
deleteRequest.timeout(TimeValue.timeValueMinutes(2));
deleteRequest.timeout("2m");
// refresh
deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
deleteRequest.setRefreshPolicy("wait_for");
// version
deleteRequest.version(2);
// version type
deleteRequest.versionType(VersionType.EXTERNAL);
同步调用
DeleteResponse deleteResponse = highClient.delete(deleteRequest, RequestOptions.DEFAULT);
异步调用
highClient.deleteAsync(deleteRequest, RequestOptions.DEFAULT, new ActionListener<DeleteResponse>() {
@Override
public void onResponse(DeleteResponse deleteResponse) {
}
@Override
public void onFailure(Exception e) {
}
});
Response
String index = deleteResponse.getIndex();
String id = deleteResponse.getId();
long version = deleteResponse.getVersion();
ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()){
}
if (shardInfo.getFailed() > 0){
for (ReplicationResponse.ShardInfo.Failure failure: shardInfo.getFailures()){
String reason = failure.reason();
}
}
删除一个不存在的索引
DeleteRequest deleteRequest1 = new DeleteRequest("posts", "does_not_exist");
DeleteResponse deleteResponse1 = highClient.delete(deleteRequest1, RequestOptions.DEFAULT);
if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND){
}
版本冲突
try {
DeleteResponse deleteResponse2 = highClient.delete(
new DeleteRequest("posts", "1").setIfSeqNo(100).setIfPrimaryTerm(2),
RequestOptions.DEFAULT
);
}catch (ElasticsearchException e){
if (e.status() == RestStatus.CONFLICT){
}
}catch (Exception e){
}
Update API
调用方法
- 方法一
UpdateRequest request = new UpdateRequest("posts", "1");
UpdateRequest updateRequest = new UpdateRequest("posts", "1")
.doc(
"updated", new Date(),
"reason", "daily update"
);
- 方法二、脚本
Map<String, Object> parameters = Collections.singletonMap("count", 4);
Script inline = new Script(ScriptType.INLINE, "painless", "ctx._source.field += params.count", parameters);
request.script(inline);
Script stored = new Script(ScriptType.STORED, null, "increment-field", parameters);
request.script(stored);
- 方法三、document
UpdateRequest request1 = new UpdateRequest("posts", "1");
String jsonString = "{" +
"\"updated\":\"2017-01-01\"," +
"\"reason\":\"daily update\"" +
"}";
request.doc(jsonString, XContentType.JSON);
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("updated", new Date());
jsonMap.put("reason", "daily update");
UpdateRequest updateRequest = new UpdateRequest("posts", "1")
.doc(jsonMap);
- 方法四、XContentBuilder
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
builder.timeField("updated", new Date());
builder.field("reason", "daily update");
}
builder.endObject();
updateRequest updateRequest = new UpdateRequest("posts", "1")
.doc(builder);
upserts
UpdateRequest request = new UpdateRequest("posts", "1");
String jsonString = "\"created\":\"2017-01-01\"";
request.upsert(jsonString, XContentType.JSON);
选项
request.routing("routing");
request.timeout(TimeValue.timeValueSeconds(1));
request.timeout("1s");
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
request.setRefreshPolicy("wait_for");
request.retryOnConflict(3);
request.fetchSource(true); // 默认false
String[] includes = new String[]{"update", "r*"};
String[] excludes = Strings.EMPTY_ARRAY;
request.fetchSource(
new FetchSourceContext(true, includes, excludes)
);
String[] includes = Strings.EMPTY_ARRAY;
String[] excludes = new String[]{"update"};
request.fetchSource(
new FetchSourceContext(true, includes, excludes)
);
request.setIfSeqNo(2L);
request.setIfPrimaryTerm(1L);
request.detectNoop(false);
request.scriptedUpsert(true);
request.docAsUpsert(true);
request.waitForActiveShards(2);
request.waitForActiveShards(ActiveShardCount.ALL);
同步选项
UpdateResponse updateResponse = highClient.update(request, RequestOptions.DEFAULT);
异步选项
highClient.updateAsync(request, RequestOptions.DEFAULT, new ActionListener<UpdateResponse>() {
@Override
public void onResponse(UpdateResponse updateResponse) {
}
@Override
public void onFailure(Exception e) {
}
});
Response
String index = updateResponse.getIndex();
String id = updateResponse.getId();
long version = updateResponse.getVersion();
if (updateResponse.getResult() == DocWriteResponse.Result.CREATED){
}else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED){
}else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP){
}
Terms Vectors API (统计)
调用
TermVectorsRequest request = new TermVectorsRequest("authors", "1");
request.setFields("user");
XContentBuilder docBuilder = XContentFactory.jsonBuilder();
docBuilder.startObject().field("user", "guest-user").endObject();
TermVectorsRequest request1 = new TermVectorsRequest("authors", docBuilder);
选项
request.setFieldStatistics(false); // 字段统计
request.setTermStatistics(true);
request.setPositions(false);
request.setOffsets(false);
request.setPayloads(false);
Map<String, Integer> filterSettings = new HashMap<>();
filterSettings.put("max_num_terms", 3);
filterSettings.put("min_term_freq", 1);
filterSettings.put("max_term_freq", 10);
filterSettings.put("min_doc_freq", 1);
filterSettings.put("max_doc_freq", 100);
filterSettings.put("min_word_length", 1);
filterSettings.put("max_word_length", 10);
request.setFilterSettings(filterSettings);
Map<String, String> perFieldAnalyzer = new HashMap<>();
perFieldAnalyzer.put("user", "keyword");
request.setPerFieldAnalyzer(perFieldAnalyzer);
request.setRealtime(false);
request.setRouting("routing");
同步执行
TermVectorsResponse response = highClient.termvectors(request, RequestOptions.DEFAULT);
异步执行
highClient.termvectorsAsync(request, RequestOptions.DEFAULT, new ActionListener<TermVectorsResponse>() {
@Override
public void onResponse(TermVectorsResponse termVectorsResponse) {
}
@Override
public void onFailure(Exception e) {
}
});
Response
TermVectorsResponse response = highClient.termvectors(request, RequestOptions.DEFAULT);
String index = response.getIndex();
String type = request.getType();
String id = request.getId();
boolean found = response.getFound();
for (TermVectorsResponse.TermVector tv : response.getTermVectorsList()) {
String fieldname = tv.getFieldName();
int docCount = tv.getFieldStatistics().getDocCount();
long sumTotalTermFreq = tv.getFieldStatistics().getSumTotalTermFreq();
if (tv.getTerms() != null) {
List<TermVectorsResponse.TermVector.Term> terms = tv.getTerms();
for (TermVectorsResponse.TermVector.Term term: terms){
String termStr = term.getTerm();
int termFreq = term.getTermFreq();
int docFreq = term.getDocFreq();
long totalTermFreq = term.getTotalTermFreq();
float score = term.getScore();
if (term.getTokens() != null){
List<TermVectorsResponse.TermVector.Token> tokens = term.getTokens();
for (TermVectorsResponse.TermVector.Token token: tokens){
int position = token.getPosition();
int startOffset = token.getStartOffset();
int endOffset = token.getEndOffset();
String payload = token.getPayload();
}
}
}
}
}
参考
多文档API
Bulk API
调用方法
BulkRequest request = new BulkRequest();
request.add(
new IndexRequest("posts").id("1").source(XContentType.JSON, "field", "foo")
);
request.add(
new IndexRequest("posts").id("2").source(XContentType.JSON, "field", "bar")
);
BulkRequest request = new BulkRequest();
request.add(new DeleteRequest("posts", "3"));
request.add(new UpdateRequest("posts", "2").doc(XContentType.JSON, "other", "test"));
request.add(new IndexRequest("posts").id("4").source(XContentType.JSON, "field", "baz"));
选项
request.timeout(TimeValue.timeValueMinutes(2));
request.timeout("2m");
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
request.setRefreshPolicy("wait_for");
request.waitForActiveShards(2);
request.waitForActiveShards(ActiveShardCount.ALL);
request.pipeline("pipelineId");
request.routing("routeId");
同步执行
BulkResponse bulkResponse = highClient.bulk(request, RequestOptions.DEFAULT);
异步执行
highClient.bulkAsync(request, RequestOptions.DEFAULT, new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkItemResponses) {
}
@Override
public void onFailure(Exception e) {
}
});
Response
BulkResponse bulkResponse = highClient.bulk(request, RequestOptions.DEFAULT);
for (BulkItemResponse bulkItemResponse: bulkResponse){
DocWriteResponse itemResponse = bulkItemResponse.getResponse();
switch (bulkItemResponse.getOpType()){
case INDEX:
case CREATE:
IndexResponse indexResponse = (IndexResponse)itemResponse;
break;
case DELETE:
UpdateResponse updateResponse = (UpdateResponse)itemResponse;
break;
case UPDATE:
DeleteResponse deleteResponse = (DeleteResponse)itemResponse;
break;
}
}
错误检查
// 有一个失败,为true
if (bulkResponse.hasFailures()){
}
// 检查每个请求的response
for (BulkItemResponse bulkItemResponse: bulkResponse){
if (bulkItemResponse.isFailed()){
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
}
}
BulkProcessor
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
int numberOfActions = request.numberOfActions();
log.debug("Executing bulk [{}] with {} requests",
executionId, numberOfActions);
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
if (response.hasFailures()) {
log.warn("Bulk [{}] executed with failures", executionId);
} else {
log.debug("Bulk [{}] completed in {} milliseconds",
executionId, response.getTook().getMillis());
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
log.error("Failed to execute bulk", failure);
}
};
BulkProcessor.Builder
builder = BulkProcessor.builder(
(r, bulkListener) -> highClient.bulkAsync(r, RequestOptions.DEFAULT, bulkListener), listener
);
builder.setBulkActions(500); // -1 不限
builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); // -1 不限
builder.setConcurrentRequests(0); // 0 只允许一个请求
builder.setFlushInterval(TimeValue.timeValueSeconds(10L));
builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3)); // 重试间隔和次数
BulkProcessor bulkProcessor = builder.build();
bulkProcessor.add(new DeleteRequest("posts", "3"));
bulkProcessor.add(new UpdateRequest("posts", "2").doc(XContentType.JSON, "other", "test"));
bulkProcessor.add(new IndexRequest("posts").id("4").source(XContentType.JSON, "field", "baz"));
关闭
boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS); // 异步关闭, 如果超时未完成返回false
bulkProcessor.close(); // 直接关闭
Multi-Get API
调用方法
MultiGetRequest request = new MultiGetRequest();
request.add(new MultiGetRequest.Item("index", "example_id"));
request.add(new MultiGetRequest.Item("index", "another_id"));
选项
// 搜索包含字段
String[] includes = new String[] {"foo", "*r"};
String[] excludes = Strings.EMPTY_ARRAY;
FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
request.add(new MultiGetRequest.Item("index", "example_id").fetchSourceContext(fetchSourceContext));
// 剔除搜索字段
String[] includes = Strings.EMPTY_ARRAY;
String[] excludes = new String[]{"foo", "*r"};
fetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
request.add(new MultiGetRequest.Item("index", "example_id").fetchSourceContext(fetchSourceContext));
request.add(new MultiGetRequest.Item("index", "with_routing").routing("some_routing"));
request.add(new MultiGetRequest.Item("index", "with_version").versionType(VersionType.EXTERNAL).version(10123L));
request.preference("some_preference");
request.realtime(false);
request.refresh(true);
Response
request.add(new MultiGetRequest.Item("index", "example_id").storedFields("foo"));
MultiGetResponse responses = highClient.mget(request, RequestOptions.DEFAULT);
MultiGetItemResponse itemResponse = responses.getResponses()[0];
String value = itemResponse.getResponse().getField("foo").getValue();
同步
MultiGetResponse response = highClient.mget(request, RequestOptions.DEFAULT);
异步
highClient.mgetAsync(request, RequestOptions.DEFAULT, new ActionListener<MultiGetResponse>() {
@Override
public void onResponse(MultiGetResponse multiGetItemResponses) {
}
@Override
public void onFailure(Exception e) {
}
});
Response
MultiGetResponse response = highClient.mget(request, RequestOptions.DEFAULT);
MultiGetItemResponse firstItem = response.getResponses()[0];
GetResponse firstGet = firstItem.getResponse();
String index = firstItem.getIndex();
String id = firstItem.getId();
if (firstGet.isExists()){
long version = firstGet.getVersion();
String sourceAsString = firstGet.getSourceAsString();
Map<String, Object> sourceAsMap = firstGet.getSourceAsMap();
byte[] sourceAsBytes = firstGet.getSourceAsBytes();
}
异常
MultiGetRequest request = new MultiGetRequest();
request.add(new MultiGetRequest.Item("index", "example_id").version(1000L));
MultiGetResponse response = highClient.mget(request, RequestOptions.DEFAULT);
MultiGetItemResponse item = response.getResponses()[0];
Exception e = item.getFailure().getFailure();
ElasticsearchException ee = (ElasticsearchException) e;
Reindex API (重新索引记录)
调用
ReindexRequest request = new ReindexRequest();
request.setSourceIndices("source1", "source2");
选项
ReindexRequest request = new ReindexRequest();
request.setSourceIndices("source1", "source2");
request.setDestIndex("dest");
request.setDestVersionType(VersionType.EXTERNAL);
request.setDestOpType("create");
request.setConflicts("proceed");
request.setSourceQuery(new TermQueryBuilder("user", "kimchy"));
request.setMaxDocs(10);
request.setSourceBatchSize(100);
request.setDestPipeline("my_pipeline");
执行脚本
request.setScript(
new Script(
ScriptType.INLINE,
"painless",
"if (ctx._source.user == 'kimchy') { ctx._source.likes++; }",
Collections.emptyMap()
)
);
设置远程信息
request.setRemoteInfo(
new RemoteInfo(
"http", remoteHost, remotePort, null,
new BytesArray(new MatchAllQueryBuilder().toString()),
user, password, Collections.emptyMap(),
new TimeValue(100, TimeUnit.MICROSECONDS),
new TimeValue(100, TimeUnit.SECONDS)
)
);
request.setSlices(2);
request.setScroll(TimeValue.timeValueMinutes(10)); // time of keeps the "search context" alive.
选项
request.setTimeout(TimeValue.timeValueMinutes(2));
request.setRefresh(true);
同步
BulkByScrollResponse bulkByScrollResponse = highClient.reindex(request, RequestOptions.DEFAULT);
异步
highClient.reindexAsync(request, RequestOptions.DEFAULT, new ActionListener<BulkByScrollResponse>() {
@Override
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
}
@Override
public void onFailure(Exception e) {
}
});
Response
TimeValue timeValue = bulkByScrollResponse.getTook();
boolean timeOut = bulkByScrollResponse.isTimedOut();
long totalDocs = bulkByScrollResponse.getTotal();
long updateDocs = bulkByScrollResponse.getUpdated();
long createDocs = bulkByScrollResponse.getCreated();
long deleteDocs = bulkByScrollResponse.getDeleted();
long batches = bulkByScrollResponse.getBatches();
long noops = bulkByScrollResponse.getNoops();
long versionConflicts = bulkByScrollResponse.getVersionConflicts();
long bulkRetries = bulkByScrollResponse.getBulkRetries();
long searchRetries = bulkByScrollResponse.getSearchRetries();
TimeValue throttledMillis = bulkByScrollResponse.getStatus().getThrottled();
TimeValue throttledUntilMillis = bulkByScrollResponse.getStatus().getThrottledUntil();
List<ScrollableHitSource.SearchFailure> searchFailures = bulkByScrollResponse.getSearchFailures();
List<BulkItemResponse.Failure> bulkFailures = bulkByScrollResponse.getBulkFailures();
Update By Query API
作用
通过一个 index
更新另外一个 index
。
调用方法
UpdateByQueryRequest request = new UpdateByQueryRequest("source1", "source2");
选项
request.setConflicts("proceed");
request.setQuery(new TermQueryBuilder("user", "kimchy"));
request.setMaxDocs(10);
request.setBatchSize(100);
request.setPipeline("my_pipeline");
request.setScript(
new Script(
ScriptType.INLINE,
"painless",
"if (ctx._source.user == 'kimchy') {ctx._source.likes++; }",
Collections.emptyMap()
)
);
request.setSlices(2);
request.setScroll(TimeValue.timeValueMinutes(10));
request.setRouting("=cat"); // 筛选匹配
request.setTimeout(TimeValue.timeValueMinutes(2));
request.setRefresh(true);
request.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
同步执行
BulkByScrollResponse bulkByScrollResponse = highClient.updateByQuery(request, RequestOptions.DEFAULT);
异步执行
highClient.updateByQueryAsync(request, RequestOptions.DEFAULT, new ActionListener<BulkByScrollResponse>() {
@Override
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
}
@Override
public void onFailure(Exception e) {
}
});
Response
TimeValue timeTaken = bulkResponse.getTook();
boolean timedOut = bulkResponse.isTimedOut();
long totalDocs = bulkResponse.getTotal();
long updatedDocs = bulkResponse.getUpdated();
long deletedDocs = bulkResponse.getDeleted();
long batches = bulkResponse.getBatches();
long noops = bulkResponse.getNoops();
long versionConflicts = bulkResponse.getVersionConflicts();
long bulkRetries = bulkResponse.getBulkRetries();
long searchRetries = bulkResponse.getSearchRetries();
TimeValue throttledMillis = bulkResponse.getStatus().getThrottled();
TimeValue throttledUntilMillis =
bulkResponse.getStatus().getThrottledUntil();
List<ScrollableHitSource.SearchFailure> searchFailures =
bulkResponse.getSearchFailures();
List<BulkItemResponse.Failure> bulkFailures =
bulkResponse.getBulkFailures();
Deleted By Query API
调用方法
DeleteByQueryRequest request = new DeleteByQueryRequest("source1", "source2");
选项
request.setConflicts("proceed");
request.setQuery(new TermQueryBuilder("user", "kimchy"));
request.setMaxDocs(10);
request.setBatchSize(100);
request.setSlices(2);
request.setScroll(TimeValue.timeValueMinutes(10));
request.setRouting("=cat");
request.setTimeout(TimeValue.timeValueMinutes(2));
request.setRefresh(true);
request.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
同步
BulkByScrollResponse bulkByScrollResponse = highClient.deleteByQuery(request, RequestOptions.DEFAULT);
异步
highClient.deleteByQueryAsync(request, RequestOptions.DEFAULT, new ActionListener<BulkByScrollResponse>() {
@Override
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
}
@Override
public void onFailure(Exception e) {
}
});
Response
TimeValue timeTaken = bulkResponse.getTook();
boolean timedOut = bulkResponse.isTimedOut();
long totalDocs = bulkResponse.getTotal();
long deletedDocs = bulkResponse.getDeleted();
long batches = bulkResponse.getBatches();
long noops = bulkResponse.getNoops();
long versionConflicts = bulkResponse.getVersionConflicts();
long bulkRetries = bulkResponse.getBulkRetries();
long searchRetries = bulkResponse.getSearchRetries();
TimeValue throttledMillis = bulkResponse.getStatus().getThrottled();
TimeValue throttledUntilMillis =
bulkResponse.getStatus().getThrottledUntil();
List<ScrollableHitSource.SearchFailure> searchFailures =
bulkResponse.getSearchFailures();
List<BulkItemResponse.Failure> bulkFailures =
bulkResponse.getBulkFailures();
Rethrollte API
调用方法
// taskId 是另外一个任务的标识id
RethrottleRequest rethrottleRequest1 = new RethrottleRequest(taskId, 100.0F);
highClient.reindexRethrottle(rethrottleRequest, RequestOptions.DEFAULT);
highClient.updateByQueryRethrottle(rethrottleRequest, RequestOptions.DEFAULT);
highClient.deleteByQueryRethrottle(rethrottleRequest, RequestOptions.DEFAULT);
异步调用
highClient.reindexRethrottleAsync(request, RequestOptions.DEFAULT, new ActionListener<ListTasksResponse>() {
@Override
public void onResponse(ListTasksResponse listTasksResponse) {
}
@Override
public void onFailure(Exception e) {
}
});
Multi Terms API
调用方法
MultiTermVectorsRequest request = new MultiTermVectorsRequest();
TermVectorsRequest termVectorsRequest = new TermVectorsRequest("authors", "1");
termVectorsRequest.setFields("user");
request.add(termVectorsRequest);
XContentBuilder docBuilder = XContentFactory.jsonBuilder();
docBuilder.startObject().field("user", "guest-user").endObject();
TermVectorsRequest termVectorsRequest1 = new TermVectorsRequest("authors", docBuilder);
request.add(termVectorsRequest1);
TermVectorsRequest termVectorsRequest2 = new TermVectorsRequest("authors", "fake_id");
termVectorsRequest2.setFields("user");
String[] ids = {"1", "2"};
MultiTermVectorsRequest request1 = new MultiTermVectorsRequest(ids, termVectorsRequest2);
同步调用
MultiTermVectorsResponse response = highClient.mtermvectors(request, RequestOptions.DEFAULT);
异步调用
highClient.mtermvectorsAsync(request, RequestOptions.DEFAULT, new ActionListener<MultiTermVectorsResponse>() {
@Override
public void onResponse(MultiTermVectorsResponse multiTermVectorsResponse) {
}
@Override
public void onFailure(Exception e) {
}
});
Response
List<TermVectorsResponse> termVectorsResponseList = response.getTermVectorsResponses();
if (termVectorsResponseList != null){
for (TermVectorsResponse vectorsResponse: termVectorsResponseList){
}
}
更多推荐
已为社区贡献3条内容
所有评论(0)