import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.util.NumberUtil;
import cn.hutool.json.JSONConfig;
import cn.hutool.json.JSONObject;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com..core.elasticsearch.utils.bean.ScrollPageBean;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.stereotype.Component;
import org.springframework.util.ReflectionUtils;

import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;

/**
 * @description: TODO es 工具类包含,分页查询,高亮查询,游标查询,新增修改删除等操作
 * @author: 
 * @date: 2022/1/7 14:42
 * @version: 1.0
 */
@Slf4j
@ConditionalOnClass({RestHighLevelClient.class})
@Component
public class ElasticsearchUtil {


	@Autowired
	private RestHighLevelClient esClient;


	/**
	 * @description: 插入数据
	 * @param: index 索引名称
	 * @param:data 数据
	 * @param:id 主键id
	 * @param: dateFormat 日期格式 默认  yyyy-MM-dd HH:mm:ss
	 * @param: immediate 是否立马同步 ,默认false  插入1秒后可查询
	 * @return: void
	 * @author:
	 * @date: 2022/1/7 14:48
	 */
	@SneakyThrows
	public void saveWithId(String index, Object data, String id, String dateFormat, boolean immediate) {
		if (dateFormat == null) {
			dateFormat = DatePattern.NORM_DATETIME_PATTERN;
		}
		IndexRequest request = new IndexRequest(index, index, id);
		if (immediate) {
			request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
		}
		request.source(new JSONObject(data, new JSONConfig().setDateFormat(dateFormat)).toString()
			, XContentType.JSON);
		IndexResponse response = esClient.index(request, RequestOptions.DEFAULT);
		if (log.isInfoEnabled()) {
			log.info("ES-带id添加,index: {}, type: {}, id: {}, result: {}", index, index, id, response.getResult());
		}
	}


	/**
	 * @description: 通过主键修改数据
	 * @param: index 索引名称
	 * @param:data 数据
	 * @param:id 主键id
	 * @param: dateFormat 日期格式 默认  yyyy-MM-dd HH:mm:ss
	 * @param: immediate 是否立马同步 ,默认false  1秒后可查询
	 * @return: void
	 * @author:
	 * @date: 2022/1/7 14:48
	 */
	@SneakyThrows
	public void updateById(String index, Object data, String id, String dateFormat, boolean immediate) {
		if (dateFormat == null) {
			dateFormat = DatePattern.NORM_DATETIME_PATTERN;
		}
		UpdateRequest request = new UpdateRequest(index, index, id);
		if (immediate) {
			request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
		}
		request.doc(new JSONObject(data, new JSONConfig().setDateFormat(dateFormat)).toString()
			, XContentType.JSON);
		UpdateResponse response = esClient.update(request, RequestOptions.DEFAULT);
		if (log.isInfoEnabled()) {
			log.info("ES-根据id更新,index: {}, type: {}, id: {}, result: {}", index, index, id, response.getResult());
		}
	}


	/**
	 * @description: 批量插入`
	 * @param: index 索引名称
	 * @param:dataMap 数据 map 接口 id ,数据对象
	 * @param:id 主键id
	 * @param: dateFormat 日期格式 默认  yyyy-MM-dd HH:mm:ss
	 * @param: immediate 是否立马同步 ,默认false  1秒后可查询
	 * @author:
	 * @date: 2022/1/7 14:55
	 */
	@SneakyThrows
	public void batchSaveWithId(String index, Map<String, Object> dataMap, boolean immediate) {
		BulkRequest bulkRequest = new BulkRequest();
		dataMap.forEach((id, data) -> bulkRequest.add(new IndexRequest(index, index, id).source(JSON.toJSONString(data), XContentType.JSON)));
		if (immediate) {
			bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
		}
		esClient.bulk(bulkRequest, RequestOptions.DEFAULT);
		if (log.isInfoEnabled()) {
			log.info("ES-带id批量添加,index: {}, type: {}, ids:{}", index, index, dataMap.keySet());
		}
	}


	/**
	 * 根据id删除
	 *
	 * @param immediate 是否立马同步 ,默认false  1秒后可查询
	 * @param index
	 * @param id
	 */
	@SneakyThrows
	public void deleteById(String index, String id, boolean immediate) {
		DeleteRequest request = new DeleteRequest(index, index, id);
		if (immediate) {
			request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
		}
		DeleteResponse response = esClient.delete(request, RequestOptions.DEFAULT);
		if (log.isInfoEnabled()) {
			log.info("ES-根据id删除,index: {}, type: {}, id: {}, result: {}", index, index, id, response.getResult());
		}
	}


	/**
	 * 根据id查询
	 *
	 * @param index 索引名称
	 * @param id    主键iD
	 * @param clazz 返回对象
	 * @param <T>
	 * @return T
	 */
	@SneakyThrows
	public <T> T getById(String index, String id, Class<T> clazz) {
		T result = null;
		GetRequest request = new GetRequest(index, index, id);
		GetResponse response = esClient.get(request, RequestOptions.DEFAULT);
		if (response.isExists()) {
			result = JSON.parseObject(response.getSourceAsString(), clazz);
		}
		if (log.isInfoEnabled()) {
			log.info("ES-根据id查询,index: {}, type: {}, id: {},  isExists: {}", index, index, id, response.isExists());
		}
		return result;
	}


	/**
	 * @description: 普通查询
	 * @param: index  索引名称
	 * @param: searchSourceBuilder 查询条件构建
	 * @param: resultClass 类
	 * @param: currentPage 当前页 分页的页码,不是es 的
	 * @param: size 每页显示数据
	 * @return: java.util.List<T>
	 * @author: 
	 * @date: 2022/1/6 16:41
	 */
	public <T> IPage<T> page(String index, SearchSourceBuilder searchSourceBuilder, Class<T> resultClass, int currentPage, int size, List<String> highFields) {
		SearchRequest request = new SearchRequest(index);
		// 高亮字段设置
		if (CollectionUtil.isNotEmpty(highFields)) {
			buildHighLight(searchSourceBuilder, highFields);
		}
		request.source(searchSourceBuilder);
		SearchResponse response = null;

		try {
			response = esClient.search(request, RequestOptions.DEFAULT);
		} catch (IOException e) {
			e.printStackTrace();
		}

		return analysisResponse(response, resultClass, currentPage, size, highFields);
	}


	/**
	 * @description: 解析es 查询结果
	 * @param: response
	 * @param: resultClass 转换成对象类
	 * @return: java.util.List<T>
	 * @author: 
	 * @date: 2022/1/7 16:13
	 */
	private <T> IPage<T> analysisResponse(SearchResponse response, Class<T> resultClass, int currentPage, int size, List<String> highFields) {
		SearchHit[] searchHits = response.getHits().getHits();
		List<T> retList = new ArrayList<>(searchHits.length);
		for (SearchHit searchHit : searchHits) {
			String strJson = searchHit.getSourceAsString();
			T t = JSON.parseObject(strJson, resultClass);
			try {
				setId(resultClass, t, searchHit.getId());
			} catch (Exception e) {
				log.info("es 查询数据设置主键id值异常", e);
			}

			/**
			 * 高亮字段设置后,组织结果,es 结果建议与java 对象 名称一直,基本要求
			 */
			if (CollectionUtil.isNotEmpty(highFields)) {
				Map<String, HighlightField> highlightFieldMap = searchHit.getHighlightFields();
				HighlightField highlightField;
				for (String field : highFields) {
					highlightField = highlightFieldMap.get(field);
					if (highlightField != null) {
						// 获取指定字段的高亮片段
						Text[] fragments = highlightField.getFragments();
						// 将这些高亮片段拼接成一个完整的高亮字段
						StringBuilder builder = new StringBuilder();
						for (Text text : fragments) {
							builder.append(text);
						}
						// 设置到实体类中
						setValue(resultClass, t, builder.toString(), field);
					}

				}
			}

			retList.add(t);
		}

		long totalNum = response.getHits().getTotalHits();
		IPage<T> pageVo = new Page<>(currentPage, size, totalNum);
		pageVo.setRecords(retList);
		return pageVo;
	}


	/**
	 * @description: 批量删除
	 * @param: index 索引名称
	 * @param: idList id 集合 默认long 类型
	 * @return: org.elasticsearch.action.bulk.BulkResponse
	 * @author: 
	 * @date: 2022/1/7 10:21
	 */
	public <T> BulkResponse batchDelete(String index, Collection<T> idList, boolean immediate) {
		BulkRequest request = new BulkRequest();
		for (T t : idList) {
			request.add(new DeleteRequest(index, index, t.toString()));
		}

		if (immediate) {
			request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
		}

		BulkResponse response = null;
		try {
			response = esClient.bulk(request, RequestOptions.DEFAULT);
		} catch (IOException e) {
			e.printStackTrace();
		}
		return response;

	}


	/**
	 * @description: 对象id 为空时,给id 赋值
	 * @param: resultClass 类
	 * @param: t 对象
	 * @param: id 主键id 的值
	 * @return: void
	 * @author: 
	 * @date: 2022/1/6 16:40
	 */
	@SneakyThrows
	private <T> void setId(Class<T> resultClass, T t, String id) {
		Field field = ReflectionUtils.findField(resultClass, "id");
		if (null != field) {
			field.setAccessible(true);
			Object object = ReflectionUtils.getField(field, t);
			if (object == null) {
				Method method = null ;
				Object value = null;
				try {
					if (NumberUtil.isLong(id)) {
						method = resultClass.getMethod("setId", Long.class);
						value = Long.valueOf(id);
					} else {
						method = resultClass.getMethod("setId", String.class);
						value = id;
					}
				}catch (NoSuchMethodException e){
					throw  new NoSuchMethodException("id 未找到对应的setId()方法,赋值失败");
				}

				ReflectionUtils.invokeMethod(method, t,value);
			}
		}
	}


	/**
	 * @description: 指定字段赋值
	 * @param: resultClass 类
	 * @param:t 对象
	 * @param:fieldValue 字段名
	 * @param: fieldName 字段值
	 * @return: void
	 * @author: 
	 * @date: 2022/1/10 13:53
	 */
	@SneakyThrows
	private <T> void setValue(Class<T> resultClass, T t, Object fieldValue, String fieldName) {
		Field field = ReflectionUtils.findField(resultClass, fieldName);
		if (null != field) {
			field.setAccessible(true);
			String methodName = "set".concat(captureName(fieldName));
			Method method = null;
			try {
				method = resultClass.getMethod(methodName, String.class);
			} catch (NoSuchMethodException e) {
				throw  new NoSuchMethodException(fieldName+"非普通的set方法,赋值失败");
			}

			ReflectionUtils.invokeMethod(method, t, fieldValue);
		}
	}


	/**
	 * @description: 进行字母的ascii编码前移,效率要高于截取字符串进行转换的操作
	 * @param: str
	 * @return: java.lang.String
	 * @author: 
	 * @date: 2022/1/10 13:46
	 */
	private String captureName(String str) {
		char[] cs = str.toCharArray();
		cs[0] -= 32;
		return String.valueOf(cs);
	}


	/**
	 * @description: 游标查询允许我们 先做查询初始化,然后再批量地拉取结果。
	 * 游标查询会取某个时间点的快照数据。 查询初始化之后索引上的任何变化会被它忽略。 它通过保存旧的数据文件来实现这个特性,
	 * 结果就像保留初始化时的索引 视图 一样。
	 * 启用游标查询可以通过在查询的时候设置参数 scroll 的值为我们期望的游标查询的过期时间。 游标查询的过期时间会在每次做查询的时候刷新,
	 * 所以这个时间只需要足够处理当前批的结果就可以了,而不是处理查询结果的所有文档的所需时间。
	 * 这个过期时间的参数很重要,因为保持这个游标查询窗口需要消耗资源,所以我们期望如果不再需要维护这种资源就该早点儿释放掉。
	 * 设置这个超时能够让 Elasticsearch 在稍后空闲的时候自动释放这部分资源。
	 * @param: indexName 索引名称
	 * @param:searchSourceBuilder 查询条件构建
	 * @param: scrollId 游标查询每次返回一个新字段 _scroll_id。每次我们做下一次游标查询, 我们必须把前一次查询返回的字段 _scroll_id 传递进去。
	 * @param: highFields 设置高亮的字段
	 * @param: resultClass 返回类
	 * @param:size 显示数据量,没页码的概念
	 * @param:minutes 游标查询的过期时间
	 * @return: java.util.Map<java.lang.String, java.lang.Object>
	 * @author: 
	 * @date: 2022/1/10 10:29
	 */
	public <T> ScrollPageBean scrollPage(String indexName, SearchSourceBuilder searchSourceBuilder, String scrollId,
										 Class<T> resultClass, int size, int minutes, List<String> highFields) throws IOException {
		SearchResponse searchResponse = null;

		/**
		 *  游标查询的过期时间
		 *  第一次查询,不带scroll_id,所以要设置scroll超时时间
		 * 超时时间不要设置太短,否则会出现异常
		 * 第二次查询,SearchSrollRequest
		 */
		if (minutes == 0) {
			minutes = 3;
		}

		if (scrollId == null) {
			SearchRequest searchRequest = new SearchRequest(indexName);
			// 高亮字段设置
			if (CollectionUtil.isNotEmpty(highFields)) {
				buildHighLight(searchSourceBuilder, highFields);
			}
			// 调用SearchRequest.source将查询条件设置到检索请求
			searchRequest.source(searchSourceBuilder);
			// 设置scroll查询
			searchRequest.scroll(TimeValue.timeValueMinutes(minutes));

			searchResponse = esClient.search(searchRequest, RequestOptions.DEFAULT);
		} else {
			// 第二次查询的时候,直接通过scroll id查询数据
			SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId);
			searchScrollRequest.scroll(TimeValue.timeValueMinutes(minutes));
			// 使用RestHighLevelClient发送scroll请求
			searchResponse = esClient.scroll(searchScrollRequest, RequestOptions.DEFAULT);
		}

		IPage<T> scrollPage = analysisResponse(searchResponse, resultClass, 0, size, highFields);
		return new ScrollPageBean(searchResponse.getScrollId(), scrollPage);
	}


	/**
	 * @description: 设置高亮
	 * @param: searchSourceBuilder
	 * @param: fields
	 * @return: void
	 * @author: 
	 * @date: 2022/1/10 11:16
	 */
	private void buildHighLight(SearchSourceBuilder searchSourceBuilder, List<String> fields) {
		// 设置高亮
		HighlightBuilder highlightBuilder = new HighlightBuilder();
		fields.forEach(field -> {

			highlightBuilder.field(field);
		});
		highlightBuilder.preTags("<font color='red'>");
		highlightBuilder.postTags("</font>");

		// 给请求设置高亮
		searchSourceBuilder.highlighter(highlightBuilder);

	}

}

pom es 6.8.3 版本

   <dependencies>
        <!-- ES的高阶的客户端API begin -->
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.elasticsearch</groupId>
                    <artifactId>elasticsearch</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.elasticsearch.client</groupId>
                    <artifactId>elasticsearch-rest-client</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
        </dependency>

        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-client</artifactId>
        </dependency>

   

        <!--Mybatis-->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus</artifactId>
        </dependency>

        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.7.3</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
        </dependency>
    </dependencies>

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐