@KafkaListener(id = "layer_test_consumer", topics = {"${kafka.consumer.topic.layerTestConfig}"},
            groupId = "${kafka.consumer.group-id.layerTestConfig}", containerFactory = "batchContainerFactory", errorHandler = "consumerAwareListenerErrorHandler")
    public void LayerTestConfigListener(List<ConsumerRecord<String, String>> records, Acknowledgment ack, Consumer consumer) {
        consumerHandler(records, ack, consumer);
    }

    public abstract void consumerHandler(List<ConsumerRecord<String, String>> records, Acknowledgment ack, Consumer consumer);
@Getter
@Slf4j
@Configuration
@EnableKafka
@EnableConfigurationProperties(KafkaCustomProperties.class)
public class KafkaConfig {

    @Value("${kafka.topic.interactive-video}")
    private String interactiveVideoTopic;

    @Autowired
    private KafkaProperties kafkaProperties;
    @Autowired
    private KafkaCustomProperties kafkaCustomProperties;

    @PostConstruct
    public void kafkaPostConstruct(){
        log.info("kafka properties:{}", JSONObject.toJSONString(kafkaProperties));
        log.info("custom kafka properties:{}", JSONObject.toJSONString(kafkaCustomProperties));
        log.info("consumer properties:{}", kafkaProperties.buildConsumerProperties());
        log.info("listener properties:{}", JSONObject.toJSONString(kafkaProperties.getListener()));
    }


    /**
     * 自定义分层测试消费者工厂类
     * @return
     */
    public ConsumerFactory<String, String> batchConsumerFactory() {
        Map<String, Object> consumerProperties = kafkaProperties.buildConsumerProperties();
//        consumerProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, LayerTestConsumerInterceptor.class.getName());
        return new DefaultKafkaConsumerFactory<>(consumerProperties);
    }


    /**
     * 自定义分层测试监听工厂容器
     * @return
     */
    @Bean("batchContainerFactory")
    public ConcurrentKafkaListenerContainerFactory listenerContainer() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(batchConsumerFactory());
//        factory.setAckDiscarded(true);
//        factory.setRecordFilterStrategy(defaultRecordFilterStrategy);
        //设置为批量监听
        factory.setBatchListener(true);
        log.info("batchContainerFactory currency amount {}", JSONObject.toJSONString(factory));
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }

}
@Slf4j
@Component
public class LayerTestConsumerAwareListenerErrorHandler {
    @Bean("consumerAwareListenerErrorHandler")
    public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
        return (message, e, consumer) -> {
            //错误处理,后期扩展可能会用到
            log.error(ELKInfoUtil.getInfo(BaseUtil.getUniqueTraceId(), ELKChannel.TEACHER, ELKLevel.ERROR, message.getPayload().toString(), e));
            return message.getPayload().toString();
        };
    }
}
 /**
     * 批处理统一方法
     *
     * @param records
     * @param ack
     * @param consumer
     */
    @Override
    public void consumerHandler(List<ConsumerRecord<String, String>> records, Acknowledgment ack, Consumer consumer) {
        log.info("current deal thread id:" + Thread.currentThread().getId());
        log.info("batch record size:" + records.size());

        for (ConsumerRecord<String, String> record : records) {
            Optional<String> kafkaMessage = Optional.ofNullable(record.value());
            log.info("Received: " + record);
            if (kafkaMessage.isPresent()) {
                messagehandler(record, ack, consumer);
                String message = record.value();
                String topic = record.topic();
                int partition = record.partition();
                log.info("message={},topic={} ,partition={}", message, topic, partition);
            }
        }
    }

    /**
     * 消息处理
     * @param record
     * @param ack
     * @param consumer
     */
    public void messagehandler(ConsumerRecord<String, String> record, Acknowledgment ack, Consumer consumer) {
        TestConfigVO testConfigVO = null;
        String kafkaKey = null;
        try {
           
        } catch (Exception e) {
          
        } finally {
            ack.acknowledge();
        }
    }
spring:
  kafka:
    # 指定kafkaserver的地址,集群配多个,中间,逗号隔开
    bootstrap-servers: 10.15.5.40:9092,10.15.5.71:9092,10.15.3.182:9092
    customer:
      enable:
        auto:
          commit: true
      key:
        deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value:
        deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #kafka加密配置
    properties:
      sasl.mechanism: PLAIN
      security.protocol: SASL_PLAINTEXT
      sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="123465";
    producer:
      #procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
      #acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
      #acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
      #acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
      #可以设置的值为:all, -1, 0, 1
      acks: 1
      # 每次批量发送消息的数量,produce积累到一定数据,一次发送
      batch-size: 1
      # produce积累数据一次发送,缓存大小达到buffer.memory就发送数据
      buffer-memory: 33554432
      # 写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,
      # 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
      retries: 0
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      properties:
        sasl.mechanism: PLAIN
        security.protocol: SASL_PLAINTEXT
        sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="rw16" password="XSwx24";


如果topic加密的话

KafkaClient {
 org.apache.kafka.common.security.plain.PlainLoginModule required
 username="admin"
 password="123456";
}

启动添加:

-Djava.security.auth.login.config=./kafka.client_jaas.conf

 

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐