springboot 使用 kafka
kafka
·
pom 文件添加依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
application.yml 文件添加配置
bootstrap-servers:是 broker 地址,如果是集群,则用逗号分隔
spring:
kafka:
bootstrap-servers: 192.168.140.xxx:xxxx,192.168.140.xxx:xxxx,192.168.140.xxx:xxxx
生产者
import com.njc.java.entity.api.resp.TestInfoRespDTO;
import com.njc.java.entity.base.NjcResponseEntity;
import com.njc.java.property.KafkaProperty;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class KafkaProducerController extends BaseController {
@Autowired
KafkaTemplate kafkaTemplate;
@Autowired
KafkaProperty kafkaProperty;
/**
* 卡夫卡推送消息
* @param msg 消息
* @return
*/
@ApiOperation(value = "卡夫卡推送", notes = "卡夫卡推送")
@PostMapping(value = "/kafka/{msg}")
NjcResponseEntity<TestInfoRespDTO> sendKafka(@PathVariable("msg") String msg) {
kafkaTemplate.send("topic", msg);
return super.success("成功发送kafka", null);
}
}
消费者
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.Optional;
/**
* @author luozhanfeng
* @version 1.0
* @date 2022/5/30 9:59
* @description
*/
@Slf4j
@Component
public class KafkaConsumer {
@KafkaListener(topics = "#{'${spring.kafka.topicDemo}'}", groupId = "a")
public void ProductInsertEvent1(ConsumerRecord<String, String> record) {
Optional<String> kafkaMessage = Optional.ofNullable(record.value());
kafkaMessage.ifPresent(msg -> {
log.info("KafkaConsumer1 kafka value:{}", msg);
});
}
@KafkaListener(topics = "#{'${spring.kafka.topicDemo}'}", groupId = "b")
public void ProductInsertEvent2(ConsumerRecord<String, String> record) {
Optional<String> kafkaMessage = Optional.ofNullable(record.value());
kafkaMessage.ifPresent(msg -> {
log.info("KafkaConsumer2 kafka value:{}", msg);
});
}
}
相同 topic 如果不用 groupId 区分会报错
No group.id found in consumer config, container properties, or @KafkaListener annotation; a group.id is required when group management is used.
更多推荐
已为社区贡献8条内容
所有评论(0)