RestHighLevelClient
依赖<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-elasticsearch</artifactId><version>2.3.0.RELEASE</version></d
·
依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
<version>2.3.0.RELEASE</version>
</dependency>
配置类
@Bean
public RestHighLevelClient esRestClient(){
RestClientBuilder builder = RestClient.builder(
new HttpHost("21.145.229.153",9200,"http"),
new HttpHost("21.145.229.253",9200,"http"),
new HttpHost("21.145.229.353",9200,"http"));
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider .setCredentials(AuthScope.ANY,new UsernamePasswordCredentials("elastic","1qaz!QAZ"));
builder.setHttpClientConfigCallback(f->f.setDefaultCredentialsProvider(credentialsProvider ));
RestHighLevelClient restClient = new RestHighLevelClient (builder);
return client;
}
或者
//使用账号密码
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider .setCredentials(AuthScope.ANY,new UsernamePasswordCredentials("elastic","1qaz!QAZ"));
//以下按需设置
//连接超时(默认为1秒)和套接字超时(默认为30秒)。 也相应地调整最大重试超时时间(默认为30秒)
RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200))
.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
//该方法接收一个RequestConfig.Builder对象,对该对象进行修改后然后返回。
@Override
public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
return requestConfigBuilder.setConnectTimeout(5000) //连接超时(默认为1秒)
.setSocketTimeout(60000);//套接字超时(默认为30秒)
}
})
.setMaxRetryTimeoutMillis(60000);//调整最大重试超时时间(默认为30秒)
//The Apache Http Async Client默认启动一个dispatcher线程和供连接管理器使用的多个worker线程,与本地检测到的处理器数量一样多(取决于Runtime.getRuntime().availableProcessors()的返回值)。
RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setDefaultIOReactorConfig(
IOReactorConfig.custom().setIoThreadCount(1).build());
}
});
//基本认证
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials("user", "password"));
RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
// 该方法接收HttpAsyncClientBuilder的实例作为参数,对其修改后进行返回
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);//提供一个默认凭据
}
});
//抢占式认证可以被禁用,这意味着每个请求都将被发送,不用去看授权请求头,在收到HTTP 401响应后,会再次发送相同的请求,这次会带上基本的身份认证头,如果你想这样做,那么你可以通过HttpAsyncClientBuilder来禁用它:
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials("user", "password"));
RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
httpClientBuilder.disableAuthCaching(); //禁用抢占式身份验证
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
});
//加密通信也可以通过HttpClientConfigCallback进行配置。 参数HttpAsyncClientBuilder公开了配置加密通信的多种方法:
setSSLContext,setSSLSessionStrategy和setConnectionManager,重要性依次增加。 以下是一个例子:
KeyStore truststore = KeyStore.getInstance("jks");
try (InputStream is = Files.newInputStream(keyStorePath)) {
truststore.load(is, keyStorePass.toCharArray());
}
SSLContextBuilder sslBuilder = SSLContexts.custom().loadTrustMaterial(truststore, null);
final SSLContext sslContext = sslBuilder.build();
RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200, "https"))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setSSLContext(sslContext);
}
});
增删该查
//插入数据
IndexRequest request = new IndexRequest("es_user");
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("user_name", "张启桥");
jsonMap.put("post_date", new Date());
jsonMap.put("age", 23);
jsonMap.put("gender", "男");
jsonMap.put("height", 180);
jsonMap.put("address", "四川省成都市犀浦镇百草路12号");
request.source(jsonMap);
IndexResponse response = null;
try {
response = restHighLevelClient.index(request, ElasticsearchConfig.COMMON_OPTIONS);
if(response.getResult().name().equalsIgnoreCase("created")){
return R.ok("创建成功!");
}else {
return R.error("失败!");
}
} catch (IOException e) {
e.printStackTrace();
}
}
//批量插入数据
/**
* 造点假数据
*/
List<Map<String, Object>> list = new ArrayList<>();
for (int i = 0; i < 100; i++) {
Map<String, Object> map = new HashMap<>();
map.put("address", "四川省成都市犀浦镇百草路"+12+i+"号");
map.put("gender", "男");
map.put("user_name", RandomStringUtils.randomAlphanumeric(10));
map.put("post_date", new Date());
map.put("age", 23+i);
map.put("height", 155+i);
list.add(map);
}
/**
* 批量从插入数据
*/
BulkRequest request =new BulkRequest();
for(int j=0;j<list.size();j++){
Map<String,Object> item = list.get(j);
request.add(new IndexRequest("es_user").
source(item));
}
try {
BulkResponse bulk = restHighLevelClient.bulk(request, ElasticsearchConfig.COMMON_OPTIONS);
if(bulk.status().getStatus() == 200){
return R.ok("批量操作成功!");
}
} catch (IOException e) {
e.printStackTrace();
}
//查询多个id的数据
SearchRequest request = new SearchRequest();
request.indices("eslog");
SearchSourceBuilder builder = new SearchSourceBuilder();
BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
//addIds后面是多个id
boolQueryBuilder.filter(QueryBuilders.idsQuery().addIds("oI9GVHQBH0SEUrtlhvX7", "oY9HVHQBH0SEUrtlaPUO", "3Fz9aHQBxI7zG-AK_rLc"));
builder.query(boolQueryBuilder);
request.source(builder);
List<Map<String, Object>> list = new ArrayList<>();
Map<String, Object> map = new HashMap<>();
try {
SearchResponse response = restHighLevelClient.search(request, ElasticsearchConfig.COMMON_OPTIONS);
SearchHit[] searchHits = response.getHits().getHits();
for (SearchHit hit : searchHits) {
Map<String, Object> sourceAsMap = hit.getSourceAsMap();
list.add(sourceAsMap);
}
map.put("data", list);
return R.ok("查询成功!").put("result", map);
} catch (IOException e) {
e.printStackTrace();
}
//根据id修改数据:
Map<String, Object> map = new HashMap<>();
map.put("id", 5);
map.put("address", "四川省成都市犀浦镇百草路" + 10082 + "号");
map.put("gender", "nv");
map.put("user_name", RandomStringUtils.randomAlphanumeric(10));
map.put("post_date", new Date());
map.put("age", 23);
map.put("height", 168);
UpdateRequest request = new UpdateRequest("es_user",map.get("id").toString()).doc(map);
try {
UpdateResponse update = restHighLevelClient.update(request, ElasticsearchConfig.COMMON_OPTIONS);
if(update.status().getStatus() == 200){
return R.ok("修改成功");
}else {
return R.error("修改失败@");
}
} catch (IOException e) {
e.printStackTrace();
}
//id查询一条
public Map<String, Object> getDataById(String id) {
/**这里的id是es插入数据生成的id,不是数据中的id**/
GetRequest request = new GetRequest("eslog", id);
GetResponse response = null;
try {
response = restHighLevelClient.get(request, EsConfig.COMMON_OPTIONS);
Map<String, Object> sourceAsMap = response.getSourceAsMap();
/**把es生成的那个id也放进map**/
sourceAsMap.put("_id", id);
return sourceAsMap;
} catch (IOException e) {
log.error(e.getMessage(), e);
/**抛个自定义异常**/
throw new RuntimeException("^^^^^^")
}
return null;
}
//id删除一条数据
public Long deleteById(String id) {
DeleteByQueryRequest request = new DeleteByQueryRequest();
request.indices("eslog");
request.setQuery(new TermQueryBuilder("id.keyword", id));
// 更新最大文档数
request.setSize(10);
// 批次大小
request.setBatchSize(1000);
// 并行
request.setSlices(2);
// 使用滚动参数来控制“搜索上下文”存活的时间
request.setScroll(TimeValue.timeValueMinutes(10));
// 超时
request.setTimeout(TimeValue.timeValueMinutes(2));
// 刷新索引
request.setRefresh(true);
BulkByScrollResponse response = null;
try {
response = restHighLevelClient.deleteByQuery(request, EsConfig.COMMON_OPTIONS);
/**返回0表示删除成功,-1表示失败**/
return response.getStatus().getUpdated();
} catch (IOException e) {
log.error(e.getMessage(),e);
ErrorUtils.throwIDaasException(AuditErrorCode.INNER_ERROR);
}
return -1L;
}
更多推荐
已为社区贡献1条内容
所有评论(0)