之前一直使用字符串的方式将数据写入到kafka中,当数据将特别大的时候发现效率不是很好,通过 Avro序列化 可以极大提高效率

环境所依赖的 pom文件 :

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    ...
    <dependencies>
        ...
        <!-- flink START -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-avro</artifactId>
        </dependency>
        <!-- flink END -->

        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
        </dependency>
        ...
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>1.8.2</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
                            <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Avro提供的技术支持包括以下五个方面 :
1> 优秀的数据结构
2> 一个紧凑的,快速的,二进制数据格式
3> 一个容器文件,用来存储持久化数据
4> RPC远程过程调用
5> 集成最简单的动态语言。读取或者写入数据文件,使用或实现RPC协议均不需要代码实现。对于静态——语言编写的话需要实现

Avro优点 :
1> 二进制消息,性能好/效率高
2> 使用JSON描述模式
3> 模式和数据统一存储,消息自描述,不需要生成stub代码 (支持生成IDL)
4> RPC调用在握手阶段交换模式定义
5> 包含完整的客户端/服务端堆栈,可快速实现RPC
6> 支持同步和异步通信
7> 支持动态消息
8> 模式定义允许定义数据的排序 (序列化时会遵循这个顺序)
9> 提供了基于Jetty内核的服务基于Netty的服务

Avro Json格式介绍 :

`UserBehavior.avsc
{
    "namespace": "com.avro.bean",
    "type": "record",
    "name": "UserBehavior",
    "fields": [
        {"name": "userId", "type": "long"},
        {"name": "itemId",  "type": "long"},
        {"name": "categoryId", "type": "int"},
        {"name": "behavior", "type": "string"},
        {"name": "timestamp", "type": "long"}
    ]
}

namespace : 要生成的目录
type : 类型 avro 使用 record
name : 会自动生成对应的对象
fields : 要指定的字段
注 : 创建的文件后缀名一定要叫 avsc

添加 avro,之后编译项目,将会自动生成 UserBehavior

<plugin>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro-maven-plugin</artifactId>
    <version>1.8.2</version>
    <executions>
        <execution>
            <phase>generate-sources</phase>
            <goals>
                <goal>schema</goal>
            </goals>
            <configuration>
                <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
                <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
            </configuration>
        </execution>
    </executions>
</plugin>

使用Java自定义序列化到 Kafka

1> 准备测试数据 :

`UserBehavior.csv

543462,1715,1464116,pv,1511658000
662867,2244074,1575622,pv,1511658000
561558,3611281,965809,pv,1511658000
894923,3076029,1879194,pv,1511658000
834377,4541270,3738615,pv,1511658000
315321,942195,4339722,pv,1511658000
625915,1162383,570735,pv,1511658000

2> 自定义Avro 序列化和反序列化 : 首先需要实现2个类分别为 Serializer 和 Deserializer 分别是序列化和反序列化

import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Map;

/**
 * 自定义序列化和反序列化
 */
public class SimpleAvroSchemaJava implements Serializer<UserBehavior>, Deserializer<UserBehavior> {
    @Override
    public void configure(Map<String, ?> map, boolean b) {
    }

    @Override
    public byte[] serialize(String s, UserBehavior userBehavior) {
        // 创建序列化执行器
        SpecificDatumWriter<UserBehavior> writer = new SpecificDatumWriter<>(userBehavior.getSchema());
        // 创建一个流 用存储序列化后的二进制文件
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        // 创建二进制编码器
        BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(out, null);
        try {
            // 数据入都流中
            writer.write(userBehavior, encoder);
        } catch (IOException e) {
            e.printStackTrace();
        }

        return out.toByteArray();
    }

    @Override
    public void close() {
    }

    @Override
    public UserBehavior deserialize(String s, byte[] bytes) {
        // 用来保存结果数据
        UserBehavior userBehavior = new UserBehavior();
        // 创建输入流用来读取二进制文件
        ByteArrayInputStream arrayInputStream = new ByteArrayInputStream(bytes);
        // 创建输入序列化执行器
        SpecificDatumReader<UserBehavior> stockSpecificDatumReader = new SpecificDatumReader<>(userBehavior.getSchema());
        // 创建二进制解码器
        BinaryDecoder binaryDecoder = DecoderFactory.get().directBinaryDecoder(arrayInputStream, null);
        try {
            // 数据读取
            userBehavior = stockSpecificDatumReader.read(null, binaryDecoder);
        } catch (IOException e) {
            e.printStackTrace();
        }
        // 结果返回
        return userBehavior;
    }
}

3> 创建序列化对象 :

import com.renren.stream.avro.bean.UserBehavior;
import com.renren.stream.avro.util.SimpleAvroSchemaJava;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

public class UserBehaviorProducerKafka {
    public static void main(String[] args) throws InterruptedException {
        // 获取数据
        List<UserBehavior> data = getData();
        // 创建配置文件
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");
        props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // value.serializer 一定要指定自定义反序列化类,否则不会会无效
        props.setProperty("value.serializer", SimpleAvroSchemaJava.class.getName());
        // 创建kafka的生产者
        KafkaProducer<String, UserBehavior> userBehaviorProducer = new KafkaProducer<>(props);
        // 循环遍历数据
        for (UserBehavior userBehavior : data) {
            ProducerRecord<String, UserBehavior> producerRecord = new ProducerRecord<>("UserBehaviorKafka", userBehavior);
            userBehaviorProducer.send(producerRecord);
            System.out.println("数据写入成功" + data);
            Thread.sleep(1000);
        }
    }

    public static List<UserBehavior> getData() {
        ArrayList<UserBehavior> userBehaviors = new ArrayList<>();
        try {
            BufferedReader br = new BufferedReader(new FileReader(new File("UserBehavior.csv")));
            String line;
            while ((line = br.readLine()) != null) {
                String[] split = line.split(",");
                userBehaviors.add(new UserBehavior(Long.parseLong(split[0]), Long.parseLong(split[1]), Integer.parseInt(split[2]), split[3], Long.parseLong(split[4])));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return userBehaviors;
    }
}

4> 创建反序列化对象 :

import com.renren.stream.avro.bean.UserBehavior;
import com.renren.stream.avro.util.SimpleAvroSchemaJava;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Collections;
import java.util.Properties;

public class UserBehaviorConsumer {
    public static void main(String[] args) {
        Properties prop = new Properties();
        prop.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");
        prop.put("group.id", "UserBehavior");
        prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 设置反序列化类为自定义的avro反序列化类
        prop.put("value.deserializer", SimpleAvroSchemaJava.class.getName());

        KafkaConsumer<String, UserBehavior> consumer = new KafkaConsumer<>(prop);
        consumer.subscribe(Collections.singletonList("UserBehaviorKafka"));
        while (true) {
            ConsumerRecords<String, UserBehavior> poll = consumer.poll(1000);
            for (ConsumerRecord<String, UserBehavior> stringStockConsumerRecord : poll) {
                System.out.println(stringStockConsumerRecord.value());
            }
        }
    }
}

5> 启动运行 : 创建 kafkaTopic 和 启动一个消费者

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

import java.util.Properties;

public class UserBehaviorProducerFlink {
    public static void main(String[] args) throws Exception {
        // 1.构建流处理运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度1 方便后面测试
        env.setParallelism(1);

        // 2.设置 Flink 数据源
        DataStreamSource<String> value = env.readTextFile("/UserBehavior.csv");
        DataStream<UserBehavior> users = value.map(row -> {
            String[] arr = row.split(",");
            UserBehavior behavior = new UserBehavior();
            behavior.setUserId(Long.valueOf(arr[0]));
            behavior.setItemId(Long.valueOf(arr[1]));
            behavior.setCategoryId(Integer.valueOf(arr[2]));
            behavior.setBehavior(arr[3]);
            behavior.setTimestamp(Long.valueOf(arr[4]));
            return behavior;
        });

        // 3.设置kafka 配置信息
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");

        // 4.连接Kafka
        FlinkKafkaProducer<UserBehavior> producer = new FlinkKafkaProducer<>("UserBehaviorKafka", new SimpleAvroSchemaFlink(), prop);

        // 5.将数据打入kafka
        users.addSink(producer);

        // 6.执行任务
        env.execute("UserBehaviorProducerFlink");
    }
}

执行 kafka 指令:

# 创建topic
# ./kafka-topics.sh --create --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --replication-factor 2 --partitions 3 --topic UserBehaviorKafka

# 模拟消费者
# ./kafka-console-consumer.sh --from-beginning --topic UserBehaviorKafka --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181

Flink 实现Avro自定义序列化到 Kafka

1> 创建Flink自定义Avro序列化和反序列化 : 当创建 FlinkKafka 连接器的时候发现使用Java那个类序列化发现不行,于是改为了系统自带的那个类进行测试。点击源码查看发系统自带的那个String其实实现的是DeserializationSchema和SerializationSchema

import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;

/**
 * 自定义序列化和反序列化
 */
public class SimpleAvroSchemaFlink implements DeserializationSchema<UserBehavior>, SerializationSchema<UserBehavior> {
    @Override
    public byte[] serialize(UserBehavior userBehavior) {
        // 创建序列化执行器
        SpecificDatumWriter<UserBehavior> writer = new SpecificDatumWriter<>(userBehavior.getSchema());
        // 创建一个流 用存储序列化后的二进制文件
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        // 创建二进制编码器
        BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(out, null);
        try {
            // 数据入都流中
            writer.write(userBehavior, encoder);
        } catch (IOException e) {
            e.printStackTrace();
        }

        return out.toByteArray();
    }

    @Override
    public TypeInformation<UserBehavior> getProducedType() {
        return TypeInformation.of(UserBehavior.class);
    }

    @Override
    public UserBehavior deserialize(byte[] bytes) throws IOException {
        // 用来保存结果数据
        UserBehavior userBehavior = new UserBehavior();
        // 创建输入流用来读取二进制文件
        ByteArrayInputStream arrayInputStream = new ByteArrayInputStream(bytes);
        // 创建输入序列化执行器
        SpecificDatumReader<UserBehavior> stockSpecificDatumReader = new SpecificDatumReader<>(userBehavior.getSchema());
        // 创建二进制解码器
        BinaryDecoder binaryDecoder = DecoderFactory.get().directBinaryDecoder(arrayInputStream, null);
        try {
            // 数据读取
            userBehavior = stockSpecificDatumReader.read(null, binaryDecoder);
        } catch (IOException e) {
            e.printStackTrace();
        }
        // 结果返回
        return userBehavior;
    }

    @Override
    public boolean isEndOfStream(UserBehavior userBehavior) {
        return false;
    }
}

2> 创建 Flink Comsumer 反序列化 :

import com.renren.stream.avro.bean.UserBehavior;
import com.renren.stream.avro.util.SimpleAvroSchemaFlink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

public class UserBehaviorConsumerFlink {
    public static void main(String[] args) throws Exception {
        // 1.构建流处理运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度1 方便后面测试
        env.setParallelism(1);

        // 2.设置kafka 配置信息
        Properties prop = new Properties();
        prop.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");
        prop.put("group.id", "UserBehavior");
        prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 设置反序列化类为自定义的avro反序列化类
        prop.put("value.deserializer", SimpleAvroSchemaFlink.class.getName());

        // 3.构建Kafka 连接器
        FlinkKafkaConsumer<UserBehavior> consumer = new FlinkKafkaConsumer<>("UserBehaviorKafka", new SimpleAvroSchemaFlink(), prop);

        // 4.设置Flink层最新的数据开始消费
        consumer.setStartFromLatest();

        // 5.基于kafka构建数据源
        DataStream<UserBehavior> data = env.addSource(consumer);

        // 6.结果打印
        data.print();

        // 7.执行任务
        env.execute("UserBehaviorConsumerFlink");
    }
}

3> 创建Flink Producer 序列化 :

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

import java.util.Properties;

public class UserBehaviorProducerFlink {
    public static void main(String[] args) throws Exception {
        // 1.构建流处理运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度1 方便后面测试
        env.setParallelism(1);

        // 2.设置 Flink 数据源
        DataStreamSource<String> value = env.readTextFile("/UserBehavior.csv");
        DataStream<UserBehavior> users = value.map(row -> {
            String[] arr = row.split(",");
            UserBehavior behavior = new UserBehavior();
            behavior.setUserId(Long.valueOf(arr[0]));
            behavior.setItemId(Long.valueOf(arr[1]));
            behavior.setCategoryId(Integer.valueOf(arr[2]));
            behavior.setBehavior(arr[3]);
            behavior.setTimestamp(Long.valueOf(arr[4]));
            return behavior;
        });

        // 3.设置kafka 配置信息
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");

        // 4.连接Kafka
        FlinkKafkaProducer<UserBehavior> producer = new FlinkKafkaProducer<>("UserBehaviorKafka", new SimpleAvroSchemaFlink(), prop);

        // 5.将数据打入kafka
        users.addSink(producer);

        // 6.执行任务
        env.execute("UserBehaviorProducerFlink");
    }
}
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐