一、kafka基本概念

     kafka将一个topic分为多个Partition,Partition在物理上由多个segment数据文件组成,每个segment数据文件都大小相等,按照顺序读写。每个Partition上的数据都均衡的分布在不同的broker上,partition的个数不能超过broker节点的个数。

      一个Partition上的消息是时间有序的,多个Partition之间的顺序无法保证

      kafka中很重要的特性,只需要一次消息,可以支持任意多的应用读取这个消息,consumer通过pull方式消费消息,kafka不删除已消费的消息,kafka中的数据的删除和其是否消息没有关系,只跟kafka broker上的两个配置有关系

  •  log.retention.hours=48 ===>数据最多保存48小时
  •  log.retention.byte=1073741824 ===>数据量最大1g

二、编写生产者客户端

2.1  引入pom


  <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-clients</artifactId>
       <version>2.4.1</version>
   </dependency>

2.2  编写生产者客户端代码 


    public static void main(String[] args) {
        Properties prop = new Properties();

        prop.put("bootstrap.servers","192.168.221.131:9092");
        prop.put("key.serializer", 
        "org.apache.kafka.common.serialization.StringSerializer");
        prop.put("value.serializer", 
        "org.apache.kafka.common.serialization.StringSerializer");

        prop.put("acks","all");
        prop.put("retries",0);
        prop.put("batch.size",16384);
        prop.put("linger.ms",1);
        prop.put("buffer.memory",33554432);
        String topic ="hello";
        KafkaProducer<String, String> producer = new KafkaProducer<>(prop);
        producer.send(new ProducerRecord<String,String>(topic,Integer.toString(2),"hello 
        kafka3"));
        producer.close();

    }

2.3 ack 消息确认机制:有三个值:0、1、all

  • 如果acks=0:表示需要Leader节点回复收到消息,这样生产者才会发送下一条数据
  • 如果acks=1:只要Partition Leader接收到消息而且写入本地磁盘了,就认为成功了,不管他其他的Follower有没有同步过去这条消息了。
  • 如果acks=all:表示需要所有Leader+副本节点回复收到消息(acks=-1),这样生产者才会发送下一条数 据

 2.4  retries

如果当前请求失败,则生产者可以自动重新连接,但是要是设置retries=0参数,则意味着请示失败不会重新连接,这样可以避免重复发送的可能

2.5   key.serializer 、value.serializer

数据在网络中传输需要进行序列化

2.6   send()

方法中有三个参数,第一个是指定发送的主题,第二个是设置消息的key,第三个是消息value

三、编写消费者客户端

  Properties prop = new Properties();

        prop.put("bootstrap.servers","192.168.221.131:9092");
        prop.put("key.deserializer", 
        "org.apache.kafka.common.serialization.StringDeserializer");
        
        prop.put("value.deserializer",
       "org.apache.kafka.common.serialization.StringDeserializer");
        prop.put("group.id","con-1");
        prop.put("auto.offset.reset","latest");
        //自动提交偏移量
        prop.put("auto.commit.intervals.ms","true");
        //自动提交时间
        prop.put("auto.commit.interval.ms","1000")
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
        ArrayList<String> topics = new ArrayList<>();
        //可以订阅多个消息
        topics.add("hello");
        consumer.subscribe(topics);
        while(true){
            ConsumerRecords<String,String> poll = consumer.poll(Duration.ofSeconds(20));
            for(ConsumerRecord<String,String> consumerRecord :poll){
                System.out.println(consumerRecord);
            }
        }
    }

3.1  prop.put("group.id","con-1");

指定消费者组id,在同一时刻消费组只有一个线程可以去消费一个分区的数据,不同的消费组可以消费同一个分区的消息

 

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐