Zookeeper启动流程浅析
zookeeper服务端启动流程
文章目录
前言
ZooKeeper 是一个开源的分布式协调服务,
提示:以下是本篇文章正文内容,下面案例可供参考
一、启动
官网下载Zookeeper解压,执行zkServer.cmd
命令就可以启动ZK服务,zkCli.cmd
启动执行客户端命令
二、服务端启动流程
1.zkServer.cmd脚本源码
@echo off
setlocal
call "%~dp0zkEnv.cmd"
set ZOOMAIN=org.apache.zookeeper.server.quorum.QuorumPeerMain
set ZOO_LOG_FILE=zookeeper-%USERNAME%-server-%COMPUTERNAME%.log
echo on
call %JAVA% "-Dzookeeper.log.dir=%ZOO_LOG_DIR%" "-Dzookeeper.root.logger=%ZOO_LOG4J_PROP%" "-Dzookeeper.log.file=%ZOO_LOG_FILE%" "-XX:+HeapDumpOnOutOfMemoryError" "-XX:OnOutOfMemoryError=cmd /c taskkill /pid %%%%p /t /f" -cp "%CLASSPATH%" %ZOOMAIN% "%ZOOCFG%" %*
endlocal
可以看到在服务端启动过程中执行了QuorumPeerMain
2.QuorumPeerMain
QuorumPeerMain main方法主要做了三个事件
1.创建QuorumPeerConfig配置文件类 ,并调用parse方法解析配置文件赋值
2. 启动DatadirCleanupManager 定时任务执行PurgeTask清理事务和快照文件
3. runFromConfig
集群方式启动服务器 (一般都是集群启动,单击模式就是走ZooKeeperServerMain这个方法比较来说少了Master选举过程)
3.runFromConfig集群启动
这个方法里面主要做了这几件事
- 创建客户端请求处理工厂类 ServerCnxnFactory,默认是
NIOServerCnxnFactory
可以配置成NettyServerCnxnFactory
ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
- 设置投票节点信息,也就是给
QuorumPeer
这个类的属性赋值,基本都是根据zoo.cfg配置文件拿到的,这里有几个属性介绍下
quorumPeers | Map对象参与投票的所有节点信息解析配置文件中的server属性得到的,server.1=127.0.0.1:2888:3888 |
electionType | 选举类型默认是3 |
myid | myid文件的内容 |
QuorumVerifier | master节点选举规则默认是QuorumMaj 获得半数节点投票即为主节点 |
- 加载数据DB信息 在
QuorumPeer
start方法里面 - 启动监听客户端请求(实现类已NettyServerCnxnFactory为例),就是监听配置文件里面的
clientPort
端口,可以接收到zkCli.cmd
发送的请求同样是在QuorumPeer
方法里面
public void start() {
LOG.info("binding to port " + localAddress);
parentChannel = bootstrap.bind(localAddress);
}
执行了bind操作就可以开始监听端口接收请求了,NettyServerCnxn
receiveMessage
方法处理接收到的消息,如果对应ZooKeeperServer为null的话是直接抛出异常,不会进行后续处理的
那么,这个ZooKeeperServer什么时候赋值呢,答案是集群模式下master选举结束后,设置各参与投票节点的类型时设置
所以集群模式下未选举出master节点虽然可以接收客户端请求,但是不会实际处理的,等到选举结束后才能处理请求,这时候集群中已经有maste节点了
- startLeaderElection 开始选举 ,这个流程比较复杂后面详细讲下
- 选举结束设置各个参与投票选举的节点状态
QuorumPeer
本身也是个线程类,start方法启动了本身线程执行run方法
startLeaderElection 定义了选举的策略,run方法里面的等待选举的结果,最后給参与选举个节点设置实际类型
4.QuorumPeer run方法
public void run() {
setName("QuorumPeer" + "[myid=" + getId() + "]" +
cnxnFactory.getLocalAddress());
LOG.debug("Starting quorum peer");
try {
jmxQuorumBean = new QuorumBean(this);
MBeanRegistry.getInstance().register(jmxQuorumBean, null);
for(QuorumServer s: getView().values()){
/**
* JMX相关
*/
ZKMBeanInfo p;
if (getId() == s.id) {
p = jmxLocalPeerBean = new LocalPeerBean(this);
try {
MBeanRegistry.getInstance().register(p, jmxQuorumBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
jmxLocalPeerBean = null;
}
} else {
p = new RemotePeerBean(s);
try {
MBeanRegistry.getInstance().register(p, jmxQuorumBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
}
}
}
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
jmxQuorumBean = null;
}
/**
* 这里是真正的选举流程
*/
try {
/*
* Main loop
*/
while (running) {
/**
* FastLeaderElection 选举结束后对这个字段赋值
*/
switch (getPeerState()) {
case LOOKING:
LOG.info("LOOKING");
if (Boolean.getBoolean("readonlymode.enabled")) {
LOG.info("Attempting to start ReadOnlyZooKeeperServer");
// Create read-only server but don't start it immediately
final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(
logFactory, this,
new ZooKeeperServer.BasicDataTreeBuilder(),
this.zkDb);
// Instead of starting roZk immediately, wait some grace
// period before we decide we're partitioned.
//
// Thread is used here because otherwise it would require
// changes in each of election strategy classes which is
// unnecessary code coupling.
Thread roZkMgr = new Thread() {
public void run() {
try {
// lower-bound grace period to 2 secs
sleep(Math.max(2000, tickTime));
if (ServerState.LOOKING.equals(getPeerState())) {
/**
* 里面逻辑设置节点的zookServer类型
*/
roZk.startup();
}
} catch (InterruptedException e) {
LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
} catch (Exception e) {
LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
}
}
};
try {
roZkMgr.start();
setBCVote(null);
/**
* 默认FastLeaderElection
* lookForLeader 流程未结束前 处于while循环中
* 选主结束后 serverState状态已经修改
* makeLEStrategy().lookForLeader() 返回一个VOTE对象 记录了节点的状态
* LOOKING, FOLLOWING, LEADING, OBSERVING;
*/
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
setPeerState(ServerState.LOOKING);
} finally {
// If the thread is in the the grace period, interrupt
// to come out of waiting.
roZkMgr.interrupt();
roZk.shutdown();
}
} else {
try {
setBCVote(null);
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
}
}
//这些break都是退出switch语句
break;
/**
* 选举结束后继续循环 根据节点类型设置不同节点类型
*/
case OBSERVING:
try {
LOG.info("OBSERVING");
setObserver(makeObserver(logFactory));
/**
* while循环接收请求 异常走到finally分支 重新选举
*/
observer.observeLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e );
} finally {
observer.shutdown();
setObserver(null);
setPeerState(ServerState.LOOKING);
}
break;
case FOLLOWING:
try {
LOG.info("FOLLOWING");
setFollower(makeFollower(logFactory));
/**
* while循环处理请求 异常时重新选举
*/
follower.followLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
follower.shutdown();
setFollower(null);
setPeerState(ServerState.LOOKING);
}
break;
case LEADING:
LOG.info("LEADING");
try {
/**
* Leader创建时 构造ServerSocket
*/
setLeader(makeLeader(logFactory));
/**
* 选举出leader 绑定端口 开启服务
*/
leader.lead();
/**
* Leader挂掉后重新选举
*/
setLeader(null);
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
if (leader != null) {
leader.shutdown("Forcing shutdown");
setLeader(null);
}
/**
* 重新选举
*/
setPeerState(ServerState.LOOKING);
}
break;
}
}
} finally {
LOG.warn("QuorumPeer main thread exited");
try {
MBeanRegistry.getInstance().unregisterAll();
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
jmxQuorumBean = null;
jmxLocalPeerBean = null;
}
}
前面6-34行是JMX相关不是很了解
while循环是这个run方法的主要逻辑,根据节点状态ServerState
做不同的处理,默认是ServerState.LOOKING
未配置节点为只读属性的情况下,代码先走到106行 setCurrentVote(makeLEStrategy().lookForLeader());
lookForLeader这个方法里面其实也有个while循环(后面介绍),也就是选举未结束之前在这里等待,选举结束后break退出Switch条件继续循环判断节点状态(选举结束后节点状态ServerState
已经改变了) ,就会走到OBSERVING FOLLOWING LEADING 条件分支
选举流程
1.startLeaderElection
synchronized public void startLeaderElection() {
try {
} catch(IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
}
for (QuorumServer p : getView().values()) {
if (p.id == myid) {
myQuorumAddr = p.addr;
break;
}
}
if (myQuorumAddr == null) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
/**
* 默认是3
*/
if (electionType == 0) {
try {
udpSocket = new DatagramSocket(myQuorumAddr.getPort());
responder = new ResponderThread();
/**
* 开启ResponderThread线程
*/
responder.start();
} catch (SocketException e) {
throw new RuntimeException(e);
}
}
this.electionAlg = createElectionAlgorithm(electionType);
}
第8行getView拿到的是zoo.cfg配置文件中的server配置
server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890
QuorumServer
有两个InetSocketAddress对象 addr electionAddr分别对应server配置项的第一 (集群节点间内部通信) 第二个(投票选举端口)端口,第22-27创建了一个DatagramSocket来接收其它节点发生的UDP数据包(非选举端口 这里不清楚具体目的是啥)
第32行创建选举策略
2 createElectionAlgorithm方法
这个方法里面涉及到QuorumCnxManager
FastLeaderElection
两个类,当然这两个类里面还有内部线程类主要是用来发送接收选举投票消息的
protected Election createElectionAlgorithm(int electionAlgorithm){
Election le=null;
//TODO: use a factory rather than a switch
switch (electionAlgorithm) {
case 0:
le = new LeaderElection(this);
break;
case 1:
le = new AuthFastLeaderElection(this);
break;
case 2:
le = new AuthFastLeaderElection(this, true);
break;
case 3:
/**
* 创建一个QuorumCnxManager
* 维护有 SendWorker RecvWorker 线程对象
*/
qcm = createCnxnManager();
/**
* 内部类 继承ZooKeeperThread
*/
QuorumCnxManager.Listener listener = qcm.listener;
/**
* listener 在创建QuorumCnxManager有初始化 是一个继承了ZooKeeperThread的可执行对象
*/
if(listener != null){
/**
* 开启SendWorker RecvWorker 发送 接收数据
*/
listener.start();
/**
* qcm QuorumCnxManager qcm 就持有 SendWorker RecvWorker
*/
le = new FastLeaderElection(this, qcm);
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;
default:
assert false;
}
return le;
}
前面说到QuorumPeer run方法调用到选举策略(这里是FastLeaderElection)的lookForLeader方法来等待Master选举结束,这里的选举算法关系到QuorumCnxManager
里面的线程 还有 FastLeaderElection
的 WorkerSende WorkerReceiver内部线程类
第20行创建QuorumCnxManager的同时创建了一个Listener 内部线程类,所以第32行是必然满足的,我们来看看这个listen线程启动后做了什么
// QuorumCnxManager 内部类Listener
public void run() {
/**
* 重试次数
*/
int numRetries = 0;
InetSocketAddress addr;
while((!shutdown) && (numRetries < 3)){
try {
ss = new ServerSocket();
ss.setReuseAddress(true);
/**
* 默认false
*/
if (listenOnAllIPs) {
int port = view.get(QuorumCnxManager.this.mySid)
.electionAddr.getPort();
addr = new InetSocketAddress(port);
} else {
addr = view.get(QuorumCnxManager.this.mySid)
.electionAddr;
}
LOG.info("My election bind port: " + addr.toString());
setName(view.get(QuorumCnxManager.this.mySid)
.electionAddr.toString());
ss.bind(addr);
while (!shutdown) {
/**
* 选举端口
*/
Socket client = ss.accept();
setSockOpts(client);
LOG.info("Received connection request "
+ client.getRemoteSocketAddress());
// Receive and handle the connection request
// asynchronously if the quorum sasl authentication is
// enabled. This is required because sasl server
// authentication process may take few seconds to finish,
// this may delay next peer connection requests.
if (quorumSaslAuthEnabled) {
receiveConnectionAsync(client);
} else {
receiveConnection(client);
}
numRetries = 0;
}
} catch (IOException e) {
LOG.error("Exception while listening", e);
numRetries++;
try {
ss.close();
Thread.sleep(1000);
} catch (IOException ie) {
LOG.error("Error closing server socket", ie);
} catch (InterruptedException ie) {
LOG.error("Interrupted while sleeping. " +
"Ignoring exception", ie);
}
}
}
LOG.info("Leaving listener");
if (!shutdown) {
LOG.error("As I'm leaving the listener thread, "
+ "I won't be able to participate in leader "
+ "election any longer: "
+ view.get(QuorumCnxManager.this.mySid).electionAddr);
}
}
第26行这里会监听选举端口,接收其它参与投票的节点发过来的投票信息,第44 行receiveConnection
处理连接信息,最终调用到handleConnection
方法,这里针对接收到的消息的sid 做不同的处理
if (sid < this.mySid) {
/*
* This replica might still believe that the connection to sid is
* up, so we have to shut down the workers before trying to open a
* new connection.
*/
SendWorker sw = senderWorkerMap.get(sid);
if (sw != null) {
sw.finish();
}
/*
* Now we start a new connection
*/
LOG.debug("Create new connection to server: " + sid);
closeSocket(sock);
connectOne(sid);
// Otherwise start worker threads to receive data.
} else {
/**
* RecvWorker 发送者 接收者相互持有对方的引用
* 启动 SendWorker RecvWorker
*/
SendWorker sw = new SendWorker(sock, sid);
RecvWorker rw = new RecvWorker(sock, din, sid, sw);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
if(vsw != null)
vsw.finish();
senderWorkerMap.put(sid, sw);
queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));
sw.start();
rw.start();
return;
}
这里的else分支代码启动了RecvWorker
SendWorker
两个线程,这个两线程的功能分别是接收Sockek 另一端的消息,以及发送消息,具体消息内容是什么呢,我们就要看看 FastLeaderElection
的 WorkerSende
WorkerReceiver
了
另外 对于sid < this.mySid
的情况,会调用到startConnection
方法,最终也是启动了RecvWorker
SendWorker
两个线程
3.FastLeaderElection
public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){
this.stop = false;
this.manager = manager;
/**
* 构建 Messenger 内部类对象
* Messenger 持有 WorkerSender WorkerReceiver 线程对象
*
* WorkerSender WorkerReceiver 维护有 QuorumCnxManager
* 现在执行是调用 QuorumCnxManager 方法拉取队列数据处理
* 队列里面的数据是由 QuorumCnxManager 线程内部类 SenderWorker RecvWorker 生成处理
* 在前一步的listener 处理
*/
starter(self, manager);
}
// FastLeaderElection 内部类
Messenger(QuorumCnxManager manager) {
this.ws = new WorkerSender(manager);
Thread t = new Thread(this.ws,
"WorkerSender[myid=" + self.getId() + "]");
t.setDaemon(true);
t.start();
this.wr = new WorkerReceiver(manager);
t = new Thread(this.wr,
"WorkerReceiver[myid=" + self.getId() + "]");
t.setDaemon(true);
t.start();
}
FastLeaderElection 选举策略创建的同时创建了WorkerSender WorkerReceiver 两个内部线程类(名字有点像 要说有什么联系的话那就是 内部有一个QuorumCnxManager 属性,FastLeaderElection 的两个发送,接收消息线程具体是通过QuorumCnxManager 的内部线程类来实现的 )
public Vote lookForLeader() throws InterruptedException {
try {
self.jmxLeaderElectionBean = new LeaderElectionBean();
MBeanRegistry.getInstance().register(
self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
self.jmxLeaderElectionBean = null;
}
if (self.start_fle == 0) {
self.start_fle = Time.currentElapsedTime();
}
try {
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
int notTimeout = finalizeWait;
synchronized(this){
logicalclock.incrementAndGet();
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
LOG.info("New election. My id = " + self.getId() +
", proposed zxid=0x" + Long.toHexString(proposedZxid));
sendNotifications();
/*
* Loop in which we exchange notifications until we find a leader
*/
while ((self.getPeerState() == ServerState.LOOKING) &&
(!stop)){
/*
* Remove next notification from queue, times out after 2 times
* the termination time
*/
Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);
/*
* Sends more notifications if haven't received enough.
* Otherwise processes new notification.
*/
if(n == null){
if(manager.haveDelivered()){
sendNotifications();
} else {
manager.connectAll();
}
/*
* Exponential backoff
*/
int tmpTimeOut = notTimeout*2;
notTimeout = (tmpTimeOut < maxNotificationInterval?
tmpTimeOut : maxNotificationInterval);
LOG.info("Notification time out: " + notTimeout);
}
else if(validVoter(n.sid) && validVoter(n.leader)) {
/*
* Only proceed if the vote comes from a replica in the
* voting view for a replica in the voting view.
*/
switch (n.state) {
case LOOKING:
// If notification > current, replace and send messages out
if (n.electionEpoch > logicalclock.get()) {
logicalclock.set(n.electionEpoch);
recvset.clear();
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());
}
sendNotifications();
} else if (n.electionEpoch < logicalclock.get()) {
if(LOG.isDebugEnabled()){
LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
+ Long.toHexString(n.electionEpoch)
+ ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
}
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
if(LOG.isDebugEnabled()){
LOG.debug("Adding vote: from=" + n.sid +
", proposed leader=" + n.leader +
", proposed zxid=0x" + Long.toHexString(n.zxid) +
", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
}
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch))) {
// Verify if there is any change in the proposed leader
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null){
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){
recvqueue.put(n);
break;
}
}
/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
if (n == null) {
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(proposedLeader,
proposedZxid,
logicalclock.get(),
proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
case OBSERVING:
LOG.debug("Notification from observer: " + n.sid);
break;
case FOLLOWING:
case LEADING:
/*
* Consider all notifications from the same epoch
* together.
*/
if(n.electionEpoch == logicalclock.get()){
recvset.put(n.sid, new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch));
if(ooePredicate(recvset, outofelection, n)) {
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
/*
* Before joining an established ensemble, verify
* a majority is following the same leader.
*/
outofelection.put(n.sid, new Vote(n.version,
n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch,
n.state));
if(ooePredicate(outofelection, outofelection, n)) {
synchronized(this){
logicalclock.set(n.electionEpoch);
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
}
Vote endVote = new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
default:
LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)",
n.state, n.sid);
break;
}
} else {
if (!validVoter(n.leader)) {
LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
}
if (!validVoter(n.sid)) {
LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
}
}
}
return null;
} finally {
try {
if(self.jmxLeaderElectionBean != null){
MBeanRegistry.getInstance().unregister(
self.jmxLeaderElectionBean);
}
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
self.jmxLeaderElectionBean = null;
LOG.debug("Number of connection processing threads: {}",
manager.getConnectionThreadCount());
}
}
第2-9行是JMX相关
第24行updateProposal
设置投票信息
第30行 sendNotifications
将发送到其它节点的投票信息添加到待发送队列sendqueue
, WorkerSender线程识别到sendqueue有投票信息,就会将投票信息发送到其它节点
集群中有ServerA ServerB ServerC 三个节点, ServerA 启动向Server发送投票信息,情况是这样的
- ServerA 启动WorkerSender拿到发送到ServerB的投票信息
// FastLeaderElection WorkerSender
public void run() {
while (!stop) {
try {
ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
if(m == null) continue;
process(m);
} catch (InterruptedException e) {
break;
}
}
LOG.info("WorkerSender is down");
}
// FastLeaderElection WorkerSender 投票信息写入ByteBuffer 调用 QuorumCnxManager toSend 发送消息
void process(ToSend m) {
ByteBuffer requestBuffer = buildMsg(m.state.ordinal(),
m.leader,
m.zxid,
m.electionEpoch,
m.peerEpoch);
manager.toSend(m.sid, requestBuffer);
}
// QuorumCnxManager
public void toSend(Long sid, ByteBuffer b) {
/*
* If sending message to myself, then simply enqueue it (loopback).
*/
if (this.mySid == sid) {
b.position(0);
addToRecvQueue(new Message(b.duplicate(), sid));
/*
* Otherwise send to the corresponding thread to send.
*/
} else {
/*
* Start a new connection if doesn't have one already.
*/
ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY);
ArrayBlockingQueue<ByteBuffer> bqExisting = queueSendMap.putIfAbsent(sid, bq);
if (bqExisting != null) {
addToSendQueue(bqExisting, b);
} else {
addToSendQueue(bq, b);
}
connectOne(sid);
}
}
这里的connectOne 方法建立与其它节点的Socket连接,并 向其它节点(ServerB)写入当前节点的Sid,同时启动了SendWorker RecvWorker线程,上面的toSend方法将投票信息写入到了queueSendMap,SendWorker 启动后拿到这个投票信息发送到其它节点(这里发送的是投票信息)
// SendWorker run方法
public void run() {
threadCnt.incrementAndGet();
try {
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
if (bq == null || isSendQueueEmpty(bq)) {
/**
* 队列中没有发送到 sid服务器的数据 从最近发送的数据里面拿
*/
ByteBuffer b = lastMessageSent.get(sid);
if (b != null) {
LOG.debug("Attempting to send lastMessage to sid=" + sid);
/**
* 写出数据
*/
send(b);
}
}
} catch (IOException e) {
LOG.error("Failed to send last message. Shutting down thread.", e);
this.finish();
}
try {
while (running && !shutdown && sock != null) {
ByteBuffer b = null;
try {
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap
.get(sid);
if (bq != null) {
b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);
} else {
LOG.error("No queue of incoming messages for " +
"server " + sid);
break;
}
if(b != null){
lastMessageSent.put(sid, b);
send(b);
}
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for message on queue",
e);
}
}
} catch (Exception e) {
LOG.warn("Exception when using channel: for id " + sid
+ " my id = " + QuorumCnxManager.this.mySid
+ " error = " + e);
}
this.finish();
LOG.warn("Send worker leaving thread");
}
将消息写入其它节点后,其它节点就可以读取消息并写入recvQueue队列,这个是在QuorumCnxManager RecvWorker run方法中(线程已经启动了),
public void run() {
threadCnt.incrementAndGet();
try {
while (running && !shutdown && sock != null) {
/**
* Reads the first int to determine the length of the
* message
*/
int length = din.readInt();
if (length <= 0 || length > PACKETMAXSIZE) {
throw new IOException(
"Received packet with invalid packet: "
+ length);
}
/**
* Allocates a new ByteBuffer to receive the message
*/
byte[] msgArray = new byte[length];
din.readFully(msgArray, 0, length);
ByteBuffer message = ByteBuffer.wrap(msgArray);
addToRecvQueue(new Message(message.duplicate(), sid));
}
} catch (Exception e) {
LOG.warn("Connection broken for id " + sid + ", my id = "
+ QuorumCnxManager.this.mySid + ", error = " , e);
} finally {
LOG.warn("Interrupting SendWorker");
sw.finish();
if (sock != null) {
closeSocket(sock);
}
}
}
这里技术选举过程中节点直接的通信流程了,假设 A B 都投票给C ,那么C就是Master节点了,我们接下来再看下投票的算法
前面lookForLeader while循环部分就是处理投票的逻辑
lookForLeader方法第42行,获取其它节点发来的投票信息,这个在WorkerReceiver线程启动后会去QuorumCnxManager
recvQueue队列中拿到投票邮箱,如果为空的话 第49-63行就会重新向其它节点发送投票消息
最后只有有接收到其它节点的投票就进入到64行的 else if逻辑,这里面的条件是校验下投票的sid 跟投给的节点sid的有效性
这里我们先讨论个问题 如果A发送选票给C 告诉C要投票给B节点,这里怎么认为A发过来的投票信息是有效的呢,因为B是觉得要投票给自己的
投票信息里面有三个字段 peerEpoch zxid sid ,规则是这样的,当某一个节点接收到的投票信息peerEpoch大于自己 ,peerEpoch相同时比较zxid (投票zxid大于自己) ,当zxid 也相同时比较sid (投票sid大于自己),则说明发过来的投票信息有效,当前节点丢掉自己的选票信息,把接收到的选票信息发送到其它节点
明确了这个规则,我们再看看代码,主要是LOOKING 条件分支
totalOrderPredicate 是判断接收到的选票是否有效的方法,QuorumVerifier默认是QuorumMaj走不到if条件里面,后面的判断就是按照newEpoch zxid sid 来比较
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +
Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
if(self.getQuorumVerifier().getWeight(newId) == 0){
return false;
}
/*
* We return true if one of the following three cases hold:
* 1- New epoch is higher
* 2- New epoch is the same as current epoch, but new zxid is higher
* 3- New epoch is the same as current epoch, new zxid is the same
* as current zxid, but server id is higher.
*/
return ((newEpoch > curEpoch) ||
((newEpoch == curEpoch) &&
((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
}
第104行 recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
存储接收到的选票信息(集群中其它节点都会投来选票)
第106行 if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch)))
,termPredicate有两个参数,第一个是当前节点获得的所有选票,第二个节点是当前要投给master节点的选票
termPredicate 判断逻辑就是看是否超过半数节点
protected boolean termPredicate(
HashMap<Long, Vote> votes,
Vote vote) {
HashSet<Long> set = new HashSet<Long>();
/*
* First make the views consistent. Sometimes peers will have
* different zxids for a server depending on timing.
*/
for (Map.Entry<Long,Vote> entry : votes.entrySet()) {
if (vote.equals(entry.getValue())){
set.add(entry.getKey());
}
}
return self.getQuorumVerifier().containsQuorum(set);
}
进入termPredicate说明master节点已经选出来,来看看方法里面的处理
while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)){
recvqueue.put(n);
break;
}
}
if (n == null) {
self.setPeerState((proposedLeader == self.getId()) ?ServerState.LEADING: learningState());
Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
leaveInstance(endVote);
return endVote;
}
有个while循环继续去拿投票信息,可能还有投票信息没过来,但已经不影响结果了,
当n= null时 说明投票已经结束了,这时候设置QuorumPeer 的ServerState状态,前面的QuorumPeer 流程就可以继续往下走了
更多推荐
所有评论(0)