Kafka 从0.10版本开始支持流处理,我们可以使用 Kafka Streams 来开发实时应用程序。本章介绍 Spring Boot 集成 Kafka Streams 进行流式计算

Spring Boot 集成 Kafka 的基本配置和用法在“Spring Boot 集成 Kafka”有介绍,这里不再详述。

依赖

使用 Kafka Streams 流处理,在集成 Spring Kafka 的基础下,还需要引入:

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-streams</artifactId>
</dependency>

配置

  • 在 application.yml 配置
spring:
  kafka:
    streams:
      application-id: test-kafka-stream # 默认取springboot应用名
      bootstrap-servers: ${KAFKA_HOST:localhost}:${KAFKA_PORT:9092} # 会覆盖 spring.kafka.bootstrap-servers 配置
#      auto-startup: true
      properties:
        default:
          key:
            serde: org.apache.kafka.common.serialization.Serdes$StringSerde # 序列化key
          value:
            serde: org.springframework.kafka.support.serializer.JsonSerde # 序列化value
          timestamp:
            extractor: org.apache.kafka.streams.processor.WallclockTimestampExtractor
        spring:
          json:
            trusted:
              packages: com.engrz.lab.* # 允许json反序列化的包

流处理相关配置:spring.kafka.streams.*

更多配置参考:Spring Boot Integration Properties

  • 在 Java 代码中配置(与 application.yml 配置二选一)
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
	Map<String, Object> props = new HashMap<>();
	props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
	props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
	props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
	props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
	props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
	return new KafkaStreamsConfiguration(props);
}

值使用 JsonSerde 序列化,需要配置信任包,否则 Spring 会报出:If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).

使用

  • 创建流

使用 @EnableKafkaStreams 注解装配

@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfiguration {
        @Bean
    public KStream<String, Object> kStream(StreamsBuilder streamsBuilder) {
        KStream<String, Object> stream = streamsBuilder.stream("streamTopic");
        stream.map((k, v) -> new KeyValue<>(k, v)).to("myTopic", Produced.with(Serdes.String(), new JsonSerde<>()));
        return stream;
    }
}

可以指定多个topic,把接收的内容存到myTopic中

计算

本章只讲 Spring Boot 集成,关于 Kafka Streams 流计算会放在 Kafka 专题介绍。以下给出一个应用场景示例:

  • 定义一个订单model类
/**
 * @author Engr-Z
 * @since 2021/1/29
 */
@Data
public class OrderModel implements Serializable {

    /**
     * 用户id
     */
    private Integer userId;

    /**
     * 订单号
     */
    private String orderNo;

    /**
     * 订单时间
     */
    private LocalDateTime orderTime;

    /**
     * 订单金额
     */
    private BigDecimal orderAmt;

    /**
     * 订单状态
     */
    private String orderStatus;

}
  • 找出交易小于1元的订单,发送到 orderTopic
@Bean
public KStream<String, OrderModel> kStream(StreamsBuilder streamsBuilder) {
    KStream<String, OrderModel> stream = streamsBuilder.stream("streamTopic");
    stream.map((k, v) -> new KeyValue<>(k, v)).to("tableTopic", Produced.with(Serdes.String(), new JsonSerde<>()));
    stream.filter((k, v) -> {
        BigDecimal orderAmt = v.getOrderAmt();
        return orderAmt.compareTo(new BigDecimal(1)) < 0;
    }).to("orderTopic", Produced.with(Serdes.String(), new JsonSerde<>()));
}

通过实时计算,我们可以解决很多业务问题。如:实时数仓,实时风控等。


除非注明,否则均为"攻城狮·正"原创文章,转载请注明出处。
本文链接:https://engr-z.com/169.html

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐