RestHighLevelClient的基本使用
RestHighLevelClient的基本使用1. pom引入<dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId><version>6.6.1</version><exclusions&g
·
RestHighLevelClient的基本使用
1. pom引入
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.6.1</version>
<exclusions>
<exclusion>
<artifactId>jackson-core</artifactId>
<groupId>com.fasterxml.jackson.core</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.6.1</version>
<exclusions>
<exclusion>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</exclusion>
</exclusions>
</dependency>
2. yaml配置
# Elasticsearch相关的配置
elasticsearch:
clientIps: 127.0.0.1 # ES网关地址,实际地址在ES实例概览中查看,注意至少配置两个网关地址
httpPort: 8990 # ES网关端口
username: test
password: 1234
esParam:
IndexName: testName # 索引名
indexType: "type" # 索引type,有可能是type或者doc
bulkSize: 500 # 一次bulk请求文档个数
docNum: 200000000 # 希望导入的数据总量
3. Rest客户端创建
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author mqs
*/
@Configuration
public class RestClientConfig {
@Value("${elasticsearch.clientIps}")
private String clientIps;
@Value("${elasticsearch.httpPort}")
private int httpPort;
@Value("${elasticsearch.username}")
private String username;
@Value("${elasticsearch.password}")
private String password;
private HttpHost[] getHttpHosts(String clientIps, int esHttpPort) {
String[] clientIpList = clientIps.split(",");
HttpHost[] httpHosts = new HttpHost[clientIpList.length];
for (int i = 0; i < clientIpList.length; i++) {
httpHosts[i] = new HttpHost(clientIpList[i], esHttpPort, "http");
}
return httpHosts;
}
/**
* 创建带HTTP Basic Auth认证rest客户端
*/
@Bean
public RestHighLevelClient restHighLevelClient() {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
return new RestHighLevelClient(RestClient.builder(getHttpHosts(clientIps, httpPort)).setHttpClientConfigCallback((HttpAsyncClientBuilder httpAsyncClientBuilder) -> httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider)));
}
}
4. 常用操作
/**
* 参数读取
*/
@Component
@Data
public class AppConfig {
//es相关配置
@Value("${esParam.indexType}")
private String indexType;
@Value("${esParam.testName}")
private String testName;
}
/************************************模拟实体类************************************/
@Data
public class Demo {
private String name;//姓名
private Integer age ;//年龄
private String sex ;//性别
private String grade ;//年级
private String subject ;//科目
private Integer score ;//分数
private Integer offset;
private Integer pageSize;
}
/**************************************新增*******************************************/
/**
* 批量新增
*/
@Autowired
private RestHighLevelClient restHighLevelClient;
@Autowired
private AppConfig appConfig;
public void syncData() {
//批量数据
List<Demo> list = dataList;
//下面尽量控制一下一次bulk的数量,如果数据过大,条数过多可能出现同步不完全的情况
for (Demo e : list) {
//转为map,这里根据自己的使用习惯来转map,我这里是通过反射自定义的方法
Map<String, Object> map = CommUtil.objectToMap(e);
request.add(new IndexRequest(appConfig.getTestName(), appConfig.getIndexType()).source(map, XContentType.JSON));
}
try {
BulkResponse response = restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
if (response.hasFailures()) {
exceptionRetry(request, response);
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 异常捕获用于重试
*/
private void exceptionRetry(BulkRequest request, BulkResponse response) {
List<DocWriteRequest<?>> list = request.requests();
BulkRequest requestRetry = new BulkRequest();
//下面尽量控制一下一次bulk的数量,如果数据过大,条数过多可能出现同步不完全的情况
for (BulkItemResponse bir : response) {
if (bir.isFailed()) {
int docIndex = bir.getItemId();
IndexRequest ir = (IndexRequest) list.get(docIndex);
requestRetry.add(new IndexRequest(appConfig.getTestName(), appConfig.getIndexType()).source(ir.sourceAsMap(), XContentType.JSON));
}
}
try {
//遇到错误,休眠1s后重试
Thread.sleep(1000);
BulkResponse responseRetry = restHighLevelClient.bulk(requestRetry, RequestOptions.DEFAULT);
//重试仍然失败时记录该数据
exceptionLog(requestRetry, responseRetry);
} catch (Exception e) {
log.error("ES同步重试出错!", e);
}
}
/**
* 重试结果判断
*/
private void exceptionLog(BulkRequest request, BulkResponse response) {
List<DocWriteRequest<?>> list = request.requests();
for (BulkItemResponse bir : response) {
if (bir.isFailed()) {
int docIndex = bir.getItemId();
IndexRequest ir = (IndexRequest) list.get(docIndex);
//记录失败原因及失败数据
log.error("同步重试失败reason=[{}],data=[{}]", bir.getFailureMessage(), ir.sourceAsMap().toString());
}
}
}
/**************************************修改*******************************************/
/**
* 根据查询将数据更新,唯一主键
*/
public void updateById(Map<String, Object> data, String indexName, String id) {
if (log.isDebugEnabled()) {
log.info("es开始更新数据:{}", JSON.toJSONString(data));
}
UpdateRequest request;
try {
request = new UpdateRequest(indexName, appConfig.getIndexType(), id).doc(data);
UpdateResponse response = restHighLevelClient.update(request, RequestOptions.DEFAULT);
log.info("更新状态:{}", response.getResult());
} catch (IOException e) {
log.error("更新写入异常:{}", e.getMessage(), e);
}
if (log.isDebugEnabled()) {
log.info("es更新数据完成");
}
}
/**************************************删除*******************************************/
/**
* 根据条件删除数据
*/
public void deleteByQueryRequest(String score){
DeleteByQueryRequest delReq = new DeleteByQueryRequest(appConfig.getInstPrcsIndex());
delReq.setDocTypes(appConfig.getIndexType());
delReq.setQuery(new TermQueryBuilder("score", score));
try {
restHighLevelClient.deleteByQuery(delReq, RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
log.error("ES按条件删除出错: {}", e.getMessage(), e);
}
}
/**************************************count查询***************************************/
public static long count(String indexName, Demo demo){
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
//构建判断条件
BoolQueryBuilder boolBuilder = makeQueryParams(demo);
sourceBuilder.query(boolBuilder);
CountRequest countRequest = new CountRequest(indexName);
countRequest.source(sourceBuilder);
CountResponse countResponse;
long count = 0L;
try {
countResponse = restHighLevelClient.count(countRequest, RequestOptions.DEFAULT);
count = countResponse != null ? countResponse.getCount() : 0;
} catch (Exception e) {
e.printStackTrace();
}
return count;
}
/*****************************构建查询参数****************************************/
//构建查询参数
private BoolQueryBuilder makeQueryParams(Demo demo) {
BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
//精确查找
if (demo.getAge() != null) {
boolQueryBuilder.must(termQuery("age", String.valueOf(demo.getAge())));
}
//范围匹配
if (!StringUtils.isEmpty(demo.getCreateDate())) {
boolQueryBuilder.must(rangeQuery("createDate").gte(demo.getCreateDate()).format("yyyy-MM-dd"));
}
//模糊匹配
if (!StringUtils.isEmpty(demo.getName())) {
boolQueryBuilder.must(QueryBuilders.wildcardQuery("name", String.format("*%s*", demo.getName())));
}
return boolQueryBuilder;
}
/**************************************分页条件查询***************************************/
private List<Demo> getListFromEs(String indexName, Demo demo) {
Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create();
SearchRequest request = new SearchRequest(index);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(makeQueryParams(demo));//查询参数
sourceBuilder.from(demo.getOffset()).size(demo.getPageSize());//分页
sourceBuilder.sort("age", SortOrder.DESC);//排序字段
request.source(sourceBuilder);
SearchHit[] hits = new SearchHit[0];
try {
hits = restHighLevelClient.search(request, RequestOptions.DEFAULT).getHits().getHits();
} catch (Exception e) {
log.error("ES查询出错: {}", e.getMessage(), e);
}
List<Demo> data = new ArrayList<>();
for (SearchHit hit : hits) {
data.add(gson.fromJson(hit.getSourceAsString(), Demo.class));
}
return data;
}
更多推荐
已为社区贡献1条内容
所有评论(0)