1.查看flink版本

首先要知道自己的flink的版本和scala版本,比如我安装的为:
 

flink-1.13.2-bin-scala_2.12.tgz

flink版本:       1.13.2

scala版本:      2.12

2.pom.xml

<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
		<flink.version>1.13.2</flink.version>
		<scala.binary.version>2.12</scala.binary.version>
	</properties>

	<dependencies>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-java</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-core</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
		</dependency>

	</dependencies>

自定义序列化器:

在低版本的flink-connector-kafka中,不支持KafkaSerializationSchema

package com.sink;

import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;

import com.alibaba.fastjson.JSONObject;

public class CustomKafkaSerializationSchema implements KafkaSerializationSchema<JSONObject> {

	private static final long serialVersionUID = 8497940668660042203L;

	private String topic;
	
	public CustomKafkaSerializationSchema(final String topic) {
		this.topic = topic;
	}

	@Override
	public ProducerRecord<byte[], byte[]> serialize(final JSONObject element, final Long timestamp) {
		return new ProducerRecord<byte[], byte[]>(topic, element.toJSONString().getBytes());
	}

}

demo:

//
KafkaSerializationSchema kafkaSerializationSchema = new KafkaSerializationSchema<String>() {
	@Override
	public ProducerRecord<byte[], byte[]> serialize(final String element, final Long timestamp) {
		return new ProducerRecord<byte[], byte[]>(topic, element.getBytes());
	}
};

FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>("", kafkaSerializationSchema, properties,
		FlinkKafkaProducer.Semantic.EXACTLY_ONCE);

// FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>("",
// new CustomSerializationSchema(),
// properties, Optional.of(new FlinkFixedPartitioner<IN>()));
stream.addSink(myProducer);

自定义反序列化器:

1.反序列化器:

package com.hadoop.ljs.flink110.kafka;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;


public class MyKafkaDeserializationSchema implements KafkaDeserializationSchema<ConsumerRecord<String, String>> {
 
    private static  String encoding = "UTF8";
    @Override
    public boolean isEndOfStream(ConsumerRecord<String, String> nextElement) {
        return false;
    }
    @Override
    public ConsumerRecord<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
       /* System.out.println("Record--partition::"+record.partition());
        System.out.println("Record--offset::"+record.offset());
        System.out.println("Record--timestamp::"+record.timestamp());
        System.out.println("Record--timestampType::"+record.timestampType());
        System.out.println("Record--checksum::"+record.checksum());
        System.out.println("Record--key::"+record.key());
        System.out.println("Record--value::"+record.value());*/
        return new ConsumerRecord(record.topic(),
                record.partition(),
                record.offset(),
                record.timestamp(),
                record.timestampType(),
                record.checksum(),
                record.serializedKeySize(),
                record.serializedValueSize(),
                /*这里我没有进行空值判断,生产一定记得处理*/
                new  String(record.key(), encoding),
                new  String(record.value(), encoding));
    }
    @Override
    public TypeInformation<ConsumerRecord<String, String>> getProducedType() {
        return TypeInformation.of(new TypeHint<ConsumerRecord<String, String>>(){});
    }
}

2.主函数类:

package com.hadoop.ljs.flink110.kafka;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
 * @author: Created By lujisen
 * @company ChinaUnicom Software JiNan
 * @date: 2020-04-25 18:41
 * @version: v1.0
 * @description: com.hadoop.ljs.flink110.kafka
 */
public class KafkaDeserializerSchemaTest {
    public static void main(String[] args) throws Exception {
        /*环境初始化*/
        StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
        /*启用checkpoint,这里我没有对消息体的key value进行判断,即使为空启动了checkpoint,遇到错误也会无限次重启*/
        senv.enableCheckpointing(2000);
        /*topic2不存在话会自动在kafka创建,一个分区 分区名称0*/
        FlinkKafkaConsumer<ConsumerRecord<String, String>> myConsumer=new FlinkKafkaConsumer<ConsumerRecord<String, String>>("topic3",new MyKafkaDeserializationSchema(),getKafkaConfig());
 
        /*指定消费位点*/
        Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
        /*这里从topic3 的0分区的第一条开始消费*/
        specificStartOffsets.put(new KafkaTopicPartition("topic3", 0), 0L);
        myConsumer.setStartFromSpecificOffsets(specificStartOffsets);
 
        DataStream<ConsumerRecord<String, String>> source = senv.addSource(myConsumer);
        DataStream<String> keyValue = source.map(new MapFunction<ConsumerRecord<String, String>, String>() {
            @Override
            public String map(ConsumerRecord<String, String> message) throws Exception {
                return "key"+message.key()+"  value:"+message.value();
            }
        });
        /*打印结果*/
        keyValue.print();
        /*启动执行*/
        senv.execute();
    }
    public static Properties getKafkaConfig(){
        Properties  props=new Properties();
        props.setProperty("bootstrap.servers","worker1.hadoop.ljs:6667,worker2.hadoop.ljs:6667");
        props.setProperty("group.id","topic_group2");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("auto.offset.reset","latest");
        return props;
    }
}
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐