java 向kafka推送消息
import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.common.se
·
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import com.alibaba.fastjson.JSONObject;
import java.util.Properties;
public class Producer {
public static void main(String[] args) {
KafkaProducer<Integer, String> producer;
//设置topic消息主题
String topic = "test";
Properties properties = new Properties();
//设置推送的kafka地址
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producer = new KafkaProducer<Integer, String>(properties);
//设置推送的json
JSONObject requestMap=new JSONObject();
requestMap.put("reqseno","123456789");
//推送消息至kafka
producer.send(new ProducerRecord<>(topic, 3, requestMap.toString()));
producer.close();
}
}
如果您感到对您有所帮助,花费您一点时间,帮忙点个赞。
更多推荐
所有评论(0)