前言

flink官网有连接es的案例,但是官网案例没有带认证,几经辗转终于找到带认证的sink到es的方式。需要可以参考。

代码

maven依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-elasticsearch7_2.11</artifactId>
    <version>1.13.0</version>
</dependency>

Java代码

//启动flink拓扑
    public static void startFlinkJob() {
        try {
            logger.info("flink job.....................");
			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            SingleOutputStreamOperator<JSONObject> stream = env.addSource(new JavaQueueSource()).setParallelism(2)
                    .name("java queue source")
                    .map(msg -> {
                        Thread.sleep(100);
                        JSONObject json = new JSONObject();
                        json.put("curtime",msg.split(",")[1]);
                        return json;
                    }).setParallelism(3);
            stream.addSink(getESSinkBuilder().build());

            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
	
	//获取ESSinkBuilder
    public static ElasticsearchSink.Builder<JSONObject> getESSinkBuilder() {
        List<HttpHost> httpHosts = new ArrayList<>();
        HttpHost http = new HttpHost("192.168.10.101", 9200, "http");
        List<InetSocketAddress> addresses = new ArrayList<>();
        InetSocketAddress socketAddress = new InetSocketAddress("192.168.10.101", 9200);
        addresses.add(socketAddress);
        httpHosts.add(http);
        ElasticsearchSink.Builder<JSONObject> esSinkBuilder = new ElasticsearchSink.Builder<>(
                httpHosts,
                new ElasticsearchSinkFunction<JSONObject>() {
                    public IndexRequest createIndexRequest(JSONObject element) {
                        Map<String, JSONObject> json = new HashMap<>();
                        json.put("data", element);

                        return Requests.indexRequest()
                                .index("zhy-test-index")
                                .source(json);
                    }

                    @Override
                    public void process(JSONObject element, RuntimeContext ctx, RequestIndexer indexer) {
                        indexer.add(createIndexRequest(element));
                    }
                }
        );
        //设置用户名密码
        esSinkBuilder.setRestClientFactory(
                restClientBuilder -> {
                    restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                        @Override
                        public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
                            CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
                            credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic","admin"));
                            return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                        }
                    });
                }
        );
        // configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered
        esSinkBuilder.setBulkFlushMaxActions(1);
        return esSinkBuilder;
    }

注意

如果使用过程中报NoSuchMethodError的错误,一般是因为导入的jar包不对导致的。实在不相信可以在idea的这个地方找一下看是否有对应方法。如果没有需要换一下依赖,如果实在解决不了,可以留言。
在这里插入图片描述

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐