import java.util.Properties;
import java.util.concurrent.Future;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.Callback;


public class KafkaUtils {
	public static Producer<String, String> producer;
	public static Properties prop;
	//有参数是读取集群配置文件
    public static void connectkafka(String defaultFS,String path) throws Exception{
		// set producer properties
    	System.out.println("start");
		prop = PropertyFileReader.readPropertyFile(defaultFS,path);
		Properties properties = new Properties();
		properties.put("bootstrap.servers", prop.getProperty("kafka.bootstrap.servers"));
		properties.put("acks", prop.getProperty("kafka.acks"));
		properties.put("retries",prop.getProperty("kafka.retries"));
		properties.put("batch.size", prop.getProperty("kafka.batch.size"));
		properties.put("linger.ms", prop.getProperty("kafka.linger.ms"));
		properties.put("max.request.size", prop.getProperty("kafka.max.request.size"));
		properties.put("compression.type", prop.getProperty("kafka.compression.type"));
		properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		properties.put("buffer.memory", "133554432");
		producer = new KafkaProducer<String, String>(properties);
    }
    //无参数是读取本地配置文件
	public static void connectkafkaLocal() throws Exception{
		// set producer properties
		System.out.println("start");
		prop = PropertyFileReader.readPropertyFileLocal();
		Properties properties = new Properties();
		properties.put("bootstrap.servers", prop.getProperty("kafka.bootstrap.servers"));
		properties.put("acks", prop.getProperty("kafka.acks"));
		properties.put("retries",prop.getProperty("kafka.retries"));
		properties.put("batch.size", prop.getProperty("kafka.batch.size"));
		properties.put("linger.ms", prop.getProperty("kafka.linger.ms"));
		properties.put("max.request.size", prop.getProperty("kafka.max.request.size"));
		properties.put("compression.type", prop.getProperty("kafka.compression.type"));
		properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		properties.put("buffer.memory", "133554432");
		producer = new KafkaProducer<String, String>(properties);
	}

	public static void connectkafkaLocal(Properties prop) throws Exception{
		// set producer properties
		System.out.println("start");
		//prop = PropertyFileReader.readPropertyFileLocal();
		Properties properties = new Properties();
		properties.put("bootstrap.servers", prop.getProperty("kafka.bootstrap.servers"));
		properties.put("acks", prop.getProperty("kafka.acks"));
		properties.put("retries",prop.getProperty("kafka.retries"));
		properties.put("batch.size", prop.getProperty("kafka.batch.size"));
		properties.put("linger.ms", prop.getProperty("kafka.linger.ms"));
		properties.put("max.request.size", prop.getProperty("kafka.max.request.size"));
		properties.put("compression.type", prop.getProperty("kafka.compression.type"));
		properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		properties.put("buffer.memory", "133554432");
		producer = new KafkaProducer<String, String>(properties);
	}
    
    public static void sendMsgKafka(String topicName,String str_json){

		producer.send(new ProducerRecord<String, String>(topicName,str_json),new Callback () {
			@Override
			public void onCompletion ( RecordMetadata metadata,Exception exception) {

				 if(exception != null) {
					 exception.printStackTrace();
					 System.out.println("The offset of the record we just sent is: " + metadata.offset());
				 }else{
					 System.out.println("producer done" + metadata.offset());
				 }

			 }


		});

		System.out.println("end");
    }
	public static void sendMsgKafka2(String topicName,String str_json){

		ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "", str_json);//Topic Key Value
		try{
			Future future = producer.send(record);
			future.get();//不关心是否发送成功,则不需要这行。
		} catch(Exception e) {
			e.printStackTrace();//连接错误、No Leader错误都可以通过重试解决;消息太大这类错误kafkaProducer不会进行任何重试,直接抛出异常
		}

		System.out.println("end");
	}

}

 

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐