1. Java连接Elasticsearch8

正文内容均以elasticsearch8.1版本为例,小版本差距不大,可直接使用,后续均以es8代替elasticsearch8.1

1.1 依赖

<!-- es8 pom依赖 -->
<dependency>
	<groupId>org.elasticsearch</groupId>
	<artifactId>elasticsearch</artifactId>
	<version>8.1.0</version>
	<exclusions>
		<exclusion>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>*</artifactId>
		</exclusion>
	</exclusions>
</dependency>

<dependency>
	<groupId>co.elastic.clients</groupId>
	<artifactId>elasticsearch-java</artifactId>
	<version>8.1.0</version>
</dependency>
<dependency>
	<groupId>com.fasterxml.jackson.core</groupId>
	<artifactId>jackson-databind</artifactId>
	<version>2.12.3</version>
</dependency>
<dependency>
	<groupId>jakarta.json.bind</groupId>
	<artifactId>jakarta.json.bind-api</artifactId>
	<version>2.0.0</version>
</dependency>

1.2 普通连接

elastic官网文档:https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/8.1/connecting.html

// Create the low-level client 创建低级客户端
RestClient restClient = RestClient.builder(
    new HttpHost("localhost", 9200)).build();

// Create the transport with a Jackson mapper 使用Jackson映射器创建传输层
ElasticsearchTransport transport = new RestClientTransport(
    restClient, new JacksonJsonpMapper());

// And create the API client 创建API客户端
ElasticsearchClient client = new ElasticsearchClient(transport);
//关闭连接
transport.close();
restClient.close();

1.3 连接池

连接池基于commons.pool2所写,网上已存在很多相关内容,仅针对es8进行微调

EsClientPoolFactory.java

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
 * @author wei
 * @version 1.0
 * @date 2022/5/13 9:43
 */
public class EsClientPoolFactory implements PooledObjectFactory<ElasticsearchClient> {


    @Override
    public PooledObject<ElasticsearchClient> makeObject() throws Exception {

        String esServerHosts = "ip:port,ip:port";

        List<HttpHost> httpHosts = new ArrayList<>();
        //填充数据
        List<String> hostList = Arrays.asList(esServerHosts.split(","));
        for (int i = 0; i < hostList.size(); i++) {
            String host = hostList.get(i);
            httpHosts.add(new HttpHost(host.substring(0, host.indexOf(":")), Integer.parseInt(host.substring(host.indexOf(":") + 1)), "http"));
        }

        // 创建低级客户端
        RestClient restClient = RestClient.builder(httpHosts.toArray(new HttpHost[0])).build();

        //使用Jackson映射器创建传输层
        ElasticsearchTransport transport = new RestClientTransport(
                restClient, new JacksonJsonpMapper()
        );

        ElasticsearchClient client = new ElasticsearchClient(transport);
        //log.info("对象被创建了" + client);
        return new DefaultPooledObject<>(client);
    }

    @Override
    public void destroyObject(PooledObject<ElasticsearchClient> p) throws Exception {
        ElasticsearchClient elasticsearchClient = p.getObject();
        //log.info("对象被销毁了" + elasticsearchClient);
    }

    @Override
    public boolean validateObject(PooledObject<ElasticsearchClient> p) {
        return true;
    }

    @Override
    public void activateObject(PooledObject<ElasticsearchClient> p) throws Exception {
        //log.info("对象被激活了" + p.getObject());
    }

    @Override
    public void passivateObject(PooledObject<ElasticsearchClient> p) throws Exception {
        //log.info("对象被钝化了" + p.getObject());
    }
}

ESClientPool.java

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author wei
 * @version 1.0
 * @date 2022/5/16 15:47
 */
public class ESClientPool {

    private static Logger logger = LoggerFactory.getLogger(ESClientPool.class);

    // 对象池配置类,不写也可以,采用默认配置
    private static GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();

    // 采用默认配置maxTotal是8,池中有8个client
    static {
        poolConfig.setMaxIdle(200);
        poolConfig.setMaxTotal(20);
        poolConfig.setMinEvictableIdleTimeMillis(1000L*3L);
    }

    // 要池化的对象的工厂类,这个是我们要实现的类
    private static EsClientPoolFactory esClientPoolFactory = new EsClientPoolFactory();

    // 利用对象工厂类和配置类生成对象池
    private static GenericObjectPool<ElasticsearchClient> clientPool = new GenericObjectPool<>(esClientPoolFactory, poolConfig);


    /**
     * 获得对象
     *
     * @return
     * @throws Exception
     */
    public static ElasticsearchClient getClient() throws Exception {
        ElasticsearchClient client = clientPool.borrowObject();
        logger.info("从池中取一个对象"+client);
        return client;
    }

    /**
     * 归还对象
     *
     * @param client
     */
    public static void returnClient(ElasticsearchClient client) throws Exception {
        logger.info("使用完毕之后,归还对象"+client);
        clientPool.returnObject(client);
    }

}

1.4 使用

//获取连接
ElasticsearchClient client = ESClientPool.getClient();
//创建索引
Boolean acknowledged = client.indices().create(c -> c.index(indexName)).acknowledged();
//归还连接对象
ESClientPool.returnClient(client);

下一章节继续分享具体的封装使用~ Java封装Elasticsearch8常用接口方法(二)索引、别名、文档等操作

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐