Springboot与Kafka整合消费
一、pom文件<!--Kafka 依赖--><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>二、配置文件在application.yml中添加配置文件s
·
一、pom文件
<!--Kafka 依赖-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
二、配置文件
在application.yml中添加配置文件
spring:
#kafka集群配置
kafka:
bootstrap-servers: 10.0.40.11:9092
#初始化生产者配置
producer:
#重试次数
retries: 0
#应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
acks: 1
#批量大小
batch-size: 16384
properties:
#当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
#linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
linger.ms: 1
#生产端缓冲区大小
buffer-memory: 33554432
#Kafka提供的序列化和反序列化类
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
#初始化消费者配置
consumer:
#默认的消费组ID
group-id: iosConsumerGroup
#默认的消费主题
topic-name: test,test2
#是否自动提交offset
enable-auto-commit: false
#提交offset延时(接收到消息后多久提交offset)
# auto-commit-interval: 1000ms
#当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset;
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
auto-offset-reset: latest
properties:
#消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
session.timeout.ms: 15000
#消费请求超时时间
request.timeout.ms: 18000
#Kafka提供的序列化和反序列化类
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#批量消费每次最多消费多少条消息
#每次拉取一条,一条条消费,当然是具体业务状况设置
max-poll-records: 1
# 指定心跳包发送频率,即间隔多长时间发送一次心跳包,优化该值的设置可以减少Rebalance操作,默认时间为3秒;
heartbeat-interval: 6000
#监听器设置
listener:
#消费端监听的topic不存在时,项目启动会报错(关掉)
missing-topics-fatal: false
#设置消费类型 批量消费 batch,单条消费:single
#type: batch
#指定容器的线程数,提高并发量
#concurrency: 3
#手动提交偏移量
ack-mode: manual
三、Kafka配置类
3.1 读取配置类
@Data
@SpringBootConfiguration
public class KafkaTopicProperties implements Serializable {
/** 消费组id */
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
/**消费主题集合*/
@Value("${spring.kafka.consumer.topic-name}")
private String[] topicName;
}
3.2 主题配置类
@Configuration
public class KafkaTopicConfiguration {
private final KafkaTopicProperties properties;
public KafkaTopicConfiguration(KafkaTopicProperties properties) {
this.properties = properties;
}
@Bean
public String[] kafkaTopicName() {
return properties.getTopicName();
}
@Bean
public String topicGroupId() {
return properties.getGroupId();
}
}
四、Kafka生产者与消费者配置
4.1 消费者(监听器)
@Slf4j
@Service
public class KafkaConsumer {
@Autowired private KafkaHandleMessageService kafkaHandleMessageService;
@KafkaProcessLog //自定义AOP切面,可记录消费日志
@KafkaListener(topics = "#{kafkaTopicName}", groupId = "#{topicGroupId}")
public Result processMessage(ConsumerRecord<Integer, String> consumerRecord, Acknowledgment ack) {
Result result = new Result();
try {
// 根据业务消费消息
result = kafkaHandleMessageService.handleMessage(consumerRecord);
// 采用手动提交
ack.acknowledge();
return result;
} catch (Exception e) {
log.error("Kafka消息消费异常,错误原因为:{}", e.getMessage());
result.setStatus(Result.FAILURE);
result.setMsg("Kafka消息消费异常,错误原因为:" + e.getMessage());
return result;
} finally {
// 最终手动提交,防止异常捕获漏提交,重复消费
ack.acknowledge();
}
}
}
4.2 生产者
@Slf4j
@Service
public class KafkaProducer {
@Autowired private KafkaTemplate<Integer, String> kafkaTemplate;
public void sendMessage(String topic, String data) {
log.info("kafka sendMessage start");
ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(topic, data);
// 回调函数
future.addCallback(
new ListenableFutureCallback<SendResult<Integer, String>>() {
@Override
public void onFailure(Throwable ex) {
log.error("kafka sendMessage error, ex = {}, topic = {}, data = {}", ex, topic, data);
}
@Override
public void onSuccess(SendResult<Integer, String> result) {
log.info("kafka sendMessage success topic = {}, data = {}", topic, data);
}
});
log.info("kafka sendMessage end");
}
五、AOP切面配置
5.1 自定义切面注解
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface KafkaProcessLog {
}
5.2 切面处理类
@Aspect
@Component
@Slf4j
public class KafkaProcessAspect {
@Autowired private KafkamessagelogService kafkamessagelogService;
@Pointcut("@annotation(cn.com.sinosoft.crs.common.aop.annotation.KafkaProcessLog)")
public void kafkaPointcut() {
// Do nothing
}
@Around("kafkaPointcut()")
public void around(ProceedingJoinPoint joinPoint) throws Throwable {
log.info("Kafka消息处理已启动...");
long beginTime = System.currentTimeMillis();
ConsumerRecord<Integer, String> consumerRecord;
// ==================================保存Kafka消息============================================
Object[] args = joinPoint.getArgs();
consumerRecord = (ConsumerRecord<Integer, String>) args[0];
// 1.消费前保存日志
Result result = new Result();
try {
result = kafkamessagelogService.saveKafkaMeessageLog(consumerRecord);
} catch (Exception e) {
log.error("Kafka消息保存失败,错误原因为{}", e.getMessage());
}
KafkamessagelogEntity kafkamessagelogEntity = (KafkamessagelogEntity) result.getData();
// ==================================输出Kafka消息============================================
log.info(
"Kafka本次消费内容如下,消费分区编码:{},偏移量:{},主题:{},表:{},表内容:{}",
consumerRecord.partition(),
consumerRecord.offset(),
consumerRecord.topic(),
consumerRecord.key(),
// 判断value值是否为空
StrUtil.isBlank(consumerRecord.value())
? ""
: JsonUtils.transToLowerObject(consumerRecord.value()).toJSONString());
// ==================================更新Kafka消息============================================
try {
// 2.执行方法
result = (Result) joinPoint.proceed();
// 3.更新日志信息
result = kafkamessagelogService.updateKafkaMeessageLog(kafkamessagelogEntity, result);
log.info("kafka消息更新{}", result.getStatus() ? "成功!" : "失败!");
} catch (Exception e) {
// 消费异常时更新日志
result.setStatus(Result.FAILURE);
result.setMsg(ExceptionUtils.ex2Str(e));
kafkamessagelogService.updateKafkaMeessageLog(kafkamessagelogEntity, result);
log.error("Kafka消息消费转换失败,错误原因为{}", e.getMessage());
}
// 4.消费后更新日志
// 执行时长(毫秒)
long time = System.currentTimeMillis() - beginTime;
log.info("Kafka消费处理已结束,共用时{}毫秒!", time);
}
}
以上,请参考,谢谢!
更多推荐
已为社区贡献5条内容
所有评论(0)