Kafka监听多个主题
Kafka监听多个主题方式一:使用@KafkaListener的topicPatter属性,通配.* 注意"."不要忽略1.引入依赖<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></depend
·
Kafka监听多个主题
方式一:使用@KafkaListener的topicPatter属性,通配.* 注意"."不要忽略
1.引入依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2.配置yaml文件
spring:
kafka:
topics: test.*
3.消费者监听
@Component
@Slf4j
public class KafkaConsumer {
@KafkaListener(topicPattern = "${spring.kafka.topics}",groupId = "${spring.kafka.consumer.group-id}")
public void consumer(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
try {
Optional message = Optional.ofNullable(record.value());
if (message.isPresent()) {
String msg = String.valueOf(message.get());
log.info("topic:{},is consumed success,topic:{}" + topic + ",msg:" + msg);
}
} catch (Exception e) {
log.error("topic:{},is consumed error:{}",topic,e.getMessage());
} finally {
ack.acknowledge();
}
}
}
方式二:使用@KafkaListener的topics属性,此时配置的是数组列表,注意用英文逗号分隔
1.引入依赖,同方式一
2.配置yaml文件
spring:
kafka:
topics: test1,test2,test3
3.消费者监听
@Component
@Slf4j
public class KafkaConsumer {
@KafkaListener(topics = "#{'${spring.kafka.topics}'.split(',')}", groupId = "${spring.kafka.consumer.group-id}")
public void consumer(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
try {
Optional message = Optional.ofNullable(record.value());
if (message.isPresent()) {
String msg = String.valueOf(message.get());
log.info("Received topic:{},msg:{}" , topic,msg);
}
} catch (Exception e) {
log.error("topic:{},is consumed error:{}", topic, e.getMessage());
} finally {
ack.acknowledge();
}
}
}
更多推荐
已为社区贡献1条内容
所有评论(0)