Spring整合Kafka
1.Spring整个Kafka2.小demo演示
·
Spring整合Kafka
一、引入依赖
<!--Spring整合Kafka的依赖-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
二、配置kafka
#KafkaProperties
##配端口号
spring.kafka.bootstrap-servers=localhost:9092
##配消费者的组id
spring.kafka.consumer.group-id=community-consumer-group
##是否自动提交 消费者的偏移量
spring.kafka.consumer.enable-auto-commit=true
##自动提交的频率,也就是多久提交一次——3000ms-3s
spring.kafka.consumer.auto-commit-interval=3000
其中:
需进行改动(改不改无所谓)
→
三、测试代码–如何用kafka
先启动zookeeper和kafka
重点
:生产者发消息,使我们主动去调的;消费者接收消息,是自动的,只需要在方法上加一个@KafkaListener(topics = {“xxxx”}),就能监听xxxx主体的生产者发来的消息,并且接收到!
3.1 KafkaTests
@RunWith(SpringRunner.class)
@SpringBootTest
@ContextConfiguration(classes = CommunityApplication.class)
public class KafkaTests {
@Autowired
private kafkaProducer kafkaProducer;
@Test
public void testKafka(){
kafkaProducer.sendMessage("test","你好!");
kafkaProducer.sendMessage("test","你收到信息了吗?");
try {
Thread.sleep(1000*10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//为了方便,直接在这写两个类,而不去去外面重新新建
@Component
class kafkaProducer{//生产者
@Autowired
private KafkaTemplate kafkaTemplate;
public void sendMessage(String topic,String context){
kafkaTemplate.send(topic,context);
}
}@Component
class kafkaConsumer{//消费者
@KafkaListener(topics = {"test"})
public void handleMessage(ConsumerRecord record){
System.out.println(record.value());
}
}
3.2 测试结果
更多推荐
已为社区贡献3条内容
所有评论(0)