flink-streaming实战-json数组解析
之前实际开发都是用的blink-sql模式,还没实际开发过streaming模式的作业,有个简单的例子,特意记录下因为下游我们是odps,目前没有jar包,这个之后调通了再加上作业描述:上游是kafka,原始数据格式为json数组,输出为json对象,本地日志打印输出,source连接参照阿里云的例子package com.alibaba.blink.datastreaming;import co
·
之前实际开发都是用的blink-sql模式,还没实际开发过streaming模式的作业,有个简单的例子,特意记录下
因为下游我们是odps,目前没有jar包,这个之后调通了再加上
作业描述:
上游是kafka,原始数据格式为json数组,输出为json对象,本地日志打印输出,source连接参照阿里云的例子
package com.alibaba.blink.datastreaming;
import com.google.gson.JsonArray;
import com.google.gson.JsonParser;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.io.Serializable;
import java.util.Properties;
public class AliKafkaConsumerDemo implements Serializable {
public static void main(String[] args) throws Exception {
AliKafkaConsumerDemo aliKafkaConsumerDemo = new AliKafkaConsumerDemo();
aliKafkaConsumerDemo.runExample();
}
public void runExample() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//全局设置并行度
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//加载kafka.properties
Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
//可更加实际拉去数据和客户的版本等设置此值,默认30s
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
//每次poll的最大数量
//注意该值不要改得太大,如果poll太多数据,而不能在下次poll之前消费完,则会触发一次负载均衡,产生卡顿
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
//当前消费实例所属的消费组,请在控制台申请之后填写
//属于同一个组的消费实例,会负载消费消息
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));
FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<String>(kafkaProperties.getProperty("topic"), new SimpleStringSchema(), props);
DataStreamSource<String> stringDataStream = env.addSource(kafkaConsumer);
DataStream<JsonArray> mapStream = stringDataStream.map(new MapFunction<String, JsonArray>() {
@Override
public JsonArray map(String value) throws Exception {
return new JsonParser().parse(value).getAsJsonArray();
}
});
SingleOutputStreamOperator<String> flatMapStream = mapStream.flatMap(new FlatMapFunction<JsonArray, String>() {
@Override
public void flatMap(JsonArray value, Collector<String> out) throws Exception {
for (int i = 0; i < value.size(); i++) {
String field = value.get(i).getAsJsonObject().toString();
out.collect(field);
}
}
});
//本地打印输出
flatMapStream.print();
//写kafka
// 拆出来的jsonObject输出到kafka
Properties properties = new Properties();
properties.setProperty("bootstrap.servers","ip:port");
flatMapStream.addSink( new FlinkKafkaProducer010<String>("dflink_sink_tst", (SerializationSchema<String>) new SimpleStringSchema(),properties));
env.execute("alikafkaconsumerdemo");
}
}
结果:
更多推荐
已为社区贡献2条内容
所有评论(0)