Java向kafka发送数据(demo)
测试环境:Win10安装kafka步骤package com.klay.utils;import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;public class SendDa
·
测试环境:Win10安装kafka步骤
package com.klay.utils;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class SendDataToKafka {
public static void main(String[] args) {
SendDataToKafka sendDataToKafka = new SendDataToKafka();
sendDataToKafka.send("test_topic", "", "this is a test data too");
}
public void send(String topic, String key, String data) {
Properties props = new Properties();
props.put("bootstrap.servers", "127.0.0.1:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
for (int i = 1; i < 2; i++) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
producer.send(new ProducerRecord<String, String>(topic, "" + i, data));
}
producer.close();
}
}
更多推荐
已为社区贡献2条内容
所有评论(0)