1 相关概念

Zookeeper是Hadoop生态系统中分布式的服务管理框架,负责存储和管理集群中的公共数据如配置信息等,并且对节点进行注册和通知管理。它具有如下几个特点:

  • 集群由一个领导者(Leader),多个跟随者(Follower)组成
  • 集群中只要有半数以上节点存活,Zookeeper集群就能正常服务。所以Zookeeper适合安装奇数台服务器。
  • 全局数据一致:每个Server保存一份相同的数据副本,Client无论连接到哪个Server,数据都是一致的。
  • 按照请求顺序执行更新,同一个Client的按发送顺序依次执行。
  • 更新原子性,一次数据更新要么成功,要么失败。
  • 实时性,在一定时间范围内,Client能读到最新数据

Zookeeper常应用于以下几个场景:

  1. 统一命名服务:在分布式环境下,经常需要对应用/服务进行统一命名,便于识别
  2. 集群状态管理:将节点信息写入ZooKeeper上的一个ZNode,实时监控节点状态变化
  3. 统一配置管理:将集群中应用的配置文件信息交由Zookeeper管理,一旦发生修改会通知各个客户端,从而保证所有结点的配置信息一致
  4. 节点上下线:服务器节点上下线时通知Zookeeper,客户端实时掌握服务器在线列表
  5. 软负载均衡:在Zookeeper中记录每台服务器的访问数,让访问数最少的服务器去处理最新的客户端请求

1.1 选举机制

zookeeper在首次启动和运行中进行节点选举的机制不同。

首次启动

在集群启动之后zookeeper会进行Leader选举,主要策略是选举服务器ID较大的节点,得票数超过一半就会当选。

假设有五台节点依次启动:

  1. 服务器1启 动,发起一次选举。投自己一票。此时服务器1票数一票,不够半数以上(3票),选举无法完成,服务器1状态保持LOOKING
  2. 服务器2启动发起一次选举。服务器1和2分别投自己一票并交换选票信息:此时服务器1发现服务器2的myid比自己目前投票推举的(服务器1) 大,更改选票为推举服务器2。此时服务器1票数0票,服务器2票数2票,没有半数以上结果,选举无法完成,服务器1,2状态保持LOOKING
  3. 服务器3启动,发起一次选举。同理,服务器1和2都会更改选票为服务器3。此次投票结果:服务器1为0票,服务器2为0票,服务器3为3票,超过半数,服务器3当选Leader。服务器1,2更改状态为FOLLOWING,服务器3更改状态为LEADING;
  4. 服务器4启动,发起一次选举。此时服务器1,2,3已经不是LOOKING状态,不会更改选票信息。交换选票信息结果:服务器3为3票,服务器4为 1票。此时服务器4服从多数,更改选票信息为服务器3,并更改状态为FOLLOWING;
  5. 服务器5启动,同4一样成为FOLLOWING。
运行中

如果某节点在运行中和Leader失去连接,此时有两种情况:当前节点失联或Leader失联。

如果当前节点失联,leader仍存在,虽然当前节点会试图去选举Leader,但与其他节点交换信息时会被告知当前服务器的Leader信息,然后重新和Leader机器建立连接,并进行状态同步即可。

如果是Leader失联,则会进行重新选举,重新选举时会依次比较Epoch、ZXID和SID,如果Epoch较大则当选为Leader,若相等则比较ZXID、最后比较SID。其中

  • Epoch是每个Leader任期的代号,代号越大代表其版本越新
  • ZXID:事务ID。用来标识一次服务器状态的变更,每次修改 ZooKeeper 状态都会按顺序产生一个事务 ID,越大代表记录了最新的数据更改
  • SID:服务器ID。用来唯一标识一台ZooKeeper集群中的机器,不能重复,和myid一致

1.2 数据写入

当客户端发起写数据请求时,一般有两种情况:请求到达Leader节点或Follower节点

  • 如果请求直接到达Leader节点,那么leader除了自己写入外还会向Follower发送写数据请求,当集群中一般节点完成写入后就会向客户端返回响应。之后再向其他Follower发送写数据请求,完成数据同步。
  • 如果请求先到达Follower,会将请求转发到Leader,然后由Leader向各节点发送写请求,半数完成后将响应传达给接收请求的那个Follower,由它向客户端返回完成的响应。
    在这里插入图片描述

2 使用

2.1 安装配置

从官网https://zookeeper.apache.org/下载安装包,解压到/opt/module目录下后修改文件夹名称,

tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/module/
cd /opt/module
mv apache-zookeeper-3.5.7-bin/   zookeeper

在目录下新建zkData文件夹用于存放zookeeper的数据,在其中新建文件myid。这个文件是用于保存标识节点的唯一编号,zookeeper通过读取该文件中的编号来区分该节点是哪个,这里输入其节点编号2

cd zookeeper
mkdir zkData
vim myid
2

将配置文件zookeeper/conf/zoo_sample.cfg 重命名为 zoo.cfg,在其中修改数据存储路径配置,并在文件末尾添加集群节点信息

dataDir=/opt/module/zookeeper/zkData
...
#######################cluster##########################
server.2=hadoop102:2888:3888
server.3=hadoop103:2888:3888
server.4=hadoop104:2888:3888

其他配置参数的含义为

  • tickTime = 2000:通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒
  • initLimit = 10:Leader和Follower初始连接时的最多心跳数
  • syncLimit = 5:Leader和Follower同步时最多心跳次数,超过5次心跳无响应会从列表删掉Follower
  • dataDir:保存Zookeeper中数据的目录
  • clientPort = 2181:客户端连接端口

在节点信息server.2=hadoop102:2888:3888中,

  1. 数字2代表节点的ID值,和zkData/myid文件中的节点编号对应,Zookeeper启动时会把配置文件和myid中的编号进行比较从而判断集群节点的启动情况。
  2. hadoop102代表节点的主机地址;
  3. 2888服务器 Follower 与集群中的 Leader 服务器交换信息的端口;
  4. 3888是集群Leader重新选举的通信端口,如果集群中的 Leader 服务器挂了,通过该端口来重新进行选举

将配置好的Zookeeper分发到其他节点。

之后需要在其他节点修改zkData/myid文件中的主机编号,这里将hadoop103的myid修改为3,hadoop104修改为4

2.2 启动

启动各节点的zookeeper服务端,通过jps可以看到zookeeper后台运行的进程

[tory@hadoop102 zookeeper]$ bin/zkServer.sh start
[tory@hadoop102 zookeeper]$ jps
3540 QuorumPeerMain

有时候在启动zookeeper的时候,显示错误如下,系统检测到zookeeper是在运行状态,但是jps查看却没有相关进程。这个可能是由于异常关机造成zkData目录下残留文件zookeeper_server.pid,将其删除后从新启动即可

Starting zookeeper … already running as process 1805

查看zookeeper的运行状态,可以看到hadoop103节点运行模式是leader

[tory@hadoop103 zookeeper]# bin/zkServer.sh status
JMX enabled by default
Using config: /opt/module/zookeeper/bin/../conf/zoo.cfg
Mode: leader

如果逐个登录各节点并启动zookeeper操作会过于繁琐,可以将启动、停止等命令写到一个脚本,通过SSH登录各个节点完成启动操作。如下所示编写脚本zk.sh,针对start、stop、status命令,通过for循环遍历各主机节点hadoop102、103、104,利用ssh登录并调用zkServer.sh执行操作

#!/bin/bash
case $1 in
"start"){
	# 遍历集群主机列表
	for i in hadoop102 hadoop103 hadoop104
	do
 		echo ---------- zookeeper $i 启动 ------------
 		# 调用zookeeper命令
 		ssh $i "/opt/module/zookeeper/bin/zkServer.sh start"
	done
};;
"stop"){
	for i in hadoop102 hadoop103 hadoop104
	do
		echo ---------- zookeeper $i 停止 ------------ 
		ssh $i "/opt/module/zookeeper/bin/zkServer.sh stop"
	done
};;
"status"){
	for i in hadoop102 hadoop103 hadoop104
	do
		echo ---------- zookeeper $i 状态 ------------ 
		ssh $i "/opt/module/zookeeper/bin/zkServer.sh status"
	done
};;
esac

将该脚本增加执行权限,然后放到path路径所在的目录下就可以全局访问

chmod u+x zk.sh

通过脚本就可以对集群所有节点进行操作,如下所示查看集群节点状态

[tory@hadoop102 bin]$ zk.sh status
---------- zookeeper hadoop102 状态 ------------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower
---------- zookeeper hadoop103 状态 ------------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: leader
---------- zookeeper hadoop104 状态 ------------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower

2.3 命令行客户端

在命令行通过客户端连接zookeeper,通过-server指定连接的服务主机端口

[tory@hadoop102 zookeeper]$ bin/zkCli.sh -server hadoop102:2181
创建节点

通过create命令可以创建节点和目录。

[zk: hadoop102:2181(CONNECTED) 2] create /China
Created /China
[zk: hadoop102:2181(CONNECTED) 3] create /China/Beijing "ChaoYang"
Created /China/Beijing

创建的节点默认是永久的,通过参数-e可以指定节点的类型为临时节点(Ephemeral),临时节点在客户端断开连接后就会消失。
通过参数-s可以为节点名称后添加编号,从而避免节点重复

[zk: hadoop102:2181(CONNECTED) 7] create /China/Beijing
Node already exists: /China/Beijing
[zk: hadoop102:2181(CONNECTED) 8] create -s /China/Beijing
Created /China/Beijing0000000001
查看节点信息

通过ls可以查看目录下的所有子节点,-s可以显示节点的详细信息,如下所示查看根节点

[zk: hadoop102:2181(CONNECTED) 5] ls -s /
[zookeeper]cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x0
mtime = Thu Jan 01 08:00:00 CST 1970
pZxid = 0x0
cversion = -1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1

其中各参数的含义为

  1. czxid:创建节点的事务 zxid
  2. ctime:znode 被创建的毫秒数(从 1970 年开始)
  3. mzxid:znode 最后更新的事务 zxid
  4. mtime:znode 最后修改的毫秒数(从 1970 年开始)
  5. pZxid:znode 最后更新的子节点 zxid
  6. cversion:znode 子节点变化号,znode 子节点修改次数
  7. dataversion:znode 数据变化号
  8. aclVersion:znode 访问控制列表的变化号
  9. ephemeralOwner:如果是临时节点,这个是 znode 拥有者的 session id。如果不是临时节点则是 0
  10. dataLength:znode 的数据长度
  11. numChildren:znode 子节点数量

通过set可以修改节点中的内容,get查看节点中的内容,stat可以查看节点详细信息

[zk: hadoop102:2181(CONNECTED) 0] set /China/Beijing "HaiDian" 
[zk: hadoop102:2181(CONNECTED) 1] get /China/Beijing
HaiDian

通过delete可以删除单个节点,deleteall删除节点及其子节点

[zk: hadoop102:2181(CONNECTED) 2] delete /China/Beijing
[zk: hadoop102:2181(CONNECTED) 3] deleteall /China
监听器

客户端注册监听它关心的目录节点,当目录节点发生变化(数据改变、节点删除、子目录节点增加删除)时,ZooKeeper 会通知客户端。监听机制保证 ZooKeeper 保存的任何的数据的任何改变都能快速的响应到监听了该节点的应用程序。

如下所示在 hadoop104 主机上注册监听/China/Beijing节点数据变化

[zk: localhost:2181(CONNECTED) 26] get -w /China/Beijing

之后在 hadoop103 主机上修改节点的数据
[zk: localhost:2181(CONNECTED) 1] set /China/Beijing “FengTai”
观察 hadoop104 主机收到数据变化的监听

WATCHER::
WatchedEvent state:SyncConnected type:NodeDataChanged 
path:/China/Beijing

注意:在hadoop103再次修改/China/Beijing的值,hadoop104上不会再收到监听。因为注册一次,只能监听一次。想再次监听,需要再次注册。

2.4 Java API

在Java开发中需要连接和访问zookeeper时可以使用官方API包,首先在maven中引入对应版本的依赖

<dependency>
	<groupId>org.apache.zookeeper</groupId>
	<artifactId>zookeeper</artifactId>
	<version>3.5.7</version>
</dependency>

首先创建客户端连接对象zkClient,在其中指定服务端地址connectString和超时时间,并且可以通过Watcher监听事件,只要节点发生变化就会触发监听器,从而执行回调函数process

# zookeeper服务端地址
private static String connectString ="hadoop102:2181,hadoop103:2181,hadoop104:2181";
# 连接超时时间
private static int sessionTimeout = 2000;
private ZooKeeper zkClient = null;
zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
	// 收到事件通知后的回调函数(用户的业务逻辑)
	@Override
	public void process(WatchedEvent watchedEvent) {
		System.out.println(watchedEvent.getType() + "--" + watchedEvent.getPath());
	}
});

通过zkClient的create()方法可以创建节点,exist()可以返回节点是否存在,getChildren()可以获取指定目录下的子节点,其第二个参数可以传入一个监听器,如果是true则默认为创建zkClient时的监听器

// 参数 1:要创建的节点的路径; 参数 2:节点数据 ; 参数 3:节点权限 ;参数 4:节点的类型
String nodeCreated = zkClient.create("/atguigu", "shuaige".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

// 判断节点是否存在
Stat stat = zkClient.exists("/China", false);
System.out.println(stat == null ? "not exist" : "exist");

// 获取子节点
List<String> children = zkClient.getChildren("/", true);
for (String child : children) {
	System.out.println(child);
}
服务器上下线监测

通过zookeeper可以模拟实现对集群节点上下线进行监测,当服务器节点上线时,作为临时节点注册到zookeeper中的/servers目录下,如/servers/hadoop102,而节点下线时临时节点就会从目录中删除。这样客户端就可以通过监听zookeeper中节点信息从而随时掌握存活的服务器列表。

如下所示为服务器端向zookeeper进行注册的代码

import java.io.IOException;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;

public class DistributeServer {
    private static String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
    private static int sessionTimeout = 2000;
    private ZooKeeper zk = null;
    private String parentNode = "/servers";

    public static void main(String[] args) throws Exception {
        DistributeServer server = new DistributeServer();
        // 1 获取 zk 连接
        server.getConnect();
        // 2 利用 zk 连接注册服务器信息
        server.registerServer(args[0]);
        // 3 启动业务功能
        server.business(args[0]);
    }

    // 创建到 zk 的客户端连接
    public void getConnect() throws IOException {
        zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
            }
        });
    }

    // 注册服务器
    public void registerServer(String hostname) throws Exception {
        String create = zk.create(parentNode + "/" +hostname,
                hostname.getBytes(), Ids.OPEN_ACL_UNSAFE,
                CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println(hostname + " is online " + create);
    }

    // 业务功能
    public void business(String hostname) throws Exception {
        System.out.println(hostname + " is working ...");
        Thread.sleep(Long.MAX_VALUE);
    }
}

如下所示为客户端监听服务器节点变化的实现

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

public class DistributeClient {
    private static String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
    private static int sessionTimeout = 2000;
    private ZooKeeper zk = null;
    private String parentNode = "/servers";

    public static void main(String[] args) throws Exception {
        DistributeClient client = new DistributeClient();
        // 1 获取 zk 连接
        client.getConnect();
        // 2 获取 servers 的子节点信息,从中获取服务器信息列表
        client.getServerList();
        // 3 业务进程启动
        client.business();
    }

    // 创建到 zk 的客户端连接
    public void getConnect() throws IOException {
        zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                try {
                    getServerList();    // 再次启动监听
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }

    // 获取服务器列表信息
    public void getServerList() throws Exception {
        // 1 获取服务器子节点信息,并且对服务器节点的变化进行监听
        List<String> children = zk.getChildren(parentNode, true);
        // 2 存储服务器信息列表
        ArrayList<String> servers = new ArrayList<>();
        // 3 遍历所有节点,获取节点中的主机名称信息
        for (String child : children) {
            byte[] data = zk.getData(parentNode + "/" + child, false, null);
            servers.add(new String(data));
        }
        // 4 打印服务器列表信息
        System.out.println(servers);
    }

    // 业务功能
    public void business() throws Exception {
        System.out.println("client is working ...");
        Thread.sleep(Long.MAX_VALUE);
    }
}
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐