kafka生产者发送消息成功回调
kafka生产者发送消息成功回调添加成功回调类生产者代码添加成功回调类添加成功回调类@Componentpublic class KafkaSendResultHandler implements ProducerListener {private static final Logger log = LoggerFactory.getLogger(KafkaSendResultHandler.cl
·
kafka生产者发送消息成功回调
添加成功回调类
@Component
public class KafkaSendResultHandler implements ProducerListener {
private static final Logger log = LoggerFactory.getLogger(KafkaSendResultHandler.class);
/**
* kafka发送成功回调
* @param producerRecord
* @param recordMetadata
*/
@Override
public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
String key = producerRecord.key().toString();
String topic = producerRecord.topic();
log.info("key:{},topic:{}, 发送成功回调",key,topic);
}
@Override
public void onError(ProducerRecord producerRecord, Exception exception) {
String key = producerRecord.key().toString();
String topic = producerRecord.topic();
log.info("key:{},topic:{}, 发送异常回调",key,topic);
}
}
生产者代码添加成功回调类
@Component
public class SendKafka{
//添加上面的KafkaSendResultHandler类
@Autowired
private KafkaSendResultHandler producerListener;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendKafka(){
try {
//发送消息前配置回调
kafkaTemplate.setProducerListener(producerListener);
//发送消息 testtopic为topic主题,testkey为key键,测试值为传的内容
kafkaTemplate.send("testtopic","testkey","测试值").get(); //发送消息改为同步添加.get()
} catch (Exception e) {
e.printStackTrace();
log.error("{}",e);
}
}
}
其他步骤
更多推荐
已为社区贡献12条内容
所有评论(0)