Flink的DataStream API(基础篇)

Flink程序主要是分为
Source -> Transform -> Sink
本篇文章主要介绍的是Flink的源算子

Source源算子

POJO类的定义:

POJO类定义为一个数据类型,Flink会把这样的类作为一个特殊的POJO数据类型,方便数据的解析和序列化

POJO的规范:

  • 是公有的
  • 有一个无参的构造方法
  • 所有的属性都是公有的
  • 所有属性的数据类型都是可以序列化的
从集合中读取数据
package com.dcit.chacpter01;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;

public class StreamSource {
    public static void main(String[] args) throws Exception {
        /**
         * 从集合中读取数据
         */
        // 1.创建Flink运行时环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        ArrayList<Event> source = new ArrayList<>();
        // Event是自定义的POJO数据类    url  user timestamp
        source.add(new Event("./home","chaochao",1000L));
        source.add(new Event("./home","lisi",2000L));
	   // 从集合中直接读取数据
        DataStreamSource<Event> eventDataStreamSource = env.fromCollection(source);
        // 打印
        eventDataStreamSource.print();
        // 执行
        env.execute();
    }
}
从文件中读取数据

env.readTextFile(“xxxx.txt”)

从Scoket中读取数据
DataStream<String> stream = env.socketTextStream("hadoop102",8888);
从Kafka中读取数据

想要以 Kafka 作为数据源获取数据,我们只需要引入 Kafka 连接器的依赖。Flink 官

方提供的是一个通用的 Kafka 连接器,它会自动跟踪最新版本的 Kafka 客户端。目前最新版本

只支持 0.10.0 版本以上的 Kafka,读者使用时可以根据自己安装的 Kafka 版本选定连接器的依

赖版本。这里我们需要导入的依赖如下。

需要添加依赖

然后调用 env.addSource(),传入 FlinkKafkaConsumer 的对象实例就可以了。

pom.xml文件

<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
 <version>${flink.version}</version>
</dependency>
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","hadoop102:9092");
        properties.setProperty("group.id", "consumer-group");
        properties.setProperty("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "latest");



        DataStreamSource<String> DS = env.addSource(
                new FlinkKafkaConsumer<String>("clicks", new SimpleStringSchema(), properties)
        );

创建 FlinkKafkaConsumer 时需要传入三个参数:

  • 第一个参数 topic,定义了从哪些主题中读取数据。可以是一个 topic,也可以是 topic

列表,还可以是匹配所有想要读取的 topic 的正则表达式。当从多个 topic 中读取数据

时,Kafka 连接器将会处理所有 topic 的分区,将这些分区的数据放到一条流中去。

  • 第二个参数是一个 DeserializationSchema 或者 KeyedDeserializationSchema。Kafka 消

息被存储为原始的字节数据,所以需要反序列化成 Java 或者 Scala 对象。上面代码中

使用的 SimpleStringSchema,是一个内置的 DeserializationSchema,它只是将字节数

组简单地反序列化成字符串。DeserializationSchema 和 KeyedDeserializationSchema 是

公共接口,所以我们也可以自定义反序列化逻辑。

  • 第三个参数是一个 Properties 对象,设置了 Kafka 客户端的一些属性。
自定义Source

env.addSource(new SourceFunction())

定义一个ClickSource 实现SourceFunction接口 实现run方法和canel方法

然后在main方法里的addSource(new CliceSource()) 将ClicksSouce传入即可

package com.dcit.chacpter01;

import org.apache.flink.streaming.api.functions.source.SourceFunction;
import sun.security.util.Length;

import java.util.Calendar;
import java.util.Random;

public class ClickSource implements SourceFunction<Event> {
    public volatile boolean running = true;
    @Override
    public void run(SourceContext<Event> sourceContext) throws Exception {
        //随机数据
        Random random = new Random();
        //定义选取的数据
        String [] users = {"Mary","Alice","Bob","Cary"};
        String[] urls = {"./home", "./cart", "./fav", "./prod?id=1",
                "./prod?id=2"};
        while (running){
            Thread.sleep(2000);
            long time = Calendar.getInstance().getTimeInMillis();
            sourceContext.collect(
                    new Event(urls[random.nextInt(urls.length)],users[random.nextInt(users.length)]
                    ,time
                    )
            )
            ;
        }

    }

    @Override
    public void cancel() {
        running = false;
    }
}

    DataStreamSource<Event> ds = env.addSource(new ClickSource());

注意:实现的SourceFunction这个接口是不能设置并行度的

​ 如果需要调整并行度那么要继承 ParallelSourceFunction 接口 里面的重写方法和之前那个一样

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐