已解决 Magic v1 does not support record headers
Magic v1 does not support record headers
·
Kafka生产数据报错
- 异常原因:Magic v1 does not support record headers
- 服务端 kafka版本: 0.10.1
- 客户端 kafka-client版本: 2.6.0
- 原因: 因为java客户端消费不受影响,只是生产数据的时候,服务端kafka版本过老,不兼容该程序,通过debug发现 报错原因如下
- 通过源码跟踪 定位如下:
因为kafka需要链路跟踪,而我这个程序只是一个简单的 producer,consumer,消费数据和生产数据而已。
解决方案一
- 项目启动时,忽略 TraceMessagingAutoConfiguration.class
@SpringBootApplication(exclude ={ TraceMessagingAutoConfiguration.class})
因为这个类是自动配置消息链路跟踪的 忽略了即可。
private KafkaTemplate<String, Object> kafkaTemplate;
kafkaTemplate.send(topic, json);
解决方案二
如果以上方案不可行 请使用以下办法 ,以兼容 <=0.10.2 的kafaka server 发送消息。
- 添加kafka配置类和生产者服务
package cn.telltao.kafka.producer.config;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.List;
import java.util.Properties;
/**
* @author telltao@qq.com
* 为了兼容kafka servier版本小于 0.10.2所做的兼容生产者发送消息模式
* 因 kafka-clients发送消息时会增加heads数据 而旧版不允许发送这个命令 所以发送通道改为旧版
* @Date 2022/8/09 12:13
*/
@Configuration
public class KafkaProducerConfig {
@Value("#{'${spring.kafka.bootstrap-servers}'.split(',')}")
private List<String> bootstrapServers;
@Bean
public KafkaProducer<String, String> initKafkaTemplate() {
Properties props = new Properties();
//设置接入点,请通过控制台获取对应Topic的接入点
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
//Kafka消息的序列化方式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
//请求的最长等待时间
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
//设置客户端内部重试次数
props.put(ProducerConfig.RETRIES_CONFIG, 5);
//设置客户端内部重试间隔
props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 3000);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
return producer;
}
}
- 添加Kafka生产者服务
package cn.telltao.kafka.producer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Objects;
import java.util.concurrent.Future;
/**
* @param
* @author telltao@qq.com
* Kafka 生产者 向kafka发送消息
* 高版本客户端向低版本发送消息
* @date 2022/8/09 12:13
* @return
*/
@Slf4j
@Component
public class KafkaProducerService {
@Resource
private KafkaProducer kafkaProducer;
/**
* 消息发送至 kafka
*
* @param topic 消息主题
* @param json 消息json
*/
public void send(String topic, String json) {
if (StringUtils.isBlank(topic) || StringUtils.isBlank(json)) {
log.error("[向kafka生产数据] 未获取到 topic 或 data 本次请求已返回");
throw new IllegalStateException("topic 或 data参数缺失");
}
log.info("[向kafka生产数据] 当前请求topic为:{},json为:{}", topic, json);
try {
ProducerRecord<String, String> kafkaMessage = new ProducerRecord<>(topic, json);
Future<RecordMetadata> metadataFuture = kafkaProducer.send(kafkaMessage);
RecordMetadata recordMetadata = metadataFuture.get();
log.info("[向kafka生产数据] 数据发送成功,返回消息为:{}", recordMetadata.toString());
} catch (Exception e) {
log.error("[向kafka生产数据] 数据发送失败,异常原因为:{}", e.toString());
}
}
/**
* 消息发送至 kafka分区
*
* @param topic
* @param partition 分区
* @param json
*/
public void sendByPartition(String topic, Integer partition, String json) {
if (StringUtils.isBlank(topic) || StringUtils.isBlank(json)) {
log.error("[向kafka生产数据] 未获取到 topic 或 data 本次请求已返回");
throw new IllegalStateException("topic 或 data参数缺失");
}
if (Objects.isNull(partition)) {
log.error("[向kafka生产数据] 未获取到 偏移量信息,已按照默认请求发送");
send(topic, json);
}
try {
ProducerRecord<String, String> kafkaMessage = new ProducerRecord<>(topic, partition, null, json);
Future<RecordMetadata> metadataFuture = kafkaProducer.send(kafkaMessage);
RecordMetadata recordMetadata = metadataFuture.get();
log.info("[向kafka生产数据] 数据发送成功,返回消息为:{}", recordMetadata.toString());
} catch (Exception e) {
log.error("[向kafka生产数据] 数据发送失败,异常原因为:{}", e.toString());
}
}
}
Maven依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>
搞定
更多推荐
已为社区贡献1条内容
所有评论(0)