java生产消费kafka消息
通过java生产消费kafka消息
·
maven依赖
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.3.0</version> </dependency>
生产者代码
package com.kafka.demo; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class KafkaProducerTest implements Runnable { private final KafkaProducer<String, String> producer; private final String topic; public KafkaProducerTest(String topicName) { Properties props = new Properties(); props.put("bootstrap.servers", "xxx.xxx.xx.xx:9092"); props.put("key.serializer", StringSerializer.class); props.put("value.serializer", ByteArraySerializer.class); this.producer = new KafkaProducer(props); this.topic = topicName; } @Override public void run() { int messageNo = 1; try { for (; ; ) { String messageStr = "hello,world"; System.out.println("-----------------------"+messageStr); producer.send(new ProducerRecord(topic, messageStr.getBytes())); //生产了100条就打印 if (messageNo % 2 == 0) { System.out.println("--------------------------------------------:" + messageNo + "条"); System.out.println("发送的信息:" + messageStr); break; } //生产1000条就退出 if (messageNo / 1000000 == 1) { System.out.println("成功发送了" + messageNo + "条"); break; } int num = (int)(Math.random()*10) + 1; System.out.println(num); Thread.sleep(num*1000); messageNo++; } } catch (Exception e) { e.printStackTrace(); } finally { producer.close(); } } public static void main(String args[]) { KafkaProducerTest test = new KafkaProducerTest("kafka-send"); Thread thread = new Thread(test); thread.start(); } }
消费者代码
package com.kafka.demo; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Arrays; import java.util.Properties; public class KafkaConsumerTest implements Runnable { private final KafkaConsumer<String, String> consumer; private ConsumerRecords<String, String> msgList; private final String topic; private static final String GROUPID = "c_test";//消费者组,随便填 public KafkaConsumerTest(String topicName) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("security.protocol", "PLAINTEXT"); props.put("group.id", GROUPID); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("auto.offset.reset", "earliest");//从何处开始消费,latest 表示消费最新消息,earliest 表示从头开始消费,none表示抛出异常,默认latest props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); this.consumer = new KafkaConsumer<String, String>(props); this.topic = topicName; this.consumer.subscribe(Arrays.asList(topic)); } @Override public void run() { int messageNo = 1; System.out.println("---------开始消费---------"); try { for (; ; ) { msgList = consumer.poll(1000); if (null != msgList && msgList.count() > 0) { for (ConsumerRecord<String, String> record : msgList) { String value = record.value(); System.out.println(messageNo+"-----------------------------------------------------"+value); messageNo++; consumer.commitAsync(); } } else { Thread.sleep(100); } } } catch (InterruptedException e) { e.printStackTrace(); } finally { consumer.close(); } } public static void main(String args[]) { KafkaConsumerTest test1 = new KafkaConsumerTest("kafka-send"); Thread thread1 = new Thread(test1); thread1.start(); } }
更多推荐
已为社区贡献4条内容
所有评论(0)