Kafka生产数据报错

  1. 异常原因:Magic v1 does not support record headers
  2. 服务端 kafka版本: 0.10.1
  3. 客户端 kafka-client版本: 2.6.0
  4. 原因: 因为java客户端消费不受影响,只是生产数据的时候,服务端kafka版本过老,不兼容该程序,通过debug发现 报错原因如下
    错误原因
  5. 通过源码跟踪 定位如下:
    因为kafka需要链路跟踪,而我这个程序只是一个简单的 producer,consumer,消费数据和生产数据而已。
    在这里插入图片描述

解决方案一

  1. 项目启动时,忽略 TraceMessagingAutoConfiguration.class
@SpringBootApplication(exclude ={ TraceMessagingAutoConfiguration.class})

因为这个类是自动配置消息链路跟踪的 忽略了即可。

 private KafkaTemplate<String, Object> kafkaTemplate;
 kafkaTemplate.send(topic, json);

解决方案二

如果以上方案不可行 请使用以下办法 ,以兼容 <=0.10.2 的kafaka server 发送消息。
  1. 添加kafka配置类和生产者服务
package cn.telltao.kafka.producer.config;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.List;
import java.util.Properties;
/**
 * @author telltao@qq.com
 *  为了兼容kafka servier版本小于 0.10.2所做的兼容生产者发送消息模式
 *  因 kafka-clients发送消息时会增加heads数据 而旧版不允许发送这个命令 所以发送通道改为旧版
 * @Date 2022/8/09 12:13
 */

@Configuration
public class KafkaProducerConfig {

    @Value("#{'${spring.kafka.bootstrap-servers}'.split(',')}")
    private List<String> bootstrapServers;

    @Bean
    public KafkaProducer<String, String> initKafkaTemplate() {
        Properties props = new Properties();
        //设置接入点,请通过控制台获取对应Topic的接入点
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

        //Kafka消息的序列化方式
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer");
        //请求的最长等待时间
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
        //设置客户端内部重试次数
        props.put(ProducerConfig.RETRIES_CONFIG, 5);
        //设置客户端内部重试间隔
        props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 3000);
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        return producer;
    }
}
  1. 添加Kafka生产者服务
package cn.telltao.kafka.producer;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Objects;
import java.util.concurrent.Future;


/**
 * @param
 * @author telltao@qq.com
 * Kafka 生产者 向kafka发送消息
 * 高版本客户端向低版本发送消息
 * @date 2022/8/09 12:13
 * @return
 */
@Slf4j
@Component
public class KafkaProducerService {

    @Resource
    private KafkaProducer kafkaProducer;

    /**
     * 消息发送至 kafka
     *
     * @param topic 消息主题
     * @param json  消息json
     */
    public void send(String topic, String json) {

        if (StringUtils.isBlank(topic) || StringUtils.isBlank(json)) {
            log.error("[向kafka生产数据] 未获取到 topic 或 data 本次请求已返回");
            throw new IllegalStateException("topic 或 data参数缺失");
        }

        log.info("[向kafka生产数据] 当前请求topic为:{},json为:{}", topic, json);
        try {
            ProducerRecord<String, String> kafkaMessage = new ProducerRecord<>(topic, json);
            Future<RecordMetadata> metadataFuture = kafkaProducer.send(kafkaMessage);
            RecordMetadata recordMetadata = metadataFuture.get();
            log.info("[向kafka生产数据]  数据发送成功,返回消息为:{}", recordMetadata.toString());
        } catch (Exception e) {
            log.error("[向kafka生产数据] 数据发送失败,异常原因为:{}", e.toString());
        }
    }

    /**
     * 消息发送至 kafka分区
     *
     * @param topic
     * @param partition 分区
     * @param json
     */
    public void sendByPartition(String topic, Integer partition, String json) {

        if (StringUtils.isBlank(topic) || StringUtils.isBlank(json)) {
            log.error("[向kafka生产数据] 未获取到 topic 或 data 本次请求已返回");
            throw new IllegalStateException("topic 或 data参数缺失");
        }
        if (Objects.isNull(partition)) {
            log.error("[向kafka生产数据] 未获取到 偏移量信息,已按照默认请求发送");
            send(topic, json);
        }
        try {
            ProducerRecord<String, String> kafkaMessage = new ProducerRecord<>(topic, partition, null, json);
            Future<RecordMetadata> metadataFuture = kafkaProducer.send(kafkaMessage);
            RecordMetadata recordMetadata = metadataFuture.get();
            log.info("[向kafka生产数据]  数据发送成功,返回消息为:{}", recordMetadata.toString());
        } catch (Exception e) {
            log.error("[向kafka生产数据] 数据发送失败,异常原因为:{}", e.toString());
        }
    }
}
Maven依赖
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.6.0</version>
    </dependency>

搞定

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐