Spring整合Kafka

在这里插入图片描述

一、引入依赖

<!--Spring整合Kafka的依赖-->
<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
</dependency>

二、配置kafka

#KafkaProperties
##配端口号
spring.kafka.bootstrap-servers=localhost:9092
##配消费者的组id
spring.kafka.consumer.group-id=community-consumer-group
##是否自动提交 消费者的偏移量
spring.kafka.consumer.enable-auto-commit=true
##自动提交的频率,也就是多久提交一次——3000ms-3s  
spring.kafka.consumer.auto-commit-interval=3000

其中:
在这里插入图片描述需进行改动(改不改无所谓)

在这里插入图片描述

在这里插入图片描述

三、测试代码–如何用kafka

先启动zookeeper和kafka
在这里插入图片描述

重点:生产者发消息,使我们主动去调的;消费者接收消息,是自动的,只需要在方法上加一个@KafkaListener(topics = {“xxxx”}),就能监听xxxx主体的生产者发来的消息,并且接收到!

在这里插入图片描述

3.1 KafkaTests

@RunWith(SpringRunner.class)
@SpringBootTest
@ContextConfiguration(classes = CommunityApplication.class)
public class KafkaTests {

    @Autowired
    private kafkaProducer kafkaProducer;

    @Test
    public void testKafka(){
        kafkaProducer.sendMessage("test","你好!");
        kafkaProducer.sendMessage("test","你收到信息了吗?");

        try {
            Thread.sleep(1000*10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

//为了方便,直接在这写两个类,而不去去外面重新新建
@Component
class kafkaProducer{//生产者

    @Autowired
    private KafkaTemplate kafkaTemplate;

    public void sendMessage(String topic,String context){
        kafkaTemplate.send(topic,context);
    }
}@Component
class kafkaConsumer{//消费者

    @KafkaListener(topics = {"test"})
    public void handleMessage(ConsumerRecord record){
        System.out.println(record.value());
    }

}

3.2 测试结果

在这里插入图片描述

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐