第一课 三分钟入门spring-kafka

第一课 三分钟入门spring-kafka

前提

  1. 已经安装好了kafka,kafka的安装和使用不在本教程的范围内,请自行安装。
  2. 了解spirng-boot,文章假设读者是使用spring-boot项目。

正式内容

  1. 引入spring-kafka的依赖,不需要指定版本。当使用 Spring Boot 时,省略版本,Spring Boot将自动引入与您的Spring Boot 版本兼容的正确版本。

    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
    </dependency>
    
  2. 增加主题,代码如下所示。为了简单起见,我们将代码直接加到了启动类Application。我们使用@Bean定义了一个bean。这个bean就是我们的kafka主题。使用TopicBuilder可以帮助我们生成一个Topic。与此同时,我们指定了Topic的名称为topic1,分区数为10以及重复数为1。

    @SpringBootApplication
    public class Application {
    
        public static void main(String[] args) {
            SpringApplication.run(Application.class, args);
        }
    
        @Bean
        public NewTopic topic() {
            return TopicBuilder.name("topic1")
                    .partitions(10)
                    .replicas(1)
                    .build();
        }
    }
    
  3. 增加生产者。同样在Application类中,我们增加如下一段代码。我们定义了一个ApplicationRunner类型的Bean。这个Bean在项目启动的时候就会自动执行run方法。这个代码的意识就是,在项目启动时,自动向kafka的topic1主题发送一个消息。

        @Bean
        public ApplicationRunner runner(KafkaTemplate<String, String> template) {
            return args -> {
                template.send("topic1", "test");
            };
        }
    
  4. 增加消费者。使用注解@KafkaListener表示这个方法监听kafka的topic1主题。每当kafka的topic1主题里有消息要消费时,就会触发这个方法。

        @KafkaListener(id = "myId", topics = "topic1")
        public void listen(String in) {
            System.out.println(in);
        }
    
  5. 编辑配置文件application.yml,加入kafka的配置。配置要按照你自己的kafka的安装来。

    spring:
      kafka:
        bootstrap-servers:
          - 127.0.0.1:9092
    
  6. 执行程序,发现test被输出到命令行了,表示给kafka发送消息和接受消息都已经成功了。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐