1、JAVA API操作kafka

 

 修改Windows的Host文件:

目录:C:\Windows\System32\drivers\etc (win10)

内容:

192.168.40.150 kafka1
192.168.40.150 kafka2
192.168.40.150 kafka3

创建maven工程导入对应maven坐标

    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.0.0</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

1.1 生产者代码

package com.panghl.demo;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.TimeUnit;

/**
 * @Author panghl
 * @Date 2021/9/16 22:07
 * @Description 消息生产者
 **/
public class ProducerDemo {
    private static final String TOPIC = "LGJY";

    public static void main(String[] args) throws InterruptedException {
        // 要构造一个消息生产者对象,关于kafka集群等相关配置,可以从Properties文件中加载也可以从一个Properties对象中加载
        // KafkaProducer按照固定的key取出对应的value
        Properties properties = new Properties();
        // 指定集群节点
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.40.150:9092,192.168.40.150:9093,192.168.40.150:9094");
        // 发送消息,网络传输,需要对key和value指定对应的序列化类
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);


//producer无需等待来自broker的确认而继续发送下一批消息。
//这种情况下数据传输效率最高,但是数据可靠性确是最低的。
//        properties.put(ProducerConfig.ACKS_CONFIG,"0");
//producer只要收到一个分区副本成功写入的通知就认为推送消息成功了。
//这里有一个地方需要注意,这个副本必须是leader副本。
//只有leader副本成功写入了,producer才会认为消息发送成功。
        properties.put(ProducerConfig.ACKS_CONFIG, "1");
//ack=-1,简单来说就是,producer只有收到分区内所有副本的成功写入的通知才认为推送消息成功了。
//        properties.put(ProducerConfig.ACKS_CONFIG,"-1");

        // 创建消息生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

        // 发送一百条消息
        for (int i = 0; i < 100; i++) {
            // 设置消息的内容
            String msg = "hello," + i;
            // 构建一个消息对象: 主题(如果不存在,kafka会帮我们创建一个一个分区一个副本的主题),消息
            ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC, msg);
            //发送
            producer.send(record);
            System.out.println("消息发送成功,msg=>" + msg);
            TimeUnit.SECONDS.sleep(1);
        }
        // 关闭消息生产者对象
        producer.close();

    }
}

1.2 消费者代码

package com.panghl.demo;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Collections;
import java.util.Properties;

/**
 * @Author panghl
 * @Date 2021/9/16 22:24
 * @Description 消费者
 **/
public class ConsumerDemo {
    private static final String TOPIC = "LGJY";


    public static void main(String[] args) {
        // 属性对象
        Properties properties = new Properties();
        // 指定集群节点
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.40.150:9092,192.168.40.150:9093,192.168.40.150:9094");
        //反序列化类
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        // 取消自动提交 防止消息丢失
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");

        //指定分组的名称
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "LGJY-GROUP1");


        //消息消费者对象
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);

        //订阅消息
        kafkaConsumer.subscribe(Collections.singletonList(TOPIC));

        while (true) {
            // 获取消息的方法是一个阻断式方法
            ConsumerRecords<String, String> records = kafkaConsumer.poll(500);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("主题:--》" + record.topic() + ",偏移量:--》" + record.offset() + ", msg:-->" + record.value());
                // 手动提交
                kafkaConsumer.commitSync();
            }
        }

    }
}

2. Apache kafka原理

2.1 分区副本机制

kafka有三层结构:kafka有多个主题,每个主题有多个分区,每个分区又有多条消息。        

        分区机制:主要解决了单台服务器存储容量有限和单台服务器并发数限制的问题, 一个分片的不同副本不能放到同一个broker上。

        当主题数据量非常大的时候,一个服务器存放不了,就将数据分成两个或者多个部分,存放在多台服务器上。每个服务器上的数据,叫做 一个分片

 

副本:副本备份机制解决了数据存储的高可用问题

当数据只保存一份的时候,有丢失的风险。为了更好的容错和容灾,将数据拷贝几份,保存到不同的机器上。

 多个follower副本通常存放在和leader副本不同的broker中。通过这样的机制实现了高可用,当某台机器挂掉后,其他follower副本也能迅 速”转正“,开始对外提供服务。

kafka的副本都有哪些作用?

        在kafka中,实现副本的目的就是冗余备份,且仅仅是冗余备份,所有的读写请求都是由leader副本进行处理的follower副本仅有一 个功能,那就是从leader副本拉取消息,尽量让自己跟leader副本的内容一致。

说说follower副本为什么不对外提供服务?

        这个问题本质上是对性能和一致性的取舍。试想一下,如果follower副本也对外提供服务那会怎么样呢?首先,性能是肯定会有所提升 的。但同时,会出现一系列问题。类似数据库事务中的幻读,脏读

        比如你现在写入一条数据到kafka主题a,消费者b从主题a消费数据,却发现消费不到,因为消费者b去读取的那个分区副本中,最新消 息还没写入。而这个时候,另一个消费者c却可以消费到最新那条数据,因为它消费了leader副本。

        为了提高那么些性能而导致出现数据不一致问题,那显然是不值得的。

1.2 kafka保证数据不丢失机制

从Kafka的大体角度上可以分为数据生产者,Kafka集群,还有就是消费者,而要保证数据的不丢失也要从这三个角度去考虑。

1.2.1. 消息生产者

消息生产者保证数据不丢失:消息确认机制(ACK机制),参考值有三个:0,1,-1

//producer无需等待来自broker的确认而继续发送下一批消息。
//这种情况下数据传输效率最高,但是数据可靠性确是最低的。
        properties.put(ProducerConfig.ACKS_CONFIG,"0");
//producer只要收到一个分区副本成功写入的通知就认为推送消息成功了。
//这里有一个地方需要注意,这个副本必须是leader副本。
//只有leader副本成功写入了,producer才会认为消息发送成功。
        properties.put(ProducerConfig.ACKS_CONFIG, "1");
//ack=-1,简单来说就是,producer只有收到分区内所有副本的成功写入的通知才认为推送消息成功了。
        properties.put(ProducerConfig.ACKS_CONFIG,"-1");

1.2.2 消息消费者

kafka消费消息的模型:

 即消费消息,设置好offset,类比一下:

 什么时候消费者丢失数据呢?

由于Kafka consumer默认是自动提交位移的(先更新位移,再消费消息),如果消费程序出现故障,没消费完毕,则丢失了消息,此 时,broker并不知道。

解决方案

enable.auto.commit=false 关闭自动提交位移

在消息被完整处理之后再手动提交位移

// 取消自动提交 防止消息丢失
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
// 手动提交
kafkaConsumer.commitSync();

1.3 消息存储及查询机制

kafka 使用日志文件的方式来保存生产者消息,每条消息都有一个 offset 值来表示它在分区中的偏移量。

Kafka 中存储的一般都是海量的消息数据,为了避免日志文件过大,一个分片 并不是直接对应在一个磁盘上的日志文件,而是对应磁盘上 的一个目录,这个目录的命名规则是<topic_name>_<partition_id>。

1.3.1 消息存储机制

 

 

1.4 生产者消息分发策略

        kafka在数据生产的时候,有一个数据分发策略。默认的情况使用DefaultPartitioner.class类。 这个

类中就定义数据分发的策略。

public interface Partitioner extends Configurable, Closeable {
/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes The serialized key to partition on( or null if no key)
* @param value The value to partition on or null
* @param valueBytes The serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster
cluster);
/**
* This is called when partitioner is closed.
*/
public void close();
}

默认实现类:org.apache.kafka.clients.producer.internals.DefaultPartitioner

1) 如果是用户指定了partition,生产就不会调用DefaultPartitioner.partition()方法

数据分发策略的时候,可以指定数据发往哪个partition。

当ProducerRecord 的构造参数中有partition的时候,就可以发送到对应partition上

/**
* Creates a record to be sent to a specified topic and partition
*
* @param topic The topic the record will be appended to
* @param partition The partition to which the record should be sent
* @param key The key that will be included in the record
* @param value The record contents
*/
public ProducerRecord(String topic, Integer partition, K key, V value) {
this(topic, partition, null, key, value, null);
}

2) DefaultPartitioner源码

如果指定key,是取决于key的hash值

如果不指定key,轮询分发

public int partition (String topic, Object key,byte[] keyBytes, Object value,byte[] valueBytes, Cluster
        cluster){
//获取该topic的分区列表
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
//获得分区的个数
            int numPartitions = partitions.size();
//如果key值为null
            if (keyBytes == null) {//如果没有指定key,那么就是轮询
//维护一个key为topic的ConcurrentHashMap,并通过CAS操作的方式对value值执行递增+1操作
                int nextValue = nextValue(topic);
//获取该topic的可用分区列表
                List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
                if (availablePartitions.size() > 0) {//如果可用分区大于0
//执行求余操作,保证消息落在可用分区上
                    int part = Utils.toPositive(nextValue) % availablePartitions.size();
                    return availablePartitions.get(part).partition();
                } else {
// 没有可用分区的话,就给出一个不可用分区
                    return Utils.toPositive(nextValue) % numPartitions;
                }
            } else {//不过指定了key,key肯定就不为null
// 通过计算key的hash,确定消息分区
                return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
            }
        }

1.5 消费者负载均衡机制

同一个分区中的数据,只能被一个消费者组中的一个消费者所消费。例如 P0分区中的数据不能被Consumer Group A中C1与C2同时消费。

消费组:一个消费组中可以包含多个消费者,properties.put(ConsumerConfig.GROUP_ID_CONFIG,"groupName");如果该消费组有四个 消费者,主题有四个分区,那么每人一个。多个消费组可以重复消费消息。

  •  如果有3个Partition, p0/p1/p2,同一个消费组有3个消费者,c0/c1/c2,则为一一对应关系;
  • 如果有3个Partition, p0/p1/p2,同一个消费组有2个消费者,c0/c1,则其中一个消费者消费2个分区的数据,另一个消费者消费一个 分区的数据;
  • 如果有2个Partition, p0/p1,同一个消费组有3个消费者,c0/c1/c3,则其中有一个消费者空闲,另外2个消费者消费分别各自消费一个 分区的数据;

2、 kakfa配置文件说明

server.properties

1、broker.id=0:

        kafka集群是由多个节点组成的,每个节点称为一个broker,中文翻译是代理。每个broker都有一个不同的brokerId,由broker.id指定, 是一个不小于0的整数,各brokerId必须不同,但不必连续。如果我们想扩展kafka集群,只需引入新节点,分配一个不同的broker.id即 可。

        启动kafka集群时,每一个broker都会实例化并启动一个kafkaController,并将该broker的brokerId注册到zooKeeper的相应节点中。集 群各broker会根据选举机制选出其中一个broker作为leader,即leader kafkaController。leader kafkaController负责主题的创建与删除、 分区和副本的管理等。当leader kafkaController宕机后,其他broker会再次选举出新的leader kafkaController。

2、log.dir = /export/data/kafka/

broker持久化消息到哪里,数据目录

3、log.retention.hours = 168

log文件最小存活时间,默认是168h,即7天。相同作用的还有log.retention.minutes、log.retention.ms。retention是保存的意思。

数据存储的最大时间超过这个时间会根据log.cleanup.policy设置的策略处理数据,也就是消费端能够多久去消费数据。 log.retention.bytes和log.retention.hours任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖。

4、log.retention.check.interval.ms

多长时间检查一次是否有log文件要删除。默认是300000ms,即5分钟。

5、log.retention.bytes

限制单个分区的log文件的最大值,超过这个值,将删除旧的log,以满足log文件不超过这个值。默认是-1,即不限制。

6、log.roll.hours

多少时间会生成一个新的log segment,默认是168h,即7天。相同作用的还有log.roll.ms、segment.ms。

7、log.segment.bytes

log segment多大之后会生成一个新的log segment,默认是1073741824,即1G。

8、log.flush.interval.messages

指定broker每收到几个消息就把消息从内存刷到硬盘(刷盘)。默认是9223372036854775807 好大。

kafka官方不建议使用这个配置,建议使用副本机制和操作系统的后台刷新功能,因为这更高效。这个配置可以根据不同的topic设置不同的 值,即在创建topic的时候设置值。

补充说明:

在Linux操作系统中,当我们把数据写入到文件系统之后,数据其实在操作系统的page cache里面,并没有刷到磁盘上去。如果此时操作系统
挂了,其实数据就丢了。
    1、kafka是多副本的,当你配置了同步复制之后。多个副本的数据都在page cache里面,出现多个副本同时挂掉的概率比1个副本挂掉,概率
就小很多了
    2、操作系统有后台线程,定期刷盘。如果应用程序每写入1次数据,都调用一次fsync,那性能损耗就很大,所以一般都会在性能和可靠性之间
进行权衡。因为对应一个应用来说,虽然应用挂了,只要操作系统不挂,数据就不会丢。

9、log.flush.interval.ms

指定broker每隔多少毫秒就把消息从内存刷到硬盘。默认值同log.flush.interval.messages一样, 9223372036854775807。 同log.flush.interval.messages一样,kafka官方不建议使用这个配置。

10、delete.topic.enable=true

是否允许从物理上删除topic

3. kafka监控与运维

3.1 kafka-eagle概述

        在生产环境下,在Kafka集群中,消息数据变化是我们关注的问题,当业务前提不复杂时,我们可以使用Kafka 命令提供带有Zookeeper 客户端工具的工具,可以轻松完成我们的工作。随着业务的复杂性,增加Group和 Topic,那么我们使用Kafka提供命令工具,已经感到无能 为力,那么Kafka监控系统目前尤为重要,我们需要观察 消费者应用的细节。

        为了简化开发者和服务工程师维护Kafka集群的工作有一个监控管理工具,叫做 Kafka-eagle。这个管理工具可以很容易地发现分布在集 群中的哪些topic分布不均匀,或者是分区在整个集群分布不均匀的的情况。它支持管理多个集群、选择副本、副本重新分配以及创建 Topic。同时,这个管理工具也是一个非常好的可以快速浏览这个集群的工具,

3.2 搭建安装 kafka-eagle

环境要求:需要安装jdk,启动zk以及kafka的服务

# 启动Zookeeper
zkServer.sh start


#启动Kafka
nohup ./kafka-server-start.sh /export/servers/kafka/config/server.properties 2>&1 &

修改windows host文件

192.168.40.171 docker-node1
192.168.40.172 docker-node2
192.168.40.173 docker-node3

搭建步骤:

1) 下载kafka-eagle的源码包

kafka-eagle官网:

        http://download.kafka-eagle.org/

我们可以从官网上面直接下载最新的安装包即可kafka-eagle-bin-1.3.2.tar.gz这个版本即可

代码托管地址:

        https://github.com/smartloli/kafka-eagle/releases

2) 上传安装包并解压:

这里我们选择将kafak-eagle安装在第三台

如果要解压的是zip格式,需要先安装命令支持。

yum install unzip

unzip xxxx.zip

#将安装包上传至 docker-node02服务器的/opt路径下, 然后解压
cd /opt
unzip kafka-eagle.zip
cd cd kafka-eagle-web/target/
tar -zxf kafka-eagle-web-2.0.1-bin.tar.gz -C /opt/

3) 准备数据库:

kafka-eagle需要使用一个数据库来保存一些元数据信息,我们这里直接使用msyql数据库来保存即可,在node01服务器执行以下命 令创建一个mysql数据库即可

SQLite、MySQL

--进入mysql客户端:

create database eagle;

4) 修改kafka-eagle配置文件

 

cd /export/servers/kafka-eagle-bin-1.3.2/kafka-eagle-web-1.3.2/conf
vim system-config.properties
#内容如下:
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=zk主机名:2181,zk主机名:2181,zk主机名:2181
kafka.eagle.driver=com.mysql.jdbc.Driver
kafka.eagle.url=jdbc:mysql://192.168.40.1:3306/eagle
kafka.eagle.username=root
kafka.eagle.password=123456

默认情况下MySQL只允许本机连接到MYSQL实例中,所以如果要远程访问,必须开放权限: update user set host = '%' where user ='root'; //修改权限

flush privileges; //刷新配置

5) 配置环境变量

kafka-eagle必须配置环境变量,node02服务器执行以下命令来进行配置环境变量

vi /etc/profile
#内容如下:
export KE_HOME=/opt/kafka-eagle-web-2.0.1/
export PATH=:$KE_HOME/bin:$PATH
#让修改立即生效,执行
source /etc/profile

6) 启动kakfa-eagle

cd /opt/kafka-eagle-web-2.0.1/bin

chmod u+x ke.sh
./ke.sh start

7) 访问主界面:

http://docker-node3:8048/ke/account/signin?/ke/

用户名:admin 密码:123456

 

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐