由于基金业务需要接入基金搜索功能,但是网上各种7.X版本的帖子又很少,这里就以自己线上代码,总结了下es相关客户端java代码实现搜索功能。

引入依赖

<dependency>
	<groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>7.3.2</version>
</dependency>

客户端连接

用于连接es服务,这里配置常量es集群地址,账号密码。

/**
 * 类 {@code ESClientConfig}
 *
 * <p> ES连接客户端配置
 *
 * @author hequan
 * Date 2021/2/5
 * Time 10:08
 */
@Configuration
@Slf4j
public class ESClientConfig {

    /**
     * RestHighLevelClient builder
     *
     * @return RestHighLevelClient
     */
    @Bean(destroyMethod = "close")
    public RestHighLevelClient restHighLevelClient() {

        log.info("初始化RestHighLevelClient开始。");

        String[] nodes = Constants.ES_NODES_HOST.split(",");

        log.info("读取ip信息:{}", Constants.ES_NODES_HOST);
        List<HttpHost> hostList = new ArrayList<>();
        for (String node : nodes) {
            String[] split = node.split(":");
            HttpHost httpHost = new HttpHost(split[0], Integer.parseInt(split[1]), "http");
            hostList.add(httpHost);
        }
        HttpHost[] httpHosts = hostList.toArray(new HttpHost[hostList.size()]);
        RestClientBuilder builder = RestClient.builder(httpHosts);

        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY,
            new UsernamePasswordCredentials(Constants.ES_NODES_ACCOUNT, Constants.ES_NODES_PASSWORD));
        builder.setHttpClientConfigCallback(httpClientBuilder ->
            httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
                .setMaxConnPerRoute(10)
                .setMaxConnTotal(30));
        builder.setRequestConfigCallback(requestConfigBuilder ->
            requestConfigBuilder.setConnectTimeout(3000)
                .setConnectionRequestTimeout(3000)
                .setSocketTimeout(5000));
        // RestHighLevelClient实例通过REST low-level client builder进行构造
        return new RestHighLevelClient(builder);

    }
}

一些通用的api

最主要的搜索的方法:queryFundInfo()
这里支持基金姓名和基金代码搜索。高亮返回


/**
 * 类 {@code RestHighLevelClientUtil}
 *
 * <p> es搜索有关工具类,包创建索引,删除索引,模板创建,查询等等
 *
 *     注意:
 *     1:写入数据和读取数据之间会有1秒的延迟,为了性能考虑:
 *     2:批量写入操作建议走BulkProcessor批量操作,参数根据写入数据量动态修改
 *     3:不要直接操作索引,读操作走读别名(模板中建立read),写操作走写别名(别名修改建立write)
 *
 *
 * @author hequan
 * Date 2021/2/4
 * Time 19:49
 */
@Component
@Slf4j
public class RestHighLevelClientUtil {

    @Autowired
    private RestHighLevelClient restHighLevelClient;


    private BulkProcessor bulkProcessor;

    // 初始化BulkProcessor 文档批量操作使用
    @PostConstruct
    public void init() {
        BulkProcessor bulkProcessor = BulkProcessor.builder((request, bulkListener) ->
            restHighLevelClient.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId, BulkRequest request) {
                request.timeout(TimeValue.timeValueMillis(ESConstants.ES_BULK_TIMEOUT));
                log.info("execution id:{},request number:{}  ",executionId, request.numberOfActions());
            }
            @Override
            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                if (response.hasFailures()) {
                    log.info("execution id:{},failure:{}  ",executionId, response.buildFailureMessage());
                } else {
                    log.info("execution id:{},time took:{}  ",executionId, response.getTook().getMillis());
                }
            }
            @Override
            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {

                log.info("executionId:{}, failure:{}  ",executionId, failure.getMessage());
            }
        }).setBulkActions(ESConstants.ES_BULK_ACTIONS)  //数据量,达到bulkActions条数据执行一次
            .setBulkSize(new ByteSizeValue(ESConstants.ES_BULK_SIZE, ByteSizeUnit.MB))   //数据大小,达到bulkSize MB的数据执行一次
            .setFlushInterval(TimeValue.timeValueSeconds(ESConstants.ES_BULK_FLUSHINTERVAL))    //等待时间,达到bulkFlushInterval秒执行一次
            .build();
        this.bulkProcessor = bulkProcessor;
    }

    /**
     * bulk processor 新增
     *
     * @param indexRequest
     */
    public void insert(IndexRequest indexRequest) {
        this.bulkProcessor.add(indexRequest);
        log.info("bulkProcess add: " + indexRequest);
    }

    /**
     * bulk processor 更新
     *
     * @param updateRequest
     */
    public void update(UpdateRequest updateRequest) {
        bulkProcessor.add(updateRequest);
        log.info("bulkProcess add: " + updateRequest);
    }

    /**
     * bulk processor 删除
     *
     * @param deleteRequest
     */
    public void delete(DeleteRequest deleteRequest) {
        bulkProcessor.add(deleteRequest);
        log.info("bulkProcess add: " + deleteRequest);
    }

    //销毁资源
    @PreDestroy
    public void destroy() {
        try {
            bulkProcessor.awaitClose(10, TimeUnit.SECONDS);
            log.info("bulkProcessor closed");
            restHighLevelClient.close();
            log.info("restHighLevelClient closed");
        } catch (Exception e) {
            log.error("bulkProcessor close failed!", e);
            log.error("restHighLevelClient closed failed", e);
        }
    }


    /**
     * 创建索引模板
     *
     * @param indexTemplateName 模板名
     * @param source 索引pattern、mapping、别名、分片数等配置在这里组装好
     */
    public AcknowledgedResponse createIndexTemplate(String indexTemplateName, String source) {
        AcknowledgedResponse putTemplateResponse = null;
        try {
            PutIndexTemplateRequest putIndexTemplateRequest = new PutIndexTemplateRequest(indexTemplateName);
            putIndexTemplateRequest.source(source, XContentType.JSON);

            if (!restHighLevelClient.indices()
                .existsTemplate(new IndexTemplatesExistRequest(indexTemplateName), RequestOptions.DEFAULT)) {
                putTemplateResponse = restHighLevelClient.indices()
                    .putTemplate(putIndexTemplateRequest, RequestOptions.DEFAULT);

                log.info("索引模板创建成功:{}, acknowledged:{}", indexTemplateName, putTemplateResponse.isAcknowledged());

            } else {
                log.warn("索引模板已存在:{}", indexTemplateName);
            }
        } catch (Exception e) {
            log.error("索引模板:{}, 创建执行异常: ", indexTemplateName, e);
        }
        return putTemplateResponse;
    }

    /**
     * 删除索引模板
     *
     * @param indexTemplateName 模板名
     */
    public AcknowledgedResponse deleteIndexTemplate(String indexTemplateName) {

        AcknowledgedResponse deleteTemplateResponse = null;

        DeleteIndexTemplateRequest request = new DeleteIndexTemplateRequest(indexTemplateName);
        try {
            deleteTemplateResponse = restHighLevelClient.indices()
                .deleteTemplate(request, RequestOptions.DEFAULT);
            log.info("删除索引模板成功:{}, acknowledged:{}", indexTemplateName, deleteTemplateResponse.isAcknowledged());
        } catch (Exception e) {
            log.error("删除索引模板异常", e);
        }
        return deleteTemplateResponse;
    }

    /**
     * 创建索引
     *
     * @param indexName 索引名
     */
    public CreateIndexResponse createIndex(String indexName) {
        CreateIndexResponse createIndexResponse = null;
        try {
            CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
            //同步执行
            if (!restHighLevelClient.indices().exists(new GetIndexRequest(indexName), RequestOptions.DEFAULT)) {
                createIndexResponse = restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);

                log.info("索引创建成功:{}, acknowledged:{}, shardsAcknowledged:{}", indexName,
                    createIndexResponse.isAcknowledged(), createIndexResponse.isShardsAcknowledged());
            } else {
                log.warn("索引已存在:{}", indexName);
            }
        } catch (Exception e) {
            log.error("索引:{}创建执行异常: ", indexName, e);
        }
        return createIndexResponse;
    }

    /**
     * 删除索引
     */
    public void deleteIndexAsync(String indexName) {
        try {
            DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);
            deleteIndexRequest.timeout(TimeValue.timeValueMinutes(2));
            deleteIndexRequest.masterNodeTimeout(TimeValue.timeValueMinutes(1));
            deleteIndexRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
            //异步执行
            if (restHighLevelClient.indices().exists(new GetIndexRequest(indexName), RequestOptions.DEFAULT)) {
                restHighLevelClient.indices().deleteAsync(deleteIndexRequest, RequestOptions.DEFAULT,
                    new ActionListener<AcknowledgedResponse>() {
                        @Override
                        public void onResponse(AcknowledgedResponse deleteIndexResponse) {
                            log.info("索引删除成功:{}, acknowledged:{}", indexName, deleteIndexResponse.isAcknowledged());
                        }

                        @Override
                        public void onFailure(Exception e) {
                            log.warn("索引删除失败: ", e);
                        }
                    });
            } else {
                log.warn("索引不存在,无需删除");
            }
        } catch (Exception e) {
            log.error("索引删除执行异常: ", e);
        }
    }

    /**
     * 关闭索引
     */
    public void closeIndexAsync(String indexName) {
        try {
            CloseIndexRequest closeIndexRequest = new CloseIndexRequest(indexName);
            closeIndexRequest.timeout(TimeValue.timeValueMinutes(2));
            closeIndexRequest.masterNodeTimeout(TimeValue.timeValueMinutes(1));
            closeIndexRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
            //异步执行
            if (restHighLevelClient.indices().exists(new GetIndexRequest(indexName), RequestOptions.DEFAULT)) {
                restHighLevelClient.indices().closeAsync(closeIndexRequest, RequestOptions.DEFAULT,
                    new ActionListener<AcknowledgedResponse>() {
                        @Override
                        public void onResponse(AcknowledgedResponse closeIndexResponse) {
                            log.info("索引关闭成功");
                            log.info("acknowledged = " + closeIndexResponse.isAcknowledged());
                        }

                        @Override
                        public void onFailure(Exception e) {
                            log.warn("索引关闭失败: ", e);
                        }
                    });
            } else {
                log.warn("索引不存在,无法关闭");
            }
        } catch (Exception e) {
            log.error("索引关闭执行异常: ", e);
        }
    }

    /**
     * 设置索引别名,后续查询走别名查询
     *
     * @param indexName 索引名
     * @param aliasName 索引别名
     */
    public AcknowledgedResponse createIndixAliasName(String indexName, String aliasName) {
        AcknowledgedResponse indicesAliasesResponse = null;
        try {
            IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest();
            IndicesAliasesRequest.AliasActions aliasAction =
                new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.ADD)
                    .index(indexName)
                    .alias(aliasName);
            indicesAliasesRequest.addAliasAction(aliasAction);
            indicesAliasesResponse = restHighLevelClient.indices()
                .updateAliases(indicesAliasesRequest, RequestOptions.DEFAULT);
            log.info("索引:{},别名:{}设置成功, acknowledged:{}", indexName, aliasName, indicesAliasesResponse.isAcknowledged());
        } catch (Exception e) {
            log.error("设置别名:{}执行异常", aliasName, e);
        }
        return indicesAliasesResponse;
    }

    /**
     * 新增文档,建议用BulkProcessor
     */
    public IndexResponse insertDocById(String indexName, String id, String json) {
        IndexResponse indexResponse = null;
        IndexRequest indexRequest = new IndexRequest(indexName);
        indexRequest.id(id).source(json, XContentType.JSON);
        try {
            indexResponse = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
            log.info("新增文档成功");
        } catch (ElasticsearchException e) {
            if (e.status() == RestStatus.CONFLICT) {
                log.warn("文档版本冲突: ", e);
            } else {
                log.warn("新增文档失败: ", e);
            }
        } catch (Exception e) {
            log.error("新增文档执行异常: ", e);
        }
        return indexResponse;
    }

    /**
     * 根据DSL查询,此json要排除{"query": }部分
     */
    public SearchResponse queryByDSL(String indexName, String dslJson) {
        SearchRequest searchRequest = new SearchRequest(indexName);
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(QueryBuilders.wrapperQuery(dslJson));
        searchRequest.source(searchSourceBuilder);
        SearchResponse searchResponse = null;
        try {
            searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
            log.info("查询耗时: " + searchResponse.getTook().getSecondsFrac() + "s");
            log.info("返回总数: " + searchResponse.getHits().getTotalHits());
        } catch (Exception e) {
            log.error("DSL查询执行异常: ", e);
        }
        return searchResponse;
    }

    /**
     * 根据字段和值精准匹配查询
     *
     * @param filed 查询字段
     * @param value 查询值
     * @param indexName 索引名
     */
    public List queryByFiled(String indexName, String filed, String value) {
        SearchRequest searchRequest = new SearchRequest(indexName);

        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        // 精准匹配
        TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery(filed, value);

        searchSourceBuilder.query(termQueryBuilder);
        searchSourceBuilder.timeout(new TimeValue(2, TimeUnit.SECONDS));

        searchRequest.source(searchSourceBuilder);

        SearchResponse searchResponse = null;
        try {
            searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
            log.info("搜索查询耗时: " + searchResponse.getTook().getSecondsFrac() + "s");
            log.info("搜索返回总数: " + searchResponse.getHits().getTotalHits());
        } catch (Exception e) {
            log.error("搜索查询执行异常: ", e);
        }

        ArrayList<Map<String, Object>> list = new ArrayList<>();

        for (SearchHit documentFields : searchResponse.getHits().getHits()) {

            list.add(documentFields.getSourceAsMap());
        }
        log.info("查询搜索的数据返回List:{}", JSON.toJSONString(list));
        return list;

    }

    /**
     * 基金搜索查询封装方法
     *
     * @param indexName 索引名
     * @param value 查询值
     * @param pageNo 页码
     * @param pageSize 一次查询总数
     */
    public List<ESFundDTO> queryFundInfo(String indexName, String value, int pageNo, int pageSize) {

        SearchRequest searchRequest = new SearchRequest(indexName);

        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        // 查询超时时间
        searchSourceBuilder.timeout(new TimeValue(2, TimeUnit.SECONDS));

        //:1:多字段匹配查询
        QueryBuilder queryBuilder = QueryBuilders
            .multiMatchQuery(value, ESConstants.SEARCH_FUND_CODE, ESConstants.SEARCH_FUND_NAME)
            .operator(Operator.AND);
        searchSourceBuilder.query(queryBuilder);

        // :2:高亮
        HighlightBuilder highlightBuilder = new HighlightBuilder();

        //高亮的字段
        highlightBuilder.field(ESConstants.SEARCH_FUND_CODE);
        highlightBuilder.field(ESConstants.SEARCH_FUND_NAME);

        //是否多个字段都高亮
        highlightBuilder.requireFieldMatch(true);

        //前缀后缀
        highlightBuilder.preTags("<span style='color:#FF7622'>");
        highlightBuilder.postTags("</span>");
        searchSourceBuilder.highlighter(highlightBuilder);

        // 3:分页
        searchSourceBuilder.from(pageNo);
        searchSourceBuilder.size(pageSize);

        searchRequest.source(searchSourceBuilder);
        SearchResponse searchResponse = null;
        try {
            searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
            log.info("搜索查询耗时: {}s", searchResponse.getTook().getSecondsFrac());

        } catch (Exception e) {
            log.error("搜索查询执行异常: ", e);
        }

        List<ESFundDTO> list = new ArrayList<>();

        if (null == searchResponse) {
            log.info("搜索查询结果返回空");
            return list;
        }

        for (SearchHit documentFields : searchResponse.getHits().getHits()) {

            Map<String, HighlightField> highlightFields = documentFields.getHighlightFields();

            HighlightField fundName = highlightFields.get("fundName");
            HighlightField fundCode = highlightFields.get("fundCode");
            log.debug("fundName:{},fundCode:{}", fundName, fundCode);
            Map<String, Object> sourceAsMap = documentFields.getSourceAsMap();

            log.debug("搜索到的数据sourceAsMap:{}", JSON.toJSONString(sourceAsMap));

            if (null != fundName) {
                Text[] fragments = fundName.getFragments();
                String newFundName = "";
                for (Text text : fragments) {
                    newFundName += text;
                }
                //替换掉原来的内容
                sourceAsMap.put(ESConstants.SEARCH_FUND_NAME, newFundName);

                // 前端跳转的基金代码
                String queryFundCode = (String) sourceAsMap.get(ESConstants.SEARCH_FUND_CODE);
                sourceAsMap.put("realFundCode", queryFundCode);
            }

            // 这里自定义分词器高亮查询会有问题,人工处理替换成高亮
            if (null != fundCode) {
                // 查询出来的值,正常是ZH000359
                String queryFundCode = (String) sourceAsMap.get(ESConstants.SEARCH_FUND_CODE);
                // 搜索的值,如ZH0,需要高亮
                String highValue = "<span style='color:#FF7622'>" + value + "</span>";

                // 这里就直接拼接高亮了,如 <span style='color:#FF7622'>ZH0</span>0359
                String newFundCode = queryFundCode.replaceFirst(value, highValue);
                //替换掉原来的内容
                sourceAsMap.put(ESConstants.SEARCH_FUND_CODE, newFundCode);

                // 前端需要原始的基金代码,需要跳转到详情
                sourceAsMap.put("realFundCode", queryFundCode);
            }

            list.add(JSONObject.parseObject(JSONObject.toJSONString(sourceAsMap), ESFundDTO.class));
        }
        log.debug("查询搜索的数据返回List:{}", JSON.toJSONString(list));
        return list;

    }

    /**
     * 是否存在文档
     *
     * @param indexName 索引名
     * @param id id
     * @return
     */
    public boolean existsDocById(String indexName, String id) {
        GetRequest getRequest = new GetRequest(indexName, id);
        getRequest.fetchSourceContext(new FetchSourceContext(false));
        getRequest.storedFields("_none_");
        boolean exists = false;
        try {
            exists = restHighLevelClient.exists(getRequest, RequestOptions.DEFAULT);
        } catch (Exception e) {
            log.error("是否存在文档执行失败: ", e);
        }
        log.info("是否存在文档执行状态:{}", exists);
        return exists;
    }

    /**
     * 创建索引,扫描注解设置属性
     * 目前使用的是索引模板,索引字段信息都已经在模板中定义好,这里不需要处理,后续考虑扩展再启用此方法
     */
    @Deprecated
    public void createIndexByClass(Class<T> clazz) {

        // 索引名
        String indexName = clazz.getAnnotation(ESIndexMetaData.class).indexName();
        // 索引别名
        String indexNameAlias = clazz.getAnnotation(ESIndexMetaData.class).indexNameAlias();
        // 分片数
        int numberOfShards = clazz.getAnnotation(ESIndexMetaData.class).numberOfShards();
        int numberOfReplicas = clazz.getAnnotation(ESIndexMetaData.class).numberOfReplicas();

        // 判断索引是否存在
        if (existsIndex(clazz)) {

            log.info("ES索引已经存在");
            return;
        }

        CreateIndexRequest request = new CreateIndexRequest(indexName);

        // 设置别名
        request.alias(new Alias(indexNameAlias));

        // 设置索引映射
        MappingData[] mappingDates = CommonUtils.getMappingDataByClass(clazz);

        XContentBuilder xContentBuilder = CommonUtils.getMappingIndex(mappingDates);

        request.settings(
            Settings.builder()
                // 分片数和副本数, 还可以设置索引刷新间隔等配置
                .put("index.number_of_shards", numberOfShards)
                .put("index.number_of_replicas", numberOfReplicas)
                .build());

        request.mapping(xContentBuilder);

        CreateIndexResponse response;
        //同步执行
        try {
            if (!restHighLevelClient.indices().exists(new GetIndexRequest(indexName), RequestOptions.DEFAULT)) {
                response = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
                log.info("索引:{}, 别名:{},创建成功", indexName, indexNameAlias);
                log.info("acknowledged :{}, shardsAcknowledged:{}", response.isAcknowledged(),
                    response.isShardsAcknowledged());
            } else {
                log.warn("新创建的索引:{}已存在,请勿重复创建", indexName);
            }
        } catch (Exception e) {
            log.error("索引:{}创建异常", indexName, e);
        }
    }

    /**
     * 判断索引是否存在
     *
     * @return true:存在,false:不存在
     */
    public boolean existsIndex(Class<T> clazz) {

        String indexName = clazz.getAnnotation(ESIndexMetaData.class).indexName();

        boolean exists = true;
        try {
            exists = restHighLevelClient.indices()
                .exists(new GetIndexRequest(indexName), RequestOptions.DEFAULT);
        } catch (Exception e) {
            log.error("判断索引是否存在异常,indexName:{}", indexName, e);
        }
        return exists;
    }

}

一些常量和配置类

这里采用模板的配置来定义搜索相关字段,这里由于支持fundCode = AD002374类似基金代码搜索,通过分析器实现。这里我们写入都才做write index ,读都走read index。


/**
 * 类 {@code ESConstants}
 *
 * <p>  Es常量
 *
 * @author hequan
 * Date 2021/2/5
 * Time 9:56
 */
public final class ESConstants {

    /**
     * 索引名,相当于‘库名’
     */
    public final static String INDEX_NAME = "test_demo_v20210222";

    /**
     * 索引别名,这里在索引模板中定义好了,后续查询走这个查
     */
    public final static String INDEX_NAME_ALIAS_READ = "test_demo_read";

    /**
     * 索引别名,这里在设置索引别名时定义好了,后续写操作走这个
     */
    public final static String INDEX_NAME_ALIAS_WRITE = "test_demo_write";

    /**
     * 索引模板名字
     */
    public final static String INDEX_TEMPLATE_NAME = "ftest_demo_template";

    /**
     * 搜索字段
     */
    public final static String SEARCH_FUND_CODE = "fundCode";

    /**
     * 搜索字段
     */
    public final static String SEARCH_FUND_NAME = "fundName";

//    public static final String ES_NODES_HOST;
//    public static final String ES_NODES_ACCOUNT;
//    public static final String ES_NODES_PASSWORD;
    public static final long ES_BULK_TIMEOUT = 3000L;

    /**
     * 数据量,达到bulkActions条数据执行一次
     */
    public static final int ES_BULK_ACTIONS = 3000;

    /**
     * 数据大小,达到bulkSize MB的数据执行一次
     */
    public static final long ES_BULK_SIZE = 10L;

    /**
     * 等待时间,达到bulkFlushInterval秒执行一次
     */
    public static final long ES_BULK_FLUSHINTERVAL = 5L;

    /**
     * 模板配置信息
     */
    public static final String TEMPLATE_SOURCE = "{\n"
        + "    \"order\": 0,\n"
        + "    \"index_patterns\": \"test_demo_v*\",\n"
        + "    \"aliases\": {\n"
        + "        \"test_demo_read\": {}\n"
        + "    },\n"
        + "    \"mappings\": {\n"
        + "        \"properties\": {\n"
        + "            \"fundType\": {\n"
        + "                \"type\": \"keyword\",\n"
        + "                \"ignore_above\": 256\n"
        + "            },\n"
        + "            \"productType\": {\n"
        + "                \"type\": \"keyword\",\n"
        + "                \"ignore_above\": 256\n"
        + "            },\n"
        + "            \"yearProfitRate\": {\n"
        + "                \"type\": \"keyword\",\n"
        + "                \"ignore_above\": 256\n"
        + "            },\n"
        + "            \"fundName\": {\n"
        + "                \"type\": \"text\",\n"
        + "                \"search_analyzer\": \"ik_smart\",\n"
        + "                \"analyzer\": \"ik_max_word\"\n"
        + "            },\n"
        + "            \"fundCode\": {\n"
        + "                \"type\": \"text\",\n"
        + "                \"search_analyzer\": \"keyword\",\n"
        + "                \"analyzer\": \"autocomplete\"\n"
        + "            }\n"
        + "        }\n"
        + "    },\n"
        + "    \"settings\": {\n"
        + "        \"index\": {\n"
        + "            \"number_of_shards\": \"3\",\n"
        + "            \"number_of_replicas\": \"2\"\n"
        + "        },\n"
        + "        \"analysis\": {\n"
        + "            \"filter\": {\n"
        + "                \"autocomplete_filter\": {\n"
        + "                    \"type\": \"edge_ngram\",\n"
        + "                    \"min_gram\": 1,\n"
        + "                    \"max_gram\": 10\n"
        + "                }\n"
        + "            },\n"
        + "            \"analyzer\": {\n"
        + "                \"autocomplete\": {\n"
        + "                    \"type\": \"custom\",\n"
        + "                    \"tokenizer\": \"standard\",\n"
        + "                    \"filter\": [\n"
        + "                        \"autocomplete_filter\"\n"
        + "                    ]\n"
        + "                }\n"
        + "            }\n"
        + "        }\n"
        + "    }\n"
        + "}\n";

}

初始化索引库等ES配置信息

这里我们也可以通过跑批实现,只需要执行一次。或直接在kb操作台直接用命令实现(由于我们没有权限)。


/**
 * 类 {@code CreateESIndex}
 *
 * <p> Es初始化索引。
 *
 * 1:创建索引模板,这里采用的是模板配置,跟模板匹配的索引会走模板的配置,后续如有新的索引可以采用自定义注解扫描生成索引属性。
 * 2:创建索引
 * 3:设置索引别名,后续查询都走别名查,别名注意:{读操作走读索引,写操作走写索引}
 * 4: 跑完一次就可以注销,后续接入apollo再打开加入开关
 *
 * @author hequan
 * Date 2021/2/4
 * Time 20:02
 */
@SuppressWarnings({"rawtypes", "unchecked"})
@Configuration
@Slf4j
public class CreateESIndex implements ApplicationListener, ApplicationContextAware {


    @Autowired
    private RestHighLevelClientUtil restHighLevelClientUtil;

    private ApplicationContext applicationContext;

    @Override
    public void onApplicationEvent(ApplicationEvent applicationEvent) {

        try {
            // 索引模板
            String createIndexTemplateSource = ESConstants.TEMPLATE_SOURCE;

            // 索引名
            String indexName = ESConstants.INDEX_NAME;
            // 索引别名,写操作使用这个
            String indexNameAlias = ESConstants.INDEX_NAME_ALIAS_WRITE;
            // 索引模板名
            String indexTemplateName = ESConstants.INDEX_TEMPLATE_NAME;

            // 1:创建索引模板
            restHighLevelClientUtil.createIndexTemplate(indexTemplateName, createIndexTemplateSource);

            // 2:创建索引
            restHighLevelClientUtil.createIndex(indexName);

            // 3:设置索引别名,后续查询都走别名查询
            restHighLevelClientUtil.createIndixAliasName(indexName, indexNameAlias);

        } catch (Exception e) {
            log.error("创建ES索引异常", e);
        }
     
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;

    }
}

最后别忘了封装查询的Bean信息

将es查询出来的数据存放到自己对应的bean信息中,基本上就可以实现简单的搜索功能。这是基于我们线上基金业务,实现基金名字搜索,基金代码搜索,返回数据高亮显示,期间遇到很多问题,还有待优化实现。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐