使用Java操作Elasticsearch的所有方法

13.1 Elasticsearch简介

	Elasticsearch是基于Lucene开发的一个分布式全文检索框架,向Elasticsearch中存储和从Elasticsearch中查询,格式是json。

a)、索引index,相当于数据库中的database。

b)、类型type相当于数据库中的table。

c)、主键id相当于数据库中记录的主键,是唯一的。

d)、文档 document (相当于一条数据)

文档是ElasticSearch的基本单位。在Es中文档以JSON格式来表示

向es中的index下面的type中存储json类型的数据。

e) 、字段是文档中的field 属性,需要对每一个属性定义索引和被搜索的方式

Elasticsearch是RestFul风格的api,通过http的请求形式(注意,参数是url拼接还是请求的json形式哦),发送请求,对Elasticsearch进行操作。查询:get。删除:delete。添加:put/post。修改:put/post。
RESTFul接口url的格式:http://ip:port/index/type/<[id]>。其中index、type是必须提供的。id是可以选择的,不提供es会自动生成,index、type将信息进行分层,利于管理。

设置setting:

  //如果想要设置Settings
   Settings settings=Settings.builder()
     .put("cluster.name","elasticsearch") //集群名
     .put("client.transport.sniff", true)// 嗅探机制,找到集群
     .put("index.number_of_shards", 2) // 分片数
     .put("index.number_of_replicas", 1) // 副本数
     .build();

13.2 索引、映射、文档CRUD

13.2.1 pom依赖

<dependencies>
        <!-- elasticsearch的客户端 -->
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>transport</artifactId>
            <version>5.4.3</version>
        </dependency>
        <!-- elasticsearch依赖2.x的log4j -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>2.8.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.8.2</version>
        </dependency>
        <!-- junit单元测试 -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
    </dependencies>
</project>

13.2.2 索引

13.2.2.1 创建索引

prepareCreate

/**
 * 创建索引
 */
@Test
public void testCreateIndex() throws UnknownHostException {
    TransportClient transportClient = new PreBuiltTransportClient(Settings.EMPTY).addTransportAddress(
            new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
    //创建索引 blog5
    CreateIndexResponse indexResponse = transportClient.admin().indices() .prepareCreate("blog5").get();//###

    System.out.println(indexResponse.isAcknowledged()); //确认输出:true
    System.out.println(indexResponse.isShardsAcked());//true
    transportClient.close();
}
13.2.2.2 删除索引

两种方法:.prepareDelete( index , type )、delete(new DeleteIndexRequest( index ))

      /**
         * 删除索引
         */
        @Test
        public void testDelIndex() throws Exception {
            //创建Client连接对象
            TransportClient transportClient = new PreBuiltTransportClient(Settings.EMPTY)
                    .addTransportAddress(new InetSocketTransportAddress( InetAddress.getByName("127.0.0.1"), 9300));
            
            //删除名称为blog5的索引
            //多数据
            //DeleteIndexResponse deleteIndexResponse = transportClient.admin().indices() .prepareDelete("blog5", "blog").get();//#####
            //对象传参
            DeleteIndexResponse deleteIndexResponse = transportClient.admin().indices().delete(new DeleteIndexRequest("blog5")).get();//#####
            System.out.println(deleteIndexResponse.isAcknowledged());//true
            transportClient.close();
        }

13.2.3 映射 ??

13.2.3.1 创建映射

putMapping

      /**
         * 创建mapping
      */
    @Test
    public void testCreateMapping() throws Exception {
            TransportClient transportClient = new PreBuiltTransportClient(Settings.EMPTY)
                    .addTransportAddress(new InetSocketTransportAddress( InetAddress.getByName("127.0.0.1"), 9300));
        
            //exists方法,判断索引是否存在
            IndicesExistsResponse indicesExistsResponse = transportClient.admin().indices().exists(new IndicesExistsRequest("blog5")).get();
            System.out.println("索引存在:" + indicesExistsResponse.isExists());//索引存在:false
            //判断索引是否存在
            if (!indicesExistsResponse.isExists()) {
                //不存在则创建索引 blog5
                CreateIndexResponse createIndexResponse = transportClient.admin().indices().prepareCreate("blog5").get();
                System.out.println("创建索引:" + createIndexResponse.isAcknowledged());//创建索引:true
            }
            //添加映射
            /**  格式:
             "mappings": {
             "article": {
             "properties": {
             "id": {
             "store": true,
             "type": "long"
             },
             "title": {
             "analyzer": "ik_max_word",
             "store": true,
             "type": "text"
             },
             "content": {
             "analyzer": "ik_max_word",
             "store": true,
             "type": "text"
             }
             }
             } */
            XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()
                    .startObject()
                    .startObject("article")
                    .startObject("properties")
                    .startObject("id").field("store", true).field("type", "long").endObject()
                    .startObject("title").field("store", true).field("type", "text").field("analyzer", "ik_max_word").endObject()
                    .startObject("content").field("store", true).field("type", "text").field("analyzer", "ik_max_word").endObject()
                    .endObject()
                    .endObject()
                    .endObject();
            //创建映射,映射到索引blog5、类型article上
            PutMappingRequest putMappingRequest = Requests.putMappingRequest("blog5").type("article") .source(xContentBuilder);
    
            PutMappingResponse putMappingResponse = transportClient.admin().indices() .putMapping(putMappingRequest).get();
            System.out.println(putMappingResponse.isAcknowledged());//true
        
            transportClient.close();
    }

13.2.4 文档

13.2.4.1 建立文档

1、XContentBuilder

prepareIndex( index , type , id ).setSource(xContentBuilder)

    @Test
    public void testCreateDocument() throws Exception {
            TransportClient transportClient = new PreBuiltTransportClient(Settings.EMPTY)
                    .addTransportAddress(new InetSocketTransportAddress( InetAddress.getByName("127.0.0.1"), 9300));
        
            //创建文档信息
            XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()
                    .startObject()
                    .field("id", 1L)
                    .field("title", "ElasticSearch是一个基于Lucene的搜索服务器")
                    .field("content", "它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是" + "用Java开发的,并作为Apache许可条款下的开放源码发布,是当前流行的企业级搜索引擎")
                    .endObject();
            //建立文档对象
            /**
             * 参数一 blog1:表示索引对象
             * 参数二 article:类型
             * 参数三 1:建立id */
            IndexResponse indexResponse = transportClient.prepareIndex("bolg5", "article", "1").setSource(xContentBuilder).get();//###
            System.out.println(indexResponse.status());//CREATED
            transportClient.close();
        } 

2、Jackson

1)Article实体

public class Article {
    private Long id;
    private String title;
    private String content;

    //getter and setter
}

2)代码实现

prepareIndex( index , type , id ).setSource( json )

    @Test
    public void testCreateDocumentByBean() throws Exception {
            TransportClient transportClient = new PreBuiltTransportClient(Settings.EMPTY)
                    .addTransportAddress(new InetSocketTransportAddress( InetAddress.getByName("127.0.0.1"), 9300));
    
            //描述json 数据
            //{id:xxx, title:xxx, content:xxx}
            Article article = new Article();
            article.setId(2L);
            article.setTitle("ElasticSearch是一个基于Lucene的搜索服务器22");
            article.setContent("它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口22");
            //转成String类型
            String jsonStr = JSON.toJSON(article).toString();
            //建立文档
            IndexResponse indexResponse = transportClient.prepareIndex("bolg5", "article", String.valueOf(article.getId())).setSource(jsonStr).get();
            System.out.println(indexResponse.status());//CREATED
            transportClient.close();
    }
13.2.4.2 批量建立
     /**
      * 批量添加
     */
    @Test
    public void addDocBatch() throws Exception {
           TransportClient transportClient = new PreBuiltTransportClient(Settings.EMPTY)
                    .addTransportAddress(new InetSocketTransportAddress( InetAddress.getByName("127.0.0.1"), 9300));
    
            for (int i = 0; i < 100; i++) {
                Article article = new Article();
                article.setId(Long.valueOf(String.valueOf(i)));
                article.setTitle("ElasticSearch是一个基于Lucene的搜索服务器" + i);
                article.setContent("它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口" + i);
                String jsonStr = JSON.toJSON(article).toString();
                //批量添加数据
                IndexResponse indexResponse = transportClient.prepareIndex("blog5", "article", String.valueOf(article.getId())).setSource(jsonStr).get();
            }
            transportClient.close();
    }
13.2.4.3 修改文档

1、 prepareUpdate、prepareIndex

.prepareUpdate( index , type , id ).setDoc(jsonStr)

@Test
public void testUpdateDocumentByUpdate() throws Exception {
        TransportClient transportClient = new PreBuiltTransportClient(Settings.EMPTY)
                .addTransportAddress(new InetSocketTransportAddress( InetAddress.getByName("127.0.0.1"), 9300));
    
        Article article = new Article();
        article.setId(2L);
        article.setTitle("Edsfsdf基于Lucene的搜索服务器222244");
        article.setContent("基胜多负少的水`在这里插入代码片`电费于RESTful web接口22334");

        String jsonStr = JSON.toJSON(article).toString();
        //jsonStr={"id":2,"title":"Edsfsdf基于Lucene的搜索服务器222244","content":"基胜多负少的水电费于RESTful web接口22334"}
        System.out.println("jsonStr=" + jsonStr);

        //修改内容
        UpdateResponse updateResponse = transportClient.prepareUpdate("bolg5", "article", String.valueOf(article.getId())).setDoc(jsonStr).get();//###
        System.out.println(updateResponse.status());//OK
        transportClient.close();
}

再一次赋值:

      @Test
        public void testUpdateDocument() throws Exception {
            TransportClient transportClient = new PreBuiltTransportClient(Settings.EMPTY)
                    .addTransportAddress(new InetSocketTransportAddress( InetAddress.getByName("127.0.0.1"), 9300));
            
            Article article = new Article();
            article.setId(2L);
            article.setTitle("再一次赋值");
            article.setContent("再一次赋值的内容");
            String jsonStr = JSON.toJSON(article).toString();
    
            //重新赋值该索引、类型和id
            IndexResponse indexResponse = transportClient.prepareIndex("bolg5", "article", String.valueOf(article.getId())).setSource(jsonStr).get();
            System.out.println(indexResponse.status());//OK
            transportClient.close();
        }

2、 update

.update(new UpdateRequest( index , type , id ).doc( json ))

    @Test
    public void testUpdateDocumentByUpdateRequest() throws Exception {
            TransportClient transportClient = new PreBuiltTransportClient(Settings.EMPTY)
                    .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName( "127.0.0.1"), 9300));
            Article article = new Article();
            article.setId(2L);
            article.setTitle("update标题");
            article.setContent("update内容");
            String jsonStr = JSON.toJSON(article).toString();
            
            // update 方法修改
            UpdateResponse updateResponse = transportClient.update(new UpdateRequest("bolg5", "article", String.valueOf(article.getId())).doc(jsonStr)).get();
            System.out.println(updateResponse.status());//OK
            transportClient.close();
    }
13.2.4.4 删除文档

1、 prepareDelete

.prepareDelete( index , type , id )

     @Test
    public void testDeleteDocument() throws Exception {
            TransportClient transportClient = new PreBuiltTransportClient(Settings.EMPTY)
                    .addTransportAddress(new InetSocketTransportAddress( InetAddress.getByName("127.0.0.1"), 9300));
        
            // prepareDelete 删除数据
            DeleteResponse deleteResponse = transportClient.prepareDelete("bolg5", "article", "2").get();
            System.out.println(deleteResponse.status());
            transportClient.close();
    }

2、 delete

.delete(new DeleteRequest( index , type , id ))

      /**
         * 删除文档
         */
        @Test
        public void testDeleteDocumentByDelRequest() throws Exception {
            TransportClient transportClient = new PreBuiltTransportClient(Settings.EMPTY)
                    .addTransportAddress(new InetSocketTransportAddress( InetAddress.getByName("127.0.0.1"), 9300));
    
            //delete(new DeleteRequest("", "", "")) 删除
            DeleteResponse deleteResponse = transportClient.delete(new DeleteRequest("bolg5", "article", "1")).get();
            System.out.println(deleteResponse.status());
            transportClient.close();
        }

3、根据条件删除

DeleteByQueryAction.INSTANCE.newRequestBuilder( client )

.filter(QueryBuilders.matchQuery( field, XXX))

.source( index )

     	/**
         * 根据查询条件进行删除数据
         */
        @Test
        public void elasticsearchDeleteByQuery() {
             TransportClient transportClient = new PreBuiltTransportClient(Settings.EMPTY)
                    .addTransportAddress(new InetSocketTransportAddress( InetAddress.getByName( "127.0.0.1"), 9300));
            
            BulkByScrollResponse response = DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
                    // 指定查询条件,matchQuery是name的值text里面包括了这个内容就进行删除。默认使用标准分词器。
                    .filter(QueryBuilders.matchQuery("username", "王五五"))
                    // 指定索引名称
                    .source("people").get();
            // 获取到删除的个数
            long deleted = response.getDeleted();
            // 打印输出删除的个数
            System.out.println(deleted);
            //1
        }

4、异步删除

    /**
         * 异步删除
         * <p>
         * 监听,如果真正删除以后进行回调,打印输出删除确认的消息。
         */
        @Test
        public void elasticsearchDeleteByQueryAsync() {
            DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
                    .filter(QueryBuilders.matchQuery("sex", "男"))
                    .source("people")
                    .execute(new ActionListener<BulkByScrollResponse>() {
                        // 删除以后的方法回调
                        @Override
                        public void onResponse(BulkByScrollResponse response) {
                            // 返回删除的个数
                            long deleted = response.getDeleted();
                            System.out.println("数据删除完毕!");
                            // 打印删除的个数
                            System.out.println("数据删除的个数: " + deleted);
                        }
    
                        @Override
                        public void onFailure(Exception e) {
                            // 失败打印异常信息
                            e.printStackTrace();
                        }
                    });
    
            // 先打印输出,正常执行完毕。再执行异步监听删除数据。
            try {
                System.out.println("异步删除操作!");
                // 休眠10秒钟,避免主线程里面结束,子线程无法进行结果输出
                Thread.sleep(10000);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

13.3 简单查询

13.3.1 查询全部

QueryBuilders.matchAllQuery()

       /**
         * 查询全部
         */
        @Test
        public void testFindAll() throws Exception {
            //创建客户端访问对象
            TransportClient transportClient = new PreBuiltTransportClient(Settings.EMPTY)
                    .addTransportAddress(new InetSocketTransportAddress( InetAddress.getByName("127.0.0.1"), 9300));
            
            //QueryBuilders.matchAllQuery(),查询全部
            SearchResponse response = transportClient.prepareSearch("blog5").setTypes("article")
                    .setQuery(QueryBuilders.matchAllQuery())//###
                    .get();
            //获取搜索结果
            SearchHits hits = response.getHits();
            System.out.println(hits.getTotalHits());
            //遍历结果
            SearchHit[] hits1 = hits.getHits();
            for (SearchHit hit : hits1) {
                System.out.println(hit.getSourceAsString());
            }
            transportClient.close();
        }  

13.3.2 字符串查询

QueryBuilders.queryStringQuery(“XXX”)

        /**
         * 字符串查询
         */
        @Test
        public void testQueryStirng() throws Exception {
            TransportClient transportClient = new PreBuiltTransportClient(Settings.EMPTY)
                    .addTransportAddress(new InetSocketTransportAddress( InetAddress.getByName("127.0.0.1"), 9300));
            
            SearchResponse response = transportClient.prepareSearch("blog5").setTypes("article")
                    //利用IK分词器,匹配字符串内容
                    .setQuery(QueryBuilders.queryStringQuery("是个"))//###
                    .get();
            SearchHits hits = response.getHits();
            System.out.println(hits.getTotalHits());
            SearchHit[] hits1 = hits.getHits();
            for (SearchHit hit : hits1) {
                System.out.println(hit.getSourceAsString());
            }
            transportClient.close();
        }

13.3.3 词条查询

QueryBuilders.termQuery(“content”, “XXX”)

        /**
         * 词条查询
         */
        @Test
        public void testQueryTerm() throws Exception {
            TransportClient transportClient = new PreBuiltTransportClient(Settings.EMPTY)
                    .addTransportAddress(new InetSocketTransportAddress( InetAddress.getByName("127.0.0.1"), 9300));
            
            SearchResponse response = transportClient.prepareSearch("blog").setTypes("article")
                    .setQuery(QueryBuilders.termQuery("content", "创建"))//###
                    .get();
            
            SearchHits hits = response.getHits();
            System.out.println(hits.getTotalHits());
            SearchHit[] hits1 = hits.getHits();
            for (SearchHit hit : hits1) {
                System.out.println(hit.getSourceAsString());
            }
            transportClient.close();
        }

13.3.4 模糊查询

QueryBuilders.wildcardQuery(“title”," xxx ")

       /** 模糊查询 */
        @Test
        public void testWildcard() throws Exception {
            TransportClient transportClient = new PreBuiltTransportClient(Settings.EMPTY)
                    .addTransportAddress(new InetSocketTransportAddress( InetAddress.getByName("127.0.0.1"), 9300));
            
            SearchResponse response = transportClient.prepareSearch("blog5").setTypes("article")
                    .setQuery(QueryBuilders.wildcardQuery("title","*基于*"))//####
                    .get();
            
            SearchHits hits = response.getHits();
            System.out.println(hits.getTotalHits());
            SearchHit[] hits1 = hits.getHits();
            for (SearchHit hit : hits1) {
                System.out.println(hit.getSourceAsString());
            }
            transportClient.close();
        }

13.3.5 分页查询和排序

searchRequestBuilder.setFrom( 0)
searchRequestBuilder.setSize( 3)
searchRequestBuilder.addSort(“id”, SortOrder.DESC)

       /**
         * 查询分页和排序
         */
        @Test
        public void testQueryPageAndSort() throws Exception {
            TransportClient transportClient = new PreBuiltTransportClient(Settings.EMPTY)
                    .addTransportAddress(new InetSocketTransportAddress( InetAddress.getByName("127.0.0.1"), 9300));
    
            //构造搜索内容, 默认每页10条记录
            SearchRequestBuilder searchRequestBuilder = transportClient.prepareSearch("blog5") .setTypes("article").setQuery(QueryBuilders.matchAllQuery());
    
            //查询第1页数据,每页3条
            //setFrom():从第几条开始检索,默认是0。
            // setSize():每页最多显示的记录数。
            searchRequestBuilder.setFrom(0);//###
            searchRequestBuilder.setSize(3);//###
            //排序
            searchRequestBuilder.addSort("id", SortOrder.DESC);
    
            SearchResponse searchResponse = searchRequestBuilder.get();
            // 获取命中次数,查询结果有多少对象
            SearchHits hits = searchResponse.getHits();
            System.out.println(hits.getTotalHits());
            for (SearchHit hit : hits) {
                System.out.println(hit.getSourceAsString());
            }
            transportClient.close();
        }

输出:

13.3.6 多条查询

.prepareMultiGet() .add( index , type , id )

     /**
         * 查找多条
         * <p>
         * 索引Index里面的类型Type里面的多个id的所有信息
         */
        @Test
        public void elasticsearchMultiGet() throws IOException {
            TransportClient transportClient = new PreBuiltTransportClient(Settings.EMPTY)
                    .addTransportAddress(new InetSocketTransportAddress( InetAddress.getByName("127.0.0.1"), 9300));
            
            // 查询出多个索引Index、多个类型Type的多个id的所有信息
            MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
                    .add("people", "student", "1")
                    .add("people", "student", "2", "3")
                    .add("people", "teacher", "1")
                    //不存在该索引,.isExists()报错
                    .add("news", "fulltext", "1")
                    .get();
    
            // 将查询出的结果遍历输出
            for (MultiGetItemResponse itemResponse : multiGetItemResponses) {
                // 将每一个查询出的结果遍历输出
                GetResponse response = itemResponse.getResponse();
                // 判断如果存在就进行遍历输出
                if (response.isExists()) {
                    String json = response.getSourceAsString();
                    System.out.println(json);
                }
            }
        }

13.3.7 范围查找

rangeQuery( field ).from( XX ).to( XX ).includeLower( boolean ).includeUpper( boolean )

        /**
         * 按照范围进行查找。
         */
        @Test
        public void elasticsearchRange() {
            // includeLower(true).includeUpper(false)含义是包含前面,不包含后面的
            // [21, 24)
            QueryBuilder qb = rangeQuery("age").from(21).to(24).includeLower(true).includeUpper(false);
            // 将查询条件传递进去,并将查询结果进行返回。
            SearchResponse response = client.prepareSearch("people").setQuery(qb).get();
            System.out.println(response);
        }

13.3.11 查询结果高亮

       /**
         * 处理高亮
         */
        @Test
        public void testHighLight() throws Exception {
            //创建Client连接对象
            TransportClient transportClient = new PreBuiltTransportClient(Settings.EMPTY)
                    .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
    
            //构造搜索内容
            SearchRequestBuilder searchRequestBuilder = transportClient.prepareSearch("blog5").setTypes("article")
                    //.setQuery(QueryBuilders.termQuery("content","搜索"));
                    //模糊搜索
                    .setQuery(QueryBuilders.wildcardQuery("content", "*索"));
            searchRequestBuilder.setFrom(3);
            searchRequestBuilder.setSize(3);
            searchRequestBuilder.addSort("id", SortOrder.DESC);
    
            //设置高亮数据
            HighlightBuilder highLighter = new HighlightBuilder();
            highLighter.preTags("<font class='red'>");
            highLighter.field("content");
            highLighter.postTags("</font>");
            searchRequestBuilder.highlighter(highLighter);
    
            //获得查询结果数据
            SearchResponse searchResponse = searchRequestBuilder.get();
            SearchHits hits = searchResponse.getHits();
            System.out.println(hits.getTotalHits());
            for (SearchHit hit : hits) {
                System.out.println("SourceAsString内容:" + hit.getSourceAsString());
                Text[] texts = hit.getHighlightFields().get("content").getFragments();
                for (Text text : texts) {
                    System.out.println("content内容:" + text.toString());
                }
                System.out.println("--------------------------------------");
            }
            transportClient.close();
        }

13.3.12 简单查询

.prepareGet( index , type , id).execute().actionGet()

        @Test
        public void testSelect() {
            try {
            //设置集群名称elasticsearch,Settings设置es的集群名称,使用的设计模式,链式设计模式、build设计模式。
                Settings settings=Settings.builder()
                        .put("cluster.name","elasticsearch") //集群名
                        .build();
    
                // 读取es集群中的数据,创建client。
                @SuppressWarnings("resource")
                TransportClient client = new PreBuiltTransportClient(settings).addTransportAddresses(
                        // 用java访问ES用的端口是9300。es的9200是restful的请求端口号
                        // 由于我使用的是伪集群,所以就配置了一台机器,如果是集群方式,将竞选主节点的加进来即可。
                        new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
                
                // 方式是先去索引里面查询出索引数据,再去文档里面查询出数据。####
                GetResponse response = client.prepareGet("blog5", "article", "14").execute().actionGet();
                System.out.println(response);
                // 关闭client
                client.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

查询的结果如下所示:

13.4 设置

是否创建索引,是否存储,是否即分词,又建立索引(analyzed)、是否建索引不分词(not_analyzed)等等。

13.4.1 设置集群、索引

        private TransportClient client = new PreBuiltTransportClient(Settings.EMPTY).addTransportAddresses(
                new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
    
        // 在所有的测试方法之前执行
        @SuppressWarnings("resource")
        @Before
        public void init() throws Exception {
            // 设置集群名称biehl01
            Settings settings = Settings.builder().put("cluster.name", "elasticsearch")
                    // 自动感知的功能(可以通过当前指定的节点获取所有es节点的信息)
                    .put("client.transport.sniff", true).build();
            // 创建client
            client = new PreBuiltTransportClient(settings).addTransportAddresses(
                    // new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"),
                    // 9300),
                    // new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"),
                    // 9300),
                    // 建议指定2个及其以上的节点。
                    new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
        }
    
        /**
         * AdminClient创建索引,并配置一些参数,用来指定一些映射关系等等
         * 这里创建一个索引Index,并且指定分区、副本的数量
         */
        @Test
        public void createIndexWithSettings() throws UnknownHostException {
            // 获取Admin的API
            AdminClient admin = client.admin();
            // 使用Admin API对索引进行操作
            IndicesAdminClient indices = admin.indices();
            // 准备创建索引
            indices.prepareCreate("food")
                    // 配置索引参数
                    .setSettings(
                            // 参数配置器
                            Settings.builder()
                                    // 指定索引分区的数量。shards分区
                                    .put("index.number_of_shards", 5)
                             // 指定索引副本的数量(注意:不包括本身,如果设置数据存储副本为1,实际上数据存储了2份)
                             // replicas副本
                                    .put("index.number_of_replicas", 1))
                    // 真正执行
                    .get();
        }

13.4.2 设置字段 ??

       /**
         * index这个属性,no代表不建索引
         * <p>
         * not_analyzed,建索引不分词; analyzed 即分词,又建立索引
         * <p>
         * expected [no],[not_analyzed] or [analyzed]。即可以选择三者任意一个值
         */
        @Test
        public void elasticsearchSettingsPlayerMappings() throws IOException {
            // 1:settings
            HashMap<String, Object> settings_map = new HashMap<String, Object>(2);
            // 分区的数量4
            settings_map.put("number_of_shards", 4);
            // 副本的数量1
            settings_map.put("number_of_replicas", 1);
    
            // 2:mappings
            XContentBuilder builder = XContentFactory.jsonBuilder()
                    .startObject().field("dynamic", "true")
                    .startObject("properties")
                    // 在文档中存储
                    .startObject("id").field("type", "integer").field("store", "yes").endObject()
                    // 不分词,但是建索引、
                    .startObject("name").field("type", "string").field("index", "not_analyzed").endObject()
                    .startObject("age").field("type", "integer").endObject()
                    .startObject("salary").field("type", "integer").endObject()
                    // 不分词,但是建索引
                    .startObject("team").field("type", "string").field("index", "not_analyzed").endObject()
                    // 不分词,但是建索引、
                    .startObject("position").field("type", "string").field("index", "not_analyzed").endObject()
                    // 即分词,又建立索引、
                    .startObject("description").field("type", "string").field("store", "no").field("index", "analyzed")
                    .field("analyzer", "ik_smart").endObject()
                    // 即分词,又建立索引、在文档中存储
                    .startObject("addr").field("type", "string").field("store", "yes").field("index", "analyzed")
                    .field("analyzer", "ik_smart").endObject()
                    .endObject()
                    .endObject();
    
            CreateIndexRequestBuilder prepareCreate = client.admin().indices().prepareCreate("player");
            prepareCreate.setSettings(settings_map).addMapping("basketball", builder).get();
        }

展示:

{
    "state":"open",
    "settings":{
        "index":{
            "creation_date":"1645075089681",
            "number_of_shards":"4",
            "number_of_replicas":"1",
            "uuid":"b2LXw7UGSFuEE4-hcRgTAg",
            "version":{
                "created":"5060899"
            },
            "provided_name":"player"
        }
    },
    "mappings":{
        "basketball":{
            "dynamic":"true",
            "properties":{
                "name":{
                    "type":"keyword"
                },
                "description":{
                    "analyzer":"ik_smart",
                    "store":true,
                    "type":"text"
                },
                "id":{
                    "store":true,
                    "type":"integer"
                },
                "position":{
                    "type":"keyword"
                },
                "team":{
                    "type":"keyword"
                },
                "addr":{
                    "analyzer":"ik_smart",
                    "store":true,
                    "type":"text"
                },
                "salary":{
                    "type":"integer"
                },
                "age":{
                    "type":"integer"
                }
            }
        }
    },
    "aliases":[

    ],
    "primary_terms":{
        "0":1,
        "1":1,
        "2":1,
        "3":1
    },
    "in_sync_allocations":{
        "0":[
            "ceEjSFckSs-X2PcIe1EsEw"
        ],
        "1":[
            "lDikbTUXRkmgkWJyjnf6GA"
        ],
        "2":[
            "VBINrKxrRAGebgYMHwNd4g"
        ],
        "3":[
            "TCf-1AWQSl-gKyfFjX4M_A"
        ]
    }
}

13.5 聚合统计

聚合查询,可以进行分组统计数量,分组统计最大值,分组统计平均值,等等统计

    private static TransportClient client  = new PreBuiltTransportClient( Settings.builder() .put("cluster.name", "elasticsearch")
                        // 自动感知的功能(可以通过当前指定的节点获取所有es节点的信息)
                        .put("client.transport.sniff", true).build())
                        .addTransportAddresses(new InetSocketTransportAddress(InetAddress.getByName( "127.0.0.1"), 9300));
       
        /** 统计某个队伍的人数
         * select team, count(*) as team_count from player group by team;
         * team_counts是别名称。
         */
        @Test
        public void elasticsearchAgg1() {
            // 指定索引和type,####
            SearchRequestBuilder builder = client.prepareSearch("player").setTypes("basketball");
            // 按team分组然后聚合,但是并没有指定聚合函数。
            // team_count是别名称,####
            TermsAggregationBuilder teamAgg = AggregationBuilders.terms("team_count").field("team");
            // 添加聚合器
            builder.addAggregation(teamAgg);
            // 触发
            SearchResponse response = builder.execute().actionGet();
            // System.out.println(response);
            // 将返回的结果放入到一个map中
            Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
            // 遍历打印输出
            Set<String> keys = aggMap.keySet();
            for (String key : keys) {
                System.out.println("key: " + key);
            }
    
            System.out.println("");
            // 取出聚合属性
            StringTerms terms = (StringTerms) aggMap.get("team_count");
            // 依次迭代出分组聚合数据
            for (Terms.Bucket bucket : terms.getBuckets()) {
                // 分组的名字
                String team = (String) bucket.getKey();
                // count,分组后一个组有多少数据
                long count = bucket.getDocCount();
                System.out.println(team + ": " + count);
            }
    
            System.out.println("");
    
            // 使用Iterator进行遍历迭代
    //        Iterator<Terms.Bucket> teamBucketIt = terms.getBuckets().iterator();
            Iterator<StringTerms.Bucket> teamBucketIt = terms.getBuckets().iterator();
            while (teamBucketIt.hasNext()) {
                Terms.Bucket bucket = teamBucketIt.next();
                // 获取到分组后每组的组名称
                String team = (String) bucket.getKey();
                // 获取到分组后的每组数量
                long count = bucket.getDocCount();
                // 打印输出
                System.out.println(team + ": " + count);
            }
        }
    
        /**
         * select team, position, count(*) as pos_count from
         *  player group by team,position;
         */
        @Test
        public void elasticsearchAgg2() {
            SearchRequestBuilder builder = client.prepareSearch("player").setTypes("basketball");
            // 指定别名和分组的字段
            TermsAggregationBuilder teamAgg = AggregationBuilders.terms("team_name").field("team");
            TermsAggregationBuilder posAgg = AggregationBuilders.terms("pos_count").field("position");
            // 添加两个聚合构建器。先按照team分组,再按照position分组。
            builder.addAggregation(teamAgg.subAggregation(posAgg));
            // 执行查询
            SearchResponse response = builder.execute().actionGet();
            // 将查询结果放入map中
            Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
            // 根据属性名到map中查找
            StringTerms teams = (StringTerms) aggMap.get("team_name");
            // 循环查找结果
            for (Terms.Bucket teamBucket : teams.getBuckets()) {
                // 先按球队进行分组
                String team = (String) teamBucket.getKey();
                Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap();
                StringTerms positions = (StringTerms) subAggMap.get("pos_count");
                // 因为一个球队有很多位置,那么还要依次拿出位置信息
                for (Terms.Bucket posBucket : positions.getBuckets()) {
                    // 拿到位置的名字
                    String pos = (String) posBucket.getKey();
                    // 拿出该位置的数量
                    long docCount = posBucket.getDocCount();
                    // 打印球队,位置,人数
                    System.out.println(team + ": " + pos + ": " + docCount);
                }
            }
    
        }
    
        /** 统计 队伍最大年龄
         * select team, max(age) as max_age from player group by team;
         */
        @Test
        public void elasticsearchAgg3() {
            SearchRequestBuilder builder = client.prepareSearch("player").setTypes("basketball");
            // 指定安球队进行分组
            TermsAggregationBuilder teamAgg = AggregationBuilders.terms("team_name").field("team");
            // 指定分组求最大值
            MaxAggregationBuilder maxAgg = AggregationBuilders.max("max_age").field("age");
            // 分组后求最大值
            builder.addAggregation(teamAgg.subAggregation(maxAgg));
            // 查询
            SearchResponse response = builder.execute().actionGet();
            Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
            // 根据team属性,获取map中的内容
            StringTerms teams = (StringTerms) aggMap.get("team_name");
            for (Terms.Bucket teamBucket : teams.getBuckets()) {
                // 分组的属性名
                String team = (String) teamBucket.getKey();
                // 在将聚合后取最大值的内容取出来放到map中
                Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap();
                // 取分组后的最大值
                InternalMax ages = (InternalMax) subAggMap.get("max_age");
                // 获取到年龄的值
                double max = ages.getValue();
                // 打印输出值
                System.out.println(team + ": " + max);
            }
        }
    
        /** 统计平均年龄、总收入
         * select team, avg(age) as avg_age, sum(salary) as total_salary from player
         * group by team;
         */
        @Test
        public void elasticsearchAgg4() {
            //去掉.setTypes()
            SearchRequestBuilder builder = client.prepareSearch("player");
            // 指定分组字段
            TermsAggregationBuilder termsAgg = AggregationBuilders.terms("team_name").field("team");
            // 指定聚合函数是求平均数据
            AvgAggregationBuilder avgAgg = AggregationBuilders.avg("avg_age").field("age");
            // 指定另外一个聚合函数是求和
            SumAggregationBuilder sumAgg = AggregationBuilders.sum("total_salary").field("salary");
            // 分组的聚合器关联了两个聚合函数
            builder.addAggregation(termsAgg.subAggregation(avgAgg).subAggregation(sumAgg));
            // 查询
            SearchResponse response = builder.execute().actionGet();
            Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
            // 按分组的名字取出数据
            StringTerms teams = (StringTerms) aggMap.get("team_name");
            for (Terms.Bucket teamBucket : teams.getBuckets()) {
                // 获取球队名字
                String team = (String) teamBucket.getKey();
                Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap();
                // 根据别名取出平均年龄
                InternalAvg avgAge = (InternalAvg) subAggMap.get("avg_age");
                // 根据别名取出薪水总和
                InternalSum totalSalary = (InternalSum) subAggMap.get("total_salary");
                double avgAgeValue = avgAge.getValue();
                double totalSalaryValue = totalSalary.getValue();
                System.out.println(team + ": " + avgAgeValue + ": " + totalSalaryValue);
            }
        }
    
        /** 统计 每队的总收入,并排序
         * select team, sum(salary) as total_salary from player group by team order by
         * total_salary desc;
         */
        @Test
        public void elasticsearchAgg5() {
            SearchRequestBuilder builder = client.prepareSearch("player");
            // 按team进行分组,然后指定排序规则
            TermsAggregationBuilder termsAgg = AggregationBuilders.terms("team_name").field("team")
                    .order(Terms.Order.aggregation("total_salary ", true));
            // 指定一个聚合函数是求和
            SumAggregationBuilder sumAgg = AggregationBuilders.sum("total_salary").field("salary");
            // 添加两个聚合构建器。先按照team分组,再按照salary求和。
            builder.addAggregation(termsAgg.subAggregation(sumAgg));
            // 查询
            SearchResponse response = builder.execute().actionGet();
            // 将查询结果放入map中
            Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
            // 从查询结果中获取到team_name的信息
            StringTerms teams = (StringTerms) aggMap.get("team_name");
            // 开始遍历获取到的信息
            for (Terms.Bucket teamBucket : teams.getBuckets()) {
                // 获取到key的值
                String team = (String) teamBucket.getKey();
                // 获取到求和的值
                Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap();
                // 获取到求和的值的信息
                InternalSum totalSalary = (InternalSum) subAggMap.get("total_salary");
                // 获取到求和的值
                double totalSalaryValue = totalSalary.getValue();
                // 打印输出信息
                System.out.println(team + " " + totalSalaryValue);
            }
        }
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐