Flink实战(Java):自定义序列化器KafkaSerializationSchema和反序列化器KafkaDeserializationSchema
自定义序列化器:package com.sink;import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;import org.apache.kafka.clients.producer.ProducerRecord;import com.alibaba.fastjson.JSONObject;publ
·
1.查看flink版本
首先要知道自己的flink的版本和scala版本,比如我安装的为:
flink-1.13.2-bin-scala_2.12.tgz
flink版本: 1.13.2
scala版本: 2.12
2.pom.xml
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<flink.version>1.13.2</flink.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
自定义序列化器:
在低版本的flink-connector-kafka中,不支持KafkaSerializationSchema
package com.sink;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;
import com.alibaba.fastjson.JSONObject;
public class CustomKafkaSerializationSchema implements KafkaSerializationSchema<JSONObject> {
private static final long serialVersionUID = 8497940668660042203L;
private String topic;
public CustomKafkaSerializationSchema(final String topic) {
this.topic = topic;
}
@Override
public ProducerRecord<byte[], byte[]> serialize(final JSONObject element, final Long timestamp) {
return new ProducerRecord<byte[], byte[]>(topic, element.toJSONString().getBytes());
}
}
demo:
//
KafkaSerializationSchema kafkaSerializationSchema = new KafkaSerializationSchema<String>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(final String element, final Long timestamp) {
return new ProducerRecord<byte[], byte[]>(topic, element.getBytes());
}
};
FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>("", kafkaSerializationSchema, properties,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
// FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>("",
// new CustomSerializationSchema(),
// properties, Optional.of(new FlinkFixedPartitioner<IN>()));
stream.addSink(myProducer);
自定义反序列化器:
1.反序列化器:
package com.hadoop.ljs.flink110.kafka;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class MyKafkaDeserializationSchema implements KafkaDeserializationSchema<ConsumerRecord<String, String>> {
private static String encoding = "UTF8";
@Override
public boolean isEndOfStream(ConsumerRecord<String, String> nextElement) {
return false;
}
@Override
public ConsumerRecord<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
/* System.out.println("Record--partition::"+record.partition());
System.out.println("Record--offset::"+record.offset());
System.out.println("Record--timestamp::"+record.timestamp());
System.out.println("Record--timestampType::"+record.timestampType());
System.out.println("Record--checksum::"+record.checksum());
System.out.println("Record--key::"+record.key());
System.out.println("Record--value::"+record.value());*/
return new ConsumerRecord(record.topic(),
record.partition(),
record.offset(),
record.timestamp(),
record.timestampType(),
record.checksum(),
record.serializedKeySize(),
record.serializedValueSize(),
/*这里我没有进行空值判断,生产一定记得处理*/
new String(record.key(), encoding),
new String(record.value(), encoding));
}
@Override
public TypeInformation<ConsumerRecord<String, String>> getProducedType() {
return TypeInformation.of(new TypeHint<ConsumerRecord<String, String>>(){});
}
}
2.主函数类:
package com.hadoop.ljs.flink110.kafka;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
* @author: Created By lujisen
* @company ChinaUnicom Software JiNan
* @date: 2020-04-25 18:41
* @version: v1.0
* @description: com.hadoop.ljs.flink110.kafka
*/
public class KafkaDeserializerSchemaTest {
public static void main(String[] args) throws Exception {
/*环境初始化*/
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
/*启用checkpoint,这里我没有对消息体的key value进行判断,即使为空启动了checkpoint,遇到错误也会无限次重启*/
senv.enableCheckpointing(2000);
/*topic2不存在话会自动在kafka创建,一个分区 分区名称0*/
FlinkKafkaConsumer<ConsumerRecord<String, String>> myConsumer=new FlinkKafkaConsumer<ConsumerRecord<String, String>>("topic3",new MyKafkaDeserializationSchema(),getKafkaConfig());
/*指定消费位点*/
Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
/*这里从topic3 的0分区的第一条开始消费*/
specificStartOffsets.put(new KafkaTopicPartition("topic3", 0), 0L);
myConsumer.setStartFromSpecificOffsets(specificStartOffsets);
DataStream<ConsumerRecord<String, String>> source = senv.addSource(myConsumer);
DataStream<String> keyValue = source.map(new MapFunction<ConsumerRecord<String, String>, String>() {
@Override
public String map(ConsumerRecord<String, String> message) throws Exception {
return "key"+message.key()+" value:"+message.value();
}
});
/*打印结果*/
keyValue.print();
/*启动执行*/
senv.execute();
}
public static Properties getKafkaConfig(){
Properties props=new Properties();
props.setProperty("bootstrap.servers","worker1.hadoop.ljs:6667,worker2.hadoop.ljs:6667");
props.setProperty("group.id","topic_group2");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("auto.offset.reset","latest");
return props;
}
}
更多推荐
已为社区贡献13条内容
所有评论(0)