第一课 三分钟入门spring-kafka
三分钟入门spirng-kafka增加主题,代码如下所示。为了简单起见,我们将代码直接加到了启动类。我们使用定义了一个bean。这个bean就是我们的kafka主题。使用可以帮助我们生成一个Topic。与此同时,我们指定了Topic的名称为topic1,分区数为10以及重复数为1。增加生产者。同样在类中,我们增加如下一段代码。我们定义了一个类型的Bea......
第一课 三分钟入门spring-kafka
第一课 三分钟入门spring-kafka
前提
- 已经安装好了kafka,kafka的安装和使用不在本教程的范围内,请自行安装。
- 了解spirng-boot,文章假设读者是使用spring-boot项目。
正式内容
-
引入spring-kafka的依赖,不需要指定版本。当使用 Spring Boot 时,省略版本,Spring Boot将自动引入与您的Spring Boot 版本兼容的正确版本。
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
-
增加主题,代码如下所示。为了简单起见,我们将代码直接加到了启动类
Application
。我们使用@Bean
定义了一个bean。这个bean就是我们的kafka主题。使用TopicBuilder
可以帮助我们生成一个Topic。与此同时,我们指定了Topic的名称为topic1,分区数为10以及重复数为1。@SpringBootApplication public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } @Bean public NewTopic topic() { return TopicBuilder.name("topic1") .partitions(10) .replicas(1) .build(); } }
-
增加生产者。同样在
Application
类中,我们增加如下一段代码。我们定义了一个ApplicationRunner
类型的Bean。这个Bean在项目启动的时候就会自动执行run
方法。这个代码的意识就是,在项目启动时,自动向kafka的topic1主题发送一个消息。@Bean public ApplicationRunner runner(KafkaTemplate<String, String> template) { return args -> { template.send("topic1", "test"); }; }
-
增加消费者。使用注解
@KafkaListener
表示这个方法监听kafka的topic1主题。每当kafka的topic1主题里有消息要消费时,就会触发这个方法。@KafkaListener(id = "myId", topics = "topic1") public void listen(String in) { System.out.println(in); }
-
编辑配置文件
application.yml
,加入kafka的配置。配置要按照你自己的kafka的安装来。spring: kafka: bootstrap-servers: - 127.0.0.1:9092
-
执行程序,发现
test
被输出到命令行了,表示给kafka发送消息和接受消息都已经成功了。
更多推荐
所有评论(0)