Flink自带的Source源算子以及自定义数据源Source
文章目录Flink的DataStream API(基础篇)Source源算子从集合中读取数据从文件中读取数据从Scoket中读取数据从Kafka中读取数据自定义SourceFlink的DataStream API(基础篇)Flink程序主要是分为Source -> Transform -> Sink本篇文章主要介绍的是Flink的源算子Source源算子POJO类的定义:POJO类定义
Flink的DataStream API(基础篇)
Flink程序主要是分为
Source -> Transform -> Sink
本篇文章主要介绍的是Flink的源算子
Source源算子
POJO类的定义:
POJO类定义为一个数据类型,Flink会把这样的类作为一个特殊的POJO数据类型,方便数据的解析和序列化
POJO的规范:
- 是公有的
- 有一个无参的构造方法
- 所有的属性都是公有的
- 所有属性的数据类型都是可以序列化的
从集合中读取数据
package com.dcit.chacpter01;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
public class StreamSource {
public static void main(String[] args) throws Exception {
/**
* 从集合中读取数据
*/
// 1.创建Flink运行时环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ArrayList<Event> source = new ArrayList<>();
// Event是自定义的POJO数据类 url user timestamp
source.add(new Event("./home","chaochao",1000L));
source.add(new Event("./home","lisi",2000L));
// 从集合中直接读取数据
DataStreamSource<Event> eventDataStreamSource = env.fromCollection(source);
// 打印
eventDataStreamSource.print();
// 执行
env.execute();
}
}
从文件中读取数据
env.readTextFile(“xxxx.txt”)
从Scoket中读取数据
DataStream<String> stream = env.socketTextStream("hadoop102",8888);
从Kafka中读取数据
想要以 Kafka 作为数据源获取数据,我们只需要引入 Kafka 连接器的依赖。Flink 官
方提供的是一个通用的 Kafka 连接器,它会自动跟踪最新版本的 Kafka 客户端。目前最新版本
只支持 0.10.0 版本以上的 Kafka,读者使用时可以根据自己安装的 Kafka 版本选定连接器的依
赖版本。这里我们需要导入的依赖如下。
需要添加依赖
然后调用 env.addSource(),传入 FlinkKafkaConsumer 的对象实例就可以了。
pom.xml文件
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
Properties properties = new Properties();
properties.setProperty("bootstrap.servers","hadoop102:9092");
properties.setProperty("group.id", "consumer-group");
properties.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset", "latest");
DataStreamSource<String> DS = env.addSource(
new FlinkKafkaConsumer<String>("clicks", new SimpleStringSchema(), properties)
);
创建 FlinkKafkaConsumer 时需要传入三个参数:
- 第一个参数 topic,定义了从哪些主题中读取数据。可以是一个 topic,也可以是 topic
列表,还可以是匹配所有想要读取的 topic 的正则表达式。当从多个 topic 中读取数据
时,Kafka 连接器将会处理所有 topic 的分区,将这些分区的数据放到一条流中去。
- 第二个参数是一个 DeserializationSchema 或者 KeyedDeserializationSchema。Kafka 消
息被存储为原始的字节数据,所以需要反序列化成 Java 或者 Scala 对象。上面代码中
使用的 SimpleStringSchema,是一个内置的 DeserializationSchema,它只是将字节数
组简单地反序列化成字符串。DeserializationSchema 和 KeyedDeserializationSchema 是
公共接口,所以我们也可以自定义反序列化逻辑。
- 第三个参数是一个 Properties 对象,设置了 Kafka 客户端的一些属性。
自定义Source
env.addSource(new SourceFunction())
定义一个ClickSource 实现SourceFunction接口 实现run方法和canel方法
然后在main方法里的addSource(new CliceSource()) 将ClicksSouce传入即可
package com.dcit.chacpter01;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import sun.security.util.Length;
import java.util.Calendar;
import java.util.Random;
public class ClickSource implements SourceFunction<Event> {
public volatile boolean running = true;
@Override
public void run(SourceContext<Event> sourceContext) throws Exception {
//随机数据
Random random = new Random();
//定义选取的数据
String [] users = {"Mary","Alice","Bob","Cary"};
String[] urls = {"./home", "./cart", "./fav", "./prod?id=1",
"./prod?id=2"};
while (running){
Thread.sleep(2000);
long time = Calendar.getInstance().getTimeInMillis();
sourceContext.collect(
new Event(urls[random.nextInt(urls.length)],users[random.nextInt(users.length)]
,time
)
)
;
}
}
@Override
public void cancel() {
running = false;
}
}
DataStreamSource<Event> ds = env.addSource(new ClickSource());
注意:实现的SourceFunction这个接口是不能设置并行度的
如果需要调整并行度那么要继承 ParallelSourceFunction 接口 里面的重写方法和之前那个一样
更多推荐
所有评论(0)