1.pom文件导入依赖

       <!--kafka-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>


2.application.properties文件配置

spring.kafka.bootstrap-servers=127.0.0.1:9092
#这里的分组可以随便添加一个
spring.kafka.consumer.group-id=xxx-log-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.session.timeout=60000
spring.kafka.consumer.maxpoll.interval=120000
spring.kafka.consumer.maxpoll.records=100
spring.kafka.consumer.concurrency=3


3.创建消息发送者

 我们可以在Controller中添加以下代码:

    /**
     * 发送消息
     *
     * @param msg
     * @return
     */
    @RequestMapping(value = "/test1")
    public String test(String msg) {

        Properties props = new Properties();
        //kakfa 服务
        props.put("bootstrap.servers", "localhost:9092");
        //leader 需要等待所有备份都成功写入日志
        props.put("acks", "all");
        //重试次数
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        //key的序列化方式
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //value的序列化方式
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<>(props);
        //String kafkaJson = "{\"appName\":\"综合管理\",\"appEnName\":null,\"funName\":\"个人中心-我的资源\",\"funEnName\":null,\"opType\":1,\"operateResult\":1,\"errorCode\":null,\"opName\":\"查询\",\"opTime\":\"2022-07-18 19:04:42\",\"operateTime\":\"20220818190422\",\"desp\":\"tests/wewrwerwe\",\"env\":\"os\\u003dWindows 10,\",\"userId\":\"st0001\",\"ip\":\"10.0.10.102\",\"idCard\":\"1111111111111111\",\"createdTime\":\"2022-07-18 19:04:11\",\"userName\":\"系统管理员\",\"groupId\":\"10000\",\"groupName\":\"xxxxx\",\"organizationId\":\"3200000000\",\"userType\":\"\"}";
        //JSONObject jsonObject = JSON.parseObject(msg);
        producer.send(new ProducerRecord<>(topics, "client1", msg));
        producer.close();
        return "已向kafka发送消息";
    }

4.创建消息监听者

@Slf4j

@EnableBinding(Sink.class)

public class WorkbenchStreamListener {

  @Resource

  private FileService fileService;



  @StreamListener(KafkaConstants.xxx_input) // 监听接受通道

  public void receiveData(MoveMessage moveMessage) {

  }

}

// 接受通道

public interface Sink {

  @Input(KafkaConstants.OES_WORKBENCH_MOVE_INPUT)

  SubscribableChannel oesWorkbenchMoveInput(); // 接受通道用SubscribableChannel

}

  接下来就可以愉快的发送监听消息了

到此这篇关于spring boot 使用spring-cloud-stream简单操作kafka使用详解的文章就介绍到这了,更多相关spring boot 使用spring-cloud-stream简单操作kafka内容请搜索我之前的文章,希望大家以后多多支持!

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐