Flink Java 之 读取 kafka 数据与数据写入 kafka
序列与反序列化 kafka 数据Flink Kafka Consumer 需要知道如何将 Kafka 中的二进制数据转换为 Java 或者 Scala 对象。KafkaDeserializationSchema 允许用户指定这样的 schema,每条 Kafka 中的消息会反序列化。以上代码采用最简单的SimpleStringSchema。为了方便使用,Flink 提供了以下几种 schemas:
先上代码
package com.daidai.connectors;
import com.daidai.sink.domain.User;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.util.Properties;
public class KafkaToKafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "daidai:9092");
properties.setProperty("group.id", "flink-test");
//kafkaproducer
FlinkKafkaProducer flinkKafkaProducer = new FlinkKafkaProducer("flink-test", new SimpleStringSchema(), properties);
DataStreamSource<String> dataStreamSource = env.fromElements(new User(1, "daidai", "admin").toString());
dataStreamSource.addSink(flinkKafkaProducer);
//kafkaconsumer
FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>("flink-test", new SimpleStringSchema(), properties);
flinkKafkaConsumer.setStartFromLatest();
DataStreamSource<String> source = env.addSource(flinkKafkaConsumer);
source.print();
env.execute();
}
}
解析
依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.13.6</version>
</dependency>
序列与反序列化 kafka 数据
Flink Kafka Consumer 需要知道如何将 Kafka 中的二进制数据转换为 Java 或者 Scala 对象。KafkaDeserializationSchema 允许用户指定这样的 schema,每条 Kafka 中的消息会反序列化。
以上代码采用最简单的SimpleStringSchema。
为了方便使用,Flink 提供了以下几种 schemas:
TypeInformationSerializationSchema(和 TypeInformationKeyValueSerializationSchema) 基于 Flink 的 TypeInformation 创建 schema。 如果该数据的读和写都发生在 Flink 中,那么这将是非常有用的。此 schema 是其他通用序列化方法的高性能 Flink 替代方案。
JsonDeserializationSchema(和 JSONKeyValueDeserializationSchema)将序列化的 JSON 转化为 ObjectNode 对象,可以使用 objectNode.get(“field”).as(Int/String/…)() 来访问某个字段。 KeyValue objectNode 包含一个含所有字段的 key 和 values 字段,以及一个可选的"metadata"字段,可以访问到消息的 offset、partition、topic 等信息。
配置 Kafka Consumer 开始消费的位置
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(...);
myConsumer.setStartFromEarliest(); // 尽可能从最早的记录开始
myConsumer.setStartFromLatest(); // 从最新的记录开始
myConsumer.setStartFromTimestamp(...); // 从指定的时间开始(毫秒)
myConsumer.setStartFromGroupOffsets(); // 默认的方法
DataStream<String> stream = env.addSource(myConsumer);
Flink Kafka Consumer 的所有版本都具有上述明确的起始位置配置方法。
- setStartFromGroupOffsets(默认方法):从 Kafka brokers 中的 consumer 组(consumer 属性中的 group.id 设置)提交的偏移量中开始读取分区。 如果找不到分区的偏移量,那么将会使用配置中的 auto.offset.reset 设置。
- setStartFromEarliest() 或者 setStartFromLatest():从最早或者最新的记录开始消费,在这些模式下,Kafka 中的 committed offset 将被忽略,不会用作起始位置。
- setStartFromTimestamp(long):从指定的时间戳开始。对于每个分区,其时间戳大于或等于指定时间戳的记录将用作起始位置。如果一个分区的最新记录早于指定的时间戳,则只从最新记录读取该分区数据。在这种模式下,Kafka 中的已提交 offset 将被忽略,不会用作起始位置。
- 你也可以为每个分区指定 consumer 应该开始消费的具体 offset:
Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);
myConsumer.setStartFromSpecificOffsets(specificStartOffsets);
上面的例子中使用的配置是指定从 myTopic 主题的 0 、1 和 2 分区的指定偏移量开始消费。
offset 值是 consumer 应该为每个分区读取的下一条消息。
请注意:如果 consumer 需要读取在提供的 offset 映射中没有指定 offset 的分区,那么它将回退到该特定分区的默认组偏移行为(即 setStartFromGroupOffsets())。
请注意:当 Job 从故障中自动恢复或使用 savepoint 手动恢复时,这些起始位置配置方法不会影响消费的起始位置。在恢复时,每个 Kafka 分区的起始位置由存储在 savepoint 或 checkpoint 中的 offset 确定。
更多推荐
所有评论(0)