Part 4 --ZooKeeper Zab Protocol


ZooKeeper<四> ZAB 协议

zab 协议全称 ZooKeeper Atomic Broadcast Protocol 。

它作为 zk 最核心的协议,贯穿了 zk 的始终。

我们知道 zk 从启动到挂掉,无非做这么几件事:

  • 选举 leader
  • 发现 server
  • 同步 事务
  • 广播 事务
  • 崩溃再选举

而 zab 则是这些事情的保障,它让所有的事情按照我们期待的方向行进。现在去看看 zk 究竟是如何去做这些事情的。

Election

前文已经介绍,zk 使用 FLE 进行选举,是现在 zk 唯一支持的选举算法。这里不再赘述。

其他阶段,这里分 Leader 和 Follower 两个角色进行解读。

Leader

1. Discovery

我们前面解析源码的时候,在 QuorumPeer 这个类里,说过它的 run 方法,里面有个 switch...case ,会对 serverState 的不同状态,做不同处理。

当时我们看了 LOOKING 的 case,因为这里是 election 的入口。

现在选举结束后,serverState 会切换成其他状态,同时 zabState 也同样会切换

我们再来看下这个run 方法(截取部分):

// 源码参考: org.apache.zookeeper.server.quorum.QuorumPeer#run
case LEADING:
    try {
        setLeader(makeLeader(logFactory));
        // 这个方法里有一个 while ,如果退了出来,
        // 说明该 server 的领导结束,会进行下一次的选举
        leader.lead();
        setLeader(null);
    }catch (Exception e) {
        LOG.warn("Unexpected exception", e);
    } finally {
        if (leader != null) {
            leader.shutdown("Forcing shutdown");
            setLeader(null);
        }
        // 这里会把 serverState 重新置为 LOOKING 状态
        // 从而在整个 while 里再次发起选举
        updateServerState();
    }
    break;
}

好,从上面能看到,被选为 leader 后,所有的奥秘,都在 leader.lead() 这个方法中了。现在去看看这个家伙,究竟背着我们都做了些什么!!!

我看了下这个方法, 从 576 行到 874 行,共计 874 - 576 = 298 行,我们平时这么写代码,估计会被打死,然后领导们说,你这个方法怎么能写这么多行呢!!! 啊哈哈哈 :joy:

整个阶段都需要对照:

org.apache.zookeeper.server.quorum.LearnerHandler#run

LearnerHandler ,会为每一个跟随者分配一个连接,负责和 server 整个生命周期间的通信。

同时注意 LeaderLearnerHandler 在几个关键节点的同步:

  • getEpochToPropose:handler 会获取超过半数的 server epoch,一次产生 leader 新的 epoch

  • waitForEpochAck : handler 会获取超过半数的 server ack ,以此为基础进行同步

  • waitForNewLeaderAck:等待 server 同步的结果。整个同步过程,都是 handler 负责处理的。

正是通过这几个方法,使得虽然是异步进行,但最终在关键节点上,都会等待各自完成各自的事情!!!

self.setZabState(QuorumPeer.ZabState.DISCOVERY);

可以看到,这个方法开始时,将 zabState 的状态设置为了 DISCOVERY ,也就是说,zk 正式从选举阶段进入到发现阶段:

self.setZabState(QuorumPeer.ZabState.DISCOVERY);
// 这里把自己的 epoch 和 zxid 放到 StatueSummary 中,方便之后的比较
leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());
// 开启线程接受 followers 的连接
cnxAcceptor = new LearnerCnxAcceptor();
cnxAcceptor.start();
// 这个方法用于获取 epoch , +1 即为新 leader 的 epoch ,它的国号!--- [1]
long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
// zk 在获取到所有连接中最大的 epoch 后,+1 , 然后低 32 位清零,即为新的 zxid
zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
// ...
// 等待超半数的 server 的 epoch ack, 确认新的 epoch 下发到半数以上的 server
// 这里的处理方式和上面获取最大 epoch 一样
// zk 认为,等到足够多的 epoch ack 后,选举才算结束
waitForEpochAck(self.getId(), leaderStateSummary);
self.setCurrentEpoch(epoch);
self.setLeaderAddressAndId(self.getQuorumAddress(), self.getId());
self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);

到这里, Discovery 阶段就结束了。我们总结下 Discovery 阶段, leader 主要做了什么

  • 获取最新的 epoch
    • 收到 FOLLOWERINFO 或者 OBSERVERINFO 类型的 qp (quorum packet)
    • 统计并获取最大的 epoch,设置新的 epoch
    • 回复 LEADERINFO 类型的 qp ,同时把新的 epoch 告诉当前的 server
  • 获取超过半数的 server 的连接(waitForEpochAck):
    • handler 等待 follower/observer ACKEPOCH 类型的回复,超过半数表明新的 epoch 设置成功

2. Synchronization

self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);    

这里开始, zab 协议进入到同步阶段,我们看看同步阶段, leader 都会做哪些事情:

/** 
与 [1] 一摸一样的处理方式, 获取超过半数的 ack,
我们去查看这个方法的调用,能看到在 learnerHandler 中被调用了,
而在调用之前,learnerHandler 做的事情就是和与之连接的 server 进行同步
*/
waitForNewLeaderAck(self.getId(), zk.getZxid());
// 这里会开启 leader 的事务处理责任链 --- [2]
startZkServer();
// 这里就是在 how-to-use 中介绍的配置,设为 yes(默认),即表明 leader 也会接受 client 连接
// 将 zkServer 放入到 ServerCnxFactory 中
if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {
    self.setZooKeeperServer(zk);
}
// 这就到 BROADCAST 了???
// 是的,因为我有助手 learnerHandler
self.setZabState(QuorumPeer.ZabState.BROADCAST);

这里先介绍下同步时的几种类型(可以同时思考下,分别什么情况,会导致下面的类型处理):

  • DIFF:差异同步
  • TRUNC:回滚同步
  • SNAP:全量同步

这个时候,我们去看 LearnerHandlerwaitForEpochAckwaitForNewLeaderAck 中间部分的代码

syncFollower

org.apache.zookeeper.server.quorum.LearnerHandler#syncFollower

  1. leader lastProcessedZxid == peerLastZixd: 说明事务一致,无需同步,此时 leader 会发送一个空的 DIFF qp 给 follower

  2. peer lastProcessedZxid 在 [minCommittedLog, maxCommittedLog] 之间,这个区间是 zk 维护的一份 commit queue,用来快速与各 server 同步。在此之间说明 peer 的事务与 leader 之间存在差异,zk 会先发一个 DIFF qp 给 server ,然后将区间中比 peer zxid 大的事务,以 Proposal 的方式发送给 server,server 收到后,回复 ack

  3. peer lastProcessedZxid > maxCommittedLog 时,就需要进行 trunc ,同上,也是先发送 TRUNC qp,然后通过 Proposal 进行回滚同步

  4. peer lastProcessedZxid < minCommittedLog 时,zk 会先尝试 on-disk txnlog + committedLog 的方式进行同步,如果失败,则使用 SNAP 的方式进行同步。 SNAP 会从磁盘读取 snapshot.xxx 命名的文件,序列化后发送给 server

上述同步完成后,leader 会发送一个 NEWLEADER qp 给 server,告知同步结束,得到足够 server 的 ack 回复后(这里就是 leader 和 learnerHandler 的一个同步点,也就是方法 waitForNewLeaderAck),leader 将执行 startZKServer 方法,而此可的 learnerHandler 也会等待 zkServer 的启动。

startZkServer

org.apache.zookeeper.server.quorum.Leader#startZkServer

// 来源于基类 org.apache.zookeeper.server.ZooKeeperServer#startup
// 截取部分代码
public synchronized void startup() {
    // 这里就是上文提到的 leader 处理事务的责任链
    setupRequestProcessors();
    // 设置 zk 状态
    setState(State.RUNNING);
    // 通知所有等待在该对象上的线程
    // 这些线程即负责与各个 server 连接的 learnerHandler
    notifyAll();
}

zkServer 启动后, learnerHandler 就会发送一个 UPTODATE 的报文给各个 server ,告诉他们,我们准备好了,可以开门接客,啊呸,可以接受 client 的连接了。

3. Broadcast

while (true) {
    synchronized (this) {
        // 检查 quorumPeer 和 zkServer 的状态,不对劲就会跳出 while
        if (!this.isRunning()) {
            break;
        }
        // follower 未超过半数,跳出 while
        if (!tickSkip && !syncedAckSet.hasAllQuorums()) {
            break;
        }
    }
    // 定时发送 ping 报文
    for (LearnerHandler f : getLearners()) {
        f.ping();
    }
}
// 如果从 while 跳出,这里的 msg 会有值,代码被我精简了
if (null != shutdownMessage) {
    shutdown(shutdownMessage);
}

这是 leader.lead() 方法去做的,可是我们已经知道,leader 和 server 之间的工作,大部分都由 learnerHandler 在处理。所以这个时候,我们还得去看看 learnerHandler 在搞些什么事情

while (true) {
    qp = new QuorumPacket();
    ia.readRecord(qp, "packet");
    switch (qp.getType()) {
    case Leader.ACK:
        // ...
    case Leader.PING:
        // ...
        break;
    case Leader.REVALIDATE:
        // ...
        break;
    case Leader.REQUEST:
        // ...
        break;
    default:
        LOG.warn("unexpected quorum packet, type: {}", packetToString(qp));
        break;
    }
}

可以看到, learnerHandler 通过 switch…case… 来处理来自 server 的各个类型请求,一旦出了问题,就会抛出异常,最后在 finally 代码块中,shutdown

Follower

对于 Follower 而言,同样要从 QuorumPeer 的 run 方法的 switch…case… 代码块入手,这里就不再贴代码了,就简单梳理下大致的流程,以及各个阶段和 leader 的通信。

1. Discovery

  • Follower (下文以 F 代替),通过 org.apache.zookeeper.server.quorum.Learner#connectToLeader 与新 leader 建立 TCP 连接。

  • F 发送 FOLLOWERINFO 类型 qp 给 leader(如果是 observer ,发送的类型为 OBSERVERINFO),被 learnerHandler 接收。

    前文提到 leader 在执行 lead 方法的时候,会先去创建 LearnerCnxAcceptor –> LearnerCnxAcceptorHandler –> LearnerHandler , 这一些列代码执行后,就在等待 F 连接并发送 FOLLOWERINFO 报文。这是他们通信的第一个报文,如果不是,这个连接就会被 leader 舍弃掉

  • 等待 leader 的 LEADERINFO qp, 该报文含有 leader 的最新 epoch ,所以 F 设置新的 epoch,同时发送 ACKEPOCH 类型 qp 给 leader

discovery 阶段结束

2. Synchronization

leader 通过 learnerHandler 发送的同步标志(DIFF/TRUNC/SNAP) ,进行不同的处理和回复,在 org.apache.zookeeper.server.quorum.Learner#syncWithLeader 中,我们可以看到不同的处理方式。

同步结束后, leader 会发送一个 NEWLEADER qp 给 F,F 回 ACK, leader 收到超半数 ack 后,发送 UPTODATE qp,F 收到该类型的 qp 后,会跳出 while 循环:

case Leader.UPTODATE:
    break outerLoop;

后续代码会回复 ACK,同时启动 zkServer:

ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
writePacket(ack, true);
sock.setSoTimeout(self.tickTime * self.syncLimit);
self.setSyncMode(QuorumPeer.SyncMode.NONE);
zk.startup();

而 F 的 zkServer ,同样会开启一条事务处理的责任链:

protected void setupRequestProcessors() {
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
    commitProcessor.start();
    firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
    ((FollowerRequestProcessor) firstProcessor).start();
    syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor(getFollower()));
    syncProcessor.start();
}

通过代码可以看到,F 的责任链如下:

  • FollowerRequestProcessor –> CommitProcessor –> FinalRequestProcessor

  • SyncRequestProcessor –> SendAckRequestProcessor

通过这些责任链,将一条事务的处理变得分工明确。

3. Broadcast

syncWithLeader 执行完毕,也就是同步完成后, F 就进入到广播阶段。

如果该 F 配置了 observerMasterPort ,这里还会启动一个 ObserverMaster 线程,用来与 observer 事务同步

if (self.getObserverMasterPort() > 0) {
    om = new ObserverMaster(self, fzk, self.getObserverMasterPort());
    om.start();
} 
while (this.isRunning()) {
    readPacket(qp);
    processPacket(qp);
}

这里同样,通过 while ,保持与 leader 之间的联系

一条写事务

我们已经知道,ServerCnxnFactory (我们没有配置 netty ,这里指 NIOServerCnxnFactory),负责整个生命周期与 client 的连接。先看下 NIOServerCnxnFactory 的类图:

NIOServerCnxnFactory

继承自 ServerCnxnFactory ,内部主要有:

  • IOWorkRequest:负责处理 IO 的读写( m 个,可配置,默认 核数*2 )
  • ConnectionExpirerThread:负责清理与客户端的 异常/失效/过期 连接 (1 个线程)
  • AcceptThread:负责接收来自 client 的连接并分配给 selectorThread (1 个线程)
  • SelectorThread:负责与 client 的连接处理( n 个,可配置,或者取 核数/2 的平方根)

当客户端请求过来后,由 selectorThread 交由 IOWorkRequest(事实上,IOWorkRequest 线程池处理) 的 doWork 处理,doWork 处理后,交由 NIOServerCnxn 的 doIO 处理,该类真正负责与 client 的通信。若是接收 client 的信息,则交由该类的 readPayload 处理,它简单处理后,再给 readRequest(or readConnectRequest) 方法:

private void readRequest() throws IOException {
    zkServer.processPacket(this, incomingBuffer);
}

这个方法很简单,就i是把这个 request 交给 zkServer 处理,最终会添加到:

LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>();

而这个属性,会由该类,也就是 RequestThrottler 线程的 run 方法处理,通过 take() 方法,阻塞,直到有消息过来。

最终会交由 ZooKeeperServer 来处理:

// org.apache.zookeeper.server.ZooKeeperServer#submitRequestNow
public void submitRequestNow(Request si) {
    try {
        // 这里可以看到,这个消息交给了 Follower 的责任链处理
        firstProcessor.processRequest(si);
    } catch (MissingSessionException e) {
        requestFinished(si);
    } catch (RequestProcessorException e) {
        requestFinished(si);
    }
}

到了这里,我们基本能理清,从 client 发出一个消息,到最终这个消息被处理的流程

zk

注释

[1]

cnxAcceptor = new LearnerCnxAcceptor();
cnxAcceptor.start();
long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());

我们来说下这三行。

前两行用于开启线程,接受 followers 的连接。接受连接是干嘛的?

我们知道, 新的皇帝会有新的年号,zk 中新的 leader 也会更新自己的 epoch 。

假设集群中有一个与世隔绝的 C follower ,一直没有获取到 leader 的 epoch 。而之后这个 leader 挂掉了,恰巧这个 C 当选为新的 leader ,它要产生自己的 epoch ,怎么办呢?直接在自己的 epoch 上加一?

那肯定不行!你不能确定这个 epoch 有没有被其他 leader 用过!

zk 通过上面三行,还获取 ensemble 中最新的 epoch,也就是当前 ensemble 中最大的 epoch ,之后加一后用作新的 epoch 。

这里整体的处理,在 zk 中,这种处理方式很常见。

如果是我写,肯定会先去等待所有的连接,超过半数后,统计所有的 epoch ,然后和自己的 epoch 一起比较,获取最大的 epoch,加一,成为新 leader 的 epoch 。

嗯,十分凡夫俗子的想法。

我们看看这帮一个方法写了几百行的家伙是怎么做的。

  1. 先通过 LearnerCnxAcceptor 这个类,去开启 learnerHandler ,这个是用来处理其他 server 的连接的,然后在这个 handler 里获取最新的 epoch,嗯哼,epoch 可是还没产生哦!

    long zxid = qp.getZxid();
    long newEpoch = learnerMaster.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
    long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);
  2. Leader 主线程里,也通过 getEpochToPropose 获取最新的 epoch

  3. 所以关键就在这个 getEpochToPropose 这里了。

    // org.apache.zookeeper.server.quorum.Leader#getEpochToPropose
    public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException {
        // 加锁,这是个 set 集合,用来存储所有连接的 server myid
        synchronized (connectingFollowers) {
            // 标记位,初始为 true
            if (!waitingForNewEpoch) {
                return epoch;
            }
            // 这里处理新的 epoch ,只要你的 epoch 比我大,你就能取代我
            if (lastAcceptedEpoch >= epoch) {
                epoch = lastAcceptedEpoch + 1;
            }
            // 校验 server 是否是合法的投票团成员
            // 这里的顺序表明,你可以不是投票方的成员,只要你传过来的 epoch 比我大就好
            // 为什么是这样呢? 我理解为根据 zab 协议,这里最高的 epoch 必然从合法投票团成员中产生
            // 所以这里的处理方式是你大你有理
            if (isParticipant(sid)) {
                connectingFollowers.add(sid);
            }
            QuorumVerifier verifier = self.getQuorumVerifier();
            // 获得了超过半数的连接,拿到最新的 epoch,设置 flag 为 false
            // 同时通知所有等待的线程
            if (connectingFollowers.contains(self.getId()) && verifier.containsQuorum(connectingFollowers)) {
                waitingForNewEpoch = false;
                self.setAcceptedEpoch(epoch);
                connectingFollowers.notifyAll();
            } else {
                long start = Time.currentElapsedTime();
                if (sid == self.getId()) {
                    timeStartWaitForEpoch = start;
                }
                long cur = start;
                long end = start + self.getInitLimit() * self.getTickTime();
                // 这里通过 while + wait 实现
                // 即所有访问这个方法的线程,在没有获得半数连接的时候,最终都会在这里去等待
                while (waitingForNewEpoch && cur < end && !quitWaitForEpoch) {
                    connectingFollowers.wait(end - cur);
                    cur = Time.currentElapsedTime();
                }
                if (waitingForNewEpoch) {
                    throw new InterruptedException("Timeout while waiting for epoch from quorum");
                }
            }
            return epoch;
        }
    }

我在上面的代码上加了一点注释,下面再仔细说下。

我们知道,java 里 waitnotifyAllObject 里的两个方法。对于长期写业务代码的我而言,几乎没怎么用过。今天也是看到这里后,才知道原来还可以这么玩。

java.lang.Object#wait(long, int) 的文档上,说:

This method causes the current thread (referred to here as T) to place itself in the wait set for this object and then to relinquish any and all synchronization claims on this object. Note that only the locks on this object are relinquished; any other objects on which the current thread may be synchronized remain locked while the thread waits.

意思是说,这个方法,会让当前的线程放到锁对象的等待列表中,对于上面的代码,就是所有访问的线程都被放到了 connectingFollowers 这个对象的等待列表里。

但是这个方法加了同步锁,一个线程过来了,其他线程还怎么访问呢?

wait 的 doc 说,同时会放弃且只放弃所有该对象上的同步声明(synchronization claims),也就是之前获得的这个对象(这里指 connectingFollowers) 上的锁,此时,其他线程可以再次竞争。

直到有线程 notify 或者 notifyAll 或者 interrupted 或者超时。

所以这里,我就能理解了。

所有 server 的连接,包括自己,都会在这里去等待超过半数的连接,达到标准后,会被唤醒,然后各干各的事情。

[2]

我们逐步点击这个方法,会发现它有调用两个方法:

setupRequestProcessors();
startRequestThrottler();

第一个方法就是用来开启责任链,负责 leader 期间所有事物的处理。

第二个方法相当于一个阀门,用来约束请求数量。

我们把重点放到第一个方法上,看看它做了什么:

protected void setupRequestProcessors() {
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
    commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener());
    commitProcessor.start();
    ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor);
   // 这里注意, initialize 方法里,会初始化另外一条 syncRequestProcessor
    proposalProcessor.initialize();
    prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
    prepRequestProcessor.start();
    firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);

    setupContainerManager();
}

这里能很清楚的看到整个 leader 处理请求的链条:

LeaderRequestProcessor –> PrepRequestProcessor –> ProposalRequestProcessor –> CommitProcessor –> ToBeAppliedRequestProcessor –> FinalRequestProcessor

leader 同时还有一条责任链,负责记录事务到磁盘:

SyncRequestProcessor –> AckRequestProcessor

至于每个 processer 负责什么事情,大家可以去代码里瞧一瞧看一看,相信会有收获,这里暂时不再赘述。


文章作者: peifeng
版权声明: 本博客所有文章除特別声明外,均采用 CC BY-NC-ND 4.0 许可协议。转载请注明来源 peifeng !
 上一篇
西游亦西行 -- 藏区游记 西游亦西行 -- 藏区游记
不知不觉,进藏已三日有余。 记得飞机降落在贡嘎机场,一直当心的高反没有发生后,是雀跃的。只是随后的一系列不适,让我记忆犹新。 当晚到达酒店后,没有洗澡(进藏后不要轻易洗澡),简单洗漱后直接入睡。然而伴随而来的是一两个小时就醒来一次的简短睡眠
2020-04-09
下一篇 
Part 3 -- ZooKeeper FastLeaderElection Part 3 -- ZooKeeper FastLeaderElection
我们都知道 ZooKeeper 要么在选举,要么在选举的路上。那么 ZooKeeper 的选举,究竟是怎么选举的?用的什么选举算法?本文将带着这些疑问一探究竟。
2020-04-04
  目录