Raft一致性协议的投票选举

摘要:
在上一篇文章中,作者谈到了Raft协议,这是一种更容易理解和实现的一致性协议:Raft一致性协议和ApacheRatis。在Raft协议中,引入了领导者、追随者和候选人三个角色来进行一致性控制中的投票过程。在开源项目ApacheRatis中,该协议有一个完整的版本实现。如果候选人获得总票数的一半以上,该候选人可以当选为领导人。此时,追随者会认为领袖服务异常,然后会发起新的领袖选举,并将其状态从追随者切换为候选人。

文章目录

前言

每当我们谈到对于分布式系统容错性的时候,我们其实真正想聊的是里面的状态一致性的控制。而当我们再想深入探讨一致性控制的时候,我们想到的可能是经典的Paxos协议。但是Paxos协议实现起来较为复杂,而且原理本身也不易理解。笔者在之前文章中聊过一个更易理解和实现的一致性协议Raft协议:聊聊Raft一致性协议以及Apache Ratis。在Raft协议中,引入了Leader、Follower以及Candidate三种角色来做一致性控制中的投票选举过程。本文笔者来聊聊里Raft协议中最为关键的投票选举过程以及它的一个实现版本。在开源项目Apache Ratis中,对此协议有完整的版本实现。

Raft协议的投票选举原理

在Raft协议的投票过程中,它是由1个Candidate候选者向其它Follower发送投票请求,让这些Follower投票选择Candidate。然后这些Follower将会返回给Candidate。如果这个Candidate收到超过半数以上的总票数的时候,那么此Candidate就可以当选为Leader身份了。

投票过程如下图所示:
在这里插入图片描述
投票请求结果返回
在这里插入图片描述
上述投票选举过程在现实情况中其实还有很多种边缘情况,比如同时有另外一个Candidate在投票选举时怎么办?当发起此轮投票选举时,发现已经有新的投票Leader选举已被选出来了,怎么处理呢。

下面我们来看看这些边缘情况Raft协议中是怎么处理的。

Raft协议的投票选举细节及代码实现

这里我们要从Leader选举的源头开始讲起。当系统启动好之后,初始选举后系统由1个Leader和若干个Follower角色组成。然后突然由于某个异常原因,Leader服务出现了异常,导致Follower角色检测到和Leader的上次RPC更新时间超过给定阈值时间时。此时Follower会认为Leader服务已出现异常,然后它将会发起一次新的Leader选举行为,同时将自身的状态从Follower切换为Candidate身份。随后请求其它Follower投票选择自己。

这里笔者结合Apache Ratis中对于Raft的实现来展开阐述,上述超时时间代码如下:

  public  void run() {
    long sleepDeviationThresholdMs = server.getSleepDeviationThresholdMs();
    while (monitorRunning && server.isFollower()) {
      ...
        synchronized (server) {
          // 如果当前Follower检测到上次RPC时间超过规定阈值,则开始将自身切为候选者身份
          if (outstandingOp.get() == 0 && lastRpcTime.elapsedTimeMs() >= electionTimeout) {
            LOG.info("{}:{} changes to CANDIDATE, lastRpcTime:{}, electionTimeout:{}ms",
                server.getId(), server.getGroupId(), lastRpcTime.elapsedTimeMs(), electionTimeout);
            // election timeout, should become a candidate
            server.changeToCandidate();
            break;
          }
        }
      ...
    }
  }

然后切换为Candidate的服务发起新的领导选举,

  synchronized void changeToCandidate() {
    Preconditions.assertTrue(isFollower());
    role.shutdownFollowerState();
    setRole(RaftPeerRole.CANDIDATE, "changeToCandidate");
    if (state.shouldNotifyExtendedNoLeader()) {
      stateMachine.notifyExtendedNoLeader(getRoleInfoProto());
    }
    // 然后此Candidate发起新的领导选举
    role.startLeaderElection(this);
  }

接着我们需将当前Leader选举的轮次更新,通俗地解释可理解为第几届选举了,这个值在Candidate每次发起选举时会递增。选举轮次编号信息是一个十分重要的信息,这可以避免那些消息落后的Candidate发起滞后的领导选举过程,而获取最新的Leader信息。

private void askForVotes() throws InterruptedException, IOException {
    final ServerState state = server.getState();
    while (shouldRun()) {
      // one round of requestVotes
      final long electionTerm;
      final RaftConfiguration conf;
      synchronized (server) {
    	// (1).初始化当前选举轮次编号,比当前的轮次递增1
        electionTerm = state.initElection();
        conf = state.getRaftConf();
        state.persistMetadata();
      }
      ...

      final ResultAndTerm r;
      // (2).获取除自身外其他Follow服务的Service信息
      final Collection<RaftPeer> others = conf.getOtherPeers(server.getId());
      if (others.isEmpty()) {
        r = new ResultAndTerm(Result.PASSED, electionTerm);
      } else {
        final Executor voteExecutor = new Executor(this, others.size());
        try {
          // (3).发起投票过程
          final int submitted = submitRequests(electionTerm, lastEntry, others, voteExecutor);
          // (4).等待投票结果返回
          r = waitForResults(electionTerm, submitted, conf, voteExecutor);
        } finally {
          voteExecutor.shutdown();
        }
      }

下面我们来进入submitRequests的实际子过程,看看Follower在接收到投票请求时,是如何处理的。

  private RequestVoteReplyProto requestVote(
      RaftPeerId candidateId, RaftGroupId candidateGroupId,
      long candidateTerm, TermIndex candidateLastEntry) throws IOException {
    ...
    synchronized (this) {
      final FollowerState fs = role.getFollowerState().orElse(null);
      // (1)Follower判断发起的领导轮次编号是否落后于当前的轮次
      if (shouldWithholdVotes(candidateTerm)) {
        LOG.info("{}-{}: Withhold vote from candidate {} with term {}. State: leader={}, term={}, lastRpcElapsed={}",
            getMemberId(), role, candidateId, candidateTerm, state.getLeaderId(), state.getCurrentTerm(),
            fs != null? fs.getLastRpcTime().elapsedTimeMs() + "ms": null);
      } else if (state.recognizeCandidate(candidateId, candidateTerm)) {
    	// (2)轮次编号,CandidateId有效,当前Follower投票给请求方的Candidate
        final boolean termUpdated = changeToFollower(candidateTerm, true, "recognizeCandidate:" + candidateId);
        // see Section 5.4.1 Election restriction
        if (state.isLogUpToDate(candidateLastEntry) && fs != null) {
          fs.updateLastRpcTime(FollowerState.UpdateType.REQUEST_VOTE);
          // (3)记下当前Follower投票的CandidateId,用来表明此Follower的投票归属
          // 避免发生二次投票
          state.grantVote(candidateId);
          voteGranted = true;
        }
        if (termUpdated || voteGranted) {
          state.persistMetadata(); // sync metafile
        }
      }
      ...
    }
    return reply;
  }

这里我们着重再来看state.recognizeCandidate操作,它在Follower作出投票选举前做了哪些关键的验证操作呢。

  /**
   * 判断当前的轮次编号,给定的Candidate身份是否是可接受的
   */
  boolean recognizeCandidate(RaftPeerId candidateId, long candidateTerm) {
    if (!getRaftConf().containsInConf(candidateId)) {
      return false;
    }
    final long current = currentTerm.get();
    // 如果当前投票选举编号轮次是新的领导选举轮次,则是可接受的
    if (candidateTerm > current) {
      return true;
    } else if (candidateTerm == current) {
      // 如果不是,意为此选举轮次中,有多个Candidate发起了领导选择
      // 此时判断当前Follower是否已经投出过票:
      //   1)没有投出过票,Candidate可接受
      //   2) 投出过票,但是投出的票不是给定Candidate,则不接受
      return votedFor == null || votedFor.equals(candidateId);
    }
    return false;
  }

从上面处理中,我们可以注意到这里面是会存在多个Candidate同时发生新轮次的领导选举过程中,这个时候就还得进行投票归属信息的判断,避免Follower投出进行二次投票过程。当然在这个过程中,我们也要保证投票相关变量的更新务必是原子更新的。

随后Follower将投票结果返回给发起请求的Candidate,然后Candidate进行了以下的处理过程。

  private ResultAndTerm waitForResults(final long electionTerm, final int submitted,
      RaftConfiguration conf, Executor voteExecutor) throws InterruptedException {
    final Timestamp timeout = Timestamp.currentTime().addTimeMs(server.getRandomTimeoutMs());
    final Map<RaftPeerId, RequestVoteReplyProto> responses = new HashMap<>();
    final List<Exception> exceptions = new ArrayList<>();
    int waitForNum = submitted;
    Collection<RaftPeerId> votedPeers = new ArrayList<>();
    while (waitForNum > 0 && shouldRun(electionTerm)) {
     ...

      try {
    	// (1)从投票线程池中拿出投票结果
        final Future<RequestVoteReplyProto> future = voteExecutor.poll(waitTime);
        if (future == null) {
          continue; // poll timeout, continue to return Result.TIMEOUT
        }

        final RequestVoteReplyProto r = future.get();
        final RaftPeerId replierId = RaftPeerId.valueOf(r.getServerReply().getReplyId());
        final RequestVoteReplyProto previous = responses.putIfAbsent(replierId, r);
        if (previous != null) {
          LOG.warn("{} received duplicated replies from {}, the 2nd reply is ignored: 1st = {}, 2nd = {}",
              server.getId(), replierId, ServerProtoUtils.toString(previous), ServerProtoUtils.toString(r));
          continue;
        }
        if (r.getShouldShutdown()) {
          return logAndReturn(Result.SHUTDOWN, responses, exceptions, -1);
        }
        // (2)如果发现当前Follower的领导选举已经是新的轮次的话,则返回新的轮次信息
        if (r.getTerm() > electionTerm) {
          return logAndReturn(Result.DISCOVERED_A_NEW_TERM, responses,
              exceptions, r.getTerm());
        }
        // (3)接受到成功的返回结果,加入到投票结果列表中
        if (r.getServerReply().getSuccess()) {
          votedPeers.add(replierId);
          // 如果投票总数超过半数以上时,则表明当前领导选举通过
          if (conf.hasMajority(votedPeers, server.getId())) {
            return logAndReturn(Result.PASSED, responses, exceptions, -1);
          }
        }
      } catch(ExecutionException e) {
        LogUtils.infoOrTrace(LOG, () -> this + " got exception when requesting votes", e);
        exceptions.add(e);
      }
      waitForNum--;
    }
    // (4)规定时间内没有获取到足够多的票数,则当前领导选举竞选失败
    return logAndReturn(Result.REJECTED, responses, exceptions, -1);
  }

Candidate对于上述最终结果的处理过程如下:

  private void askForVotes() throws InterruptedException, IOException {
    final ServerState state = server.getState();
    while (shouldRun()) {
      ...
        try {
          // (3).发起投票过程
          final int submitted = submitRequests(electionTerm, lastEntry, others, voteExecutor);
          // (4).等待投票结果返回
          r = waitForResults(electionTerm, submitted, conf, voteExecutor);
        } finally {
          voteExecutor.shutdown();
        }
      }

      synchronized (server) {
        if (!shouldRun(electionTerm)) {
          return; // term already passed or this should not run anymore.
        }
 
        // (5)对选举结果的处理
        switch (r.result) {
          case PASSED:
        	// 选举通过,则切换当前身份为Leader
            server.changeToLeader();
            return;
          case SHUTDOWN:
            LOG.info("{} received shutdown response when requesting votes.", this);
            server.getProxy().close();
            return;
          // 如果选举失败或发现更新一轮的选举轮次(说明别的Candidate发起的领导选举已经成功)
          // 则进行相应信息更新
          case REJECTED:
          case DISCOVERED_A_NEW_TERM:
            final long term = Math.max(r.term, state.getCurrentTerm());
            server.changeToFollowerAndPersistMetadata(term, Result.DISCOVERED_A_NEW_TERM);
            return;
          case TIMEOUT:
            // should start another election
        }
      }
    }
  }

以上就是Apache Ratis内部基于Raft协议的投票过程的代码实现过程。在这里,为了避免可能存在多个Candidate几乎同时发生投票,导致结果完全一致从而需要进行下一轮次的选举。这里会进行随机时间间隔的设置,来错开投票的发起时间。

在Apache Ratis中,也有此细节的实现:

  public  void run() {
    long sleepDeviationThresholdMs = server.getSleepDeviationThresholdMs();
    while (monitorRunning && server.isFollower()) {
    // 随机时间的设置,避免完全同时投票选举过程发生
    final long electionTimeout = server.getRandomTimeoutMs();
      try {
        if (!JavaUtils.sleep(electionTimeout, sleepDeviationThresholdMs)) {
          continue;
        }
        ...
        synchronized (server) {
          if (outstandingOp.get() == 0 && lastRpcTime.elapsedTimeMs() >= electionTimeout) {
            // ...
            server.changeToCandidate();
            break;
          }
        }
      ...
    }
  }

这个corner case是可能存在的,比如A, B, C, D四个服务,A和B要竞选Leader身份,当按照下述选举过程时,就会出现平票结果:

1)每个Candidate会向非自身服务发起投票选举,但是如若自身也收到投票选举时,默认只选自己,不会投向其它Candidate
2)Candidate A向B, C,D发起投票选举,B也是Candidate,它只会投给自己,同时C投票给A。接着D投给了B。这样就出现了平票2对2的情况了。

当然上述情况主要多加1个服务,就不会出现平票的情况了,

最后附上Raft投票选举过程图,大家可以对照上述的子过程实现,进行对比,学习。总体来讲,投票实现过程还是比较易于理解的。

在这里插入图片描述

引用

[1].https://raft.github.io/raft.pdf
[2].http://ratis.incubator.apache.org/
[3].https://raft.github.io/
[4].https://blog.csdn.net/Androidlushangderen/article/details/86763412

免责声明:文章转载自《Raft一致性协议的投票选举》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇聊聊分布式存储系统的Decommission和Maintenance模式基于状态机方法构建高容错性服务下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

etc 安装及使用

键值存储仓库,用于配置共享和服务发现。 A highly-available key value store for shared configuration and service discovery. 基本介绍 服务发现 要解决服务发现的问题,需要有下面三大支柱,缺一不可。 一个强一致性、高可用的服务存储目录。基于Raft算法的etcd天生就是这样一个...

分布式一致性协议Raft原理与实例

分布式一致性协议Raft原理与实例 1.Raft协议 1.1 Raft简介 Raft是由Stanford提出的一种更易理解的一致性算法,意在取代目前广为使用的Paxos算法。目前,在各种主流语言中都有了一些开源实现,比如本文中将使用的基于JGroups的Raft协议实现。关于Raft的原理,强烈推荐动画版Raft讲解。 1.2 Raft原...

分布式系统的Raft算法

好东西~~ 英文动画演示Raft 过去, Paxos一直是分布式协议的标准,但是Paxos难于理解,更难以实现,Google的分布式锁系统Chubby作为Paxos实现曾经遭遇到很多坑。   来自Stanford的新的分布式协议研究称为Raft,它是一个为真实世界应用建立的协议,主要注重协议的落地性和可理解性。   在了解Raft之前,我们先了解Consen...

架构之微服务(etcd)

1. ETCD是什么 ETCD是用于共享配置和服务发现的分布式,一致性的KV存储系统。该项目目前最新稳定版本为2.3.0. 具体信息请参考[项目首页]和[Github]。ETCD是CoreOS公司发起的一个开源项目,授权协议为Apache。 提供配置共享和服务发现的系统比较多,其中最为大家熟知的是[Zookeeper](后文简称ZK),而ETCD可以算得上...

Zookeeper快速领导者选举原理

  Zookeeper快速领导者选举原理 本文略长,更适合在电脑端观看,可以收藏或直接关注微信公众号:1点25 人类选举的基本原理 正常情况下,选举是一定要投票的。 我们应该都经历过投票,在投票时我们可能会将票投给和我们关系比较好的人,如果你和几个候选人都比较熟,这种情况下你会将选票投给你认为能力比较强的人,如果你...

Hulu大规模容器调度系统Capos

Hulu是美国领先的互联网专业视频服务平台,目前在美国拥有超过2000万付费用户。Hulu总部位于美国洛杉矶,北京办公室是仅次于总部的第二大研发中心,也是从Hulu成立伊始就具有重要战略地位的分支办公室,独立负责播放器开发,搜索和推荐,广告精准投放,大规模用户数据处理,视频内容基因分析,人脸识别,视频编解码等核心项目。 在视频领域我们有大量的视频转...