https://mp.weixin.qq.com/s/zgLcBWuVzFsKkNngKW85Zw
通常,Raft是作为引入到某些服务中的对象实现的。由于我们不在这里开发服务,而是仅研究Raft本身,因此我创建了一个简单的 Server 类型,该类型包裹 ConsensusModule 类型以尽可能地隔离代码中更感兴趣的部分
共识模块(CM)实现了Raft算法的核心,位于 raft.go 文件中。它完全从与集群中其他副本的网络和连接的细节中抽象出来。ConsensusModule中与网络相关的唯一字段是:
// CM中的服务器ID
id int
// 集群中节点的ID列表
peerIds []int
// 包含CM的服务器,处理节点间的RPC通信
server *Server
在实现中,每个Raft副本将群集中的其他副本称为”端点“。集群中的每个端点都有唯一的数字ID,以及所有端点的ID列表。server 字段是指向包含 Server 的指针(在server.go中实现),使 ConsensusModule 可以将消息发送给端点。稍后我们将看到它是如何实现的。
这样设计的目标是将所有网络细节都排除掉,把重点放在Raft算法上。通常,要将Raft论文映射到此实现上,只需要 ConsensusModule 类型及其方法。服务器代码是一个相当简单的Go网络框架,有一些小的复杂之处来支持严格的测试。在本系列文章中,我们不会花时间,但是如果有不清楚的地方,可以留言提问。
2
Raft服务器状态
从总体上讲,Raft CM是一个具有3种状态的状态机:
这可能有点迷惑,因为上一部分花费了大量时间来解释Raft如何帮助实现状态机。通常情况下,状态一词在这里是重载的。Raft是用于实现任意复制状态机的算法,但它内部也有一个小型状态机。后面,该状态意味着从上下文中可以清楚地确定,不明确的地方我们也会指出该状态。
在一个典型的稳定状态场景中,群集中的一台服务器是领导者,而其他所有服务器都是跟随者。尽管我们不希望出问题,但Raft的目标就是容错,因此我们将花费大部分时间来讨论非典型场景,失败情况,某些服务器崩溃,其他服务器断开连接等。
正如前面所说,Raft使用了一个强大的领导模型。领导者响应客户端的请求,将新条目添加到日志中,并将其复制到跟随者。万一领导者失败或停止响应,每个跟随者都随时准备接管领导权。这是图中从“跟随者”到“候选者”的“响应超时,开始选举”的过渡。
3
任期
就像常规选举一样,在Raft中也有任期。任期是某个服务器担任领导者的时间段。新的选举触发一个新的任期,并且Raft算法可确保给定的任期只有一个领导者。
但是,这个类别有点牵强,因为Raft选举与真实选举有很大差别。在Raft中,选举更加合作;候选者的目标不是赢得选举,而是在任何一个特定的任期内有合适的候选者赢得选举。我们稍后将详细讨论“合适”的含义。
4
选举计时器
Raft算法的关键组成部分是选举计时器。 这是每个跟随者连续运行的计时器,每次收到当前领导者的消息就会重新启动它。领导者发送周期性的心跳,因此当这些心跳停止到达时,跟随者会认为领导者已经崩溃或断开连接,并开始选举(切换到候选状态)。
问:不是所有的跟随者都会同时成为候选人?
选举计时器是随机的,这是Raft简单化的关键之一。Raft使用此随机方法来降低多个关注者同时进行选举的机会。但是,即使他们确实同时成为候选人,在任何给定的任期中也只有一个当选为领导者。在极少数情况下,如果投票分裂,以致没有候选人能赢得选举,将进行新的选举(有新任期)。从理论上讲,永久地重新进行选举是可行的,但在每一轮选举中发生这种情况的可能性都大大降低。
问:如果跟随者与集群断开连接(分区)怎么办?它不会因为没有听到领导者的声音而开始选举吗?
答:这是网络分区的隐患,因为跟随者无法区分谁被分区。它确实将开始选举。但是,如果是这个跟随者被断开,那么这次选举将无济于事-因为它无法与其他端点联系,所以不会获得任何投票。它可能会继续保持候选状态(每隔一段时间重新启动一次新选举),直到重新连接到集群。稍后我们将更详细地研究这种情况。
5
对等RPC
Raft有两种RPC在端点之间互相发送。有关这些RPC的详细参数和规则,参见图2。简要讨论它们的目标:
RequestVotes(RV):仅在候选状态下使用;候选人使用它来请求选举中的端点投票。答复中包含是否批准投票的指示。
AppendEntries(AE):仅在领导者状态下使用;领导者使用此RPC将日志条目复制到跟随者,也发送心跳。即使没有新的日志条目要复制,该RPC也会定期发送给每个跟随者。
从以上内容可以推断出跟随者没有发送任何RPC。这是对的;跟随者不会向其他端点发起RPC,但是他们在后台运行选举计时器。如果在没有当前领导者的通信的情况下经过了此计时器,则跟随者将成为候选者并开始发送RV。
6
实现选举计时器
现在开始深入研究代码。以下代码文件会在文章末尾给出。关于 ConsensusModule 结构的字段的完整列表,可以在代码文件中查看。
我们的 CM 通过在 goroutine 中运行以下功能来实现选举计时器:
func (cm *ConsensusModule) runElectionTimer() {
timeoutDuration := cm.electionTimeout()
cm.mu.Lock()
termStarted := cm.currentTerm
cm.mu.Unlock()
cm.dlog(“election timer started (%v), term=%d”, timeoutDuration, termStarted)
// This loops until either:
// - we discover the election timer is no longer needed, or
// - the election timer expires and this CM becomes a candidate
// In a follower, this typically keeps running in the background for the
// duration of the CM’s lifetime.
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
for {
<-ticker.C
cm.mu.Lock()
if cm.state != Candidate && cm.state != Follower {
cm.dlog("in election timer state=%s, bailing out", cm.state)
cm.mu.Unlock()
return
}
if termStarted != cm.currentTerm {
cm.dlog("in election timer term changed from %d to %d, bailing out", termStarted, cm.currentTerm)
cm.mu.Unlock()
return
}
// Start an election if we haven't heard from a leader or haven't voted for
// someone for the duration of the timeout.
if elapsed := time.Since(cm.electionResetEvent); elapsed >= timeoutDuration {
cm.startElection()
cm.mu.Unlock()
return
}
cm.mu.Unlock() } } 首先通过调用 cm.electionTimeout 选择一个伪随机的选举超时时间。正如论文中的建议,我们在这里使用的范围是150到300毫秒。和 ConsensusModule 的大多数方法一样,runElectionTimer 在访问字段时会锁定结构。这是必须要做的,因为实现尝试尽可能地保持同步,这也是Go的优势之一。这意味着顺序代码是...顺序执行的,并且不会拆分为多个事件处理程序。但是,RPC仍然同时发生,因此我们必须保护共享数据结构。我们很快就会讲到RPC处理程序。
这个方法的主循环运行一个10毫秒的代码。有更有效的方法来等待事件,但是这种习惯用法代码最为简单。每次循环迭代都在10毫秒之后进行。从理论上讲,这可以使整个选举超时,但随后响应速度会变慢,并且在日志中进行调试/跟踪会更加困难。我们检查状态是否仍然如预期且任期未更改。如果有任何关闭,我们终止选举计时器。
如果自上次“选举重置事件”以来已经过去了足够的时间,则此端点开始选举并成为候选人。这是什么选举重置事件?可以终止选举的任何因素-例如,收到有效的心跳,或给另一个候选人投票。
7
成为候选人
上面可以看到,一旦经过足够的时间而没有跟随者收到领导者或其他候选人的消息,它将开始选举。在查看代码之前,我们考虑一下进行选举所需的事情:
将状态切换为候选项,并增加条件项,因为这是算法为每次选举指定的条件。
将RV RPC发送给所有端点,要求他们在这次选举中为我们投票。
等待对这些RPC的答复,然后计数是否获得足够的选票成为领导者。
在Go中,所有这些逻辑都可以在一个函数中实现:
func (cm *ConsensusModule) startElection() {
cm.state = Candidate
cm.currentTerm += 1
savedCurrentTerm := cm.currentTerm
cm.electionResetEvent = time.Now()
cm.votedFor = cm.id
cm.dlog(“becomes Candidate (currentTerm=%d); log=%v”, savedCurrentTerm, cm.log)
var votesReceived int32 = 1
// Send RequestVote RPCs to all other servers concurrently.
for _, peerId := range cm.peerIds {
go func(peerId int) {
args := RequestVoteArgs{
Term: savedCurrentTerm,
CandidateId: cm.id,
}
var reply RequestVoteReply
cm.dlog("sending RequestVote to %d: %+v", peerId, args)
if err := cm.server.Call(peerId, "ConsensusModule.RequestVote", args, &reply); err == nil {
cm.mu.Lock()
defer cm.mu.Unlock()
cm.dlog("received RequestVoteReply %+v", reply)
if cm.state != Candidate {
cm.dlog("while waiting for reply, state = %v", cm.state)
return
}
if reply.Term > savedCurrentTerm {
cm.dlog("term out of date in RequestVoteReply")
cm.becomeFollower(reply.Term)
return
} else if reply.Term == savedCurrentTerm {
if reply.VoteGranted {
votes := int(atomic.AddInt32(&votesReceived, 1))
if votes*2 > len(cm.peerIds)+1 {
// Won the election!
cm.dlog("wins election with %d votes", votes)
cm.startLeader()
return
}
}
}
}
}(peerId) }
// Run another election timer, in case this election is not successful.
go cm.runElectionTimer()
}
候选人首先为自己投票-将 voiceReceived 初始化为1并设置 cm.votedFor =cm.id。
然后,它与所有其他端点并行发出RPC。每个RPC都在自己的goroutine中完成,因为我们的RPC调用是同步的-它们会阻塞直到收到响应为止,这可能需要一段时间。
rpc实现:
cm.server.Call(peer, “ConsensusModule.RequestVote”, args, &reply)
我们使用 ConsensusModule.server 字段中包含的Server指针发出远程调用,使用 ConsensusModule.RequestVotes 作为远程方法名称。最终调用第一个参数中给出的端点的RequestVote方法。
如果RPC成功,已经过了一段时间,因此我们必须检查状态,看看有哪些选项。如果我们的状态不再是候选人,就退出。什么时候会发生这种情况?例如,我们可能赢得了选举,因为其他RPC调用中有足够的选票。或者收到其他RPC调用中的一个具有更高的任期,所以我们切换成跟随者。重要的是,在网络不稳定的情况下,RPC可能需要很长时间才能到达-当我们收到答复时,其余代码可能会继续进行,因此在这种情况下妥协放弃非常重要。
如果响应返回时我们仍然是候选人,我们将检查响应的任期,并将其与发送请求时的原始任期进行比较。如果答复的任期较高,我们将恢复为跟随者状态。例如,如果其他候选人在我们收集选票时赢得了选举,就会发生这种情况。
如果该任期与我们发出的任期相同,请检查是否已投票。我们使用一个原子投票变量来安全地从多个goroutine中收集投票。如果此服务器拥有多数表决权(包括它自己授予的表决权),它将成为领导者。
请注意,startElection方法不会阻塞。它更新一些状态,启动一堆goroutines并返回。因此,它还应该在goroutine中启动一个新的选举计数器-在最后一行进行。这样可以确保如果这次选举没有任何用处,则超时后将开始新的选举。这也解释了runElectionTimer中的状态检查:如果此选举确实将端点转变为领导者,则并发运行的runElecionTimer将在观察它不希望进入的状态时才返回。
8
成为领导者
当投票记录显示该端点获胜时,我们已经在startElection中看到了startLeader调用。
func (cm *ConsensusModule) startLeader() {
cm.state = Leader
cm.dlog(“becomes Leader; term=%d, log=%v”, cm.currentTerm, cm.log)
go func() {
ticker := time.NewTicker(50 * time.Millisecond)
defer ticker.Stop()
// Send periodic heartbeats, as long as still leader.
for {
cm.leaderSendHeartbeats()
<-ticker.C
cm.mu.Lock()
if cm.state != Leader {
cm.mu.Unlock()
return
}
cm.mu.Unlock()
} }() } 这实际上是一个相当简单的方法:所有操作都是运行心跳计时器-一个goroutine,只要此CM仍然是领导者,它将每50毫秒调用一次 leaderSendHeartbeats。
func (cm *ConsensusModule) leaderSendHeartbeats() {
cm.mu.Lock()
savedCurrentTerm := cm.currentTerm
cm.mu.Unlock()
for _, peerId := range cm.peerIds {
args := AppendEntriesArgs{
Term: savedCurrentTerm,
LeaderId: cm.id,
}
go func(peerId int) {
cm.dlog(“sending AppendEntries to %v: ni=%d, args=%+v”, peerId, 0, args)
var reply AppendEntriesReply
if err := cm.server.Call(peerId, “ConsensusModule.AppendEntries”, args, &reply); err == nil {
cm.mu.Lock()
defer cm.mu.Unlock()
if reply.Term > savedCurrentTerm {
cm.dlog(“term out of date in heartbeat reply”)
cm.becomeFollower(reply.Term)
return
}
}
}(peerId)
}
}
有点类似于startElection,从某种意义上说,它为每个对等点启动了一个goroutine以发送RPC。这次RPC是没有日志内容的AppendEntries(AE),在Raft中起着心跳的作用。
与处理RV响应类似,如果RPC返回的任期高于我们的任期,则此端点切换为跟随者。
func (cm *ConsensusModule) becomeFollower(term int) {
cm.dlog(“becomes Follower with term=%d; log=%v”, term, cm.log)
cm.state = Follower
cm.currentTerm = term
cm.votedFor = -1
cm.electionResetEvent = time.Now()
go cm.runElectionTimer()
}
它将CM的状态设置为跟随者,并重置其条件和其他重要状态字段。启动一个新的选举计时器。
9
答复RPC
目前为止,我们已经实现了活动部分-初始化RPC、计时器和状态转换。在我们看到服务器方法之前,演示还不完整。 从RequestVote开始:
func (cm *ConsensusModule) RequestVote(args RequestVoteArgs, reply *RequestVoteReply) error {
cm.mu.Lock()
defer cm.mu.Unlock()
if cm.state == Dead {
return nil
}
cm.dlog(“RequestVote: %+v [currentTerm=%d, votedFor=%d]”, args, cm.currentTerm, cm.votedFor)
if args.Term > cm.currentTerm {
cm.dlog(“… term out of date in RequestVote”)
cm.becomeFollower(args.Term)
}
if cm.currentTerm == args.Term &&
(cm.votedFor == -1 || cm.votedFor == args.CandidateId) {
reply.VoteGranted = true
cm.votedFor = args.CandidateId
cm.electionResetEvent = time.Now()
} else {
reply.VoteGranted = false
}
reply.Term = cm.currentTerm
cm.dlog(“… RequestVote reply: %+v”, reply)
return nil
}
注意检查是否为“死亡”状态。我们稍后再讨论。
检查该任期是否过时并成为跟随者。如果已经是跟随者,则状态不会更改,但其他状态字段将重置。
否则,如果调用者的任期与该任期一致,而我们还没有投票给其他候选人,将进行投票。不会对较旧的RPC投票。
func (cm *ConsensusModule) AppendEntries(args AppendEntriesArgs, reply *AppendEntriesReply) error {
cm.mu.Lock()
defer cm.mu.Unlock()
if cm.state == Dead {
return nil
}
cm.dlog(“AppendEntries: %+v”, args)
if args.Term > cm.currentTerm {
cm.dlog(“… term out of date in AppendEntries”)
cm.becomeFollower(args.Term)
}
reply.Success = false
if args.Term == cm.currentTerm {
if cm.state != Follower {
cm.becomeFollower(args.Term)
}
cm.electionResetEvent = time.Now()
reply.Success = true
}
reply.Term = cm.currentTerm
cm.dlog(“AppendEntries reply: %+v”, *reply)
return nil
}
该逻辑也与图2的选择部分保持一致。需要了解的一个复杂情况是:
if cm.state != Follower {
cm.becomeFollower(args.Term)
}
问:如果此端点是领导者怎么办?为什么它成为另一个领导者的跟随者?
答:Raft在任何给定的任期内都保证只有一个领导者存在。如果仔细地遵循RequestVote的逻辑以及发送RV的startElection中的代码,将看到在集群中不能使用相同的任期存在两个领导者。对于发现其他端点赢得选举的候选人而言,这一条件很重要。
10
状态和goroutine
回顾一下CM可能处于的所有状态,并在其中运行不同的goroutine:
跟随者:将CM初始化为跟随者,并且在对beginFollower的每次调用中,一个新的goroutine开始运行runElectionTimer。注意,在短时间内一次可以运行多个。假设跟随者在较高的任期内从领导者那里获得了RV;将触发另一个beginFollower调用,该调用将启动新的计时器goroutine。但是,旧的一旦发现任期发生变化,将不做任何事情直接退出。
候选人:也同时具有选举goroutine的计时器,除此之外,还有许多goroutines发送RPC。具有与跟随者相同的保护措施,可以在新的运行程序停止运行时停止“旧的”选举程序。请记住,RPC goroutine可能需要很长时间才能完成,因此,如果他们注意到RPC调用返回时它们已过时,必须安静地退出,这一点至关重要。
领导者:领导者没有选举goroutine,但有心跳goroutine每50毫秒执行一次。
代码中还有一个附加状态-死亡状态。是为了有序地关闭CM。调用Stop会将状态设置为Dead,所有goroutine会在观察到该状态后立即退出。
使所有这些goroutine运行可能会令人担忧-如果其中一些仍在后台运行怎么办?或更糟糕的是,它们反复泄漏,其数量无边无际地增长?这就是泄漏检查的目的,并且一些测试启用了泄漏检查。
11
服务器失控和任期增加
总结以上部分,我们研究一个可能发生的特殊场景以及Raft如何应对。这个例子非常有趣并且很有启发性。在这里,我试图将其呈现为一个故事,但是您可能希望使用一张纸来跟踪不同服务器的状态。 如果您不能遵循该示例-请给我发送电子邮件-我们将很乐意对其进行修复以使其更加清晰。
设想一个有三台服务器的群集:A,B和C。假设A是领导者,起始项为1,并且该群集运行正常。A每隔50毫秒向B和C发送一次心跳AE RPC,并在几毫秒内获得快速响应;每个这样的AE都会重置B和C的 eletementResetEvent,因此它们仍然是的跟随者。
在某个时间点,由于网络路由器出现故障,服务器B从A和C中分区了。A仍每50毫秒向其发送一次AE,但是这些AE要么立即出错,要么在底层RPC引擎超时出错。A对此无能为力,但这没什么大不了的。我们还没有讨论日志复制,但是由于三台服务器中的两台还处于活动状态,因此群集具有提交客户端命令的数量。
B呢?假设当断开连接时,其选举超时设置为200毫秒。断开连接大约200毫秒后,B的runElectionTimer goroutine意识到没有收到来自领导者的选举超时消息。B无法区分谁存在,所以它将成为候选人并开始新的选举。
因此,B的任期将变为2(而A和C的项仍为1)。B会将RV RPC发送给A和C,要求他们投票。当然,这些RPC在B的网络中丢失了。B的startElection在开始时就启动了另一个runElectionTimer goroutine,该goroutine等待250毫秒(超时范围在150-300毫秒之间是随机的),查看是否由于上次选举而发生了重要的事情。B没做任何事情,因为它仍然是完全隔离的。因此,runElectionTimer开始另一个新的选举,将期限增加到3。
很长时间,B的路由器需要花费几秒钟的时间来重置并恢复在线状态。同时,B偶尔会重新选举一次,其任期已到8。
此时,网络分区恢复,并且B重新连接到A和C。
然后,AE RPC从A到达。回想一下A一直每50 ms发送一次,尽管B暂时没有回复。
B的 AppendEntries 被执行,并以term = 8发送回一个答复。
A在 LeaderSendHeartbeats 中获得了此答复,检查了答复的任期,并发现其高于其本身。它将自己的任期更新为8,并成为跟随者。集群暂时失去领导者。
现在可能会发生多种情况。B是候选者,但它可能在网络恢复之前已经发送了RV。C是跟随者,但在其自身的选举超时时间内,它将成为候选者,因为它不再从A接收定期的AE。A成为跟随者,并且还将在其选举超时时间内成为候选者。
因此,这三台服务器中的任何一台都可以赢得下一次选举。这仅是因为我们实际上未在此处复制任何日志。正如我们将在下一部分中看到,在实际情况下,A和C可能会在B不在时添加一些新的客户端命令,因此它们的日志将是最新的。因此,B不能成为新的领导者-将会发生新的选举,由A或C赢得;我们将在下一部分中再次讨论该场景。
假设自从B断开连接以来未添加任何新命令,则由于重新连接而导致更换领导者也是完全可以的。
这看起来效率很低。因为领导者的更换并不是真正必要的,因为在整个场景中A都非常健康。但是,在个别情况下以不降低效率为代价来使不变量保持简单是Raft做出的设计选择之一。在最常见的情况下(没有任何中断),效率才是关键,因为99.9%的时间集群都是在正常状态。
12
下一步
本系列的下一部分中将描述一个更完整的Raft实现,包括实际处理客户端命令并在整个集群中复制。敬请关注!
Raft参考:https://raft.github.io/raft.pdf
代码参考:https://github.com/eliben/raft/tree/master/part1
说明:有小伙伴留言说原文是外网写的,我们直接拿来发了,是否是盗用。实际上小编也是经过原作者Eli同意了的。发此系列文章的目的也很简单,仅仅是为了能让更多的小伙伴看到优质的文章,可以学习提升自己