配置文件

        <dependency>
            <groupId>co.elastic.clients</groupId>
            <artifactId>elasticsearch-java</artifactId>
            <version>8.3.3</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.12.3</version>
        </dependency>
        <dependency>
            <groupId>jakarta.json.bind</groupId>
            <artifactId>jakarta.json.bind-api</artifactId>
            <version>2.0.0</version>
        </dependency>
        <dependency>
            <groupId>jakarta.json</groupId>
            <artifactId>jakarta.json-api</artifactId>
            <version>2.0.1</version>
        </dependency>

1.建立工厂


import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class EsClientPoolFactory implements PooledObjectFactory<ElasticsearchClient> {

    @Override
    public PooledObject<ElasticsearchClient> makeObject() throws Exception {

        String esServerHosts ="127.0.0.1:9200";

        List<HttpHost> httpHosts = new ArrayList<>();
        //填充数据
        List<String> hostList = Arrays.asList(esServerHosts.split(","));
        for (int i = 0; i < hostList.size(); i++) {
            String host = hostList.get(i);
            httpHosts.add(new HttpHost(host.substring(0, host.indexOf(":")), Integer.parseInt(host.substring(host.indexOf(":") + 1)), "http"));
        }

        // 创建低级客户端
        RestClient restClient = RestClient.builder(httpHosts.toArray(new HttpHost[0])).build();

        //使用Jackson映射器创建传输层
        ElasticsearchTransport transport = new RestClientTransport(
                restClient, new JacksonJsonpMapper()
        );

        ElasticsearchClient client = new ElasticsearchClient(transport);
        //log.info("对象被创建了" + client);
        return new DefaultPooledObject<>(client);
    }

    @Override
    public void destroyObject(PooledObject<ElasticsearchClient> p) throws Exception {
        ElasticsearchClient elasticsearchClient = p.getObject();
        //log.info("对象被销毁了" + elasticsearchClient);
    }

    @Override
    public boolean validateObject(PooledObject<ElasticsearchClient> p) {
        return true;
    }

    @Override
    public void activateObject(PooledObject<ElasticsearchClient> p) throws Exception {
        //log.info("对象被激活了" + p.getObject());
    }

    @Override
    public void passivateObject(PooledObject<ElasticsearchClient> p) throws Exception {
        //log.info("对象被钝化了" + p.getObject());
    }
}

2.建立连接


import co.elastic.clients.elasticsearch.ElasticsearchClient;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ESClientPool {

    private static Logger logger = LoggerFactory.getLogger(ESClientPool.class);

    // 对象池配置类,不写也可以,采用默认配置
    private static GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();

    // 采用默认配置maxTotal是8,池中有8个client
    static {
        poolConfig.setMaxIdle(200);
        poolConfig.setMaxTotal(20);
        poolConfig.setMinEvictableIdleTimeMillis(1000L*3L);
    }

    // 要池化的对象的工厂类,这个是我们要实现的类
    private static EsClientPoolFactory esClientPoolFactory = new EsClientPoolFactory();

    // 利用对象工厂类和配置类生成对象池
    private static GenericObjectPool<ElasticsearchClient> clientPool = new GenericObjectPool<>(esClientPoolFactory, poolConfig);


    /**
     * 获得对象
     *
     * @return
     * @throws Exception
     */
    public static ElasticsearchClient getClient() throws Exception {
        ElasticsearchClient client = clientPool.borrowObject();
        logger.info("从池中取一个对象"+client);
        return client;
    }

    /**
     * 归还对象
     *
     * @param client
     */
    public static void returnClient(ElasticsearchClient client) throws Exception {
        logger.info("使用完毕之后,归还对象"+client);
        clientPool.returnObject(client);
    }

}

3.创建索引服务接口

    /**
     * 创建索引
     *
     * @param indexName
     * @return
     * @throws Exception
     */
    @ApiOperation(value = "创建索引服务接口")
    @PostMapping("/createIndex")
    public boolean createIndex(@RequestParam("indexName") String indexName) {
        try {
            ElasticsearchClient client = ESClientPool.getClient();
            //创建索引并返回状态
            CreateIndexResponse createIndexResponse = client
                    .indices()
                    .create(c -> c
                            .index(indexName)
                    );
            Boolean acknowledged = createIndexResponse.acknowledged();
            System.out.println("acknowledged = " + acknowledged);
            ESClientPool.returnClient(client);
            return acknowledged;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

4.删除索引服务接口

    /**
     * 删除索引服务接口
     *
     * @return
     */
    @ApiOperation(value = "删除索引服务接口")
    @DeleteMapping("/deleteIndex/{indexName}")
    public boolean deleteIndex(@PathVariable("indexName") String indexName) {
        try {
            ElasticsearchClient client = ESClientPool.getClient();
            boolean exists = exists(client, Arrays.asList(indexName));
            if (!exists) {
                //不存在就结束
                return false;
            }
            DeleteIndexResponse deleteIndexResponse = client
                    .indices()
                    .delete(index -> index
                            .index(indexName)
                    );
            boolean acknowledged = deleteIndexResponse.acknowledged();
            ESClientPool.returnClient(client);
            System.out.println("acknowledged = " + acknowledged);
            return acknowledged;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

    //检验索引是否存在
    public boolean exists(ElasticsearchClient client, List<String> indexName) {
        try {
            boolean value = client
                    .indices()
                    .exists(e -> e
                            .index(indexName)
                    ).value();
            System.out.println("indexexists:  " + value);
            return value;
        } catch (IOException e) {
            e.printStackTrace();
        }
        return false;
    }

5.查看索引信息服务接口

    /**
     * 查看索引信息
     *
     * @param indexName
     * @return
     * @throws Exception
     */
    @ApiOperation(value = "查看索引信息服务接口")
    @GetMapping("/getIndex")
    public Map<String, Object> getIndexMsg(@RequestParam("indexName") String indexName) {
        try {
            Map<String, Object> map = new HashMap<>();
            ElasticsearchClient client = ESClientPool.getClient();
            GetIndexResponse getIndexResponse = client
                    .indices()
                    .get(getIndex -> getIndex
                            .index(indexName)
                    );
            Map<String, IndexState> result = getIndexResponse.result();
            ESClientPool.returnClient(client);
            map.put("data", result.toString());
            return map;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

6.索引存在校验服务接口

    /**
     * 索引是否存在
     *
     * @param indexName
     * @return
     */
    @ApiOperation(value = "索引存在校验服务接口")
    @GetMapping("/checkIndex")
    public boolean exists(@RequestParam("indexName") String indexName) {
        try {
            ElasticsearchClient client = ESClientPool.getClient();
            boolean value = client
                    .indices()
                    .exists(e -> e
                            .index(indexName)
                    ).value();
            System.out.println("indexexists:  " + value);
            ESClientPool.returnClient(client);
            return value;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

7.批量添加文档信息

//请求实体类

import com.es.esdemo.commons.entity.UserDO;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;

import java.io.Serializable;
import java.util.List;

@Data
public class UserRequestDTO implements Serializable {

    @ApiModelProperty("索引")
    private String index;

    @ApiModelProperty("数据集合")
    private List<UserDO> userList;

    @ApiModelProperty("数据ids")
    private List<String> ids;
}


//用户实体类
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;

import java.io.Serializable;

@Data
public class UserDO implements Serializable {

    @ApiModelProperty("用户id")
    private String id;

    @ApiModelProperty("姓名")
    private String name;

    @ApiModelProperty("性别")
    private String sex;

    @ApiModelProperty("年龄")
    private Integer age;

    @ApiModelProperty("索引")
    private String index;

    @ApiModelProperty("实体类")
    private Object obj;

}


    /**
     * 添加文档信息
     */
    @ApiOperation(value = "添加文档信息服务接口")
    @PostMapping("/addDocuments")
    public long createDocument(@RequestBody UserRequestDTO dto) {
        try {
            ElasticsearchClient client = ESClientPool.getClient();
            IndexResponse indexResponse = client
                    .index(x -> x
                            .index(dto.getIndex())
                            .document(dto.getUserList().get(0))
                    );
            long version = indexResponse.version();
            ESClientPool.returnClient(client);
            return version;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return 0;
    }

8.更新文档信息服务接口

    /**
     * 修改文档自定义属性
     *
     * @return version
     * @throws Exception
     */
    @ApiOperation(value = "更新文档信息服务接口")
    @PostMapping("/updateDocuments")
    public long updateDocument(@RequestBody UserRequestDTO dto) {
        try {
            ElasticsearchClient client = ESClientPool.getClient();
            UpdateResponse<UserDO> userUpdateResponse = client
                    .update(x -> x
                            .index(dto.getIndex())
                            .id(dto.getUserList().get(0).getId())
                            .doc(dto.getUserList().get(0)), UserDO.class);
            long version = userUpdateResponse.version();
            ESClientPool.returnClient(client);
            return version;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return 0;
    }

9.批量删除文档

 /**
     * bulk批量删除文档记录
     *
     * @return List<BulkResponseItem>
     * @throws Exception
     */
    @ApiOperation(value = "批量删除文档信息服务接口")
    @DeleteMapping("/batchDeleteDocuments")
    public List<BulkResponseItem> delDocByIds(UserRequestDTO dto) {
        try {
            ElasticsearchClient client = ESClientPool.getClient();
            // 构建批量操作对象BulkOperation的集合
            List<BulkOperation> bulkOperations = new ArrayList<>();
            // 向集合中添加需要删除的文档id信息
            for (int i = 0; i < dto.getIds().size(); i++) {
                int finalI = i;
                bulkOperations.add(BulkOperation.of(b -> b
                        .delete((d -> d
                                .index(dto.getIndex())
                                .id(dto.getIds().get(finalI)
                                )
                        ))
                ));
            }
            // 调用客户端的bulk方法,并获取批量操作响应结果
            BulkResponse response = client
                    .bulk(e -> e
                            .index(dto.getIndex())
                            .operations(bulkOperations));
            return response.items();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

10.高亮 自定义颜色

    /**
     * 高亮 自定义颜色
     *
     * @param index 索引
     * @param field 字段
     * @param value 参数
     * @param color 颜色 (支持rgb格式颜色)
     * @return
     * @throws Exception
     */
    @ApiOperation(value = "自定义高亮查询 自定义颜色不填默认黄色")
    @PostMapping("/advancedQueryByHighLightColor")
    public Object advancedQueryByHighLightColor(String index, String field, String value, String color) throws Exception {
        ElasticsearchClient client = ESClientPool.getClient();
        SearchResponse<Object> searchResponse = client.search(e -> e
                        .index(index)
                        .query(q -> q
                                .term(t -> t
                                        .field(field)
                                        .value(value)
                                )
                        )
                        .highlight(h -> h
                                .fields(field, f -> f
                                        .preTags("<font color='" + (StringUtils.isEmpty(color) ? "yellow" : color) + "'>")
                                        .postTags("</font>")
                                )
                        )
                , Object.class);

        Object source = searchResponse.hits().hits();
        ESClientPool.returnClient(client);
        return source.toString();
    }

11.文档查询(相当于mysql in 方法)

    /**
     * term匹配 多次匹配
     *
     * @param index
     * @param field       字段
     * @param fieldValues 多个参数相当于 mysql in语法
     * @return Object
     * @throws Exception
     */
    @ApiOperation(value = "term匹配 多次匹配")
    @PostMapping("/advancedQueryByTerms")
    public Object advancedQueryByTerms(String index, String field, List<FieldValue> fieldValues) throws Exception {
        ElasticsearchClient client = ESClientPool.getClient();
        SearchResponse<Object> user_test = client.search(e -> e
                        .index(index)
                        .query(q -> q
                                .terms(t -> t
                                        .field(field)
                                        .terms(terms -> terms
                                                .value(fieldValues)
                                        )
                                )
                        )
                , Object.class);
        HitsMetadata<Object> hits = user_test.hits();
        TotalHits total = hits.total();
        List<Hit<Object>> hits1 = hits.hits();
        Object source = hits1.get(0).source();
        ESClientPool.returnClient(client);
        return source;
    }

12.分页查询

    /**
     * 分页查询
     *
     * @param index 索引
     * @param from  页数
     * @param size  条数
     * @return
     * @throws Exception
     */
    @ApiOperation(value = "分页查询")
    @PostMapping("/advancedQueryByPage")
    public Object advancedQueryByPage(String index, Integer from, Integer size) throws Exception {
        ElasticsearchClient client = ESClientPool.getClient();
        SearchResponse<Object> searchResponse = client.search(e -> e
                        .index(index)
                        .query(q -> q
                                .matchAll(m -> m)
                        )
                        .from(from)
                        .size(size)
                , Object.class);

        Object source = searchResponse.hits().hits().get(0).source();
        ESClientPool.returnClient(client);
        return source;

    }

13.match 匹配 文档类型为text会自动分词,默认keyword字段不支持分词

/**
     * match 匹配 文档类型为text会自动分词,默认keyword字段不支持分词
     *
     * @param indexName 索引
     * @param field     查询字段
     * @param query     查询内容
     * @param slop      间隔字符数量
     * @param sortField 排序字段
     * @param order     排序方式
     * @param from      页数
     * @param size      条数
     * @return
     * @throws Exception
     */
    @ApiOperation(value = "中文分词查询 (文档类型为text会自动分词,默认keyword字段不支持分词)")
    @PostMapping("/queryMatch")
    public Object queryMatch(String indexName, String field, String query, Integer slop, Integer from, Integer size) throws Exception {
        ElasticsearchClient client = ESClientPool.getClient();
        SearchResponse<Object> searchResponse = client.search(e -> e
                        .index(indexName)
                        .query(q -> q
                                .matchPhrase(m -> m
                                        .field(field)
                                        .query(query)
                                        .slop(slop)
                                )
                        )
                        .from(from)
                        .size(size)
                , Object.class);
        Object source = searchResponse.hits().hits().get(0).source();
        ESClientPool.returnClient(client);
        return source;
    }

14.matchPhrasePrefix 分词匹配

    /**
     * matchPhrase短语匹配
     *
     * @param indexName 索引
     * @param field     查询字段
     * @param query     查询内容
     * @param slop      间隔字符数量
     * @param sortField 排序字段
     * @param order     排序方式
     * @param from      页数
     * @param size      条数
     * @return
     * @throws Exception
     */
    @ApiOperation(value = "拼音分词查询 (索引需要加上 analysis 配置才能生效)")
    @PostMapping("/matchPhrasePrefix")
    public Object matchPhrasePrefix(String indexName, String field, String query, Integer slop,  Integer from, Integer size) throws Exception {
        ElasticsearchClient client = ESClientPool.getClient();
        SearchResponse<Object> searchResponse = client.search(e -> e
                        .index(indexName)
                        .query(q -> q
                                .matchPhrasePrefix(m -> m
                                        .field(field)
                                        .query(query)
                                        .slop(slop)
                                )
                        )
                        .from(from)
                        .size(size)
                , Object.class);
        Object source = searchResponse.hits().hits();
        ESClientPool.returnClient(client);
        return source;
    }

 /**
     * 需要使用此项配置才能生效拼音分词查询
     * PUT /milk
     * {
     *   "settings": {
     *     "analysis": {
     *       "filter": {
     *         "pinyin_filter":{
     *           "type":"pinyin",
     *           "keep_separate_first_letter" : false,
     *           "keep_full_pinyin" : true,
     *           "keep_original" : true,
     *           "limit_first_letter_length" : 16,
     *           "lowercase" : true,
     *           "remove_duplicated_term" : true
     * }
     *       },
     *       "analyzer": {
     *         "ik_pinyin_analyzer":{
     *           "tokenizer":"standard",
     *            "filter":["pinyin_filter"]
     *         }
     *       }
     *     }
     *   },
     *   "mappings": {
     *     "properties": {
     *       "brand":{
     *         "type": "text",
     *         "analyzer": "ik_pinyin_analyzer"
     *       },
     *       "series":{
     *         "type": "text",
     *         "analyzer": "ik_pinyin_analyzer"
     *       },
     *       "price":{
     *         "type": "float"
     *       }
     *     }
     *   }
     * }
     *
     * POST _bulk
     * {"index":{"_index":"milk", "_id":1}}
     * {"brand":"蒙牛", "series":"特仑苏", "price":60}
     * {"index":{"_index":"milk", "_id":2}}
     * {"brand":"蒙牛", "series":"真果粒", "price":40}
     * {"index":{"_index":"milk", "_id":3}}
     * {"brand":"华山牧", "series":"华山牧", "price":49.90}
     * {"index":{"_index":"milk", "_id":4}}
     * {"brand":"伊利", "series":"安慕希", "price":49.90}
     * {"index":{"_index":"milk", "_id":5}}
     * {"brand":"伊利", "series":"金典", "price":49.90}
     *
     * POST milk/_search
     * {
     *
     *   "query": {
     *     "match_phrase_prefix": {
     *       "series": "l"
     *     }
     *   }
     * }
     */

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐