应用场景:用Java实现在kafka 的topic1中写数据,有其他程序对topic1中的数据进行消费,并且会把结果写进topic2中,我们需要做的就是往topic1中写数据,并且监测topic2,如果有数据写进topic2就获取此数据


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.FailureCallback;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SuccessCallback;

@Component
public class CrwalProduce {
	private Logger logger = LoggerFactory.getLogger(CrwalProduce.class);

	@Autowired
	private KafkaTemplate<String, String> kafkaTemplate;

	public void send(String topic, String json) {
		ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, json);
		future.addCallback(new SuccessCallback() {
			@Override
			public void onSuccess(Object result) {
				logger.info("生产者已发送数据:" + topic);
			}
		}, new FailureCallback() {
			@Override
			public void onFailure(Throwable ex) {
				logger.error(ex.toString());
			}
		});
	}
}

调用此send方法往topic1中写数据

crwalProduce.send(kafkaTopicPrefix.TASK_RESULT_ + task.getEngineId(), task.getParam());

下面就是监听topic2的类

import org.springframework.stereotype.Component;

import java.util.Optional;

/**
 * @Author gaomy
 * @Date 2021/11/5 15:45
 * @Description
 * @Version 1.0
 */

@Component
public class KafkaListener {

    @org.springframework.kafka.annotation.KafkaListener(topics = {"task_result_3189f717470a4b2c85dc334e3a5b3a16"})
    public void listen(String record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record);
        if (kafkaMessage.isPresent()) {
            System.out.println("***********************************************");
            System.out.println("消息接收成功");
            Object message = kafkaMessage.get();
            System.out.println("record =" + record);
            System.out.println("message =" + message);
        }
    }

}

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐