想连接的话需要满足下面的条件
1、关闭虚拟机的防火墙,或者开放kafka和zookeeper的指定端口
centos7关闭防火墙
2、给虚拟机设定静态的IP,当然如果不嫌麻烦就可以不设
CentOS7用NAT模式设置静态IP
3、把kafka文件夹下的配置文件config/server.properties里面的listenters写上虚拟机的IP,不要默认的localhost。zookeeper.connect也要按IP:端口的样式写

下面所有的操作先说明环境

在虚拟机里的kafka创建了一个topicmy-replicated-topic的单分区三个备份节点的broker

客户端依赖

<dependency>
	    <groupId>org.apache.kafka</groupId>
	    <artifactId>kafka-clients</artifactId>
	    <version>2.3.0</version>
	</dependency>

jdk是1.8
虚拟机里的kafka是2.12-2.30
这种版本搭配不冲突

1、工具类

主要用于和虚拟机中的卡kafka的配置

public class ProUntil {
			public static Properties getProperties() {				
				Properties props = new Properties();
				 props.put("bootstrap.servers", "192.168.137.130:9092"); 
				 props.put("acks", "all");       //判定是否成功发送,“all”会阻塞,性能低但可靠
				 props.put("retries", 0);        //失败重试次数
				 props.put("batch.size", 16384);//缓存区的大小,每个"活跃"的分区有一个缓存区
				 //消息延迟时间,单位毫秒,小于这个时间的消息组成批一次请求就发送过去,
				 //在高负载情况下时间间隔太近的也会组成批发送
				 //不再高负载下可以时间大点,以时间换有效的请求
				 props.put("linger.ms", 1);
				 props.put("buffer.memory", 33554432);//缓存的总量,大于这个数就会阻塞
				 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");//key和value转成字节
				 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
				 return props;
			}
}

2、生产者

(1)非阻塞

有返回值但是不能调用get(),否则成阻塞了

  Producer<String, String> producer = new KafkaProducer<>(ProUntil.getProperties());
    for(int i = 0; i < 100; i++) {
    	//异步,添加消息到缓存区,等待到一定程度把这些消息一起发送到集群
    	producer.send(new ProducerRecord<String, String>("my-replicated-topic",  Integer.toString(i)));			  
    } 
   producer.close();  //关闭生产者,不关闭为造成还没发送过去的消息发生泄漏

成功,不是一个一个来的,是一起来的,等了一小会瞬间到99

public Future<RecordMetadata> send(ProducerRecord<K,V> record,Callback callback)

异步发送一条消息到topic,当发送已确认调用callback,这没写,就默认为空。
send() 里的 ProducerRecord() 有很多构造函数,需要自己去看
看源码的属性就大概理解了

public class ProducerRecord<K, V> {

    private final String topic; //主题
    private final Integer partition; //分区
    private final Headers headers;
    private final K key; //partition不存在就hash(key)选分区,key和partition都没有随机
    private final V value;  //值
    private final Long timestamp;//没有就按生产者的时间
    //在broker端,如果配置了时间戳采用createtime方式,则使用producer传给Broker的record中的timestramp时间,如果指定为logappendtime,则在broker写入到Log文件时会重写该时间。
    }
2、有返回值的阻塞

返回值接收

Future<RecordMetadata> frm = producer.send(new ProducerRecord<String, String>());
RecordMetadata rm = frm.get();
//接收了返回值他还是非阻塞的,Future调用了get()方法是就成阻塞的了,
//直到相关请求完成返回RecordMetadata 或者抛出异常
//可以自己试试,当把第二行注释起来很快,放开就会速度慢下来

翻译方式是百度翻译请见谅

返回类型方法解释
longchecksum()记录的校验和(The checksum (CRC32) of the record.)
longoffset()主题/分区中记录的偏移量(The offset of the record in the topic/partition.)
intpartition()记录发送到哪个分区(The partition the record was sent to)
intserializedKeySize()序列化后key的有多少字节(The size of the serialized, uncompressed key in bytes.)
intserializedValueSize()序列化后value的有多少字节(The size of the serialized, uncompressed value in bytes.)
longtimestamp()主题/分区中记录的时间戳(The timestamp of the record in the topic/partition.)
Stringtopic()主题名(The topic the record was appended to)

最后的topic解释一下:
1、普通的topic大家都会
2、如果topic使用的是CreateTime,则使用用户提供的时间戳或发送的时间(如果用户没有指定指定消息的时间戳)
3、如果topic使用的是LogAppendTime,则追加消息时,时间戳是broker的本地时间。

(3)接收回调函数的非阻塞

上面介绍了阻塞和非阻塞,但是有个很大的问题?
如果你想提高性能也就是非阻塞,但是你想知道传过去了吗,也就是成功了吗
如果你想通过判断 Future<RecordMetadata> 是否为 null ,那你肯定不对,因为不管成不成功,Future<RecordMetadata> 都不为null
那就看下面的代码吧

public static void main(String[] args) {		
		Producer<String, String> producer = new KafkaProducer<>(ProUntil.getProperties());
		for(int i = 0; i < 100; i++) {
		 ProducerRecord<String, String> record = new ProducerRecord<String, String>("my-replicated-topic", Integer.toString(i));
		  producer.send(record, (recordMetadata, e) -> {   //这样其实就是new了一个新的构造函数,可以代替Callback 
              if (e != null) {						//如果成功不为null
                  System.err.println("my-replicated-topic" + "--消息发送失败");
              }else {
                  System.err.println("my-replicated-topic" + "--消息发送成功");
              }
          });
		  producer.flush();//刷新缓存空间,把信息发出去
		}
		 producer.close();
		 
	}

保证发送到同一个分区的回调函数按一定的顺序执行,不同分区的不一定

推荐:如果你需要执行阻塞或计算昂贵(消耗)的回调,建议在callback主体中使用自己的Executor来并行处理

3、消费者

首先版本:我这个2.3.0版本的kafka的客户端只有这个poll(Duration)方法了,而poll(long)已经被删除了
最后:说一下 group.id 这个是对消费者而言的,跟生产者没关系,
通俗点解释:对于某个主题里的所有消费者,根据group.id分为很多组,组里可以有多个消费者,也可以为1个,当然不指定group.id的消费者可以理解为每一个都为单独的一个组,即使他们没有group.id
这里是单个分区,而且只是一个Leader和两个备份,可以看成一个消费者,因为最后读写的都是Leader,所以备份节点和Leader可以看成一个消费者,就算多个分区也没事,kafka会默认给消费者平均分配分区

(1)工具类
public class ConUntil {
	public static Properties getProperties() {
		Properties props = new Properties();
		props.put("bootstrap.servers", "192.168.137.130:9092");
		props.put("group.id", "test");   //消费者组 ,不能不写,这个不一定和9092的消费者的group.id对应,随便写的
		props.put("enable.auto.commit", "true"); //自动提交偏移量为true,不自动提交则为false
		props.put("auto.commit.interval.ms", "1000");//自动提交的频率,一秒一次
		props.put("session.timeout.ms", "30000");//停止心跳时间超过30s,就认为有故障
		props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		return props;	
		}
}
(2)自动提交偏移量
 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(ConUntil.getProperties());
		     consumer.subscribe(Arrays.asList("my-replicated-topic"));//订阅主题,可以是多个,用逗号隔开
		     while (true) {
		         ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
		         //poll(Duration),获取元数据发送过去到回来的时间,如果长于这个时间就认为服务器故障,
		         for (ConsumerRecord<String, String> record : records)
		             System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
		     }

在这里插入图片描述
offset是偏移量,value为消息

(2)手动提交偏移量

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(ConUntil.getProperties());
consumer.subscribe(Arrays.asList("my-replicated-topic")); //主题
	try {
		while(true) {
			ConsumerRecords<String, String> records = consumer.poll(Duration.ofHours(5));//这个可以足够大,因为是手动提交
			for (TopicPartition partition : records.partitions()) { //迭代分区
				  List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
				  for (ConsumerRecord<String, String> record : partitionRecords) {
					   System.out.println(record.offset() + ": " + record.value());//输出偏移量和
				}
				long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); //当前偏移量
				consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); //指向下一个的偏移量
			 }
		}
	 } finally {
	consumer.close();
  }
			

注意:已提交的offset应始终是你的程序将读取的下一条消息的offset。因此,调用commitSync(offsets)时,你应该加1个到最后处理的消息的offset。

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐