spring boot 使用spring-cloud-stream简单操作kafka使用详解
这篇文章主要介绍了在spring boot 中使用spring-cloud-stream简单操作kafka使用详解,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
·
1.pom文件导入依赖
<!--kafka-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
2.application.properties文件配置
spring.kafka.bootstrap-servers=127.0.0.1:9092
#这里的分组可以随便添加一个
spring.kafka.consumer.group-id=xxx-log-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.session.timeout=60000
spring.kafka.consumer.maxpoll.interval=120000
spring.kafka.consumer.maxpoll.records=100
spring.kafka.consumer.concurrency=3
3.创建消息发送者
我们可以在Controller中添加以下代码:
/**
* 发送消息
*
* @param msg
* @return
*/
@RequestMapping(value = "/test1")
public String test(String msg) {
Properties props = new Properties();
//kakfa 服务
props.put("bootstrap.servers", "localhost:9092");
//leader 需要等待所有备份都成功写入日志
props.put("acks", "all");
//重试次数
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
//key的序列化方式
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//value的序列化方式
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
//String kafkaJson = "{\"appName\":\"综合管理\",\"appEnName\":null,\"funName\":\"个人中心-我的资源\",\"funEnName\":null,\"opType\":1,\"operateResult\":1,\"errorCode\":null,\"opName\":\"查询\",\"opTime\":\"2022-07-18 19:04:42\",\"operateTime\":\"20220818190422\",\"desp\":\"tests/wewrwerwe\",\"env\":\"os\\u003dWindows 10,\",\"userId\":\"st0001\",\"ip\":\"10.0.10.102\",\"idCard\":\"1111111111111111\",\"createdTime\":\"2022-07-18 19:04:11\",\"userName\":\"系统管理员\",\"groupId\":\"10000\",\"groupName\":\"xxxxx\",\"organizationId\":\"3200000000\",\"userType\":\"\"}";
//JSONObject jsonObject = JSON.parseObject(msg);
producer.send(new ProducerRecord<>(topics, "client1", msg));
producer.close();
return "已向kafka发送消息";
}
4.创建消息监听者
@Slf4j
@EnableBinding(Sink.class)
public class WorkbenchStreamListener {
@Resource
private FileService fileService;
@StreamListener(KafkaConstants.xxx_input) // 监听接受通道
public void receiveData(MoveMessage moveMessage) {
}
}
// 接受通道
public interface Sink {
@Input(KafkaConstants.OES_WORKBENCH_MOVE_INPUT)
SubscribableChannel oesWorkbenchMoveInput(); // 接受通道用SubscribableChannel
}
接下来就可以愉快的发送监听消息了
到此这篇关于spring boot 使用spring-cloud-stream简单操作kafka使用详解的文章就介绍到这了,更多相关spring boot 使用spring-cloud-stream简单操作kafka内容请搜索我之前的文章,希望大家以后多多支持!
更多推荐
已为社区贡献6条内容
所有评论(0)