添加依赖

<dependencies>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>7.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>7.8.0</version>
        </dependency>


        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.9.9</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>2.8.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.8.2</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.6</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.72</version>
        </dependency>

    </dependencies>

代码 (没自测过!!!)

package com.lh.es.common.util;


import org.apache.http.HttpHost;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import sun.misc.BASE64Encoder;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;

/**
 * ES 工具类
 **/
public class EsUtil {


    private static final Map<String, RestHighLevelClient> clientMap = new HashMap<String, RestHighLevelClient>();
    private static final Map<String, RestClientBuilder> restBuilderMap = new HashMap<String, RestClientBuilder>();
    
    /**
     * 创建不带权限的客户端
     * @param ip
     * @param port
     * @return
     */
    public static RestHighLevelClient getClient(String ip, int port) {
        String clientKey = ip + port;
        try {
            RestHighLevelClient clientSingle = clientMap.get(clientKey);


            if (clientSingle == null) {
                RestClientBuilder builder;
                if (restBuilderMap.get(clientKey) == null) {
                    builder = RestClient.builder(getHttpHosts(ip, port));
                    restBuilderMap.put(clientKey, builder);
                } else {
                    builder = restBuilderMap.get(clientKey);
                }

                RestHighLevelClient client = new RestHighLevelClient(builder);
                clientMap.put(clientKey, client);
                return client;
            } else {
                return clientSingle;
            }

        } catch (Exception e) {
            e.printStackTrace();
            return null;

        }
    }
    
    /**
     * 创建带权限的客户端
     * @param ip
     * @param port
     * @param user
     * @param key
     * @return
     */
    public static RestHighLevelClient getAuthClient(String ip, int port, String user, String key) {
        String clientKey = ip + port + user;
        try {
            RestHighLevelClient clientSingle = clientMap.get(clientKey);
            if (clientSingle == null) {
                RestClientBuilder builder;
                if (restBuilderMap.get(clientKey) == null) {
                    builder = RestClient.builder(getHttpHosts(ip, port));
                    restBuilderMap.put(clientKey, builder);
                } else {
                    builder = restBuilderMap.get(clientKey);
                }
                String auth = new BASE64Encoder().encode((user + ":" + key).getBytes());
                builder.setDefaultHeaders(new BasicHeader[]{new BasicHeader("Authorization", "Basic" + auth)});
                RestHighLevelClient client = new RestHighLevelClient(builder);
                clientMap.put(clientKey, client);
                return client;
            } else {
                return clientSingle;
            }

        } catch (Exception e) {
            e.printStackTrace();
            return null;

        }
    }


    public static HttpHost[] getHttpHosts(String host, int port) {

        String[] split = host.split(",");

        HttpHost[] httpHosts = new HttpHost[split.length];
        for (int i = 0; i < split.length; i++) {

            httpHosts[i] = new HttpHost(split[i], port, "http");

        }
        return httpHosts;
    }


    /**
     * 分页查询
     *
     * @param client
     * @param indexName
     * @param docType
     * @param queryBuilders 条件构造器
     * @param from          页数
     * @param size          每页大小
     * @return
     */

    public static List<Map<String, Object>> queryPage(RestHighLevelClient client, String indexName, String docType, QueryBuilder queryBuilders, int from, int size) {

        List<Map<String, Object>> resList = new ArrayList<>();

        //根据页数计算偏移量
        from = (from - 1) * size;
        try {
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
            searchSourceBuilder.query(queryBuilders);
            searchSourceBuilder.from(from);
            searchSourceBuilder.size(size);

            SearchRequest searchRequest = new SearchRequest();
            searchRequest.indices(indexName);
            searchRequest.types(docType); //9.0版本后没有docType概念
            searchRequest.source(searchSourceBuilder);
            SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);

            SearchHit[] hits = searchResponse.getHits().getHits();
            if (!CollectionUtils.isEmpty(hits)) {
                for (SearchHit hit : hits) {
                    resList.add(hit.getSourceAsMap());
                }
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
        return resList;
    }

    /**
     * @param client
     * @param indexName
     * @param docType
     * @param queryBuilders
     * @param exFields
     * @param fields
     * @param size          每次滚动返回的条数
     * @param timeout       滚动时间间隔 时间间隔内当前条数没返回完会往下滚动
     * @return
     */

    public static List<Map<String, Object>> findWithScroll(RestHighLevelClient client,
                                                           String indexName,
                                                           String docType,
                                                           QueryBuilder queryBuilders,
                                                           String exFields,
                                                           String fields,
                                                           int size,
                                                           int timeout) {

        List<Map<String, Object>> resList = new ArrayList<>();


        try {

            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

            searchSourceBuilder.size(size);
            searchSourceBuilder.query(queryBuilders);

            SearchRequest searchRequest = new SearchRequest(indexName);
            searchRequest.source(searchSourceBuilder);
            searchRequest.types(docType);
            if (StringUtils.isNullOrEmpty(fields) && !StringUtils.isNullOrEmpty(exFields)) {
                searchSourceBuilder.fetchSource(fields.split(","), exFields.split(","));
            } else if (StringUtils.isNullOrEmpty(fields)) {
                searchSourceBuilder.fetchSource(null, exFields.split(","));

            } else if (StringUtils.isNullOrEmpty(exFields)) {
                searchSourceBuilder.fetchSource(fields.split(","), null);

            }
            //初始化scroll
            Scroll scroll = new Scroll(TimeValue.timeValueMinutes(timeout));
            searchRequest.scroll(scroll);


            //第一次游标返回的结果中拿到当前scrollId
            SearchResponse search = client.search(searchRequest, RequestOptions.DEFAULT);
            String firstScrollId = search.getScrollId();
            SearchHit[] hits = search.getHits().getHits();
            if (CollectionUtils.isEmpty(hits)) {
                return new ArrayList<>();
            }
            //将第一次返回的结果收集
            for (SearchHit hit : hits) {
                resList.add(hit.getSourceAsMap());
            }

            //收集之后的结果 注意可以加入长度限制 避免OOM
            while (!CollectionUtils.isEmpty(hits)) {
                SearchScrollRequest searchScrollRequest = new SearchScrollRequest(firstScrollId);
                searchScrollRequest.scroll(scroll);
                SearchResponse scroll1 = client.scroll(searchScrollRequest, RequestOptions.DEFAULT);
                firstScrollId =   scroll1.getScrollId();
                for (SearchHit hit : hits) {
                    resList.add(hit.getSourceAsMap());
                }
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
        return resList;
    }


    /**
     * 根据ID查找
     * @param client
     * @param indexName
     * @param type
     * @param id
     * @return
     */
    public static List<Map<String, Object>> queryById(RestHighLevelClient client, String indexName, String type, String id) {
        List<Map<String, Object>> resList = new ArrayList<Map<String, Object>>();

        try {
            SearchRequest searchRequest = new SearchRequest();
            searchRequest.indices(indexName);
            searchRequest.types(type);
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
            searchSourceBuilder.query(QueryBuilders.idsQuery().addIds(id.split(",")));

            searchRequest.source(searchSourceBuilder);
            SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
            for (SearchHit hit : searchResponse.getHits().getHits()) {
                resList.add(hit.getSourceAsMap());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return resList;

    }


    /**
     * 批量插入
     * @param client
     * @param indexName
     * @param type
     * @param list
     * @param timeOut
     * @return
     */
    public static boolean bulkInsertMap(RestHighLevelClient client, String indexName, String type, List<Map<String, Object>> list, int timeOut) {
        boolean res = true;


        try {

            BulkProcessor bulkProcessor = getBulkProcessor(client);
            for (Map<String, Object> msg : list) {
                //9.0之后去去掉type就好
                bulkProcessor.add(new IndexRequest(indexName, type).source(msg, XContentType.JSON).id(String.valueOf(msg.get("_id"))));

            }
            //
            res = bulkProcessor.awaitClose(timeOut, TimeUnit.SECONDS);
            // 立即退出会导致剩余的文档不会插入
            // 比如 10100 在 1000条完成插入后 此时BulkProcessor的awaitClose会被标记为已完成 这时候时候close的话 剩下的文档不会被插入。
            //  bulkProcessor.close();

        } catch (Exception e) {
            e.printStackTrace();
            res = false;
        }


        return res;
    }

    /**
     * 插入单条数据
     *
     * @param client
     * @param indexName
     * @param indexType
     * @param record
     * @return
     */
    public static boolean insertOne(RestHighLevelClient client, String indexName, String indexType, Map<String, Object> record) {

        boolean res = true;
        IndexRequest indexRequest;


        try {
            Object id = record.get("_id");

            if (id != null) {
                record.remove("_id");
                indexRequest = new IndexRequest(indexName, indexType, id.toString()).source(record);

            } else {
                indexRequest = new IndexRequest(indexName, indexType).source(record);

            }


            client.index(indexRequest, RequestOptions.DEFAULT);


        } catch (Exception e) {

            res = false;

        }
        return res;
    }

    /**
     * 删除数据
     *
     * @param client
     * @param indexName
     * @param indexType
     * @param queryBuilder 条件构造器
     * @return
     */
    public static boolean delete(RestHighLevelClient client, String indexName, QueryBuilder queryBuilder, String indexType) {

        boolean res = true;

        try {

            DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest();
            deleteByQueryRequest.indices(indexName);
            deleteByQueryRequest.types(indexType);
            deleteByQueryRequest.setQuery(queryBuilder);


            client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);


        } catch (Exception e) {

            res = false;

        }
        return res;
    }

    /**
     * 根据id删除数据
     *
     * @param client
     * @param indexName
     * @param indexType
     * @param id
     * @return
     */
    public static boolean deleteById(RestHighLevelClient client, String indexName, String id, String indexType) {

        boolean res = true;
        DeleteRequest deleteRequest = new DeleteRequest();

        try {
            deleteRequest.index(indexName);
            deleteRequest.type(indexType);
            deleteRequest.id(id);
            client.delete(deleteRequest, RequestOptions.DEFAULT);


        } catch (Exception e) {

            res = false;

        }
        return res;
    }


    public static BulkProcessor getBulkProcessor(RestHighLevelClient client) {

        BulkProcessor.Listener listener = new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long l, BulkRequest bulkRequest) {

            }

            @Override
            public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {

            }

            @Override
            public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {

            }
        };


        BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer = (request, bulkListener) -> client
                .bulkAsync(request, RequestOptions.DEFAULT, bulkListener);


        return BulkProcessor.builder(bulkConsumer, listener)
                .setBulkActions(500)
                .setBulkSize(new ByteSizeValue(100L, ByteSizeUnit.MB))
                .setConcurrentRequests(10)
                .setFlushInterval(TimeValue.timeValueSeconds(100L))
                .build();

    }


}

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐