学习笔记:Flink 读取和写入Kafka数据
引入依赖:<!-- 阿里巴巴开发的 JSON 库 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.47</version></dependenc
·
读取 Kafka 数据
package kafka.Source_Kafka;
import lombok.Data;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import java.util.Properties;
public class Kafka1_FlinkKafkaProducer {
public static void main(String[] args) throws Exception {
// ToDo 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// ToDo 1.source
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092"); // Kafka brokers的地址列表
properties.setProperty("group.id", "consumer-group"); // 消费组ID
DataStream<String> dataInput = env.addSource(new FlinkKafkaConsumer011<>("student-read", new SimpleStringSchema(), properties));
// ToDo 2.transformation
DataStream<Student> dataPojo = dataInput.map(new MapFunction<String, Student>() {
@Override
public Student map(String value) throws Exception {
JSONObject object = JSONObject.parseObject(value);
Integer id = object.getInteger("id");
String name = object.getString("name");
Integer age = object.getInteger("age");
return new Student(id, name, age); // JSON转换为POJO对象
}
});
// ToDo 3.sink
dataPojo.print();
// ToDo 4.execute
env.execute();
}
@Data // 注解在类上,为类提供读写属性,还提供equals()、hashCode()、toString()方法
@AllArgsConstructor // 注解在类上,为类提供全参构造函数,参数的顺序与属性定义的顺序一致
@NoArgsConstructor // 注解在类上,为类提供无参构造函数
public static class Student {
private Integer id;
private String name;
private Integer age;
}
}
写入 Kafka 数据
package kafka.Sink_Kafka;
import lombok.Data;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import java.util.Arrays;
public class Kafka1_FlinkKafkaConsumer {
public static void main(String[] args) throws Exception {
// ToDo 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// ToDo 1.source
DataStream<Student> dataInput = env.fromCollection(
Arrays.asList(
new Student(3, "Linda", 20),
new Student(6, "Allen", 19),
new Student(8, "Marcia", 20),
new Student(9, "Helen", 18)
)
);
// ToDo 2.transformation
DataStream<String> dataJson = dataInput.map(new MapFunction<Student, String>() {
@Override
public String map(Student value) throws Exception {
JSONObject object = new JSONObject();
object.put("id", value.getId());
object.put("name", value.getName());
object.put("age", value.getAge());
return object.toJSONString(); // POJO对象转换为JSON
}
});
// ToDo 3.sink
dataJson.addSink(new FlinkKafkaProducer011<>("localhost:9092", "student-write", new SimpleStringSchema()));
// ToDo 4.execute
env.execute();
System.out.println("Kafka写入成功!");
}
@Data // 注解在类上,为类提供读写属性,还提供equals()、hashCode()、toString()方法
@AllArgsConstructor // 注解在类上,为类提供全参构造函数,参数的顺序与属性定义的顺序一致
@NoArgsConstructor // 注解在类上,为类提供无参构造函数
public static class Student {
private Integer id;
private String name;
private Integer age;
}
}
DataSet 写入 Kafka 数据
package kafka.Sink_Kafka;
import lombok.Data;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import java.util.Arrays;
public class Kafka2_OutputFormat {
public static void main(String[] args) throws Exception {
// ToDo 0.env
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// ToDo 1.source
DataSet<Student> dataInput = env.fromCollection(
Arrays.asList(
new Student(5, "Nancy", 22),
new Student(6, "Emma", 20),
new Student(7, "Sophia", 19)
)
);
// ToDo 2.transformation
DataSet<String> dataJson = dataInput.map(new MapFunction<Student, String>() {
@Override
public String map(Student value) throws Exception {
JSONObject object = new JSONObject();
object.put("id", value.getId());
object.put("name", value.getName());
object.put("age", value.getAge());
return object.toJSONString(); // POJO对象转换为JSON
}
});
// ToDo 3.sink
dataJson.output(KafkaOutputFormat.buildKafkaOutputFormat()
.setBootstrapServers("localhost:9092")
.setTopic("student-write")
.finish());
// ToDo 4.execute
env.execute();
System.out.println("Kafka写入成功!");
}
@Data // 注解在类上,为类提供读写属性,还提供equals()、hashCode()、toString()方法
@AllArgsConstructor // 注解在类上,为类提供全参构造函数,参数的顺序与属性定义的顺序一致
@NoArgsConstructor // 注解在类上,为类提供无参构造函数
public static class Student {
private Integer id;
private String name;
private Integer age;
}
}
KafkaOutputFormat
package kafka.Sink_Kafka;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.IOException;
import java.util.Properties;
/**
* Flink DataSet Sink to Kafka
*/
public class KafkaOutputFormat extends RichOutputFormat<String> {
private String bootstrapServers;
private String topic;
private Producer<String, String> producer;
@Override
public void configure(Configuration configuration) {
}
@Override
public void open(int taskNumber, int numTasks) throws IOException {
Properties props = new Properties();
props.setProperty("bootstrap.servers", this.bootstrapServers);
// Producer往集群发送数据需要所有的ISR Follower都完成从Leader的同步才会发送下一条
props.setProperty("acks", "all");
// 约束Producer能够使用的内存缓冲区的大小,默认值是32MB
props.setProperty("buffer.memory", "33554432");
// 同一个批次可以使用的内存大小,默认值是16KB
props.setProperty("batch.size", "16384");
// 在发送批次之前等待更多消息加入批次的时间,默认值是100ms
props.setProperty("linger.ms", "100");
// 请求失败后Producer可以重发消息的次数,默认值是2
props.setProperty("retries", "2");
// key和value指定的序列化方式
props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(props);
}
@Override
public void writeRecord(String record) throws IOException {
producer.send(new ProducerRecord<>(this.topic, record));
}
@Override
public void close() throws IOException {
producer.close();
}
public static KafkaOutputFormatBuilder buildKafkaOutputFormat() {
return new KafkaOutputFormatBuilder();
}
public static class KafkaOutputFormatBuilder {
private final KafkaOutputFormat format;
public KafkaOutputFormatBuilder() {
this.format = new KafkaOutputFormat();
}
public KafkaOutputFormatBuilder setBootstrapServers(String bootstrapServers) {
format.bootstrapServers = bootstrapServers;
return this;
}
public KafkaOutputFormatBuilder setTopic(String topic) {
format.topic = topic;
return this;
}
public KafkaOutputFormat finish() {
return format;
}
}
}
更多推荐
已为社区贡献1条内容
所有评论(0)