flink elasticsearch sink
flinkelasticsearch sink 目前我这边电脑资源不够耍,写入虚拟机很慢数据有差异.1.data 数据sensor_1,1547718199,35.8sensor_6,1547718201,15.4sensor_7,1547718202,6.7sensor_10,1547718205,38.1sensor_1,1547718207,36.3sensor_1,1547718209,3
·
flink elasticsearch sink 目前我这边电脑资源不够耍,写入虚拟机很慢数据有差异.
1.data 数据
sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718207,36.3
sensor_1,1547718209,32.8
sensor_1,1547718212,37.1
sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718207,36.3
sensor_1,1547718209,32.8
sensor_1,1547718212,37.1
2.pom.xml
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
<version>1.10.1</version>
</dependency>
3.java代码
public class sink_es {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> txtSink = env.readTextFile("D:\\ideaProject\\flink-java\\flink-java-api\\src\\main\\resources\\data.txt");
DataStream<SensorReading> streamOperator = txtSink.map(new MapFunction<String, SensorReading>() {
public SensorReading map(String line) throws Exception {
String[] split = line.split(",");
return new SensorReading(split[0], new Long(split[1]), new Double(split[2]));
}
});
streamOperator.print();
//定义 httphost
ArrayList<HttpHost> list = new ArrayList<HttpHost>();
list.add(new HttpHost("192.168.174.204",9200));
list.add(new HttpHost("192.168.174.205",9200));
ElasticsearchSink<SensorReading> readingElasticsearchSink = new ElasticsearchSink.Builder<SensorReading>(list, new ElasticsearchSinkFunction<SensorReading>() {
public void process(SensorReading sensorReading, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
//定义写入的数据source
HashMap<String, String> map = new HashMap<String, String>();
map.put("id", sensorReading.getId());
map.put("temp", sensorReading.getTemperature().toString());
map.put("ts", sensorReading.getTimestamp().toString());
//创建es 请求
IndexRequest indexRequest = Requests.indexRequest().index("book").type("serson").source(map);
//用 requestIndexer 发送最后的请求
requestIndexer.add(indexRequest);
}
}).build();
streamOperator.addSink(readingElasticsearchSink);
env.execute();
}
}
更多推荐
已为社区贡献1条内容
所有评论(0)