— Scala, Distributed Systems, ZIO, Raft — 8 min read
This article presents an implementation of the Raft consensus protocol in Scala using ZIO. More specifically, it covers the architecture of the project, leader election, and log replication. It assumes some prior distributed systems knowledge, eg what RPC is. Full Code: https://github.com/ariskk/zio-raft
Raft is a consensus protocol. It is the kind of algorithm systems use when multiple servers need to agree on a single value. It is a coordination primitive often used in databases and other distributed systems.
A Raft cluster's lifecycle can be split into two distinct phases:
The whole protocol is based on two simple RPC calls; or 4 different types of messages:
VoteRequest
: If no leader is present, candidates emerge and broadcast a VoteRequest
.VoteResponse
: Peers respond to vote requests based on a set of rules.AppendEntries
: The leader uses this message to replicate new log entries to followers. It is also used as a heartbeat.AppendEntriesResponse
: Followers confirm replication of entries based on a set of rules.The above process supports linearizable semantics. From Jepsen:
Linearizability is one of the strongest single-object consistency models, and implies that every operation appears to take place atomically, in some order, consistent with the real-time ordering of those operations: e.g., if operation A completes before operation B begins, then B should logically take effect after A.
This is a strong consistency model. Linearizable systems cannot be totally available in the face of network partitions. Raft is thus suitable for systems where consistency is critical; those usually provide coordination services to other systems. Some well known systems using it are etcd and Consul.
Digging more into the paper is definitely worth it.
This post covers the following aspects:
The implementation is using Scala and the fantastic concurrent programming library ZIO. The repo is hosted at https://github.com/ariskk/zio-raft. If you have feedback, please do reach out on Twitter; I am looking for inspiration regarding the next steps.
Without further ado..
The architecture of the Raft Server implementation is summarised in the following diagram:
It contains the following components:
Ref
.ZQueue
.RaftServer.scala
brings all those together using zio-nio
to implement a very simple network layer.
Let's now see how those can be used for the two fundamental Raft functions: Leader Election and Log Replication.
A Raft node can be in three different states:
Follower
: Responds to VoteRequest
and AppendEntries
messages.Candidate
: Runs for leader. Sends out VoteRequest
messages and collects responses.Leader
: Receives commands from clients and sends AppendEntries
messages to all followers.Those three states and their transitions are summarised in the following graph:
A Raft node starts in Follower
state. If it doesn't hear from a leader for a period of time, it does the following:
Candidate
.VoteRequest
to all known peers.Leader
and sends out empty AppendEntries
(aka heartbeats) every 50 milliseconds.
Else it becomes Follower
and waits to hear from a leader for a period of time. This is repeated until a Leader
emerges.This process could lead to a deadlock if this period of time was constant. To resolve this, Raft employs a randomised timer:
final case class ElectionTimeout(millis: Int) extends AnyVal
object ElectionTimeout {
private val base = 150
private val range = 150
def newTimeout: ElectionTimeout =
ElectionTimeout(base + Random.nextInt(range))
}
When a Raft node starts, and every time it receives an AppendEntries
message from the leader, it starts a randomised timer between 150 and 300 milliseconds**.
If it receives a new AppendEntries
message, the timer is reset to a new random value. If not, it kicks off a leader election.
This follower logic can be encoded in Scala as following:
private def processInboundEntries: ZIO[Clock, RaftException, Unit] =
queues.appendEntriesQueue.take.disconnect // wait for AppendEntries
.timeout(ElectionTimeout.newTimeout.millis.milliseconds) // timeout if none arrives after ElectionTimeout
.flatMap { maybeEntries =>
maybeEntries.fold[ZIO[Clock, RaftException, Unit]]( // if no entries arrived
for {
nodeState <- state.nodeState
currentTerm <- storage.getTerm
currentVote <- storage.getVote
hasLost <- state.hasLost(currentTerm)
_ <- // if there is no other known Candidate (ie currentVote is empty), run for Leader. Ditto if a previous election was lost but there is still no leader.
if ((nodeState == NodeState.Follower && currentVote.isEmpty) || hasLost) runForLeader
else processInboundEntries // in any other case, set a new timer and wait
} yield ()
)(es => processEntries(es) *> processInboundEntries) // if AppendEntries arrived, process them and go back to wait for more.
}
If no AppendEntries
is received and there is other known candidate, the node runs for leader.
Once the vote requests are sent, the Candidate
nodes collect vote responses. Since more than one may be running at the same time,
vote collection should exit if another node wins the majority or another candidate is running at a later term.
private def collectVotes(forTerm: Term): ZIO[Clock, RaftException, Unit] = {
lazy val voteCollectionProgram = for {
newVote <- queues.inboundVoteResponseQueue.take
processVote <-
if (newVote.term.isAfter(forTerm)) // If a later term exists, become Follower
storage.storeTerm(newVote.term) *>
state.becomeFollower
else if (newVote.term == forTerm && newVote.granted)
state.addVote(Vote(newVote.from, newVote.term)).flatMap { hasWon =>
ZIO.when(hasWon)(storage.lastIndex.flatMap(state.initPeerIndices(_)))
}
else if (newVote.term == forTerm && !newVote.granted)
state.addVoteRejection(Vote(newVote.from, newVote.term))
else ZIO.unit
} yield ()
/***/
voteCollectionProgram
.repeatUntilM(_ => nodeState.map(_ != NodeState.Candidate))
}
If the node loses the election, it becomes a Follower
and runs the processInboundEntries
loop outlined above.
If it gets elected, it starts sending heartbeats. Those are simply AppendEntries
messages where the list of entries contains all uncommitted entries (more on this later).
If all entries have been committed, then this list is empty.
private def sendHeartbeats: ZIO[Clock, RaftException, Unit] = {
lazy val sendHeartbeatEntries = sendAppendEntries
.delay(Raft.LeaderHeartbeat)
.repeatWhileM(_ => isLeader)
(processAppendEntriesResponses &> sendHeartbeatEntries) *> processInboundEntries
}
LeaderHeartbeat
is set at 50 milliseconds; frequent enough so that the minimum election timeout of 150 millis is never reached
even in periods with no client commands. processAppendEntriesResponses
contains an additional check where the leader steps down if it receives an AppendEntriesResponse
from a later term.
To sum up the leader election:
VoteRequest
from a quicker node, the node becomes a Candidate
and sends VoteRequest
s to peers.At this stage, the cluster has elected a leader and is ready to accept commands from clients. Those commands need to be replicated to a majority. Let's see how this works.
Clients send commands that essentially represent StateMachine
transitions. Raft stores those commands in a Log
and replicates those
log entries to all peers. Each log entry has an Index
and a Term
.
If the StateMachine
is a Key-Value store, then a command could look like this:
case class WriteKey[V](key: Key, value: V) extends WriteCommand
The Command
model depends on the application.
sealed trait Command extends Serializable
trait ReadCommand extends Command
trait WriteCommand extends Command
Those traits model commands to a generic state machine which looks at the very basic level like this:
trait StateMachine[T] {
def write: PartialFunction[WriteCommand, IO[StateMachineException, Unit]]
def read: PartialFunction[ReadCommand, IO[StateMachineException, T]]
}
Only the leader can accept and replicate commands. If a client tries to submit to a follower, the follower will redirect the client to the most recent known leader. If the follower doesn't know who the leader is, an error is returned and the client tries a different random server. The entry point for clients to write commands looks like this:
def submitCommand(command: WriteCommand): ZIO[Clock, RaftException, CommandResponse] =
state.leader.flatMap { leader =>
leader match {
case Some(leaderId) if leaderId == nodeId => processCommand(command)
case Some(leaderId) if leaderId != nodeId => ZIO.succeed(Redirect(leaderId))
case None => ZIO.succeed(LeaderNotFoundResponse)
}
}
When the Leader
receives a WriteCommand
from a client, it executes the followings steps:
Term
and the last Index
from Storage
.LogEntry
containing the WriteCommand
and the current Term
.Log
.AppendEntries
message.AppendEntries
message to all peers in parallel. AppendEntriesResponse
. StateMachine
. At this stage, the command is considered committed. Raft guarantees that committed entries are durable and will eventually be
executed by all state machines.The AppendEntries
message looks like this:
final case class AppendEntries(
appendId: AppendEntries.Id,
from: NodeId,
to: NodeId,
term: Term,
prevLogIndex: Index,
prevLogTerm: Term,
leaderCommitIndex: Index,
entries: Seq[LogEntry]
) extends Message
from
is always the leader and to
one of the followers. The current leader term
is included.
The leader also includes prevLogIndex
, which is the index of the entry immediately preceding the new entries.
Equivalently, prevLogTerm
is the term of the previous entry.
Raft AppendEntries
have to be idempotent for the protocol to work correctly. By including the Index
after which the new entries should be
appended, Raft guarantees that followers have enough information to detect and discard duplicated AppendEntries
messages.
Finally, leaderCommitIndex
is the index of the highest log entry known to be committed (replicated to a majority of peers).
The receiving Follower
first checks the Term
of the incoming message.
If the AppendEntries
term is smaller than the current term in the follower the entries will be rejected.
This rejection is particularly important as this is how the leader knows a more recent term exists and thus it needs to step down.
If the term is larger or equal, then the follower will perform the following check:
private def shouldAppend(previousIndex: Index, previousTerm: Term) =
if (previousIndex.index == -1) ZIO.succeed(true)
else
for {
size <- storage.logSize
term <- storage.getEntry(previousIndex).map(_.map(_.term).getOrElse(Term.Invalid))
success = (previousTerm == term) && previousIndex.index == size - 1
_ <- ZIO.when(!success)(storage.purgeFrom(previousIndex))
} yield success
Raft guarantees that if two logs have the same previousTerm
in previousIndex
,
then the logs are identical in all preceding entries. This is known as the Log Matching Property.
For a detailed explanation as to why this holds, please refer to section 5.3 of the Raft paper.
This practically checks if the follower log is identical to the leader log minus the new entries. If it is, it is safe to append.
If it isn't though, that means that at least the entry at previousIndex
is an illegal entry from a previous leader.
There is no use for that data as that entry hasn't been committed and it never will.
In this case, the follower purges all entries starting at previousIndex
and responds to the leader with a failure.
At this stage, the leader knows that the follower's log is out of sync.
To be able to fix situations like this, the leader keeps an auxiliary data structure where it stores the last known replicated Index
for each peer.
This is called the matchIndex
. If appending after previousIndex
fails due to previousTerm
being different, then the leader decreases the index by one and tries again.
This process is repeated until the index where the two logs are identical is found. Then the leader replicates all entries starting from this index and until the last.
When this process is completed, the shouldAppend
method outlined above will return true
and the follower can append to its log.
private def appendLog(ae: AppendEntries) =
shouldAppend(ae.prevLogIndex, ae.prevLogTerm).flatMap { shouldAppend =>
ZIO.when(shouldAppend) {
for {
_ <- storage.appendEntries(ae.prevLogIndex.increment, ae.entries.toList)
currentCommit <- state.lastCommitIndex
_ <-
ZIO.when(ae.leaderCommitIndex > currentCommit)(
storage.logSize.flatMap { size =>
val newCommitIndex = Index(math.min(size - 1, ae.leaderCommitIndex.index))
state.updateCommitIndex(newCommitIndex) *>
storage.getRange(currentCommit.increment, newCommitIndex).flatMap { entries =>
ZIO.collectAll(
entries.map(e => stateMachine.write(e.command) *> state.incrementLastApplied)
)
}
}
)
} yield ()
}
}
For every successfully replicated entry, the leader increases the leaderCommitIndex
and applies the command to its state machine.
At the next heartbeat (or client command, whichever comes first), the leader will pass the updated leaderCommitIndex
to the followers.
At this point, the followers know that all commands up to leaderCommitIndex
have been safely replicated to a majority. Thus, they can safely apply
all log commands up to leaderCommitIndex
to their state machines.
To keep track of which log entries have been committed, all nodes keep a lastCommitIndex
in their volatile state.
Then at every successful AppendEntries
message, they apply all commands from lastCommitIndex
to leaderCommitIndex
to their state machines.
The full implementation can be found at https://github.com/ariskk/zio-raft. The implementation is not complete. Notable parts of Raft missing include snapshots, log compaction, and cluster membership changes.
The tests make an effort to introduce non-Byzantine failures and thus test protocol robustness under semi-realistic network conditions. That said, proper integration tests using Jepsen are needed before this can be used in the real world.
If you have any feedback or ideas on how to move this forward, please reach out.
Thanks for stopping by!