1.自定义序列化器与反序列化器

1.1 定义Order实体类

public class Order implements Serializable {

      private Integer OrderId;

      private String title;

    public Order() {
    }

    public Order(Integer orderId, String title) {
        OrderId = orderId;
        this.title = title;
    }
   // 省略必要的get与set方法
}

1.2 定义Order序列化类

由于Kafka中的数据都是字节数组,在将消息发送到Kafka之前需要先将数据序列化为字节数组。 序列化器的作用就是用于序列化要发送的消息的。

Kafka使用 org.apache.kafka.common.serialization.Serializer 接口用于定义序列化器,将 泛型指定类型的数据转换为字节数组。

Kafka提供了如下常用类型的序列化类:

在这里插入图片描述

自定义序列化器需要实现org.apache.kafka.common.serialization.Serializer接口,并实现其中 的 serialize 方法。

public class OrderSerializer implements Serializer<Order> {

    @Override
    public byte[] serialize(String topic, Order data) {
        try {
            if (data == null) 
                return null;
            Integer orderId = data.getOrderId();
            String title = data.getTitle();
            int length = 0;
            byte[] bytes = null;
            if (null != title) {
                bytes = title.getBytes("utf-8");
                length = bytes.length;
            }
            //前4个字节保存orderId,
            //第二个4个字节保存title字段的长度
            ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + length);
            buffer.putInt(orderId);
            buffer.putInt(length);
            buffer.put(bytes);
            return buffer.array();
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("序列化数据异常");
        }
    }
}

1.3 生产者代码

package com.warybee.c1;

import com.warybee.model.Order;
import com.warybee.serializer.OrderSerializer;
import org.apache.kafka.clients.producer.*;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

public class KafkaProducerDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        Map<String, Object> configs = new HashMap<>();
        // 设置连接Kafka的初始连接用到的服务器地址
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092");
        // 设置key的序列化类
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
        // 设置自定义的序列化类
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, OrderSerializer.class);
        configs.put(ProducerConfig.ACKS_CONFIG,"all");
        KafkaProducer<Integer, Order> kafkaProducer=new KafkaProducer<Integer, Order>(configs);
        //定义order
        Order order=new Order();
        order.setOrderId(1);
        order.setTitle("iphone13 pro 256G");
        ProducerRecord<Integer,Order> producerRecord=new ProducerRecord<Integer,Order>
        (       "test_order_topic",
                0,
                order.getOrderId(),
                order);
        //消息的异步确认
        kafkaProducer.send(producerRecord, new Callback() {
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
                if (exception==null){
                    System.out.println("消息的主题:"+recordMetadata.topic());
                    System.out.println("消息的分区:"+recordMetadata.partition());
                    System.out.println("消息的偏移量:"+recordMetadata.offset());
                }else {
                    System.out.println("发送消息异常");
                }
            }
        });


        // 关闭生产者
        kafkaProducer.close();
    }
}

1.4. 定义Order反序列化器

自定义反序列化类,需要实现 org.apache.kafka.common.serialization.Deserializer 接 口,并且实现其中的deserialize方法。

package com.warybee.deserializer;

import com.warybee.model.Order;
import org.apache.kafka.common.serialization.Deserializer;

import java.nio.ByteBuffer;

/**
 * @author joy
 * @description Order反序列化类
 */
public class OrderDeserializer implements Deserializer<Order> {

    @Override
    public Order deserialize(String topic, byte[] data) {
        ByteBuffer allocate = ByteBuffer.allocate(data.length);
        allocate.put(data);
        allocate.flip();
        Integer orderId = allocate.getInt();
        int length = allocate.getInt();
        String title = new String(data, 8, length);
        return new Order(orderId, title);
    }
}

1.5 消费者代码

package com.warybee.c1;

import com.warybee.deserializer.OrderDeserializer;
import com.warybee.model.Order;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @author joy
 */
public class KafkaConsumerDemo {

    public static void main(String[] args) {
        Map<String, Object> configs = new HashMap<>();
        // 设置连接Kafka的初始连接用到的服务器地址
        // 如果是集群,则可以通过此初始连接发现集群中的其他broker
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092");
        //KEY反序列化类
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
        //自定义value反序列化类
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, OrderDeserializer.class);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer.demo-3");
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        //创建消费者对象
        KafkaConsumer<Integer, Order> consumer = new KafkaConsumer<Integer, Order>(configs);

        List<String> topics = new ArrayList<>();
        topics.add("test_order_topic");
        //消费者订阅主题
        consumer.subscribe(topics);
        while (true){
            //批量拉取主题消息,每3秒拉取一次
            ConsumerRecords<Integer, Order> records = consumer.poll(3000);
            //变量消息
            for (ConsumerRecord<Integer, Order> record : records) {
                System.out.println(record.topic() + "\t"
                        + record.partition() + "\t"
                        + record.offset() + "\t"
                        + record.key() + "\t"
                        + record.value().getTitle());

            }
        }
    }
}

上面实现的序列化与反序列化类,定义繁琐,不具有通用性,一不小心就会BUG满天飞,不适合在实际项目中使用,只做了解原理即可。接下来使用Apache Avro来实现序列化和反序列化

2. 使用Avro序列化和反序列化

2.1 Apache Avro介绍

Apache Avro是一种与编程语言无关的序列化格式。提供了一种共享数据文件的方式。Avro 数据通过与语言无关的 schema 来定义。schema 通过 JSON 来描述,数据被序列化成二进制文件或 JSON 文件,不过一般会使用二进制文件。Avro 在读写文件时需要用到 schema,schema 一般会被内嵌在数据文件里。

Apache Avro官网文档https://avro.apache.org/docs/current/gettingstartedjava.html

2.2 创建Maven项目

  • 引入依赖

        <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>3.0.0</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->
            <dependency>
                <groupId>ch.qos.logback</groupId>
                <artifactId>logback-classic</artifactId>
                <version>1.2.7</version>
            </dependency>
           <!--Apache Avro 依赖-->
            <dependency>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro</artifactId>
                <version>1.11.0</version>
            </dependency>
    
  • Avro 插件

    <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.avro</groupId>
                    <artifactId>avro-maven-plugin</artifactId>
                    <version>1.11.0</version>
                    <executions>
                        <execution>
                            <phase>generate-sources</phase>
                            <goals>
                                <goal>schema</goal>
                            </goals>
                            <configuration>
                                <sourceDirectory>${project.basedir}/src/main/java/com/warybee/avro/schema/</sourceDirectory>
                                <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    

Avro插件参数说明:

  • sourceDirectory:schema 文件所在目录
  • outputDirectory:根据schema 文件生成的类文件到哪个目录

2.3 创建schema 文件

Apache Avro schema 是使用 JSON 定义的。详细的介绍,可以参考官网文档:https://avro.apache.org/docs/current/gettingstartedjava.html

定义一个order.avsc文件(文件所在目录要与上一步配置的sourceDirectory保持一致),内容如下:

{"namespace": "com.warybee.avro",
  "type": "record",
  "name": "Order",
  "fields": [
    {"name": "orderId", "type": "int"},
    {"name": "title",  "type": "string"},
    {"name": "num", "type": "int"}
  ]
}

IDEA话,可以安装一个Apache Avro IDL Schema Support插件,安装插件后编写schema 有智能提示。

2.4.Avro生成entity

上面配置了Avro 插件,通过maven命令,生成即可。

mvn install

或者IDEA右键->RUN Maven->install

在这里插入图片描述

2.5 生产者代码

package com.warybee.avro;

import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.kafka.clients.producer.*;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

/**
 * @author joy
 */
public class KafkaProducerDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        Map<String, Object> configs = new HashMap<>();
        // 设置连接Kafka的初始连接用到的服务器地址
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092");
        // 设置key的序列化类
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
        // 设置value的序列化类
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
        configs.put(ProducerConfig.ACKS_CONFIG,"all");
        KafkaProducer<Integer,byte[]> producer=new KafkaProducer<Integer, byte[]>(configs);
        //发送100条消息
        for (int i = 0; i < 100; i++) {
             Order order=Order.newBuilder()
                     .setOrderId(i+1)
                     .setTitle("订单: "+(i+1)+" iphone 13 pro 256G")
                     .setNum(1)
                     .build();
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, (BinaryEncoder)null);
            SpecificDatumWriter writer = new SpecificDatumWriter(order.getSchema());
            try {
                writer.write(order, encoder);
                encoder.flush();
                out.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
           ProducerRecord<Integer,byte[]> record=new ProducerRecord<>(
                   "test_avro_topic",
                   0,
                   order.getOrderId(),
                   out.toByteArray());
            //发送消息
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                      if (exception==null){
                          System.out.println("消息的主题:"+metadata.topic());
                          System.out.println("消息的分区:"+metadata.partition());
                          System.out.println("消息的偏移量:"+metadata.offset());
                      }else {
                          System.out.println("发送消息异常");
                      }
                }
            });
        }
        // 关闭生产者
        producer.close();
    }
}

2.6 消费者代码

package com.warybee.avro;

import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @author joy
 */
public class KafkaConsumerDemo {

    public static void main(String[] args) {
        Map<String, Object> configs = new HashMap<>();
        // 设置连接Kafka的初始连接用到的服务器地址
        // 如果是集群,则可以通过此初始连接发现集群中的其他broker
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092");
        //KEY反序列化类
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
        //value反序列化类
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        //消费者所在的组ID
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer.demo.avro");
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        //创建消费者对象
        KafkaConsumer<Integer,  byte[]> consumer = new KafkaConsumer<Integer,  byte[]>(configs);

        List<String> topics = new ArrayList<>();
        topics.add("test_avro_topic");
        //消费者订阅主题
        consumer.subscribe(topics);

        SpecificDatumReader<Order> reader = new SpecificDatumReader<>(Order.getClassSchema());
        try {
            while (true){
                //批量拉取主题消息,每3秒拉取一次
                ConsumerRecords<Integer, byte[]> records = consumer.poll(3000);
                for (ConsumerRecord<Integer, byte[]> record : records) {
                    Decoder decoder = DecoderFactory.get().binaryDecoder(record.value(), null);
                    Order order=null;
                    try {
                        order=reader.read(null,decoder);
                        System.out.println("订单ID:"+order.getOrderId()+"\t"
                                +"订单标题:"+order.getTitle()+"\t"
                                +"数量:"+order.getNum());
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }finally {
            consumer.close();
        }
    }
}

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐