前些天看完了一本关于elasticsearch的书籍,并且做了一个elasticsearch相关项目,对与es也算是有了一定程度的了解,不过看书向来都是一边看一边忘的,以此文章记录一些es的简单用法。

依赖

由于本人用的es版本为es 7.2的镜像,故所有依赖都是es 7.2 版本。以下为依赖代码。

	<dependency>
	    <groupId>org.elasticsearch</groupId>
	    <artifactId>elasticsearch</artifactId>
	    <version>7.2.0</version>
	</dependency>
	<!--这个依赖 7.x 版本可以不添加,但是由于本人代码用到,所以不去除-->
	<dependency>
	    <groupId>org.elasticsearch.client</groupId>
	    <artifactId>transport</artifactId>
	    <version>7.2.0</version>
	</dependency>
	<dependency>
	    <groupId>org.elasticsearch.client</groupId>
	    <artifactId>elasticsearch-rest-high-level-client</artifactId>
	    <version>7.2.0</version>
	</dependency>
	<dependency>
	    <groupId>org.elasticsearch.client</groupId>
	    <artifactId>elasticsearch-rest-client</artifactId>
	    <version>7.2.0</version>
	</dependency>

配置

下面为ElasticsearchConfig的配置代码

import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.net.InetAddress;

/**
 * @Author lidai
 * @Date 2020/1/7 16:20
 */
@Configuration
public class ElasticsearchConfig {

    @Value("${elasticsearch.host}")
    private String host;

    @Value("${elasticsearch.port}")
    private int port;

    @Value("${elasticsearch.scheme}")
    private String scheme;

    @Value("${elasticsearch.timeout}")
    private int timeout;

    @Bean(name = "highLevelClient")
    public RestHighLevelClient restHighLevelClient() {
        //可以传httpHost数组
        RestClientBuilder builder = RestClient.builder(new HttpHost(host, port, scheme));
        builder.setRequestConfigCallback(requestConfigBuilder -> {
            //设置超时
            return requestConfigBuilder.setSocketTimeout(timeout);
        });
        return new RestHighLevelClient(builder);
    }

    /**
     * es7 已废弃 TransportClient,但是代码记录中用到,故留存。7.x版本可以直接舍弃
     *
     * @return
     */
    @Bean
    public TransportClient transportClient() {
        try {
            Settings settings = Settings.builder().put("cluster.name", "elasticsearch")
                    .put("client.transport.sniff", true)
                    .build();
            TransportClient transportClient = new PreBuiltTransportClient(settings);
            TransportAddress address = new TransportAddress(InetAddress.getByName(host), port);
            transportClient.addTransportAddress(address);
            return transportClient;
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }
}

工具类

es 6.x中好像在存储或修改时不能直接使用json字符串了(我记得是这样,如果能用就更好了),所以需要使用XContentBuilder来对字段进行处理,以下是一个处理的工具类

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Map;

/**
 * @Author lidai
 * @Date 2020/1/7 18:14
 */
public class ElasticsearchUtils {

    /**
     * 将实体转化为 XContentBuilder 以便存储es
     *
     * @param xContentBuilder
     * @param object
     * @return
     */
   public static XContentBuilder objectToXContentBuilder(XContentBuilder xContentBuilder, Object object) {
        try {
            JSONObject jsonObject = JSON.parseObject(JSON.toJSONString(object));
            xContentBuilder.startObject();
            for (Map.Entry<String, Object> entry : jsonObject.entrySet()) {
                xContentBuilder.field(entry.getKey(), entry.getValue());
            }
            xContentBuilder.endObject();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return xContentBuilder;
    }
}

示例代码

以下为增删改查的Java示例代码 ,目前代码都是es6.x版本,7.x版本有时间在加上去。值得注意的是es7.x中 TransportClient类已经过期。所以以下示例代码中凡是使用TransportClient的示例代码均为 elasticsearch 6.x 版本。

import com.alibaba.fastjson.JSONObject;
import com.demo.interview.es.domain.ElasticsearchPage;
import com.demo.interview.es.domain.Person;
import com.demo.interview.es.utils.ElasticsearchUtils;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.indices.analyze.AnalyzeRequest;
import org.elasticsearch.action.admin.indices.analyze.AnalyzeResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.io.IOException;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

/**
 * @Author lidai
 * @Date 2020/1/7 18:12
 */
@Slf4j
@RestController
@RequestMapping("/elasticsearch")
public class ElasticsearchController {

    static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
    static DateTimeFormatter formatter2 = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
    static Person person = new Person(1L, "test", 20, 0, LocalDate.parse("2020-01-01").format(formatter), "备注", LocalDateTime.now().format(formatter2));
    final String DEMO_INDEX = "demo_index";
    final String DEMO_TYPE = "demo_type";

    @Autowired
    @Qualifier(value = "highLevelClient")
    private RestHighLevelClient highLevelClient;

    @Autowired
    private TransportClient transportClient;

    /**
     * 新增
     *
     * @param person
     * @throws Exception
     */
    public void createIndex(Person person) throws Exception {
        XContentBuilder xContentBuilder = XContentFactory.jsonBuilder();
        xContentBuilder = ElasticsearchUtils.objectToXContentBuilder(xContentBuilder, person);
        //创建索引
        IndexResponse indexResponse = transportClient.prepareIndex(DEMO_INDEX, DEMO_TYPE, person.getId().toString())
                .setSource(xContentBuilder)
                //立即生效,无此需求可以不设置
                .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
                .execute().actionGet();
    }

    /**
     * 修改
     *
     * @param object
     * @throws IOException
     */
    public void updateIndex(Person person) throws IOException {
        XContentBuilder xContentBuilder = XContentFactory.jsonBuilder();
        xContentBuilder = ElasticsearchUtils.objectToXContentBuilder(xContentBuilder, person);
        UpdateResponse updateResponse = transportClient.prepareUpdate(DEMO_INDEX, DEMO_TYPE, person.getId().toString())
                .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
                .setDoc(xContentBuilder).get();
    }

    /**
     * 批量修改
     *
     * @param ids
     * @throws IOException
     */
    public void batchUpdate(String[] ids) throws IOException {
        BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
        for (String id : ids) {
            bulkRequestBuilder.add(transportClient.prepareUpdate(DEMO_INDEX, DEMO_TYPE, id)
                    .setDoc(XContentFactory.jsonBuilder().startObject().field("deleted", 1).endObject()));
        }
        BulkResponse response = bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
        if (response.hasFailures()) {
            throw new ElasticsearchException(response.buildFailureMessage());
        }
    }

    /**
     * 批量新增
     *
     * @param persons
     * @throws IOException
     */
    public void batchInsert(List<Person> persons) throws IOException {
        XContentBuilder xContentBuilder = XContentFactory.jsonBuilder();
        BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
        //添加es
        for (Person person : persons) {
            //注意批量操作时数据为null会报异常
            bulkRequestBuilder.add(transportClient.prepareIndex(DEMO_INDEX, DEMO_TYPE, person.getId().toString())
                    .setSource(ElasticsearchUtils.objectToXContentBuilder(xContentBuilder, person)));
        }
        BulkResponse responses = bulkRequestBuilder.get();
        if (responses.hasFailures()) {
            throw new ElasticsearchException(responses.buildFailureMessage());
        }
    }

    /**
     * 搜索示例
     *
     * @param params
     * @return
     */
    public ElasticsearchPage<Person> searchDemo(Map<String, Object> params) {
        BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
        boolQueryBuilder.must(QueryBuilders.termQuery("deleted", 0));
        String keyword = (String) params.get("keyword");
        Integer news = (Integer) params.get("news");
        Integer status = (Integer) params.get("status");
        Integer columnCategory = (Integer) params.get("columnCategory");
        String year = (String) params.get("year");
        Integer pageSize = (Integer) params.get("pageSize");
        Integer pageNum = (Integer) params.get("pageNum");
        String[] messageColumn = {"messageTitle", "messageSource", "content", "tags", "issuer"};
        //keyword字段类别搜索
        if (news != null && StringUtils.hasText(keyword)) {
            switch (news) {
                case 1:
                    boolQueryBuilder.must(QueryBuilders.multiMatchQuery(keyword, messageColumn));
                    break;
                case 2:
                    boolQueryBuilder.must(QueryBuilders.matchQuery("messageTitle", keyword));
                    break;
                case 3:
                    boolQueryBuilder.must(QueryBuilders.matchQuery("content", keyword));
                    break;
                case 4:
                    boolQueryBuilder.must(QueryBuilders.matchQuery("messageSource", keyword));
                    break;
                case 5:
                    boolQueryBuilder.must(QueryBuilders.matchQuery("tags", keyword));
                    break;
                case 6:
                    boolQueryBuilder.must(QueryBuilders.matchQuery("issuer", keyword));
                    break;
                default:
                    break;
            }
        }
        //发布状态搜索
        if (status != null) {
            switch (status) {
                case 1:
                    boolQueryBuilder.must(QueryBuilders.matchAllQuery());
                    break;
                case 2:
                    boolQueryBuilder.must(QueryBuilders.termQuery("status", 2));
                    break;
                case 3:
                    boolQueryBuilder.must(QueryBuilders.termQuery("status", 3));
                    break;
                default:
                    break;
            }
        }
        //栏目类别搜索
        if (columnCategory != null) {
            switch (columnCategory) {
                case 1:
                    boolQueryBuilder.must(QueryBuilders.matchAllQuery());
                    break;
                case 2:
                    boolQueryBuilder.must(QueryBuilders.termQuery("columnCategory", 2));
                    break;
                case 3:
                    boolQueryBuilder.must(QueryBuilders.termQuery("columnCategory", 3));
                    break;
                default:
                    break;
            }
        }
        //年份搜索,多年分传参形式为{"2016","2017","2018","2019","2020"}
        if (StringUtils.hasText(year)) {
            try {
                Integer y = Integer.valueOf(year);
                boolQueryBuilder.must(QueryBuilders.rangeQuery("updateDate").from(year + "-1-1").to(year + "-12-31"));
            } catch (NumberFormatException e) {
                String[] yearList = year.split(",");
                int max = Arrays.stream(yearList).mapToInt(Integer::valueOf).max().getAsInt();
                int min = Arrays.stream(yearList).mapToInt(Integer::valueOf).min().getAsInt();
                boolQueryBuilder.must(QueryBuilders.rangeQuery("updateDate").from(min + "-1-1").to(max + "-12-31"));
            }
        }
        SearchResponse searchResponse = transportClient.prepareSearch(DEMO_INDEX)
                .setTypes(DEMO_TYPE)
                .setQuery(boolQueryBuilder)
                .setFrom((pageNum - 1) * pageSize)
                .setSize(pageSize)
                .addSort("status", SortOrder.DESC)
                .addSort("updateDate", SortOrder.DESC)
                .addSort("id", SortOrder.DESC)
                .setExplain(true)
                .execute()
                .actionGet();
        SearchHits hits = searchResponse.getHits();
        int totalHits = (int) hits.getTotalHits().value;
        SearchHit[] searchHits = hits.getHits();
        List<Person> persons = new ArrayList<>(10);
        for (SearchHit searchHit : searchHits) {
            String sourceAsString = searchHit.getSourceAsString();
            Person person = JSONObject.parseObject(sourceAsString, Person.class);
            persons.add(person);
        }
        return ElasticsearchPage.<Person>of(pageNum, pageSize, totalHits, persons);
    }

    /**
     * 查询分词结果,str为要分析的字符串,使用|将分词结果隔开
     *
     * @param str
     * @return
     */
    public String queryAnalyzer(String str) {
        if (StringUtils.hasText(str)) {
            AnalyzeRequest analyzeRequest = new AnalyzeRequest(DEMO_INDEX).text(str)
                    .analyzer("ik_max_word");
            List<AnalyzeResponse.AnalyzeToken> tokens = this.transportClient.admin().indices().analyze(analyzeRequest)
                    .actionGet().getTokens();
            StringBuilder sb = new StringBuilder();
            for (AnalyzeResponse.AnalyzeToken token : tokens) {
                sb.append(token.getTerm());
                sb.append("|");
            }
            str = sb.toString().substring(0, sb.toString().length() - 1);
        }
        return str;
    }

}

以上代码为求方便并未测试(后续有时间会测试),有问题欢迎指出,谢谢。

7.2.0版本DEMO示例

package com.learn.admin.es.controller;

import cn.hutool.core.map.MapUtil;
import com.alibaba.fastjson.JSON;
import com.learn.admin.es.model.Person;
import lombok.SneakyThrows;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.web.bind.annotation.*;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;

/**
 * @author LD
 * @date 2021/5/20 16:51
 */
@RestController
@RequestMapping("/es")
public class ESController {

    private final RestHighLevelClient restHighLevelClient;

    public ESController(RestHighLevelClient restHighLevelClient) {
        this.restHighLevelClient = restHighLevelClient;
    }

    @GetMapping("/create_index")
    public void test() throws Exception {
        CreateIndexRequest request = new CreateIndexRequest("test_index");
        CreateIndexResponse createIndexResponse = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
        System.out.println(createIndexResponse.index());
    }

    @GetMapping("/index_exists")
    public void test2() throws Exception {
        GetIndexRequest getIndexRequest = new GetIndexRequest("test_index");
        boolean exists = restHighLevelClient.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
        System.out.println(exists);
    }

    @GetMapping("/delete_index")
    public void test3() throws Exception {
        DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest("test_index");
        AcknowledgedResponse delete = restHighLevelClient.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);
        System.out.println(delete.isAcknowledged());
    }

    @PostMapping("/add")
    public void test4() throws Exception {
        Person ming = new Person("小明就是小明", 20);
        IndexRequest request = new IndexRequest("test_index").id("1").source(JSON.toJSONString(ming), XContentType.JSON);

        //设置超时
        request.timeout(TimeValue.timeValueSeconds(3));
        request.timeout("3s");

        IndexResponse response = restHighLevelClient.index(request, RequestOptions.DEFAULT);
        System.out.println(response.toString());
    }

    @GetMapping("/get")
    public void test5() throws Exception {
        GetRequest request = new GetRequest("test_index", "1");
        GetResponse response = restHighLevelClient.get(request, RequestOptions.DEFAULT);
        System.out.println(response.getSourceAsMap());
        System.out.println(response.getSourceAsString());
    }


    @GetMapping("/exists")
    public void test6() throws Exception {
        GetRequest request = new GetRequest("test_index", "1");
        //不获取 _source 中的字段信息
        request.fetchSourceContext(new FetchSourceContext(false));

        boolean exists = restHighLevelClient.exists(request, RequestOptions.DEFAULT);
        System.out.println(exists);
    }


    @PutMapping("/update")
    @SneakyThrows
    public void test7() {
        Person ming = new Person(999);
        //只更新年龄
        UpdateRequest request = new UpdateRequest("test_index", "1").doc(JSON.toJSONString(ming), XContentType.JSON);
        UpdateResponse response = restHighLevelClient.update(request, RequestOptions.DEFAULT);
        System.out.println(response.toString());
    }

    @DeleteMapping("/delete")
    @SneakyThrows
    public void test8() {
        DeleteRequest request = new DeleteRequest("test_index", "3");
        DeleteResponse response = restHighLevelClient.delete(request, RequestOptions.DEFAULT);

        if (response.status() == RestStatus.NOT_FOUND) {
            System.out.println("没有数据要删除");
        }
        System.out.println(response.toString());
    }

    @PostMapping("/bulk_add")
    @SneakyThrows
    public void test9() {
        List<Person> people = List.of(
                new Person("2", "德玛西亚之力", 33),
                new Person("3", "艾瑞利亚", 18),
                new Person("4", "诺克萨斯之手", 40));
        BulkRequest bulkRequest = new BulkRequest("test_index");
        bulkRequest.timeout(TimeValue.timeValueSeconds(10));
        for (Person person : people) {
            bulkRequest.add(new IndexRequest().id(person.getId()).source(JSON.toJSONString(person), XContentType.JSON));
        }
        BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
        if (bulkResponse.hasFailures()) {
            System.out.println(bulkResponse.buildFailureMessage());
        }
        System.out.println(bulkResponse.toString());
    }


    @GetMapping("/search")
    @SneakyThrows
    public void test10() {
        SearchRequest searchRequest = new SearchRequest("test_index");
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

        //查询所有
        searchSourceBuilder.query(QueryBuilders.matchQuery("name", "小明"));
        searchSourceBuilder.from(0).size(10);
        searchSourceBuilder.sort("age", SortOrder.ASC);

        //只需要 name age 两个字段
        String[] includeFields = new String[]{"name", "age"};
        searchSourceBuilder.fetchSource(includeFields, Strings.EMPTY_ARRAY);

        searchSourceBuilder.timeout(TimeValue.timeValueSeconds(10));

        //高亮
        HighlightBuilder highlightBuilder = new HighlightBuilder();
        highlightBuilder.field("name");
        highlightBuilder.preTags("<span style='color:red'>");
        highlightBuilder.postTags("</span>");
        searchSourceBuilder.highlighter(highlightBuilder);

        searchRequest.source(searchSourceBuilder);

        SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
        SearchHits hits = response.getHits();
        long total = hits.getTotalHits().value;
        System.out.println(total);
        for (SearchHit hit : hits) {
            Map<String, Object> sourceAsMap = hit.getSourceAsMap();

            Map<String, HighlightField> highlightFields = hit.getHighlightFields();
            HighlightField name = highlightFields.get("name");

            //替换高亮字段
            Optional.ofNullable(name).ifPresent(var -> {
                String n_name = Stream.of(var.fragments()).map(Text::string).reduce(String::concat)
                        .orElse(MapUtil.getStr(sourceAsMap, "name"));
                sourceAsMap.put("name", n_name);
            });

            System.out.println(sourceAsMap);
        }

    }
}

突然发现 7.x中的Client 过期了,推荐使用 ElasticsearchClient,于是写了几个小demo

<!--官方推荐从高级rest客户端迁移-->
<dependency>
   <groupId>co.elastic.clients</groupId>
   <artifactId>elasticsearch-java</artifactId>
   <version>7.16.2</version>
</dependency>

使用方式也很简单,这个api采用了链式调用和lambda相结合的方式调用。虽然看起来调用简单了很多,但是并不推荐全部采用lambda,写起来爽了,后面看起来相当费力。demo如下:

@Configuration
@EnableConfigurationProperties(value = ElasticsearchProperties.class)
public class ElasticsearchClientConfig {

    @Autowired
    private ElasticsearchProperties properties;

    @Bean
    public ElasticsearchClient elasticsearchClient() {
        RestClient restClient = RestClient
                .builder(new HttpHost(properties.getHost(), properties.getPort()))
                .setHttpClientConfigCallback(builder -> {
                    CredentialsProvider provider = new BasicCredentialsProvider();
                    provider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(properties.getUsername(), properties.getPassword()));
                    builder.setDefaultCredentialsProvider(provider);
                    return builder;
                }).build();

        ElasticsearchTransport transport = new RestClientTransport(
                restClient, new JacksonJsonpMapper());

        return new ElasticsearchClient(transport);
    }

}

package com.it.learn.boooom.service;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.BulkIndexByScrollFailure;
import co.elastic.clients.elasticsearch._types.FieldSort;
import co.elastic.clients.elasticsearch._types.FieldValue;
import co.elastic.clients.elasticsearch._types.Script;
import co.elastic.clients.elasticsearch._types.SortOrder;
import co.elastic.clients.elasticsearch._types.aggregations.Aggregate;
import co.elastic.clients.elasticsearch._types.aggregations.Aggregation;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import co.elastic.clients.elasticsearch._types.query_dsl.RangeQuery;
import co.elastic.clients.elasticsearch._types.query_dsl.TermQuery;
import co.elastic.clients.elasticsearch.core.BulkRequest;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import co.elastic.clients.elasticsearch.core.IndexRequest;
import co.elastic.clients.elasticsearch.core.IndexResponse;
import co.elastic.clients.elasticsearch.core.SearchRequest;
import co.elastic.clients.elasticsearch.core.SearchResponse;
import co.elastic.clients.elasticsearch.core.UpdateByQueryResponse;
import co.elastic.clients.elasticsearch.core.UpdateResponse;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
import co.elastic.clients.elasticsearch.core.search.Hit;
import co.elastic.clients.json.JsonData;
import com.alibaba.fastjson2.JSON;
import com.google.common.collect.ImmutableList;
import com.it.learn.boooom.entity.Users;
import com.it.learn.boooom.util.RandomUtil;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.stream.Collectors;

/**
 * @author lidai5
 * @date 2022/9/6 14:21
 */
@Slf4j
@SpringBootTest
@RunWith(SpringJUnit4ClassRunner.class)
public class ES8QueryTest {

    @Autowired
    private ElasticsearchClient esClient;

    private static final String USER_INDEX = "user_info_test";

    @Test
    public void testIndex() {
        Users users = randomUsers(1).get(0);
        IndexRequest.Builder<Users> builder = new IndexRequest.Builder<>();
        try {
            IndexResponse res = esClient.index(builder.index(USER_INDEX).document(users).build());
            log.info("res = {}", JSON.toJSONString(res));
        } catch (IOException e) {
            log.error("user index exception", e);
        }
    }

    @Test
    public void testIndex2() {
        Users users = randomUsers(1).get(0);
        try {
            IndexResponse res = esClient.index(builder -> builder.index(USER_INDEX).document(users));
            log.info("res = {}", JSON.toJSONString(res));
        } catch (IOException e) {
            log.error("user index exception", e);
        }
    }

    @Test
    public void update() {
        Users users = randomUsers(1).get(0);
        try {
            UpdateResponse<Users> updateRes = esClient.update(builder -> builder.index(USER_INDEX).id("sXuaEYMBpjrRVTcwS680").doc(users), Users.class);
            log.info("res = {}", JSON.toJSONString(updateRes));
        } catch (IOException e) {
            log.error("user update exception", e);
        }
    }

    @Test
    public void updateByQuery() {
        try {
            Query age = new RangeQuery.Builder().field("age").gt(JsonData.of(40)).build()._toQuery();
            Query userId = new TermQuery.Builder().field("userId").value(FieldValue.of(123)).build()._toQuery();
            Script script = new Script.Builder().inline(inline -> inline.source("ctx._source.username = 'update by query'")).build();
            UpdateByQueryResponse updateByQuery = esClient.updateByQuery(builder -> builder.index(USER_INDEX)
                    .query(query -> query.bool(bool -> bool.filter(ImmutableList.of(age, userId))))
                    .script(script));
            List<BulkIndexByScrollFailure> failures = updateByQuery.failures();
            log.info(JSON.toJSONString(updateByQuery));
        } catch (IOException e) {
            log.error("user update exception", e);
        }
    }

    @Test
    public void testBulk() {
        List<Users> users = randomUsers(10);
        List<BulkOperation> bulkOperations = users.stream()
                .map(user -> BulkOperation.of(
                        builder -> builder.index(index -> index.document(user).id(String.valueOf(user.getUserId())))))
                .collect(Collectors.toList());
        BulkRequest bulkRequest = new BulkRequest.Builder().index(USER_INDEX).operations(bulkOperations).build();
        try {
            BulkResponse bulkResponse = esClient.bulk(bulkRequest);
        } catch (IOException e) {
            log.error("user bulk exception", e);
        }
    }

    @Test
    public void testSearch() {

        SearchRequest request = new SearchRequest.Builder()
                .index(USER_INDEX)
                .from(0).size(20)
                .query(query -> {
                    query.bool(bool -> {
                        bool.must(must -> {
                            must.range(range -> range.field("create_time").gt(JsonData.of("2022-09-08 00:00:00")));
                            must.match(match -> match.field("username").query(FieldValue.of("诸铌痦")));
                            return must;
                        });
                        return bool;
                    });
                    return query;
                }).sort(sort -> {
                    sort.field(FieldSort.of(fs -> fs.field("age").order(SortOrder.Asc).missing(FieldValue.of(0))));
                    return sort;
                }).aggregations("avgAge", Aggregation.of(agg -> {
                    //过滤无年龄的数据
                    agg.filter(query -> query.bool(bool -> bool.mustNot(not -> not.exists(ex -> ex.field("age")))));
                    //聚合年龄平均值
                    return agg.avg(avg -> avg.field("age"));
                }))
                .build();
        System.out.println(request.toString());

        try {
            SearchResponse<Users> search = esClient.search(request, Users.class);
            List<Hit<Users>> hits = search.hits().hits();
            List<Users> users = hits.stream().map(Hit::source).collect(Collectors.toList());
            long totalCount = search.hits().total().value();
            Map<String, Aggregate> aggregations = search.aggregations();
            Aggregate avgAge = aggregations.get("avgAge");
            double value = avgAge.avg().value();
            log.info("search success. total = {},users = {},ageValue = {}", totalCount, JSON.toJSONString(users), value);
        } catch (IOException e) {
            log.error("users search error", e);
        }

    }


    public List<Users> randomUsers(int count) {
        List<Users> usersList = new ArrayList<>();
        for (int i = 0; i < count; i++) {
            Users u = new Users();
            u.setUserId((long) UUID.randomUUID().toString().hashCode());
            u.setUsername(RandomUtil.randomUserName());
            u.setAge(new Random().nextInt(100));
            u.setPassword(RandomUtil.randomPW(12));
            u.setCreateTime(new Date());
            u.setUpdateTime(new Date());
            usersList.add(u);
        }
        return usersList;
    }
}

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐