kafka简介

kafka是一个分布式消息发布和订阅系统,与zookeeper进行协调,多分区、多副本;

特性是高吞吐、可持久化、可水平扩展、支持流数据处理,具备三大功能:

  • 消息系统:业务系统经常用到,常见解耦,还有削峰、异步通信、缓冲等功能。
  • 存储系统
  • 流式处理平台
基本概念与原理
  • Producer:生产者,发送消息的一方。

  • Consumer:消费者,接收消息的一方。

  • Broker:代理,kafka节点,一个节点就是一个kafka server进程。

  • Topic:主题,消息以主题来进行归类。

  • Partition:分区。主题的所有消息分布在不同的区中,每个分区的消息一定是不同的,分区可以分布在不同的broker中。

  • Replica:副本机制,每个分区引入多副本,leader副本和follower副本,leader副本处理读写,follower副本负责同步leader副本的数据,出现故障时,follower副本中重新选举出新的leader副本,进行故障转移。

  • Consumer Group:每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name,则属于默认的group)

    理解模型

在这里插入图片描述

分布式模型

在这里插入图片描述

Linux-kafka安装
前提:

​ kafka安装需要有jdk环境

官网下载:

​ https://kafka.apache.org/downloads

解压
tar -zxvf kafka_2.13-3.2.0.tgz
配置server.properties
cd /usr/local/kafka/kafka_2.13-3.2.0/config
vi server.properties
//放开listeners  主机IP
listeners=PLAINTEXT://127.0.0.1:9092 
//因为kafka是基于zookeeper的(新版本自带zk),要加上zk的地址
zookeeper.connect=本服务器的ip地址:2181
启动zk
./zookeeper-server-start.sh -daemon ../config/zookeeper.properties
启动kafka
./kafka-server-start.sh -daemon ../config/server.properties
Java-kafka发布订阅demo
添加maven依赖
        <!--kafka-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.2.0</version>
        </dependency>
生产者
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Future;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

public class Producer {
    public static String topic = "test";//定义主题

    public static void main(String[] args) throws InterruptedException {
        Properties p = new Properties();
        //kafka地址,多个地址用逗号分割
        p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.181:9092");
        //序列化
        p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(p);

        try {
            while (true) {
                String msg = "Hello," + new Random().nextInt(100);
                ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, msg);
                //kafkaProducer,send(record)可以通过返回的Future来判断是否已经发送到kafka,增强消息的可靠性
                //同时也可以使用send的第二个参数来回调,通过回调判断是否发送成功。
                Future<RecordMetadata> send = kafkaProducer.send(record);
                System.out.println("消息发送成功:" + msg);
                Thread.sleep(500);
            }
        } finally {
            kafkaProducer.close();
        }

    }
}

在这里插入图片描述

消费者
import java.util.Collections;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

public class Consumer {
    public static void main(String[] args) {
        Properties p = new Properties();
        p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.181:9092");
        //反序列化
        p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        //消费组,kafka根据分组名称判断是不是同一组消费者,同一组消费者去消费一个主题的数据时,数据将在这一组消费者上面轮询
        p.put(ConsumerConfig.GROUP_ID_CONFIG, "test");

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(p);
        // 订阅消息-可订阅多个主题
        kafkaConsumer.subscribe(Collections.singletonList(Producer.topic));

        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(String.format("topic:%s,offset:%d,消息:%s",
                        record.topic(), record.offset(), record.value()));
            }
        }
    }
}

在这里插入图片描述

出现问题

1.启动kafka报错:ERROR Error while creating ephemeral at /brokers/ids/1, node already exists

解决:原因关闭kafka异常关闭,启动时应先启动zookeeper,再启动kafka

2.生产者发消息报错: Can’t resolve address: VM_0_15_centos:9092

解决:

1.在 server.properties,添加配置:host.name = 192.168.111.130  # 本机ip地址
2.在kafka安装服务器,hosts中添加:192.168.0.191 VM_0_15_centos
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐