BulkRequest和BulkProcessor都可以用来批量处理请求,但二者的使用流程和设计理念有所不同

BulkRequest

主要使用流程

RestHighLevelClient restHighLevelClient = new RestHighLevelClient();//创建一个es客户端,用于执行请求

BulkRequest bulkRequest = new BulkRequest(); //创建一个批量请求

IndexRequest indexRequest1 = new IndexRequest("index_name"); //创建第一个“添加文档”的请求
indexRequest1.id("1") .source(XContentType.JSON,"field_name", "foo1");//设置添加的文档信息
IndexRequest indexRequest2 = new IndexRequest("index_name"); //创建第二个“添加文档”的请求
indexRequest2.id("1") .source(XContentType.JSON,"field_name", "foo2");//设置添加的文档信息

bulkRequest.add(indexRequest1); //将第一个“添加文档”请求加入到批量请求中
bulkRequest.add(indexRequest2); //将第二个“添加文档”请求加入到批量请求中

BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);//执行批量请求,返回执行结果

BulkProcessor

主要使用流程

//创建一个BulkProcessor.Listener监听器
BulkProcessor.Listener listener = new BulkProcessor.Listener() { 
    @Override
    public void beforeBulk(long executionId, BulkRequest request) {
        //BulkRequest每次被执行之前都会调用该方法,executionId为此次执行的标识
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request,
            BulkResponse response) {
        //BulkRequest每次被执行之后都会调用该方法,executionId为此次执行的标识
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request,
            Throwable failure) {
        //若此次BulkRequest执行发生错误,会调用该方法
    }
};

//构建一个BulkProcessor对象,需要的组件:一个请求客户端RestHighLevelClient(client),一个BulkProcessor.Listener监听器(listener)
//用BulkProcessor.builder构建BulkProcessor.Builder对象,需要传入两个参数:
//第一个是BiConsumer<BulkRequest, ActionListener<BulkResponse>>函数式接口,我们需要提供一个请求的处理过程(函数式编程实现消费者方法),消费者方法被执行时会将BulkProcessor维护的BulkRequest对象属性(request)和内部构建的ActionListener<BulkResponse>对象(bulkListener)传入
//第二个是BulkProcessor.Listener监听器,可以在请求执行前后处理相关逻辑
BulkProcessor bulkProcessor = BulkProcessor.builder(
        (request, bulkListener) ->client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
        listener
        ).build();

//创建两个“添加文档”的请求
IndexRequest one = new IndexRequest("index_name").id("1")
        .source(XContentType.JSON, "title",
                "In which order are my Elasticsearch queries executed?");
IndexRequest two = new IndexRequest("index_name").id("2")
        .source(XContentType.JSON, "title",
                "Current status and upcoming changes in Elasticsearch");

//将两个请求添加到批量请求中
bulkProcessor.add(one);
bulkProcessor.add(two);

//等待所有的请求均处理完毕或者到达指定的等待时间后停止处理并释放占用资源,若在指定时间到达前所有请求均被处理,则返回true;若时间到了后请求没有处理完,则返回false
boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);

BulkProcessor源码分析

BulkProcessor里维护了一个BulkRequest对象属性和其他重要的对象属性BulkProcessor重要的对象属性

BulkProcessor.builder(BiConsumer<BulkRequest, ActionListener> consumer, Listener listener)方法创建BulkProcessor.Builder对象,并将comsumer和listener作为构造函数的参数传入
在这里插入图片描述
BulkProcessor.Builder重要属性的初始化值
BulkProcessor.Builder重要属性的初始化值
BulkProcessor.Builder.build()方法构建一个BulkProcessor对象
BulkProcessor.Builder.build()方法构建一个BulkProcessor对象
BulkProcessor构造函数
BulkProcessor构造函数
BulkProcessor.add添加请求(IndexRequest,UpdateRequest,DeleteRequest…)
BulkProcessor.add添加请求(IndexRequest,UpdateRequest,DeleteRequest......)
BulkProcessor.newBulkRequestIfNeeded方法
BulkProcessor.newBulkRequestIfNeeded方法
BulkProcessor.execute(BulkRequest bulkRequest, long executionId )将要执行的bulkRequest对象和id传入给bulkRequestHandler作为执行参数
BulkProcessor.execute(BulkRequest bulkRequest, long executionId )将要执行的bulkRequest对象和id传入给bulkRequestHandler作为执行参数
BulkRequestHandler.execute(BulkRequest bulkRequest, long executionId)执行请求
BulkRequestHandler.execute(BulkRequest bulkRequest, long executionId)执行请求
Retry.withBackoff(BiConsumer<BulkRequest, ActionListener> consumer, BulkRequest bulkRequest, ActionListener listener)方法
Retry.withBackoff(BiConsumer<BulkRequest, ActionListener> consumer, BulkRequest bulkRequest, ActionListener listener)方法
RetryHandler实现了ActionListener<BulkResponse>接口,所以RetryHandler也是一个监听器
RetryHandler实现了ActionListener<BulkResponse>接口
其从构造函数接收了listener监听器对象作为其对象属性,在其实现的两个监听方法里调用listener对象属性的方法
在其实现的两个监听方法里调用listener对象属性的方法
finishHim方法
RetryHandler.execute(BulkRequest bulkRequest)方法
RetryHandler.execute(BulkRequest bulkRequest)方法
BulkProcessor.awaitClose调用此方法将当前所有的请求都执行并释放占用的资源
BulkProcessor.awaitClose调用此方法将当前所有的请求都执行并释放占用的资源
BulkProcessor.execute()方法执行请求
BulkProcessor.execute方法执行请求
BulkRequestHandler.execute(BulkRequest bulkRequest, long executionId)执行请求,同上边提过的

总结起来:利用consumer和listener构建BulkProcessor,将请求add入BulkProcessor,当bulkRequest里的请求个数或者存储容量达到阈值,就会立即自动开始执行当前bulkRequest请求,之后重新生成一个空的bulkRequest,循环这个过程,直到调用awaitClose手动执行当前bulkRequest请求并释放占用的资源。

BulkProcessor设计模式

BulkProcessor将创建bulkRequest对象的过程和时机以及批量执行请求的过程和时机封装了起来,我们不必手动去调用client.bulk()来执行批量请求,只需要将请求add到BulkProcessor中(BulkProcessor中维护一个bulkRequest),BulkProcessor“满了”就自动执行请求然后重新创建一个bulkRequest,以此循环往复,最后手动调用awaitClose执行所有请求并释放资源。整个执行请求的过程和时机对于用户来说是完全透明的,我们不必关心什么时候执行请求以及具体怎么执行。

BulkProcessor.builder使用了构建者模式,将consumer和listener作为“原料”投入后调用build来定制一个BulkProcessor。
BulkRequest使用了类似于模板方法的模式理念,将创建BulkRequest和客户端执行请求这两步封装了起来,用户只需要将请求add到BulkProcessor中即可。

BulkRequest,BulkProcessor对比总结

BulkRequest整个使用过程“循规蹈矩”,从创建BulkRequest到add请求到客户端执行请求按顺序走。
BulkProcessor中维护了一个BulkRequest对象。总的来说,BulkProcessor将创建BulkRequest和客户端执行请求这两步封装了起来,用户只需要将请求add到BulkProcessor中即可,最后手动调用关闭完成整个请求。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐