环境请自行搭建,本文仅适用于SpringCloud框架整合Kafka实现消息收发。

application.yml新增Kafka配置信息

spring:
  #==============================================================
  #spring-cloud-stream-Kafka配置 开始
  #==============================================================
  cloud:
    stream:
      default-binder: kafka #Default binder
      bindings:
        #缺省的输入、输出通道(配置自己定义的通道与哪个中间件交互)
        es_default_input:
          destination: es_default_topic
          binder: kafka
          group: es_default_group
          consumer:
            concurrency: 2 #入站消费者的并发性
        es_default_output:
          destination: es_default_topic
          binder: kafka
          content-type: text/plain
        #告警的输入、输出通道(多主题、分组测试用,实际开发中根据业务需求定义)
        es_alarm_input:
          destination: es_alarm_topic
          binder: kafka
          group: es_alarm_group
        es_alarm_output:
          destination: es_alarm_topic
          binder: kafka
          content-type: text/plain
        #kafka配置
      kafka:
        binder:
          autoCreateTopics: true   # 自动创建topics
          autoAddPartitions: true 
          replicationFactor: 1
          brokers: cm02:9092,cm03:9092,cm04:9092  #Kafka的服务端列表,cm02为集群节点,在本地host配置,这里替换成你的IP即可
          zkNodes: cm01:2181,cm02:2181,cm03:2181  #Kafka服务端连接的ZooKeeper节点列表
          requiredAcks: 1
  #==============================================================
  #spring-cloud-stream-Kafka配置 结束
  #==============================================================

pom.xml引入依赖包。

在pom.xml的dependenies节点中进行新增,具体如下:

<!--kafka-->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

自定义input和output通道

@Input注解标识一个输入通道
@Output注解标识一个输出通道
通道名称作为参数,如果未提供参数,默认使用方法名称作为通道名称。

package com.github.market.admin.common.kafka;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

/**
 * kafka自定义信息通道
 * @author sfy
 * @date 2020/9/22 10:00 上午
 */
public interface EsChannel {
    /**
     * 缺省发送消息通道名称
     */
    String ES_DEFAULT_OUTPUT = "es_default_output";

    /**
     * 缺省接收消息通道名称
     */
    String ES_DEFAULT_INPUT = "es_default_input";

    /**
     * 告警发送消息通道名称
     */
    String ES_ALARM_OUTPUT = "es_alarm_output";

    /**
     * 告警接收消息通道名称
     */
    String ES_ALARM_INPUT = "es_alarm_input";

    /**
     * 缺省发送消息通道
     * @return channel 返回缺省信息发送通道
     */
    @Output(ES_DEFAULT_OUTPUT)
    MessageChannel sendEsDefaultMessage();

    /**
     * 告警发送消息通道
     * @return channel 返回告警信息发送通道
     */
    @Output(ES_ALARM_OUTPUT)
    MessageChannel sendEsAlarmMessage();

    /**
     * 缺省接收消息通道
     * @return channel 返回缺省信息接收通道
     */
    @Input(ES_DEFAULT_INPUT)
    MessageChannel recieveEsDefaultMessage();

    /**
     * 告警接收消息通道
     * @return channel 返回告警信息接收通道
     */
    @Input(ES_ALARM_INPUT)
    MessageChannel recieveEsAlarmMessage();
}


定义消息发送器-生产者

package com.github.market.admin.common.kafka.sender;

import com.github.market.admin.common.kafka.EsChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

/**
 * kafka消息发送器
 *      注入先前定义的通道EsChannel,sendToDefaultChannel、sendToAlarmChannel分别
 *      为我们自定义的两个发送方法,可将消息发送到不同的通道中,每个通道对应一个kafka的主题。
 * @author sfy
 * @date 2020/9/22 10:45 上午
 */
@Slf4j
@Component
public class EsKafkaMessageSender {
    @Autowired
    private EsChannel channel;

    /**
     * 消息发送到默认通道:缺省通道对应缺省主题
     * @param message
     */
    public void sendToDefaultChannel(String message){
        channel.sendEsDefaultMessage().send(MessageBuilder.withPayload(message).build());
    }

    /**
     * 消息发送到告警通道:告警通道对应告警主题
     * @param message
     */
    public void sendToAlarmChannel(String message){
        channel.sendEsAlarmMessage().send(MessageBuilder.withPayload(message).build());
    }
}


定义消息订阅器-消费者

package com.github.market.admin.listener.kafka;

import com.github.market.admin.common.kafka.EsChannel;
import com.xiaoleilu.hutool.date.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;

/**
 * kafka从不同的通道实现消息的订阅。
 * @author sfy
 * @date 2020/9/22 10:52 上午
 */
@Slf4j
@EnableBinding(value = EsChannel.class)
public class EsKafkaMessageReceiveListener {

    /**
     * 从缺省通道接收消息
     * @param message
     */
    @StreamListener(EsChannel.ES_DEFAULT_INPUT)
    public void receive(Message<String> message){
        log.info("{}订阅告警消息:通道 = es_default_input,data = {}", DateUtil.now(), message);
    }

    /**
     * 从告警通道接收消息
     * @param message
     */
    @StreamListener(EsChannel.ES_ALARM_INPUT)
    public void receiveAlarm(Message<String> message){
        System.out.println("订阅告警消息:" + message);
        log.info("{}订阅告警消息:通道 = es_alarm_input,data = {}", DateUtil.now(), message);
    }
}


新增EnableBinding

@EnableBinding注解,这个注解指定刚才我们定义消息通道的接口名称。

@EnableAsync
@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients
@ComponentScan(basePackages = {"com.github.market.admin", "com.github.collection.common.bean"})
@EnableBinding(EsChannel.class)  //加这个就好
public class MarketAdminServiceApplication {
    public static void main(String[] args) {
        SpringApplication.run(MarketAdminServiceApplication.class, args);
    }
}

定义Controller调用

package com.github.market.admin.controller;

import com.github.collection.common.util.Response;
import com.github.market.admin.common.kafka.sender.EsKafkaMessageSender;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 *
 * @author sfy
 * @date 2020/9/21 10:51 上午
 */
@Slf4j
@RestController
@RequestMapping("/demo")
public class DemoController {
    @Autowired
    private EsKafkaMessageSender sender

    /**
     * kafka发送消息
     * @author sfy
     * @date 2020/9/22 3:50 下午
     */
    @PostMapping("/testKafkaMessageSend")
    public void testKafkaMessageSend(String message) {
        sender.sendToDefaultChannel(message);
        sender.sendToDefaultChannel(message);
        sender.sendToDefaultChannel(message);
        sender.sendToDefaultChannel(message);
    }

    /**
     * kafka发送消息
     * @author sfy
     * @date 2020/9/22 3:50 下午
     */
    @PostMapping("/testKafkaMessageSend2")
    public void testKafkaMessageSend2(String message) {
        sender.sendToAlarmChannel(message);
    }

}

这里是分割线···

在这里插入图片描述
传送链

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐