Spring Boot集成ElasticSearach
本文介绍的是Spring Boot如何集成ElasticSearch,再结合使用ElasticSearch记录API网关日志的实践,总结一下集成常见的几种方式
前言
ElasticSearch是Elastic公司出品的一款功能强大的搜索引擎,被广泛的应用于各大IT公司,它的代码位于 https://github.com/elastic/elasticsearch,目前是一个开源项目。ElasticSearch公司的另外两个开源产品 Logstash 、 Kibana与ElasticSearch构成了著名的ELK技术栈。。他们三个共同形成了一个强大的生态圈。简单地说,Logstash 负责数据的采集,处理(丰富数据,数据转换等),Kibana 负责数据展示,分析,管理,监督及应用。Elasticsearch 处于最核心的位置,它可以帮我们对数据进行快速地搜索及分析。
ElasticSearch的使用及介绍,读者可以自行在网上搜索相关资料学习,今天要介绍的是Spring Boot如何集成ElasticSearch,再结合使用ES记录网关日志的实践,总结一下集成常见的几种方式。
一、Elasticsearch版本介绍
- ES的版本管理机制:主版本.次版本.修正版本
- 主版本一年一发,主要是重大升级和改进,涉及底层Lucence版本升级、性能改进、重大特性发布
- 次版本2~3个月发一次,主要是少量特性功能发布(有阻断变化和废弃旧特性)及BUG修复
- 修正版本2周左右发一次,不添加新功能,只修复BUG
- 大版本主要功能特性差异,详细参见ES发展史
- 6.8.0/7.1.0后xpack 开源,基础版不需要授权免费使用,包括: 安全、监控等
- 多type支持:
- 5.6.X 默认支持多type,可以通过配置只支持一个type
- 6.X 默认支持单一type,可以通过配置打开支持多type
- 7.X 只允许单一type,即_doc
比如Elasticsearch-V7.2.1,7是主版本、2是次版本,1是修正版本,每个版本实现的功能、特性和解决的bug可以在官方的release-notes中查看。
二、客户端种类
Elasticsearch目前最新版本为8.4.1,下载链接https://www.elastic.co/cn/downloads/enterprise-search,客户端非常多,各种语言基本上都有对应的客户端,我收集了一下java常用的客户端,有些客户端已经弃用,仅供参考,如下:
- Elastic Node Client
ES从V0.9~V2.2 版本提供的客户端,集成该客户端的应用会以一个node节点的方式加入到ES集群,以集群节点的身份与ES通信。从V2.3版本开始移除,不提供这种类型的客户端,该类型的客户端在后续的ES版本中不可用。
- Elastic Transport Client
ES从V0.9~V7.X 版本提供的客户端,该客户端使用transport模块(序列化协议)和远程的ES集群通信,相比与node客户端,transport client不加入ES集群,只通过transport模块简单的获取各节点的状态和绑定的transport地址,通过轮询的方式和集群内的节点通信。从V7.0开始官方不建议使用,V8.0开始正式移除。官方推荐使用 Elastic Java High Level Rest Client 来代替。
- Elastic Rest Client
ES从V5.0~V5.4 版本提供的官方的轻量级的客户端,使用restful API(http协议)和远程ES集群通信,具有轻量、引入依赖少、兼容所有服务端ES版本的特点,只提供了最基本的API
- Elastic Java Low Level Rest Client
ES从V5.5 版本开始Rest Client 分为了两类,原来的 Elastic Recst Client 改名叫作 Elastic Java Low Level Rest Client,基于HTTP协议通过restful API来和远程ES通信,只提供了最基本最简单的API,但是和之前一样兼容所有的ES版本
- Elastic Java High Level Rest Client
ES从V5.5 版本开始Rest Client 分为了两类,基于 Low Level Client 提供了更高层次得封装,提供了更高级得API且和Elastic Transport Cliet 接口及参数保持一致。
- JestClient
开源社区提供得基于http协议的Rest 客户端,目前支持ES版本V1.0~V6.X,基于http协议,官方宣称接口及代码设计比ES官方提供的Rest客户端更简洁、更合理,更好用,具有一定的ES服务端版本兼容性,但是更新速度不是很快,目前ES版本已经出到V7.9,但是JestClient只支持到V6.X
三、 客户端与版本兼容性
客户端类型 | 兼容性 |
---|---|
Elasticsearch Node Client | 只支持ES 服务端V0.9~V2.2 ,且客户端的版本需要和ES集群的服务版本完全一致,不具备跨版本的兼容性 |
Elastic Transport Client | 支持ES服务端 V0.9~V7.X ,从V8.0开始移除,不支持该客户端。客户端的主版本号必须和ES集群的主版本号 保持一致,次版本号可以不一致,但是次版本号不一致可能导致一些API不兼容。比如 Elastic Transport Client V5.x版本只能连接V5.X的ES集群,不能连接V6.X或者V7.X的集群 |
Elastic Rest Client 和 Elastic Java Low Level Rest Client | 从 V5.0开始提供,理论上兼容所有服务端ES版本 |
Elastic Java High Level Rest Client | 从V5.5开始提供,和Elastic Transport Client 一样,客户端的主版本号必须和ES集群的主版本号 保持一致,次版本号可以不一致,但是次版本号不一致可能导致一些API不兼容。 |
JestClient | 没用过不是很了解,目前支持ES V1.0~V6.X,根据官方提供的兼容情况见表1: |
表1
Jest Version | Elasticsearch Version |
---|---|
>= 6.0.0 | 6 |
>= 5.0.0 | 5 |
>= 2.0.0 | 2 |
0.1.0 - 1.0.0 | 1 |
<= 0.0.6 | < 1 |
四、引入Elasticsearch依赖包
Elasticsearch的java客户端也有多种,按照个人喜好、使用习惯和实际业务需求选择接行了,下面大概介绍一下这些客户端。
- elasticsearch-rest-client
这是一种使用HttpClient方式连接Elasticsearch的客户端,依赖如下:
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>7.15.2</version>
</dependency>
客户端配置如下,参数可以自行配置在yml文件:
RestClient restClient = RestClient.builder(
new HttpHost("localhost", 9200, "http"),
new HttpHost("localhost", 9201, "http")).build();
restClient.close();
- elasticsearch-rest-high-level-client
这是官方提供的高级客户端,还有两种读者可自行查阅,引入依赖包:
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.3.9</version>
</dependency>
客户端配置如下,参数可自行配置在yml文件:
@Bean
public RestHighLevelClient esRestClient(){
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("192.168.77.130", 9200, "http"))
// 多个host直接在这后面拼接
);
return client;
}
- x-pack-sql-jdbc
这是一种使用jdbc驱动连接的方式,和连接普通关系数据库一样,Elasticsearch也提供了驱动程序,6.4及6.4之后版本引入依赖如下:
<dependency>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>x-pack-sql-jdbc</artifactId>
<version>6.7.1</version>
</dependency>
使用方式示例:
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
public class ESJdbcTest {
//虽然程序未直接使用驱动,但是不保证在其他地方用不到
// 6.3 driver = "org.elasticsearch.xpack.sql.jdbc.jdbc.JdbcDriver";
// 6.3之后 driver = "org.elasticsearch.xpack.sql.jdbc.EsDriver";
public static void main(String[] args) {
try {
Connection connection = DriverManager.getConnection("jdbc:es://http://ES-IP:9200");//将ES-IP换位ES服务器的IP
Statement statement = connection.createStatement();
ResultSet results = statement.executeQuery(
"select * from rep LIMIT 5");
while(results.next()){
System.out.println(results.getString("request"));
}
}catch (Exception e){
e.printStackTrace();
}
}
}
不过Elasticsearch提供的驱动不是免费的,白金版支持JDBC连接,基础版是不支持的,你可以申请一个白金版,自学玩玩的话可以破解x-pack-core,但是没有什么实际意义。
- 因为java语言现在大多数微服务都是基于spring boot构建的,所以使用spring生态的Template风格的客户端会多一些,也更简洁方便,我个人也是比较喜欢这种客户端,使用简单逻辑清晰。
目前使用最多的的Elasticsearch版本是7.x,我们项目使用的是7.9.3,java客户端一般Elastic Java High Level Rest Client比较广泛一些,所以今天介绍的就是集成Elasticsearch的高级客户端,此外我个人比较喜欢使用Spring Boot的各种template客户端,所以项目中使用的是ElasticsearchRestTemplate。
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-elasticsearch</artifactId>
<version>7.9.3</version>
</dependency>
也可以直接加入Spring Boot的starter,效果是一样的,区别就是加入这个starter以后不用再引入spring-boot-starter了,我们看到网上有些资料引入的是这个jar,其实是一样的
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
<version>2.4.5</version>
</dependency>
引入Elasticsearch客户端的时候,要注意和Spring、Spring Boot与Elasticsearch的版本对应,以下是官方提供的版本对应表:
五、客户端配置
总的来说客户端配置方式有两种,一种是采用Spring Boot提供自动化默认配置,另外一种是通过配置类手动代码配置,我个人比较偏爱代码配置,作为一个程序员,看的见的可控制的代码,毕竟要比一些看不见的抽象变量要踏实的多。
- YAML配置
由于我们的项目是API网关,使用的是Spring Cloud Gateway,用过这个框架的朋友都知道,他的核心是webflux、reactor和netty,所以template风格客户端配置方式要复杂些,多一种反应式客户端,这样就有三种模板客户端,ElasticsearchTemplate、ElasticsearchRestTemplate和ReactiveElasticsearchTemplate,其中ElasticsearchTemplate在Elasticsearch7之后就废弃了不建议使用,取而代之的是ElasticsearchRestTemplate,yml配置方式也跟着变化,看很多以前的资料yml配置方式是这样:
spring:
data:
elasticsearch:
cluster-nodes:localhost:9300
cluster-name:elasticsearch
username:elastic
passwoed:123456
新的配置方式,如果使用的是普通template客户端连接Elasticsearch,如下:
spring:
elasticsearch:
rest:
uris: http://localhost:9200
username: elastic
password: 123456
connection-timeout: 5000
如果使用的是反应式template方式,如下:
spring:
data:
elasticsearch:
client:
reactive:
endpoints: ${integration_gateway_es_address:localhost:9200}
username: ${integration_gateway_es_username:elastic}
password: ${integration_gateway_es_password:123456}
socket-timeout: ${integration_gateway_es_timeout:5000}
- 配置类
ymal参数配置,使用代码配置参数为自定义:
elasticsearch:
address: ${integration_gateway_es_address:https://localhost:9200}
username: ${integration_gateway_es_username:elastic}
password: ${integration_gateway_es_password:elastic+2022}
timeout: ${integration_gateway_es_timeout:5000}
Java配置类:
/**
* @author Mr.bin
* @version 1.0.0
* @ClassName ElasticSearchConfig.java
* @Description ElasticSearch客户端配置
* @createTime 2022年08月20日 10:35:00
*/
@Configuration
@ConfigurationProperties(prefix = "elasticsearch")
@Data
public class ElasticSearchConfig extends AbstractElasticsearchConfiguration {
private String[] address;
private String username;
private String password;
private long timeout = 5000;
@Override
@Bean
public RestHighLevelClient elasticsearchClient() {
HttpHost[] httpHosts = createHosts();
RestClientBuilder.RequestConfigCallback requestConfigCallback =
requestConfigBuilder -> requestConfigBuilder.setSocketTimeout((int) timeout);
RestClientBuilder restClientBuilder = RestClient.builder(httpHosts).
setRequestConfigCallback(requestConfigCallback).
setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@SneakyThrows
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
// 配置账号密码登录
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(username, password));
// 配置https登录为信任所有证书
SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, new TrustStrategy() {
// 信任所有
@Override
public boolean isTrusted(X509Certificate[] chain, String authType) throws CertificateException {
return true;
}
}).build();
SSLIOSessionStrategy sessionStrategy =
new SSLIOSessionStrategy(sslContext, NoopHostnameVerifier.INSTANCE);
httpClientBuilder.setSSLStrategy(sessionStrategy);
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
return httpClientBuilder;
}
});
RestHighLevelClient restHighLevelClient = new RestHighLevelClient(restClientBuilder);
return restHighLevelClient;
}
private HttpHost[] createHosts() {
HttpHost[] httpHosts = new HttpHost[address.length];
for (int i = 0; i < address.length; i++) {
String hostStr = address[i];
String schema = hostStr.substring(0, hostStr.indexOf(":"));
String port = hostStr.substring(hostStr.lastIndexOf(":") + 1);
String ip = hostStr.substring(hostStr.indexOf("//") + 2, hostStr.lastIndexOf(":"));
httpHosts[i] = new HttpHost(ip, Integer.valueOf(port), schema);
}
return httpHosts;
}
@Bean
public ElasticsearchRestTemplate elasticsearchRestTemplate(RestHighLevelClient restHighLevelClient) {
return new ElasticsearchRestTemplate(restHighLevelClient);
}
}
注:
- Elasticsearch7以后增加了安全认证,需要配置用户名和密码认证,如果是集群,需要配置SSL证书校验,否则会连接失败。
- 如果是采用配置类方式配置客户端,服务启动会报异常,Spring Boot的actuator自动化配置,会检查反应式模板客户端的健康情况,虽然不会影响启动,如果不使用反应式模板客户端,也不会对功能有啥影响,但是报异常终究看着不舒服,对于有强迫症的程序员来说,肯定是不能容忍的。
解决方法有两种,一种是在ymal里面增加前面第1小节介绍的连接配置即可,既然是自定义配置,这种方式纯粹是多于,那么就要采用第二种方式了,直接关闭反应式客户端的健康检查,除非你会使用反应式客户端,那么你可以增加这种配置。
启动异常信息:
[unit][2022-09-05 11:05:57.215][spms-integration-service-api-gateway] ERROR [221736][ctor-http-nio-2][]-[][reactor.core.publisher.Operators ][314]Operator called default onErrorDropped
org.springframework.data.elasticsearch.client.NoReachableHostException: Host 'localhost:9200' not reachable. Cluster state is offline.
at org.springframework.data.elasticsearch.client.reactive.SingleNodeHostProvider.lambda$lookupActiveHost$4(SingleNodeHostProvider.java:109)
at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:102)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2397)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2193)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onSubscribe(FluxOnErrorResume.java:74)
at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54)
at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onError(Operators.java:2062)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onError(FluxOnAssembly.java:392)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onError(MonoPeekTerminal.java:258)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onError(FluxMap.java:259)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:106)
at reactor.core.publisher.Operators.error(Operators.java:197)
at reactor.core.publisher.MonoErrorSupplied.subscribe(MonoErrorSupplied.java:71)
at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:221)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:221)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:221)
at reactor.core.publisher.MonoNext$NextSubscriber.onError(MonoNext.java:93)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onError(MonoFlatMapMany.java:204)
at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124)
at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.whenError(FluxRetryWhen.java:224)
at reactor.core.publisher.FluxRetryWhen$RetryWhenOtherSubscriber.onError(FluxRetryWhen.java:273)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:413)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:250)
at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:491)
at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:299)
at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:97)
at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27)
at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onError(FluxRetryWhen.java:189)
at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:189)
at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect$ClientTransportSubscriber.onError(HttpClientConnect.java:305)
at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:189)
at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onError(DefaultPooledConnectionProvider.java:167)
at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.fail(AbstractPool.java:418)
at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool.lambda$drainLoop$7(SimpleDequePool.java:390)
at reactor.core.publisher.FluxDoOnEach$DoOnEachSubscriber.onError(FluxDoOnEach.java:186)
at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:189)
at reactor.netty.resources.DefaultPooledConnectionProvider$PooledConnectionAllocator$PooledConnectionInitializer.onError(DefaultPooledConnectionProvider.java:549)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:192)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:259)
at reactor.netty.transport.TransportConnector$MonoChannelPromise.tryFailure(TransportConnector.java:495)
at reactor.netty.transport.TransportConnector$MonoChannelPromise.setFailure(TransportConnector.java:449)
at reactor.netty.transport.TransportConnector.lambda$doConnect$3(TransportConnector.java:184)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)
at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:262)
at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
解决方法:
spring:
autoconfigure:
exclude:
- org.springframework.boot.actuate.autoconfigure.elasticsearch.ElasticSearchReactiveHealthContributorAutoConfiguration
- org.springframework.boot.autoconfigure.data.elasticsearch.ReactiveElasticsearchRepositoriesAutoConfiguration
- org.springframework.boot.autoconfigure.data.elasticsearch.ReactiveElasticsearchRestClientAutoConfiguration
六、Elasticsearch使用
经过以上的配置,就可以愉快的使用Elasticsearch了,使用方式也很简单,为了完善本文,下面我大概介绍一下,更多使用方式读者可以自行查阅相关文档。
- 注入ElasticsearchRestTemplate对象
@Autowired
private ElasticsearchRestTemplate elasticsearchTemplate;
- 使用客户端完成各种操作
public <T> String save(String indexName, T entity) {
IndexQuery indexQuery = new IndexQueryBuilder().withId(getId(entity)).withObject(entity).build();
return elasticsearchTemplate.index(indexQuery, IndexCoordinates.of(indexName));
}
- 我个人偏爱,喜欢将一些通用操作封装成工具类,最后分享一下我封装的工具,解释一下,由于这个项目需要搜集网关访问日志,而日志可能是散布在各处,所以做了一个缓存用来统一收集日志信息,完成之后再入库,还有一个功能是批量入库,这是为了提高系统性能,日志一般是辅助功能不需要事务控制,收集到一定数量之后统一入库要比一条一条插入好得多,减少连接交互数可以提高效率改善性能。
/**
* @author Mr.bin
* @version 1.0.0
* @ClassName ElasticSearchUtil.java
* @Description ElasticSearch常用操作工具类
* @createTime 2022年08月18日 09:19:00
*/
@Component
@Slf4j
public class ElasticSearchUtil {
/**
* 缓存。
* key: indexName
* value: 要打包提交的 List
*/
public ConcurrentHashMap<String, List<Object>> cacheMap;
// 自动打包提交触发条件
public int cacheMaxNumber = 10;
private static ConcurrentHashMap<String, Object> cache = new ConcurrentHashMap<>();
@Autowired
private ElasticsearchRestTemplate elasticsearchTemplate;
public ElasticsearchRestTemplate getClient() {
return elasticsearchTemplate;
}
public <T> boolean createIndex(Class<T> index) {
String indexName = getIndexName(index);
if (StringUtils.isEmpty(indexName)) {
log.error("the class " + index + " has not define @Document annotation and index name.");
return false;
}
IndexOperations indexOperations = elasticsearchTemplate.indexOps(index);
if (!indexOperations.exists()) {
String jsonStr = BeanUtils.readJsonFromFile("templates" + File.separator + "EsIndexSettings.json");
Document document = Document.parse(jsonStr);
indexOperations.create(document);
indexOperations.putMapping(index);
}
return true;
}
public <T> String save(String indexName, T entity) {
IndexQuery indexQuery = new IndexQueryBuilder().withId(getId(entity)).withObject(entity).build();
return elasticsearchTemplate.index(indexQuery, IndexCoordinates.of(indexName));
}
public <T> List<IndexedObjectInformation> saveBatch(String indexName, List<T> entitys) {
if (entitys.isEmpty()) {
return Collections.emptyList();
}
List<IndexQuery> indexQueries = entitys.stream().map(item -> {
IndexQuery indexQuery = new IndexQueryBuilder().withId(getId(item)).withObject(item).build();
return indexQuery;
}).collect(Collectors.toList());
return elasticsearchTemplate.bulkIndex(indexQueries, IndexCoordinates.of(indexName));
}
/**
* 删除
*
* @return
*/
public <T> String delete(T entity) {
return elasticsearchTemplate.delete(getId(entity));
}
public <T> UpdateResponse update(String indexName, T entity) {
JSONObject jsonObject = JSON.parseObject(JSON.toJSONString(entity));
Document document = jsonObject.entrySet().stream().map(entry -> {
Document doc = Document.create();
if (Objects.nonNull(entry.getValue())) {
// 更新后的内容
doc.putIfAbsent(entry.getKey(), entry.getValue());
}
return doc;
}).collect(Collectors.toList()).get(0);
UpdateQuery updateQuery = UpdateQuery.builder(getId(entity)).
withDocument(document).
// 不加默认false。true表示更新时不存在就插入
withDocAsUpsert(true).
build();
return elasticsearchTemplate.update(updateQuery, IndexCoordinates.of(indexName));
}
/**
* 将之提交到缓存Cache中。这里不同意put,put是直接提交到ElasticSearch中,而这个只是提交到Java缓存中,等积累到一定条数之后,
* 在一起将Java缓存中的打包一次性提交到 Elasticsearch中
* <p>默认同一个indexName索引中,缓存最大条数是100条,达到100条会自动提交到 elasticsearch。 这个最大条数,可以通过
*
* @param param 要增加的数据,key-value形式。 其中map.value 支持的类型有 String、int、long、float、double、boolean
* @param indexName 索引名字,类似数据库的表,是将数据添加进哪个表
*/
public synchronized void cache(Object param, String indexName) {
List<Object> list = cacheMap.get(indexName);
if (list == null) {
list = new LinkedList<>();
}
list.add(param);
if (list.size() >= this.cacheMaxNumber) {
// 提交
boolean submit = cacheSubmit(indexName);
if (submit) {
// 提交成功,那么清空indexName的list
list.clear();
}
}
// 重新赋予cacheMap
cacheMap.put(indexName, list);
}
/**
* 将当前缓存中某个索引中的数据提交到elasticsearch中
*
* @param indexName 索引名字,类似数据库的表,是将数据添加进哪个表
* @return true:成功; false:提交失败
*/
public boolean cacheSubmit(String indexName) {
List<Object> list = cacheMap.get(indexName);
if (list == null) {
return true;
}
List<IndexedObjectInformation> rsp = saveBatch(indexName, list);
if (rsp == null || rsp.isEmpty()) {
// 出现错误,那么不清空list
return false;
} else {
// 成功,那么清空缓存中这个索引的数据
list.clear();
cacheMap.put(indexName, list);
return true;
}
}
public static <T> void cache(String key, T entity) {
if (Objects.isNull(cache.get(key))) {
cache.put(key, entity);
} else {
T oldEntity = (T) cache.get(key);
T updateEntity = BeanUtils.updateBean(oldEntity, entity);
cache.put(key, updateEntity);
}
}
public boolean cacheSubmit(String indexName, String key) {
Object entity = cache.get(key);
if (Objects.isNull(entity)) {
return true;
}
String rsp = save(indexName, entity);
if (StringUtils.isEmpty(rsp)) {
return false;
} else {
// 成功,那么清空缓存中这个索引的数据
cache.remove(key);
return true;
}
}
public static <T> T getCache(String key) {
return (T) cache.get(key);
}
public static void removeCache(String key) {
if (cache.containsKey(key)) {
cache.remove(key);
}
if (!cache.isEmpty()) {
cache.clear();
}
}
public <T> String getId(T t) {
try {
Class<? extends Object> tClass = t.getClass();
//得到所有属性
Field[] fields = tClass.getDeclaredFields();
String idName = Arrays.stream(fields).filter(f -> Objects.nonNull(f.getAnnotation(Id.class))).
map(i -> {
i.setAccessible(true);
return i.getName();
}).collect(Collectors.joining());
// 整合出 getId() 属性这个方法
PropertyDescriptor propertyDescriptor = new PropertyDescriptor(idName, tClass);
Method method = propertyDescriptor.getReadMethod();
// 调用这个整合出来的get方法,强转成自己需要的类型
return String.valueOf(method.invoke(t));
} catch (Exception e) {
throw new BusinessException(ReturnCodeEnum.DATA_FAILED,
"get document id failed, id is null or model has error.");
}
}
public <T> String getIndexName(Class<T> index) {
boolean hasDoc = index.isAnnotationPresent(org.springframework.data.elasticsearch.annotations.Document.class);
if (hasDoc) {
org.springframework.data.elasticsearch.annotations.Document doc =
index.getAnnotation(org.springframework.data.elasticsearch.annotations.Document.class);
return doc.indexName();
} else {
return "";
}
}
}
更多推荐
所有评论(0)