Java实现Kafka的生产消费数据
应用场景:用Java实现在kafka 的topic1中写数据,有其他程序对topic1中的数据进行消费,并且会把结果写进topic2中,我们需要做的就是往topic1中写数据,并且监测topic2,如果有数据写进topic2就获取此数据import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.
·
应用场景:用Java实现在kafka 的topic1中写数据,有其他程序对topic1中的数据进行消费,并且会把结果写进topic2中,我们需要做的就是往topic1中写数据,并且监测topic2,如果有数据写进topic2就获取此数据
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.FailureCallback;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SuccessCallback;
@Component
public class CrwalProduce {
private Logger logger = LoggerFactory.getLogger(CrwalProduce.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void send(String topic, String json) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, json);
future.addCallback(new SuccessCallback() {
@Override
public void onSuccess(Object result) {
logger.info("生产者已发送数据:" + topic);
}
}, new FailureCallback() {
@Override
public void onFailure(Throwable ex) {
logger.error(ex.toString());
}
});
}
}
调用此send方法往topic1中写数据
crwalProduce.send(kafkaTopicPrefix.TASK_RESULT_ + task.getEngineId(), task.getParam());
下面就是监听topic2的类
import org.springframework.stereotype.Component;
import java.util.Optional;
/**
* @Author gaomy
* @Date 2021/11/5 15:45
* @Description
* @Version 1.0
*/
@Component
public class KafkaListener {
@org.springframework.kafka.annotation.KafkaListener(topics = {"task_result_3189f717470a4b2c85dc334e3a5b3a16"})
public void listen(String record) {
Optional<?> kafkaMessage = Optional.ofNullable(record);
if (kafkaMessage.isPresent()) {
System.out.println("***********************************************");
System.out.println("消息接收成功");
Object message = kafkaMessage.get();
System.out.println("record =" + record);
System.out.println("message =" + message);
}
}
}
更多推荐
已为社区贡献5条内容
所有评论(0)