前言

ElasticSearch是Elastic公司出品的一款功能强大的搜索引擎,被广泛的应用于各大IT公司,它的代码位于 https://github.com/elastic/elasticsearch,目前是一个开源项目。ElasticSearch公司的另外两个开源产品 Logstash 、 Kibana与ElasticSearch构成了著名的ELK技术栈。。他们三个共同形成了一个强大的生态圈。简单地说,Logstash 负责数据的采集,处理(丰富数据,数据转换等),Kibana 负责数据展示,分析,管理,监督及应用。Elasticsearch 处于最核心的位置,它可以帮我们对数据进行快速地搜索及分析。
ElasticSearch的使用及介绍,读者可以自行在网上搜索相关资料学习,今天要介绍的是Spring Boot如何集成ElasticSearch,再结合使用ES记录网关日志的实践,总结一下集成常见的几种方式。


一、Elasticsearch版本介绍

  • ES的版本管理机制:主版本.次版本.修正版本
    • 主版本一年一发,主要是重大升级和改进,涉及底层Lucence版本升级、性能改进、重大特性发布
    • 次版本2~3个月发一次,主要是少量特性功能发布(有阻断变化和废弃旧特性)及BUG修复
    • 修正版本2周左右发一次,不添加新功能,只修复BUG
  • 大版本主要功能特性差异,详细参见ES发展史
    • 6.8.0/7.1.0后xpack 开源,基础版不需要授权免费使用,包括: 安全、监控等
    • 多type支持:
      • 5.6.X 默认支持多type,可以通过配置只支持一个type
      • 6.X 默认支持单一type,可以通过配置打开支持多type
      • 7.X 只允许单一type,即_doc

比如Elasticsearch-V7.2.1,7是主版本、2是次版本,1是修正版本,每个版本实现的功能、特性和解决的bug可以在官方的release-notes中查看。

二、客户端种类

Elasticsearch目前最新版本为8.4.1,下载链接https://www.elastic.co/cn/downloads/enterprise-search,客户端非常多,各种语言基本上都有对应的客户端,我收集了一下java常用的客户端,有些客户端已经弃用,仅供参考,如下:

  • Elastic Node Client

ES从V0.9~V2.2 版本提供的客户端,集成该客户端的应用会以一个node节点的方式加入到ES集群,以集群节点的身份与ES通信。从V2.3版本开始移除,不提供这种类型的客户端,该类型的客户端在后续的ES版本中不可用。

  • Elastic Transport Client

ES从V0.9~V7.X 版本提供的客户端,该客户端使用transport模块(序列化协议)和远程的ES集群通信,相比与node客户端,transport client不加入ES集群,只通过transport模块简单的获取各节点的状态和绑定的transport地址,通过轮询的方式和集群内的节点通信。从V7.0开始官方不建议使用,V8.0开始正式移除。官方推荐使用 Elastic Java High Level Rest Client 来代替。

  • Elastic Rest Client

ES从V5.0~V5.4 版本提供的官方的轻量级的客户端,使用restful API(http协议)和远程ES集群通信,具有轻量、引入依赖少、兼容所有服务端ES版本的特点,只提供了最基本的API

  • Elastic Java Low Level Rest Client

ES从V5.5 版本开始Rest Client 分为了两类,原来的 Elastic Recst Client 改名叫作 Elastic Java Low Level Rest Client,基于HTTP协议通过restful API来和远程ES通信,只提供了最基本最简单的API,但是和之前一样兼容所有的ES版本

  • Elastic Java High Level Rest Client

ES从V5.5 版本开始Rest Client 分为了两类,基于 Low Level Client 提供了更高层次得封装,提供了更高级得API且和Elastic Transport Cliet 接口及参数保持一致。

  • JestClient

开源社区提供得基于http协议的Rest 客户端,目前支持ES版本V1.0~V6.X,基于http协议,官方宣称接口及代码设计比ES官方提供的Rest客户端更简洁、更合理,更好用,具有一定的ES服务端版本兼容性,但是更新速度不是很快,目前ES版本已经出到V7.9,但是JestClient只支持到V6.X

三、 客户端与版本兼容性

客户端类型兼容性
Elasticsearch Node Client只支持ES 服务端V0.9~V2.2 ,且客户端的版本需要和ES集群的服务版本完全一致,不具备跨版本的兼容性
Elastic Transport Client支持ES服务端 V0.9~V7.X ,从V8.0开始移除,不支持该客户端。客户端的主版本号必须和ES集群的主版本号 保持一致,次版本号可以不一致,但是次版本号不一致可能导致一些API不兼容。比如 Elastic Transport Client V5.x版本只能连接V5.X的ES集群,不能连接V6.X或者V7.X的集群
Elastic Rest Client 和 Elastic Java Low Level Rest Client从 V5.0开始提供,理论上兼容所有服务端ES版本
Elastic Java High Level Rest Client从V5.5开始提供,和Elastic Transport Client 一样,客户端的主版本号必须和ES集群的主版本号 保持一致,次版本号可以不一致,但是次版本号不一致可能导致一些API不兼容。
JestClient没用过不是很了解,目前支持ES V1.0~V6.X,根据官方提供的兼容情况见表1:

表1

Jest VersionElasticsearch Version
>= 6.0.06
>= 5.0.05
>= 2.0.02
0.1.0 - 1.0.01
<= 0.0.6< 1

四、引入Elasticsearch依赖包

Elasticsearch的java客户端也有多种,按照个人喜好、使用习惯和实际业务需求选择接行了,下面大概介绍一下这些客户端。

  • elasticsearch-rest-client
    这是一种使用HttpClient方式连接Elasticsearch的客户端,依赖如下:
<dependency>
	<groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-client</artifactId>
    <version>7.15.2</version>
</dependency>   

客户端配置如下,参数可以自行配置在yml文件:

RestClient restClient = RestClient.builder(
    new HttpHost("localhost", 9200, "http"),
    new HttpHost("localhost", 9201, "http")).build();
restClient.close();
  • elasticsearch-rest-high-level-client
    这是官方提供的高级客户端,还有两种读者可自行查阅,引入依赖包:
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>7.3.9</version>
</dependency>

客户端配置如下,参数可自行配置在yml文件:

    @Bean
    public RestHighLevelClient esRestClient(){
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                		new HttpHost("192.168.77.130", 9200, "http"))
                		// 多个host直接在这后面拼接
                	);
        return  client;
    }
  • x-pack-sql-jdbc
    这是一种使用jdbc驱动连接的方式,和连接普通关系数据库一样,Elasticsearch也提供了驱动程序,6.4及6.4之后版本引入依赖如下:
<dependency>
   <groupId>org.elasticsearch.plugin</groupId>
   <artifactId>x-pack-sql-jdbc</artifactId>
   <version>6.7.1</version>
</dependency>

使用方式示例:

 
 
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
 
 
public class ESJdbcTest {
 
//虽然程序未直接使用驱动,但是不保证在其他地方用不到
//    6.3  driver = "org.elasticsearch.xpack.sql.jdbc.jdbc.JdbcDriver";
//    6.3之后  driver = "org.elasticsearch.xpack.sql.jdbc.EsDriver";
 
    public static void main(String[] args) {
        try  {
            Connection connection = DriverManager.getConnection("jdbc:es://http://ES-IP:9200");//将ES-IP换位ES服务器的IP
            Statement statement = connection.createStatement();
            ResultSet results = statement.executeQuery(
                    "select * from rep  LIMIT 5");
          while(results.next()){
              System.out.println(results.getString("request"));
          }
        }catch (Exception e){
              e.printStackTrace();
        }
 
    }
 
}

不过Elasticsearch提供的驱动不是免费的,白金版支持JDBC连接,基础版是不支持的,你可以申请一个白金版,自学玩玩的话可以破解x-pack-core,但是没有什么实际意义。

  • 因为java语言现在大多数微服务都是基于spring boot构建的,所以使用spring生态的Template风格的客户端会多一些,也更简洁方便,我个人也是比较喜欢这种客户端,使用简单逻辑清晰。

目前使用最多的的Elasticsearch版本是7.x,我们项目使用的是7.9.3,java客户端一般Elastic Java High Level Rest Client比较广泛一些,所以今天介绍的就是集成Elasticsearch的高级客户端,此外我个人比较喜欢使用Spring Boot的各种template客户端,所以项目中使用的是ElasticsearchRestTemplate。

<dependency>
	<groupId>org.springframework.data</groupId>
	<artifactId>spring-data-elasticsearch</artifactId>
	<version>7.9.3</version>
</dependency>

也可以直接加入Spring Boot的starter,效果是一样的,区别就是加入这个starter以后不用再引入spring-boot-starter了,我们看到网上有些资料引入的是这个jar,其实是一样的

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
	<version>2.4.5</version>
</dependency>

引入Elasticsearch客户端的时候,要注意和Spring、Spring Boot与Elasticsearch的版本对应,以下是官方提供的版本对应表:
在这里插入图片描述

五、客户端配置

总的来说客户端配置方式有两种,一种是采用Spring Boot提供自动化默认配置,另外一种是通过配置类手动代码配置,我个人比较偏爱代码配置,作为一个程序员,看的见的可控制的代码,毕竟要比一些看不见的抽象变量要踏实的多。

  1. YAML配置

由于我们的项目是API网关,使用的是Spring Cloud Gateway,用过这个框架的朋友都知道,他的核心是webflux、reactor和netty,所以template风格客户端配置方式要复杂些,多一种反应式客户端,这样就有三种模板客户端,ElasticsearchTemplate、ElasticsearchRestTemplate和ReactiveElasticsearchTemplate,其中ElasticsearchTemplate在Elasticsearch7之后就废弃了不建议使用,取而代之的是ElasticsearchRestTemplate,yml配置方式也跟着变化,看很多以前的资料yml配置方式是这样:

spring:
 data:
  elasticsearch:
   cluster-nodes:localhost:9300
   cluster-name:elasticsearch
   username:elastic
   passwoed:123456

新的配置方式,如果使用的是普通template客户端连接Elasticsearch,如下:

spring:
 elasticsearch:
    rest:
      uris: http://localhost:9200
      username: elastic
      password: 123456
      connection-timeout: 5000

如果使用的是反应式template方式,如下:

spring:
 data:
  elasticsearch:
    client:
      reactive:
        endpoints: ${integration_gateway_es_address:localhost:9200}
        username: ${integration_gateway_es_username:elastic}
        password: ${integration_gateway_es_password:123456}
        socket-timeout: ${integration_gateway_es_timeout:5000}
  1. 配置类
    ymal参数配置,使用代码配置参数为自定义:
elasticsearch:
  address: ${integration_gateway_es_address:https://localhost:9200}
  username: ${integration_gateway_es_username:elastic}
  password: ${integration_gateway_es_password:elastic+2022}
  timeout: ${integration_gateway_es_timeout:5000}

Java配置类:

/**
 * @author Mr.bin
 * @version 1.0.0
 * @ClassName ElasticSearchConfig.java
 * @Description ElasticSearch客户端配置
 * @createTime 2022年08月20日 10:35:00
 */
@Configuration
@ConfigurationProperties(prefix = "elasticsearch")
@Data
public class ElasticSearchConfig extends AbstractElasticsearchConfiguration {

    private String[] address;
    private String username;
    private String password;
    private long timeout = 5000;

    @Override
    @Bean
    public RestHighLevelClient elasticsearchClient() {
        HttpHost[] httpHosts = createHosts();
        RestClientBuilder.RequestConfigCallback requestConfigCallback =
                requestConfigBuilder -> requestConfigBuilder.setSocketTimeout((int) timeout);
        RestClientBuilder restClientBuilder = RestClient.builder(httpHosts).
                setRequestConfigCallback(requestConfigCallback).
                setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                    @SneakyThrows
                    @Override
                    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                        // 配置账号密码登录
                        CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
                        credentialsProvider.setCredentials(AuthScope.ANY,
                                new UsernamePasswordCredentials(username, password));
                        // 配置https登录为信任所有证书
                        SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, new TrustStrategy() {
                            // 信任所有
                            @Override
                            public boolean isTrusted(X509Certificate[] chain, String authType) throws CertificateException {
                                return true;
                            }
                        }).build();
                        SSLIOSessionStrategy sessionStrategy =
                                new SSLIOSessionStrategy(sslContext, NoopHostnameVerifier.INSTANCE);
                        httpClientBuilder.setSSLStrategy(sessionStrategy);
                        httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                        return httpClientBuilder;
                    }
                });
        RestHighLevelClient restHighLevelClient = new RestHighLevelClient(restClientBuilder);
        return restHighLevelClient;
    }

    private HttpHost[] createHosts() {
        HttpHost[] httpHosts = new HttpHost[address.length];
        for (int i = 0; i < address.length; i++) {
            String hostStr = address[i];
            String schema = hostStr.substring(0, hostStr.indexOf(":"));
            String port = hostStr.substring(hostStr.lastIndexOf(":") + 1);
            String ip = hostStr.substring(hostStr.indexOf("//") + 2, hostStr.lastIndexOf(":"));
            httpHosts[i] = new HttpHost(ip, Integer.valueOf(port), schema);
        }
        return httpHosts;
    }

    @Bean
    public ElasticsearchRestTemplate elasticsearchRestTemplate(RestHighLevelClient restHighLevelClient) {
        return new ElasticsearchRestTemplate(restHighLevelClient);
    }

}

注:

  1. Elasticsearch7以后增加了安全认证,需要配置用户名和密码认证,如果是集群,需要配置SSL证书校验,否则会连接失败。
  2. 如果是采用配置类方式配置客户端,服务启动会报异常,Spring Boot的actuator自动化配置,会检查反应式模板客户端的健康情况,虽然不会影响启动,如果不使用反应式模板客户端,也不会对功能有啥影响,但是报异常终究看着不舒服,对于有强迫症的程序员来说,肯定是不能容忍的。
    解决方法有两种,一种是在ymal里面增加前面第1小节介绍的连接配置即可,既然是自定义配置,这种方式纯粹是多于,那么就要采用第二种方式了,直接关闭反应式客户端的健康检查,除非你会使用反应式客户端,那么你可以增加这种配置。

启动异常信息:

[unit][2022-09-05 11:05:57.215][spms-integration-service-api-gateway] ERROR [221736][ctor-http-nio-2][]-[][reactor.core.publisher.Operators ][314]Operator called default onErrorDropped
 org.springframework.data.elasticsearch.client.NoReachableHostException: Host 'localhost:9200' not reachable. Cluster state is offline.
	at org.springframework.data.elasticsearch.client.reactive.SingleNodeHostProvider.lambda$lookupActiveHost$4(SingleNodeHostProvider.java:109)
	at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:102)
	at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
	at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2397)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2193)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onSubscribe(FluxOnErrorResume.java:74)
	at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onError(Operators.java:2062)
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onError(FluxOnAssembly.java:392)
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onError(MonoPeekTerminal.java:258)
	at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onError(FluxMap.java:259)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:106)
	at reactor.core.publisher.Operators.error(Operators.java:197)
	at reactor.core.publisher.MonoErrorSupplied.subscribe(MonoErrorSupplied.java:71)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:221)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:221)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:221)
	at reactor.core.publisher.MonoNext$NextSubscriber.onError(MonoNext.java:93)
	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onError(MonoFlatMapMany.java:204)
	at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124)
	at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.whenError(FluxRetryWhen.java:224)
	at reactor.core.publisher.FluxRetryWhen$RetryWhenOtherSubscriber.onError(FluxRetryWhen.java:273)
	at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:413)
	at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:250)
	at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:491)
	at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:299)
	at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:97)
	at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27)
	at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onError(FluxRetryWhen.java:189)
	at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:189)
	at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect$ClientTransportSubscriber.onError(HttpClientConnect.java:305)
	at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:189)
	at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onError(DefaultPooledConnectionProvider.java:167)
	at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.fail(AbstractPool.java:418)
	at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool.lambda$drainLoop$7(SimpleDequePool.java:390)
	at reactor.core.publisher.FluxDoOnEach$DoOnEachSubscriber.onError(FluxDoOnEach.java:186)
	at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:189)
	at reactor.netty.resources.DefaultPooledConnectionProvider$PooledConnectionAllocator$PooledConnectionInitializer.onError(DefaultPooledConnectionProvider.java:549)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:192)
	at reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:259)
	at reactor.netty.transport.TransportConnector$MonoChannelPromise.tryFailure(TransportConnector.java:495)
	at reactor.netty.transport.TransportConnector$MonoChannelPromise.setFailure(TransportConnector.java:449)
	at reactor.netty.transport.TransportConnector.lambda$doConnect$3(TransportConnector.java:184)
	at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
	at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571)
	at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550)
	at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
	at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
	at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)
	at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:262)
	at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
	at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:164)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)

解决方法:

spring:
 autoconfigure:
    exclude:
      - org.springframework.boot.actuate.autoconfigure.elasticsearch.ElasticSearchReactiveHealthContributorAutoConfiguration
      - org.springframework.boot.autoconfigure.data.elasticsearch.ReactiveElasticsearchRepositoriesAutoConfiguration
      - org.springframework.boot.autoconfigure.data.elasticsearch.ReactiveElasticsearchRestClientAutoConfiguration

六、Elasticsearch使用

经过以上的配置,就可以愉快的使用Elasticsearch了,使用方式也很简单,为了完善本文,下面我大概介绍一下,更多使用方式读者可以自行查阅相关文档。

  1. 注入ElasticsearchRestTemplate对象
@Autowired
private ElasticsearchRestTemplate elasticsearchTemplate;
  1. 使用客户端完成各种操作
public <T> String save(String indexName, T entity) {
	IndexQuery indexQuery = new IndexQueryBuilder().withId(getId(entity)).withObject(entity).build();
    return elasticsearchTemplate.index(indexQuery, IndexCoordinates.of(indexName));
}
  1. 我个人偏爱,喜欢将一些通用操作封装成工具类,最后分享一下我封装的工具,解释一下,由于这个项目需要搜集网关访问日志,而日志可能是散布在各处,所以做了一个缓存用来统一收集日志信息,完成之后再入库,还有一个功能是批量入库,这是为了提高系统性能,日志一般是辅助功能不需要事务控制,收集到一定数量之后统一入库要比一条一条插入好得多,减少连接交互数可以提高效率改善性能。
/**
 * @author Mr.bin
 * @version 1.0.0
 * @ClassName ElasticSearchUtil.java
 * @Description ElasticSearch常用操作工具类
 * @createTime 2022年08月18日 09:19:00
 */
@Component
@Slf4j
public class ElasticSearchUtil {

    /**
     * 缓存。
     * key:  indexName
     * value: 要打包提交的 List
     */
    public ConcurrentHashMap<String, List<Object>> cacheMap;

    // 自动打包提交触发条件
    public int cacheMaxNumber = 10;

    private static ConcurrentHashMap<String, Object> cache = new ConcurrentHashMap<>();

    @Autowired
    private ElasticsearchRestTemplate elasticsearchTemplate;

    public ElasticsearchRestTemplate getClient() {
        return elasticsearchTemplate;
    }

    public <T> boolean createIndex(Class<T> index) {
        String indexName = getIndexName(index);
        if (StringUtils.isEmpty(indexName)) {
            log.error("the class " + index + " has not define @Document annotation and index name.");
            return false;
        }
        IndexOperations indexOperations = elasticsearchTemplate.indexOps(index);
        if (!indexOperations.exists()) {
            String jsonStr = BeanUtils.readJsonFromFile("templates" + File.separator + "EsIndexSettings.json");
            Document document = Document.parse(jsonStr);
            indexOperations.create(document);
            indexOperations.putMapping(index);
        }
        return true;
    }

    public <T> String save(String indexName, T entity) {
        IndexQuery indexQuery = new IndexQueryBuilder().withId(getId(entity)).withObject(entity).build();
        return elasticsearchTemplate.index(indexQuery, IndexCoordinates.of(indexName));
    }

    public <T> List<IndexedObjectInformation> saveBatch(String indexName, List<T> entitys) {
        if (entitys.isEmpty()) {
            return Collections.emptyList();
        }
        List<IndexQuery> indexQueries = entitys.stream().map(item -> {
            IndexQuery indexQuery = new IndexQueryBuilder().withId(getId(item)).withObject(item).build();
            return indexQuery;
        }).collect(Collectors.toList());
        return elasticsearchTemplate.bulkIndex(indexQueries, IndexCoordinates.of(indexName));
    }

    /**
     * 删除
     *
     * @return
     */
    public <T> String delete(T entity) {
        return elasticsearchTemplate.delete(getId(entity));
    }

    public <T> UpdateResponse update(String indexName, T entity) {
        JSONObject jsonObject = JSON.parseObject(JSON.toJSONString(entity));
        Document document = jsonObject.entrySet().stream().map(entry -> {
            Document doc = Document.create();
            if (Objects.nonNull(entry.getValue())) {
                // 更新后的内容
                doc.putIfAbsent(entry.getKey(), entry.getValue());
            }
            return doc;
        }).collect(Collectors.toList()).get(0);
        UpdateQuery updateQuery = UpdateQuery.builder(getId(entity)).
                withDocument(document).
                // 不加默认false。true表示更新时不存在就插入
                withDocAsUpsert(true).
                build();
        return elasticsearchTemplate.update(updateQuery, IndexCoordinates.of(indexName));
    }

    /**
     * 将之提交到缓存Cache中。这里不同意put,put是直接提交到ElasticSearch中,而这个只是提交到Java缓存中,等积累到一定条数之后,
     * 在一起将Java缓存中的打包一次性提交到 Elasticsearch中
     * <p>默认同一个indexName索引中,缓存最大条数是100条,达到100条会自动提交到 elasticsearch。 这个最大条数,可以通过
     *
     * @param param     要增加的数据,key-value形式。 其中map.value 支持的类型有 String、int、long、float、double、boolean
     * @param indexName 索引名字,类似数据库的表,是将数据添加进哪个表
     */
    public synchronized void cache(Object param, String indexName) {
        List<Object> list = cacheMap.get(indexName);
        if (list == null) {
            list = new LinkedList<>();
        }
        list.add(param);

        if (list.size() >= this.cacheMaxNumber) {
            // 提交
            boolean submit = cacheSubmit(indexName);
            if (submit) {
                // 提交成功,那么清空indexName的list
                list.clear();
            }
        }
        // 重新赋予cacheMap
        cacheMap.put(indexName, list);
    }

    /**
     * 将当前缓存中某个索引中的数据提交到elasticsearch中
     *
     * @param indexName 索引名字,类似数据库的表,是将数据添加进哪个表
     * @return true:成功;  false:提交失败
     */
    public boolean cacheSubmit(String indexName) {
        List<Object> list = cacheMap.get(indexName);
        if (list == null) {
            return true;
        }
        List<IndexedObjectInformation> rsp = saveBatch(indexName, list);
        if (rsp == null || rsp.isEmpty()) {
            // 出现错误,那么不清空list
            return false;
        } else {
            // 成功,那么清空缓存中这个索引的数据
            list.clear();
            cacheMap.put(indexName, list);
            return true;
        }
    }

    public static <T> void cache(String key, T entity) {
        if (Objects.isNull(cache.get(key))) {
            cache.put(key, entity);
        } else {
            T oldEntity = (T) cache.get(key);
            T updateEntity = BeanUtils.updateBean(oldEntity, entity);
            cache.put(key, updateEntity);
        }
    }

    public boolean cacheSubmit(String indexName, String key) {
        Object entity = cache.get(key);
        if (Objects.isNull(entity)) {
            return true;
        }
        String rsp = save(indexName, entity);
        if (StringUtils.isEmpty(rsp)) {
            return false;
        } else {
            // 成功,那么清空缓存中这个索引的数据
            cache.remove(key);
            return true;
        }
    }

    public static <T> T getCache(String key) {
        return (T) cache.get(key);
    }

    public static void removeCache(String key) {
        if (cache.containsKey(key)) {
            cache.remove(key);
        }
        if (!cache.isEmpty()) {
            cache.clear();
        }
    }

    public <T> String getId(T t) {
        try {
            Class<? extends Object> tClass = t.getClass();
            //得到所有属性
            Field[] fields = tClass.getDeclaredFields();
            String idName = Arrays.stream(fields).filter(f -> Objects.nonNull(f.getAnnotation(Id.class))).
                    map(i -> {
                        i.setAccessible(true);
                        return i.getName();
                    }).collect(Collectors.joining());
            // 整合出 getId() 属性这个方法
            PropertyDescriptor propertyDescriptor = new PropertyDescriptor(idName, tClass);
            Method method = propertyDescriptor.getReadMethod();
            // 调用这个整合出来的get方法,强转成自己需要的类型
            return String.valueOf(method.invoke(t));
        } catch (Exception e) {
            throw new BusinessException(ReturnCodeEnum.DATA_FAILED,
                    "get document id failed, id is null or model has error.");
        }
    }

    public <T> String getIndexName(Class<T> index) {
        boolean hasDoc = index.isAnnotationPresent(org.springframework.data.elasticsearch.annotations.Document.class);
        if (hasDoc) {
            org.springframework.data.elasticsearch.annotations.Document doc =
                    index.getAnnotation(org.springframework.data.elasticsearch.annotations.Document.class);
            return doc.indexName();
        } else {
            return "";
        }
    }

}
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐