RestHighLevelClient的基本使用

1. pom引入
<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch</artifactId>
    <version>6.6.1</version>
    <exclusions>
        <exclusion>
            <artifactId>jackson-core</artifactId>
            <groupId>com.fasterxml.jackson.core</groupId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>6.6.1</version>
    <exclusions>
        <exclusion>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
        </exclusion>
    </exclusions>
</dependency>
2. yaml配置
# Elasticsearch相关的配置
elasticsearch:
  clientIps: 127.0.0.1 # ES网关地址,实际地址在ES实例概览中查看,注意至少配置两个网关地址
  httpPort: 8990  # ES网关端口
  username: test
  password: 1234
esParam:
  IndexName: testName # 索引名
  indexType: "type" # 索引type,有可能是type或者doc
  bulkSize: 500 # 一次bulk请求文档个数
  docNum: 200000000 # 希望导入的数据总量
3. Rest客户端创建
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author mqs
 */
@Configuration
public class RestClientConfig {
    @Value("${elasticsearch.clientIps}")
    private String clientIps;

    @Value("${elasticsearch.httpPort}")
    private int httpPort;

    @Value("${elasticsearch.username}")
    private String username;

    @Value("${elasticsearch.password}")
    private String password;

    private HttpHost[] getHttpHosts(String clientIps, int esHttpPort) {
        String[] clientIpList = clientIps.split(",");
        HttpHost[] httpHosts = new HttpHost[clientIpList.length];
        for (int i = 0; i < clientIpList.length; i++) {
            httpHosts[i] = new HttpHost(clientIpList[i], esHttpPort, "http");
        }
        return httpHosts;
    }

    /**
     * 创建带HTTP Basic Auth认证rest客户端
     */
    @Bean
    public RestHighLevelClient restHighLevelClient() {
        CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
        return new RestHighLevelClient(RestClient.builder(getHttpHosts(clientIps, httpPort)).setHttpClientConfigCallback((HttpAsyncClientBuilder httpAsyncClientBuilder) -> httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider)));
    }
}
4. 常用操作
/**
 * 参数读取
 */
@Component
@Data
public class AppConfig {
    //es相关配置
    @Value("${esParam.indexType}")
    private String indexType;
    @Value("${esParam.testName}")
    private String testName;
}
/************************************模拟实体类************************************/
@Data
public class Demo {
    private String name;//姓名
	private Integer age ;//年龄
	private String sex ;//性别
	private String grade ;//年级
    private String subject ;//科目
	private Integer score ;//分数
    private Integer offset;
    private Integer pageSize;
}

/**************************************新增*******************************************/
/**
 * 批量新增
 */
@Autowired
private RestHighLevelClient restHighLevelClient;
@Autowired
private AppConfig appConfig;

public void syncData() {
    //批量数据
    List<Demo> list = dataList;
    //下面尽量控制一下一次bulk的数量,如果数据过大,条数过多可能出现同步不完全的情况
    for (Demo e : list) {
        //转为map,这里根据自己的使用习惯来转map,我这里是通过反射自定义的方法
        Map<String, Object> map = CommUtil.objectToMap(e);
        request.add(new IndexRequest(appConfig.getTestName(), appConfig.getIndexType()).source(map, XContentType.JSON));
    }
    try {
        BulkResponse response = restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
        if (response.hasFailures()) {
            exceptionRetry(request, response);
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
}

/**
 * 异常捕获用于重试
 */
private void exceptionRetry(BulkRequest request, BulkResponse response) {
    List<DocWriteRequest<?>> list = request.requests();
    BulkRequest requestRetry = new BulkRequest();
    //下面尽量控制一下一次bulk的数量,如果数据过大,条数过多可能出现同步不完全的情况
    for (BulkItemResponse bir : response) {
        if (bir.isFailed()) {
            int docIndex = bir.getItemId();
            IndexRequest ir = (IndexRequest) list.get(docIndex);
            requestRetry.add(new IndexRequest(appConfig.getTestName(), appConfig.getIndexType()).source(ir.sourceAsMap(), XContentType.JSON));
        }
    }
    try {
        //遇到错误,休眠1s后重试
        Thread.sleep(1000);
        BulkResponse responseRetry = restHighLevelClient.bulk(requestRetry, RequestOptions.DEFAULT);
        //重试仍然失败时记录该数据
        exceptionLog(requestRetry, responseRetry);
    } catch (Exception e) {
        log.error("ES同步重试出错!", e);
    }
}

/**
 * 重试结果判断
 */
private void exceptionLog(BulkRequest request, BulkResponse response) {
    List<DocWriteRequest<?>> list = request.requests();
    for (BulkItemResponse bir : response) {
        if (bir.isFailed()) {
            int docIndex = bir.getItemId();
            IndexRequest ir = (IndexRequest) list.get(docIndex);
            //记录失败原因及失败数据
            log.error("同步重试失败reason=[{}],data=[{}]", bir.getFailureMessage(), ir.sourceAsMap().toString());
        }
    }
}

/**************************************修改*******************************************/
/**
 * 根据查询将数据更新,唯一主键
 */
public void updateById(Map<String, Object> data, String indexName, String id) {
    if (log.isDebugEnabled()) {
        log.info("es开始更新数据:{}", JSON.toJSONString(data));
    }
    UpdateRequest request;
    try {
        request = new UpdateRequest(indexName, appConfig.getIndexType(), id).doc(data);
        UpdateResponse response = restHighLevelClient.update(request,  RequestOptions.DEFAULT);
        log.info("更新状态:{}", response.getResult());
    } catch (IOException e) {
        log.error("更新写入异常:{}", e.getMessage(), e);
    }
    if (log.isDebugEnabled()) {
        log.info("es更新数据完成");
    }
}

/**************************************删除*******************************************/
/**
 * 根据条件删除数据
 */
public void deleteByQueryRequest(String score){
    DeleteByQueryRequest delReq = new DeleteByQueryRequest(appConfig.getInstPrcsIndex());
    delReq.setDocTypes(appConfig.getIndexType());
    delReq.setQuery(new TermQueryBuilder("score", score));
    try {
        restHighLevelClient.deleteByQuery(delReq, RequestOptions.DEFAULT);
    } catch (IOException e) {
        e.printStackTrace();
        log.error("ES按条件删除出错: {}", e.getMessage(), e);
    }
}

/**************************************count查询***************************************/
public static long count(String indexName, Demo demo){
    SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
    //构建判断条件
    BoolQueryBuilder boolBuilder = makeQueryParams(demo);
    sourceBuilder.query(boolBuilder);
    CountRequest countRequest = new CountRequest(indexName);
    countRequest.source(sourceBuilder);
    CountResponse countResponse;
    long count = 0L;
    try {
        countResponse = restHighLevelClient.count(countRequest, RequestOptions.DEFAULT);
        count = countResponse != null ? countResponse.getCount() : 0;
    } catch (Exception e) {
        e.printStackTrace();
    }
    return count;
}
/*****************************构建查询参数****************************************/
//构建查询参数
private BoolQueryBuilder makeQueryParams(Demo demo) {
        BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
    //精确查找
    if (demo.getAge() != null) {
        boolQueryBuilder.must(termQuery("age", String.valueOf(demo.getAge())));
    }
    //范围匹配
    if (!StringUtils.isEmpty(demo.getCreateDate())) {
        boolQueryBuilder.must(rangeQuery("createDate").gte(demo.getCreateDate()).format("yyyy-MM-dd"));
    }
    //模糊匹配
    if (!StringUtils.isEmpty(demo.getName())) {
        boolQueryBuilder.must(QueryBuilders.wildcardQuery("name", String.format("*%s*", demo.getName())));
    }
    return boolQueryBuilder;
}

/**************************************分页条件查询***************************************/
private List<Demo> getListFromEs(String indexName, Demo demo) {
    Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create();
    SearchRequest request = new SearchRequest(index);
    SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
    sourceBuilder.query(makeQueryParams(demo));//查询参数
    sourceBuilder.from(demo.getOffset()).size(demo.getPageSize());//分页
    sourceBuilder.sort("age", SortOrder.DESC);//排序字段
    request.source(sourceBuilder);
    
    SearchHit[] hits = new SearchHit[0];
    try {
        hits = restHighLevelClient.search(request, RequestOptions.DEFAULT).getHits().getHits();
    } catch (Exception e) {
        log.error("ES查询出错: {}", e.getMessage(), e);
    }
    
    List<Demo> data = new ArrayList<>();
    for (SearchHit hit : hits) {
        data.add(gson.fromJson(hit.getSourceAsString(), Demo.class));
    }
    return data;
}
Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐