package com.amarsoft.code.es.service;

import com.amarsoft.code.mybatis.entity.User;
import com.amarsoft.code.mybatis.mapper.UserMapper;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.util.JSONPObject;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;

import java.io.IOException;
import java.util.List;

@Service
public class EsClientService {

    @Autowired
    private RestHighLevelClient restHighLevelClient;

    @Autowired
    UserMapper userMapper;

    ObjectMapper om=new ObjectMapper();


    /**
     * 数据迁移api
     * @throws Exception
     */
    public void bulkInit() throws Exception{

        GetIndexRequest getIndexRequest = new GetIndexRequest("db_test");
        boolean exists = restHighLevelClient.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
        if(!exists){
            CreateIndexRequest createIndexRequest = new CreateIndexRequest("db_test");
            createIndexRequest.settings(Settings.builder()
                    .put("index.number_of_shards",3)
                    .put("index.number_of_replicas",2)
            );
            CreateIndexResponse createIndexResponse = restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
        }
        //db查询数据
        List<User> sels = userMapper.Sels();

        BulkRequest bulkRequest = new BulkRequest("db_test");
        for (User user:sels) {
            bulkRequest.add(new IndexRequest().id(user.getUserName()).source(om.writeValueAsString(user),XContentType.JSON));
        }
        BulkResponse bulk = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
        System.out.println("成功插入数量:"+bulk.getItems().length);
    }

    /**
     * 查询,分页,排序,过滤
     * @throws IOException
     */
    public void Search() throws IOException {
        //创建搜索对象
        SearchRequest searchRequest = new SearchRequest("db_test");
        //搜索构建对象
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

        /**
         * 全文检索  match_phrase
         * 精准匹配 term
         * 模糊查询 fuzzy
         * 短语匹配  match_phrase   slop
         *
         */
        searchSourceBuilder.query(QueryBuilders.matchAllQuery())//执行查询条件
                .from(0)//起始条数
                .size(10)//每页展示记录
                .query(QueryBuilders.matchQuery("userName","yuan")) //过滤条件
                .sort("id", SortOrder.DESC);//排序

        //创建搜索请求
        searchRequest.source(searchSourceBuilder);

        SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);

        System.out.println("符合条件的文档总数: "+searchResponse.getHits().getTotalHits());
        System.out.println("符合条件的文档最大得分: "+searchResponse.getHits().getMaxScore());
        SearchHit[] hits = searchResponse.getHits().getHits();
        for (SearchHit hit : hits) {
            System.out.println(hit.getSourceAsMap());
        }
    }


    public void scroll(String scrollId) throws IOException{
        SearchRequest searchRequest = new SearchRequest("db_test");
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

        searchSourceBuilder.size(1);
        searchRequest.source(searchSourceBuilder);
        searchRequest.scroll(TimeValue.timeValueDays(1));//设置过期事件

        SearchResponse searchResponse  =StringUtils.isEmpty(scrollId)
                ? restHighLevelClient.search(searchRequest,RequestOptions.DEFAULT)
                :restHighLevelClient.scroll(new SearchScrollRequest(scrollId),RequestOptions.DEFAULT);

        System.out.println(searchResponse.getScrollId());
        System.out.println("符合条件的文档总数: "+searchResponse.getHits().getTotalHits());
        SearchHit[] hits = searchResponse.getHits().getHits();
        for (SearchHit hit : hits) {
            System.out.println(hit.getSourceAsMap());
        }
    }

    public void bulk() throws IOException {
        BulkRequest bulkRequest = new BulkRequest();
        // 添加
        IndexRequest indexRequest = new IndexRequest("db_test");
        User sel = userMapper.Sel(1);
        indexRequest.id(sel.getUserName()+"test").source(om.writeValueAsString(sel), XContentType.JSON);
        bulkRequest.add(indexRequest);

        // 删除
        DeleteRequest deleteRequest01 = new DeleteRequest("db_test","pYAtG3kBRz-Sn-2fMFjj");
        DeleteRequest deleteRequest02 = new DeleteRequest("db_test","uhTyGHkBExaVQsl4F9Lj");
        DeleteRequest deleteRequest03 = new DeleteRequest("db_test","C8zCGHkB5KgTrUTeLyE_");
        bulkRequest.add(deleteRequest01);
        bulkRequest.add(deleteRequest02);
        bulkRequest.add(deleteRequest03);

        // 修改
        UpdateRequest updateRequest = new UpdateRequest("db_test","10");
        updateRequest.doc("{\"username\":\"炼石补天的女娲\"}",XContentType.JSON);
        bulkRequest.add(updateRequest);

        BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
        BulkItemResponse[] items = bulkResponse.getItems();
        for (BulkItemResponse item : items) {
            System.out.println(item.status());
        }
    }
}

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐