2运行环境flink standalone模式

1、main 入口

package es;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.FileUtils;

import java.io.File;
import java.util.List;
import java.util.Map;

public class EsReadTest {

    private static EsRestClientService esRestClientService = new EsRestClientService();

    public static void main(String[] args) throws Exception {

        // set up the execution environment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 查询数据searchResponse
        String scrollId = null;
        DataSet<Tuple3<String, String, Integer>> dataSet = null;
        List<Tuple3<String, String, Integer>> dataList = null;

        int count = 0;

        while (!"none".equals(scrollId)) {

            Map<String, Object> map = esRestClientService.queryDeviceListPage(scrollId);
            if (map.get("tupleList") instanceof List)
                dataList = (List<Tuple3<String, String, Integer>>) map.get("tupleList");
            scrollId = map.get("scrollId").toString();

            if (dataList == null || dataList.size() < 10000 || count > 3)
                break;

            // 导入数据
            DataSet<Tuple3<String, String, Integer>> dataSetTemp = env.fromCollection(dataList);
            if (dataSet == null) {
                dataSet = dataSetTemp;
            } else {
                dataSet = dataSet.union(dataSetTemp);
            }
            ++count;
        }
        // 分组计算规则
        dataSet = dataSet.groupBy(0).sum(2);

        String output = "/opt/flink-data/esoutput2.txt";
        FileUtils.deleteFileOrDirectory(new File(output));
        dataSet.writeAsText(output);

        env.execute("read es");
    }
}

2、游标方式读取es

package es;

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.http.HttpHost;
import org.elasticsearch.action.search.*;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;


/**
 * 阿里云服务器搭建的ES服务
 *
 * @author lizixian
 * @date 2020/3/16 10:41
 */
public class EsRestClientService {

    private String host = "172.168.0.138:9200";
    private String scheme = "http";
    private String index = "es_index";
    private String type = "es_type";
    private RestClientBuilder builder = null;
    private RestHighLevelClient client = null;

    public void init() {
        String[] nodeIpInfos = host.split(":");
        builder = RestClient.builder(new HttpHost(nodeIpInfos[0], Integer.parseInt(nodeIpInfos[1]), scheme))
                .setRequestConfigCallback(requestConfigBuilder -> {
                    requestConfigBuilder.setConnectTimeout(10 * 60 * 1000);
                    requestConfigBuilder.setSocketTimeout(10 * 60 * 1000);
                    requestConfigBuilder.setConnectionRequestTimeout(10 * 60 * 1000);
                    return requestConfigBuilder;
                }).setMaxRetryTimeoutMillis(10 * 60 * 1000);
        client = new RestHighLevelClient(builder);
    }

    /**
     * 分页查询应设备应用安装列表-使用游标
     *
     * @author lizixian
     * @date 2020/5/10 18:01
     */
    public Map<String, Object> queryDeviceListPage(String scrollId) {
        String brand = "CH";

        //设置查询数量
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();

        sourceBuilder.size(10000);
        BoolQueryBuilder bool = QueryBuilders.boolQuery();

        // 平台
//        bool.must(QueryBuilders.termQuery("brand", brand));

        sourceBuilder.query(bool);//查询条件
        return queryDeviceListPageResult(sourceBuilder, scrollId);
    }

    private Map<String, Object> queryDeviceListPageResult(SearchSourceBuilder sourceBuilder, String scrollId) {
        SearchRequest searchRequest = new SearchRequest(index)
                .types(type)
                .scroll("2m")
                .source(sourceBuilder);
        if (client == null) {
            init();
        }
        Map<String, Object> resultMap = new HashMap<>(5);
        List<Tuple3<String, String, Integer>> tupleList = new ArrayList<>();
        try {
            SearchResponse response = null;

            if (scrollId != null) {
                SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId).scroll("2m");
                response = client.searchScroll(scrollRequest);
            } else {
                response = client.search(searchRequest);
            }

            int s = response.status().getStatus();
            if (s == RestStatus.OK.getStatus()) {
                SearchHit[] hits = response.getHits().getHits();
                scrollId = response.getScrollId();
                System.out.println("*********************查询es结果");
                if (hits != null) {
                    for (SearchHit hit : hits) {
                        System.out.println("*********************查询es结果:" + hit.getSourceAsString());
                        JSONObject json = JSONObject.parseObject(hit.getSourceAsString());
                        tupleList.add(new Tuple3<>(json.getString("name"), json.getString("city"), 1));
                    }
                }
            } else {
                //清除滚屏
                ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
                clearScrollRequest.addScrollId(scrollId);//也可以选择setScrollIds()将多个scrollId一起使用
                ClearScrollResponse clearScrollResponse = client.clearScroll(clearScrollRequest);
                boolean succeeded = clearScrollResponse.isSucceeded();
            }
            resultMap.put("scrollId", scrollId);
            resultMap.put("tupleList", tupleList);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return resultMap;
    }
}
Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐