Kafka批量消费&逐条消费
消费者配置参数private Map<String, Object> defaultGoodsConsumerConfig() {Map<String, Object> props = Maps.newHashMap();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip:port");props.put(Consu
·
消费者配置参数
private Map<String, Object> defaultGoodsConsumerConfig() {
Map<String, Object> props = Maps.newHashMap();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip:port");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "50");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer
");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer
");
props.put("listener.type", "batch");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "modify-group");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, “SASL_PLAINTEXT”);
props.put(SaslConfigs.SASL_MECHANISM, defaultKafkaProperties.getSaslMechanism());
props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="###" password="###";
");
return props;
}
@Bean(name = "defaultListenerContainerFactory")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> defaultListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory(defaultGoodsConsumerConfig()));
factory.setConcurrency(4);
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(3000);
log.info("KafkaDefaultConsumer factory获取实例:"+ JSON.toJSONString(factory));
return factory;
}
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory(defaultGoodsConsumerConfig()));
factory.setConcurrency(4);
//批量消费,如果不设置默认是单条消费
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(3000);
消费者监听消息
/**
* 监听goods变更消息
*/
@KafkaListener(id="sync-modify-goods", topics = "${kafka.sync.goods.topic}", concurrency = "4", containerFactory = "defaultListenerContainerFactory")
public void updateListener(List<ConsumerRecord<String, String>> records){
for (ConsumerRecord<String, String> msg:records) {
GoodsChangeMsg changeMsg = null;
try {
changeMsg = JSONObject.parseObject(msg.value(), GoodsChangeMsg.class);
syncGoodsProcessor.handle(changeMsg);
}catch (Exception exception) {
log.error("解析失败{}", msg, exception);
}
}
}
List<ConsumerRecord<String, String>> records可以是String[] message
,如果是逐条消费,这里配置list,kafka会根据字符串中的逗号进行分割,所以碰见该现象不要慌,看一下批量消费的配置。
更多推荐
已为社区贡献1条内容
所有评论(0)