kafka之@KafkaListener
@KafkaListener(id = "layer_test_consumer", topics = {"${kafka.consumer.topic.layerTestConfig}"},groupId = "${kafka.consumer.group-id.layerTestConfig}", containerFactory = "batchContainerFactory", erro
·
@KafkaListener(id = "layer_test_consumer", topics = {"${kafka.consumer.topic.layerTestConfig}"},
groupId = "${kafka.consumer.group-id.layerTestConfig}", containerFactory = "batchContainerFactory", errorHandler = "consumerAwareListenerErrorHandler")
public void LayerTestConfigListener(List<ConsumerRecord<String, String>> records, Acknowledgment ack, Consumer consumer) {
consumerHandler(records, ack, consumer);
}
public abstract void consumerHandler(List<ConsumerRecord<String, String>> records, Acknowledgment ack, Consumer consumer);
@Getter
@Slf4j
@Configuration
@EnableKafka
@EnableConfigurationProperties(KafkaCustomProperties.class)
public class KafkaConfig {
@Value("${kafka.topic.interactive-video}")
private String interactiveVideoTopic;
@Autowired
private KafkaProperties kafkaProperties;
@Autowired
private KafkaCustomProperties kafkaCustomProperties;
@PostConstruct
public void kafkaPostConstruct(){
log.info("kafka properties:{}", JSONObject.toJSONString(kafkaProperties));
log.info("custom kafka properties:{}", JSONObject.toJSONString(kafkaCustomProperties));
log.info("consumer properties:{}", kafkaProperties.buildConsumerProperties());
log.info("listener properties:{}", JSONObject.toJSONString(kafkaProperties.getListener()));
}
/**
* 自定义分层测试消费者工厂类
* @return
*/
public ConsumerFactory<String, String> batchConsumerFactory() {
Map<String, Object> consumerProperties = kafkaProperties.buildConsumerProperties();
// consumerProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, LayerTestConsumerInterceptor.class.getName());
return new DefaultKafkaConsumerFactory<>(consumerProperties);
}
/**
* 自定义分层测试监听工厂容器
* @return
*/
@Bean("batchContainerFactory")
public ConcurrentKafkaListenerContainerFactory listenerContainer() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(batchConsumerFactory());
// factory.setAckDiscarded(true);
// factory.setRecordFilterStrategy(defaultRecordFilterStrategy);
//设置为批量监听
factory.setBatchListener(true);
log.info("batchContainerFactory currency amount {}", JSONObject.toJSONString(factory));
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
}
@Slf4j
@Component
public class LayerTestConsumerAwareListenerErrorHandler {
@Bean("consumerAwareListenerErrorHandler")
public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
return (message, e, consumer) -> {
//错误处理,后期扩展可能会用到
log.error(ELKInfoUtil.getInfo(BaseUtil.getUniqueTraceId(), ELKChannel.TEACHER, ELKLevel.ERROR, message.getPayload().toString(), e));
return message.getPayload().toString();
};
}
}
/**
* 批处理统一方法
*
* @param records
* @param ack
* @param consumer
*/
@Override
public void consumerHandler(List<ConsumerRecord<String, String>> records, Acknowledgment ack, Consumer consumer) {
log.info("current deal thread id:" + Thread.currentThread().getId());
log.info("batch record size:" + records.size());
for (ConsumerRecord<String, String> record : records) {
Optional<String> kafkaMessage = Optional.ofNullable(record.value());
log.info("Received: " + record);
if (kafkaMessage.isPresent()) {
messagehandler(record, ack, consumer);
String message = record.value();
String topic = record.topic();
int partition = record.partition();
log.info("message={},topic={} ,partition={}", message, topic, partition);
}
}
}
/**
* 消息处理
* @param record
* @param ack
* @param consumer
*/
public void messagehandler(ConsumerRecord<String, String> record, Acknowledgment ack, Consumer consumer) {
TestConfigVO testConfigVO = null;
String kafkaKey = null;
try {
} catch (Exception e) {
} finally {
ack.acknowledge();
}
}
spring:
kafka:
# 指定kafkaserver的地址,集群配多个,中间,逗号隔开
bootstrap-servers: 10.15.5.40:9092,10.15.5.71:9092,10.15.3.182:9092
customer:
enable:
auto:
commit: true
key:
deserializer: org.apache.kafka.common.serialization.StringDeserializer
value:
deserializer: org.apache.kafka.common.serialization.StringDeserializer
#kafka加密配置
properties:
sasl.mechanism: PLAIN
security.protocol: SASL_PLAINTEXT
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="123465";
producer:
#procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
#acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
#acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
#acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
#可以设置的值为:all, -1, 0, 1
acks: 1
# 每次批量发送消息的数量,produce积累到一定数据,一次发送
batch-size: 1
# produce积累数据一次发送,缓存大小达到buffer.memory就发送数据
buffer-memory: 33554432
# 写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,
# 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
retries: 0
# 指定消息key和消息体的编解码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:
sasl.mechanism: PLAIN
security.protocol: SASL_PLAINTEXT
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="rw16" password="XSwx24";
如果topic加密的话
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="123456";
}
启动添加:
-Djava.security.auth.login.config=./kafka.client_jaas.conf
更多推荐
已为社区贡献1条内容
所有评论(0)