读取 Kafka 数据

package kafka.Source_Kafka;

import lombok.Data;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;

import java.util.Properties;

public class Kafka1_FlinkKafkaProducer {

    public static void main(String[] args) throws Exception {

        // ToDo 0.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // ToDo 1.source
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");  // Kafka brokers的地址列表
        properties.setProperty("group.id", "consumer-group");  // 消费组ID
        DataStream<String> dataInput = env.addSource(new FlinkKafkaConsumer011<>("student-read", new SimpleStringSchema(), properties));

        // ToDo 2.transformation
        DataStream<Student> dataPojo = dataInput.map(new MapFunction<String, Student>() {
            @Override
            public Student map(String value) throws Exception {
                JSONObject object = JSONObject.parseObject(value);
                Integer id = object.getInteger("id");
                String name = object.getString("name");
                Integer age = object.getInteger("age");
                return new Student(id, name, age);  // JSON转换为POJO对象
            }
        });

        // ToDo 3.sink
        dataPojo.print();

        // ToDo 4.execute
        env.execute();
    }

    @Data  // 注解在类上,为类提供读写属性,还提供equals()、hashCode()、toString()方法
    @AllArgsConstructor  // 注解在类上,为类提供全参构造函数,参数的顺序与属性定义的顺序一致
    @NoArgsConstructor  // 注解在类上,为类提供无参构造函数
    public static class Student {
        private Integer id;
        private String name;
        private Integer age;
    }
}

请添加图片描述


写入 Kafka 数据

package kafka.Sink_Kafka;

import lombok.Data;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;

import java.util.Arrays;

public class Kafka1_FlinkKafkaConsumer {

    public static void main(String[] args) throws Exception {

        // ToDo 0.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // ToDo 1.source
        DataStream<Student> dataInput = env.fromCollection(
                Arrays.asList(
                        new Student(3, "Linda", 20),
                        new Student(6, "Allen", 19),
                        new Student(8, "Marcia", 20),
                        new Student(9, "Helen", 18)
                )
        );

        // ToDo 2.transformation
        DataStream<String> dataJson = dataInput.map(new MapFunction<Student, String>() {
            @Override
            public String map(Student value) throws Exception {
                JSONObject object = new JSONObject();
                object.put("id", value.getId());
                object.put("name", value.getName());
                object.put("age", value.getAge());
                return object.toJSONString();  // POJO对象转换为JSON
            }
        });

        // ToDo 3.sink
        dataJson.addSink(new FlinkKafkaProducer011<>("localhost:9092", "student-write", new SimpleStringSchema()));

        // ToDo 4.execute
        env.execute();
        System.out.println("Kafka写入成功!");
    }

    @Data  // 注解在类上,为类提供读写属性,还提供equals()、hashCode()、toString()方法
    @AllArgsConstructor  // 注解在类上,为类提供全参构造函数,参数的顺序与属性定义的顺序一致
    @NoArgsConstructor  // 注解在类上,为类提供无参构造函数
    public static class Student {
        private Integer id;
        private String name;
        private Integer age;
    }
}

请添加图片描述


DataSet 写入 Kafka 数据

package kafka.Sink_Kafka;

import lombok.Data;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;

import java.util.Arrays;

public class Kafka2_OutputFormat {

    public static void main(String[] args) throws Exception {

        // ToDo 0.env
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // ToDo 1.source
        DataSet<Student> dataInput = env.fromCollection(
                Arrays.asList(
                        new Student(5, "Nancy", 22),
                        new Student(6, "Emma", 20),
                        new Student(7, "Sophia", 19)
                )
        );

        // ToDo 2.transformation
        DataSet<String> dataJson = dataInput.map(new MapFunction<Student, String>() {
            @Override
            public String map(Student value) throws Exception {
                JSONObject object = new JSONObject();
                object.put("id", value.getId());
                object.put("name", value.getName());
                object.put("age", value.getAge());
                return object.toJSONString();  // POJO对象转换为JSON
            }
        });

        // ToDo 3.sink
        dataJson.output(KafkaOutputFormat.buildKafkaOutputFormat()
                        .setBootstrapServers("localhost:9092")
                        .setTopic("student-write")
                        .finish());

        // ToDo 4.execute
        env.execute();
        System.out.println("Kafka写入成功!");
    }

    @Data  // 注解在类上,为类提供读写属性,还提供equals()、hashCode()、toString()方法
    @AllArgsConstructor  // 注解在类上,为类提供全参构造函数,参数的顺序与属性定义的顺序一致
    @NoArgsConstructor  // 注解在类上,为类提供无参构造函数
    public static class Student {
        private Integer id;
        private String name;
        private Integer age;
    }
}

KafkaOutputFormat

package kafka.Sink_Kafka;

import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.IOException;
import java.util.Properties;

/**
 * Flink DataSet Sink to Kafka
 */
public class KafkaOutputFormat extends RichOutputFormat<String> {

    private String bootstrapServers;
    private String topic;

    private Producer<String, String> producer;

    @Override
    public void configure(Configuration configuration) {

    }

    @Override
    public void open(int taskNumber, int numTasks) throws IOException {

        Properties props = new Properties();
        props.setProperty("bootstrap.servers", this.bootstrapServers);

        // Producer往集群发送数据需要所有的ISR Follower都完成从Leader的同步才会发送下一条
        props.setProperty("acks", "all");
        // 约束Producer能够使用的内存缓冲区的大小,默认值是32MB
        props.setProperty("buffer.memory", "33554432");
        // 同一个批次可以使用的内存大小,默认值是16KB
        props.setProperty("batch.size", "16384");
        // 在发送批次之前等待更多消息加入批次的时间,默认值是100ms
        props.setProperty("linger.ms", "100");
        // 请求失败后Producer可以重发消息的次数,默认值是2
        props.setProperty("retries", "2");
        // key和value指定的序列化方式
        props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        producer = new KafkaProducer<>(props);
    }

    @Override
    public void writeRecord(String record) throws IOException {

        producer.send(new ProducerRecord<>(this.topic, record));
    }

    @Override
    public void close() throws IOException {

        producer.close();
    }

    public static KafkaOutputFormatBuilder buildKafkaOutputFormat() {
        return new KafkaOutputFormatBuilder();
    }

    public static class KafkaOutputFormatBuilder {

        private final KafkaOutputFormat format;

        public KafkaOutputFormatBuilder() {

            this.format = new KafkaOutputFormat();
        }

        public KafkaOutputFormatBuilder setBootstrapServers(String bootstrapServers) {

            format.bootstrapServers = bootstrapServers;
            return this;
        }

        public KafkaOutputFormatBuilder setTopic(String topic) {

            format.topic = topic;
            return this;
        }

        public KafkaOutputFormat finish() {

            return format;
        }
    }
}
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐