kafkaUtils工具类封装
import java.util.Properties;import java.util.concurrent.Future;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.
·
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.Callback;
public class KafkaUtils {
public static Producer<String, String> producer;
public static Properties prop;
//有参数是读取集群配置文件
public static void connectkafka(String defaultFS,String path) throws Exception{
// set producer properties
System.out.println("start");
prop = PropertyFileReader.readPropertyFile(defaultFS,path);
Properties properties = new Properties();
properties.put("bootstrap.servers", prop.getProperty("kafka.bootstrap.servers"));
properties.put("acks", prop.getProperty("kafka.acks"));
properties.put("retries",prop.getProperty("kafka.retries"));
properties.put("batch.size", prop.getProperty("kafka.batch.size"));
properties.put("linger.ms", prop.getProperty("kafka.linger.ms"));
properties.put("max.request.size", prop.getProperty("kafka.max.request.size"));
properties.put("compression.type", prop.getProperty("kafka.compression.type"));
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("buffer.memory", "133554432");
producer = new KafkaProducer<String, String>(properties);
}
//无参数是读取本地配置文件
public static void connectkafkaLocal() throws Exception{
// set producer properties
System.out.println("start");
prop = PropertyFileReader.readPropertyFileLocal();
Properties properties = new Properties();
properties.put("bootstrap.servers", prop.getProperty("kafka.bootstrap.servers"));
properties.put("acks", prop.getProperty("kafka.acks"));
properties.put("retries",prop.getProperty("kafka.retries"));
properties.put("batch.size", prop.getProperty("kafka.batch.size"));
properties.put("linger.ms", prop.getProperty("kafka.linger.ms"));
properties.put("max.request.size", prop.getProperty("kafka.max.request.size"));
properties.put("compression.type", prop.getProperty("kafka.compression.type"));
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("buffer.memory", "133554432");
producer = new KafkaProducer<String, String>(properties);
}
public static void connectkafkaLocal(Properties prop) throws Exception{
// set producer properties
System.out.println("start");
//prop = PropertyFileReader.readPropertyFileLocal();
Properties properties = new Properties();
properties.put("bootstrap.servers", prop.getProperty("kafka.bootstrap.servers"));
properties.put("acks", prop.getProperty("kafka.acks"));
properties.put("retries",prop.getProperty("kafka.retries"));
properties.put("batch.size", prop.getProperty("kafka.batch.size"));
properties.put("linger.ms", prop.getProperty("kafka.linger.ms"));
properties.put("max.request.size", prop.getProperty("kafka.max.request.size"));
properties.put("compression.type", prop.getProperty("kafka.compression.type"));
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("buffer.memory", "133554432");
producer = new KafkaProducer<String, String>(properties);
}
public static void sendMsgKafka(String topicName,String str_json){
producer.send(new ProducerRecord<String, String>(topicName,str_json),new Callback () {
@Override
public void onCompletion ( RecordMetadata metadata,Exception exception) {
if(exception != null) {
exception.printStackTrace();
System.out.println("The offset of the record we just sent is: " + metadata.offset());
}else{
System.out.println("producer done" + metadata.offset());
}
}
});
System.out.println("end");
}
public static void sendMsgKafka2(String topicName,String str_json){
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "", str_json);//Topic Key Value
try{
Future future = producer.send(record);
future.get();//不关心是否发送成功,则不需要这行。
} catch(Exception e) {
e.printStackTrace();//连接错误、No Leader错误都可以通过重试解决;消息太大这类错误kafkaProducer不会进行任何重试,直接抛出异常
}
System.out.println("end");
}
}
更多推荐
已为社区贡献3条内容
所有评论(0)