Skip to content

Aris Koliopoulos

Implementing Raft using ZIO in Scala

Scala, Distributed Systems, ZIO, Raft8 min read

A sort of Raft Quorum with a client

This article presents an implementation of the Raft consensus protocol in Scala using ZIO. More specifically, it covers the architecture of the project, leader election, and log replication. It assumes some prior distributed systems knowledge, eg what RPC is. Full Code: https://github.com/ariskk/zio-raft

Raft

Raft is a consensus protocol. It is the kind of algorithm systems use when multiple servers need to agree on a single value. It is a coordination primitive often used in databases and other distributed systems.

A Raft cluster's lifecycle can be split into two distinct phases:

  • Leader Election: The cluster needs a leader to coordinate all reads and writes to guarantee correctness. If no leader is present, the system cannot make progress. Consequently, if no leader exists, Raft peers vote and try to elect one. The leader's term lasts for as long as the leader is alive and can reach a majority of peers.
  • Log Replication: When the leader is elected, a Raft cluster can accept commands. Those commands are replicated by the leader to a majority of peers before returning successfully to a client. Once a command is replicated to a majority, peers proceed to apply it to their state machines.

The whole protocol is based on two simple RPC calls; or 4 different types of messages:

  • VoteRequest: If no leader is present, candidates emerge and broadcast a VoteRequest.
  • VoteResponse: Peers respond to vote requests based on a set of rules.
  • AppendEntries: The leader uses this message to replicate new log entries to followers. It is also used as a heartbeat.
  • AppendEntriesResponse: Followers confirm replication of entries based on a set of rules.

The above process supports linearizable semantics. From Jepsen:

Linearizability is one of the strongest single-object consistency models, and implies that every operation appears to take place atomically, in some order, consistent with the real-time ordering of those operations: e.g., if operation A completes before operation B begins, then B should logically take effect after A.

This is a strong consistency model. Linearizable systems cannot be totally available in the face of network partitions. Raft is thus suitable for systems where consistency is critical; those usually provide coordination services to other systems. Some well known systems using it are etcd and Consul.

Digging more into the paper is definitely worth it.

This post covers the following aspects:

  1. Architecture
  2. Leader Election
  3. Log Replication

The implementation is using Scala and the fantastic concurrent programming library ZIO. The repo is hosted at https://github.com/ariskk/zio-raft. If you have feedback, please do reach out on Twitter; I am looking for inspiration regarding the next steps.

Without further ado..

1. Architecture

The architecture of the Raft Server implementation is summarised in the following diagram:

Raft Server

It contains the following components:

  • 1.1. Raft Consensus Module: This is the core module. This is where state is kept and the consensus algorithm is implemented. The Raft Consensus Module can be further broken down into sub-components:
    • 1.1.1 Storage: Durable storage needed by Raft. It consists of a Log containing state machine commands from clients, the current term, and the last known vote (more on the last two later). Two implementations are provided: a) In-memory for testing purposes b) RocksDB.
    • 1.1.2 Volatile State: Ephemeral state the module needs to function (eg the set of peers). Access to its fields must be thread-safe and is implemented using ZIO's Ref.
    • 1.1.3 Input/Output Message Queues: The implementation is fully asynchronous and decoupled from any flavour of RPC. To communicate with the outside world (ie its peers), it uses a number of message queues. Those are implemented using ZIO's asynchronous ZQueue.
    • 1.1.4 State Machine: This is where client commands are applied and it contains the state for which strong consistency is critical.
  • 1.2. Internode RPC: Manages network connections with peers in a Raft cluster. Sends and receives messages.
  • 1.3. API: Interfaces with clients. If the node is the leader, it accepts write commands and submits them to the consensus module for replication. It also executes read-only queries against the state machine. If the node is not the leader, it redirects.

RaftServer.scala brings all those together using zio-nio to implement a very simple network layer.

Let's now see how those can be used for the two fundamental Raft functions: Leader Election and Log Replication.

2. Leader Election

A Raft node can be in three different states:

  • Follower: Responds to VoteRequest and AppendEntries messages.
  • Candidate: Runs for leader. Sends out VoteRequest messages and collects responses.
  • Leader: Receives commands from clients and sends AppendEntries messages to all followers.

Those three states and their transitions are summarised in the following graph:

Leader Election

A Raft node starts in Follower state. If it doesn't hear from a leader for a period of time, it does the following:

  • Becomes a Candidate.
  • Increases the term.
  • Votes for itself.
  • Sends out a VoteRequest to all known peers.
  • Collects responses. If the majority of requests are granted, it becomes Leader and sends out empty AppendEntries (aka heartbeats) every 50 milliseconds. Else it becomes Follower and waits to hear from a leader for a period of time. This is repeated until a Leader emerges.

This process could lead to a deadlock if this period of time was constant. To resolve this, Raft employs a randomised timer:


  final case class ElectionTimeout(millis: Int) extends AnyVal
  object ElectionTimeout {
    private val base  = 150
    private val range = 150
    def newTimeout: ElectionTimeout =
      ElectionTimeout(base + Random.nextInt(range))
  }

When a Raft node starts, and every time it receives an AppendEntries message from the leader, it starts a randomised timer between 150 and 300 milliseconds**. If it receives a new AppendEntries message, the timer is reset to a new random value. If not, it kicks off a leader election.

** This is config and depends on network latencies and other factors. If Raft is used for cross datacenter replication this likely needs to be higher.

This follower logic can be encoded in Scala as following:


  private def processInboundEntries: ZIO[Clock, RaftException, Unit] =
    queues.appendEntriesQueue.take.disconnect // wait for AppendEntries
      .timeout(ElectionTimeout.newTimeout.millis.milliseconds) // timeout if none arrives after ElectionTimeout
      .flatMap { maybeEntries =>
        maybeEntries.fold[ZIO[Clock, RaftException, Unit]]( // if no entries arrived
          for {
            nodeState   <- state.nodeState
            currentTerm <- storage.getTerm
            currentVote <- storage.getVote
            hasLost     <- state.hasLost(currentTerm)
            _ <- // if there is no other known Candidate (ie currentVote is empty), run for Leader. Ditto if a previous election was lost but there is still no leader.
              if ((nodeState == NodeState.Follower && currentVote.isEmpty) || hasLost) runForLeader
              else processInboundEntries // in any other case, set a new timer and wait
          } yield ()
        )(es => processEntries(es) *> processInboundEntries) // if AppendEntries arrived, process them and go back to wait for more.
      }

If no AppendEntries is received and there is other known candidate, the node runs for leader.

Once the vote requests are sent, the Candidate nodes collect vote responses. Since more than one may be running at the same time, vote collection should exit if another node wins the majority or another candidate is running at a later term.


private def collectVotes(forTerm: Term): ZIO[Clock, RaftException, Unit] = {
  lazy val voteCollectionProgram = for {
    newVote <- queues.inboundVoteResponseQueue.take
    processVote <-
      if (newVote.term.isAfter(forTerm)) // If a later term exists, become Follower
        storage.storeTerm(newVote.term) *>
          state.becomeFollower
      else if (newVote.term == forTerm && newVote.granted)
        state.addVote(Vote(newVote.from, newVote.term)).flatMap { hasWon =>
          ZIO.when(hasWon)(storage.lastIndex.flatMap(state.initPeerIndices(_)))
        }
      else if (newVote.term == forTerm && !newVote.granted)
        state.addVoteRejection(Vote(newVote.from, newVote.term))
      else ZIO.unit
  } yield ()
  /***/
  voteCollectionProgram
    .repeatUntilM(_ => nodeState.map(_ != NodeState.Candidate))
}

If the node loses the election, it becomes a Follower and runs the processInboundEntries loop outlined above. If it gets elected, it starts sending heartbeats. Those are simply AppendEntries messages where the list of entries contains all uncommitted entries (more on this later). If all entries have been committed, then this list is empty.


private def sendHeartbeats: ZIO[Clock, RaftException, Unit] = {
  lazy val sendHeartbeatEntries = sendAppendEntries
    .delay(Raft.LeaderHeartbeat)
    .repeatWhileM(_ => isLeader)
  (processAppendEntriesResponses &> sendHeartbeatEntries) *> processInboundEntries
}

LeaderHeartbeat is set at 50 milliseconds; frequent enough so that the minimum election timeout of 150 millis is never reached even in periods with no client commands. processAppendEntriesResponses contains an additional check where the leader steps down if it receives an AppendEntriesResponse from a later term.

To sum up the leader election:

  • Raft peers start and then set an election timer randomly between 150ms and 300ms.
  • If this is not reset by a heartbeat, runs out and there is no VoteRequest from a quicker node, the node becomes a Candidate and sends VoteRequests to peers.
  • Peers respond to candidates and candidates tally up the votes.
  • If a candidate wins the majority, it becomes the leader and sends out heartbeats. If no candidate wins, the process is repeated. The randomised timer guarantees that a node will eventually be quicker to send out vote requests and win the majority.

At this stage, the cluster has elected a leader and is ready to accept commands from clients. Those commands need to be replicated to a majority. Let's see how this works.

3. Log Replication

Clients send commands that essentially represent StateMachine transitions. Raft stores those commands in a Log and replicates those log entries to all peers. Each log entry has an Index and a Term.

If the StateMachine is a Key-Value store, then a command could look like this:

case class WriteKey[V](key: Key, value: V) extends WriteCommand

The Command model depends on the application.


sealed trait Command extends Serializable
trait ReadCommand    extends Command
trait WriteCommand   extends Command

Those traits model commands to a generic state machine which looks at the very basic level like this:


trait StateMachine[T] {
  def write: PartialFunction[WriteCommand, IO[StateMachineException, Unit]]
  def read: PartialFunction[ReadCommand, IO[StateMachineException, T]]
}

Only the leader can accept and replicate commands. If a client tries to submit to a follower, the follower will redirect the client to the most recent known leader. If the follower doesn't know who the leader is, an error is returned and the client tries a different random server. The entry point for clients to write commands looks like this:


def submitCommand(command: WriteCommand): ZIO[Clock, RaftException, CommandResponse] =
  state.leader.flatMap { leader =>
    leader match {
      case Some(leaderId) if leaderId == nodeId => processCommand(command)
      case Some(leaderId) if leaderId != nodeId => ZIO.succeed(Redirect(leaderId))
      case None                                 => ZIO.succeed(LeaderNotFoundResponse)
    }
  }

When the Leader receives a WriteCommand from a client, it executes the followings steps:

  1. Reads the current Term and the last Index from Storage.
  2. Creates a new LogEntry containing the WriteCommand and the current Term.
  3. Appends the new entry to its local Log.
  4. Creates an AppendEntries message.
  5. Sends an AppendEntries message to all peers in parallel.
  6. Waits until a majority of peers respond with a successful AppendEntriesResponse.
  7. Applies the command to its local StateMachine. At this stage, the command is considered committed. Raft guarantees that committed entries are durable and will eventually be executed by all state machines.
  8. Responds back to the client.

The AppendEntries message looks like this:


final case class AppendEntries(
  appendId: AppendEntries.Id, 
  from: NodeId,
  to: NodeId,
  term: Term,
  prevLogIndex: Index,
  prevLogTerm: Term,
  leaderCommitIndex: Index,
  entries: Seq[LogEntry]
) extends Message

from is always the leader and to one of the followers. The current leader term is included. The leader also includes prevLogIndex, which is the index of the entry immediately preceding the new entries. Equivalently, prevLogTerm is the term of the previous entry.

Raft AppendEntries have to be idempotent for the protocol to work correctly. By including the Index after which the new entries should be appended, Raft guarantees that followers have enough information to detect and discard duplicated AppendEntries messages. Finally, leaderCommitIndex is the index of the highest log entry known to be committed (replicated to a majority of peers).

The receiving Follower first checks the Term of the incoming message. If the AppendEntries term is smaller than the current term in the follower the entries will be rejected. This rejection is particularly important as this is how the leader knows a more recent term exists and thus it needs to step down.

If the term is larger or equal, then the follower will perform the following check:


private def shouldAppend(previousIndex: Index, previousTerm: Term) =
  if (previousIndex.index == -1) ZIO.succeed(true)
  else
    for {
      size <- storage.logSize
      term <- storage.getEntry(previousIndex).map(_.map(_.term).getOrElse(Term.Invalid))
      success = (previousTerm == term) && previousIndex.index == size - 1
      _ <- ZIO.when(!success)(storage.purgeFrom(previousIndex))
    } yield success

Raft guarantees that if two logs have the same previousTerm in previousIndex, then the logs are identical in all preceding entries. This is known as the Log Matching Property. For a detailed explanation as to why this holds, please refer to section 5.3 of the Raft paper.

This practically checks if the follower log is identical to the leader log minus the new entries. If it is, it is safe to append. If it isn't though, that means that at least the entry at previousIndex is an illegal entry from a previous leader. There is no use for that data as that entry hasn't been committed and it never will. In this case, the follower purges all entries starting at previousIndex and responds to the leader with a failure.

At this stage, the leader knows that the follower's log is out of sync. To be able to fix situations like this, the leader keeps an auxiliary data structure where it stores the last known replicated Index for each peer. This is called the matchIndex. If appending after previousIndex fails due to previousTerm being different, then the leader decreases the index by one and tries again. This process is repeated until the index where the two logs are identical is found. Then the leader replicates all entries starting from this index and until the last. When this process is completed, the shouldAppend method outlined above will return true and the follower can append to its log.


private def appendLog(ae: AppendEntries) =
  shouldAppend(ae.prevLogIndex, ae.prevLogTerm).flatMap { shouldAppend =>
    ZIO.when(shouldAppend) {
      for {
        _             <- storage.appendEntries(ae.prevLogIndex.increment, ae.entries.toList)
        currentCommit <- state.lastCommitIndex
        _ <-
          ZIO.when(ae.leaderCommitIndex > currentCommit)(
            storage.logSize.flatMap { size =>
              val newCommitIndex = Index(math.min(size - 1, ae.leaderCommitIndex.index))
              state.updateCommitIndex(newCommitIndex) *>
                storage.getRange(currentCommit.increment, newCommitIndex).flatMap { entries =>
                  ZIO.collectAll(
                    entries.map(e => stateMachine.write(e.command) *> state.incrementLastApplied)
                  )
                }
            }
          )
      } yield ()
    }
  }

For every successfully replicated entry, the leader increases the leaderCommitIndex and applies the command to its state machine. At the next heartbeat (or client command, whichever comes first), the leader will pass the updated leaderCommitIndex to the followers. At this point, the followers know that all commands up to leaderCommitIndex have been safely replicated to a majority. Thus, they can safely apply all log commands up to leaderCommitIndex to their state machines. To keep track of which log entries have been committed, all nodes keep a lastCommitIndex in their volatile state. Then at every successful AppendEntries message, they apply all commands from lastCommitIndex to leaderCommitIndex to their state machines.


The full implementation can be found at https://github.com/ariskk/zio-raft. The implementation is not complete. Notable parts of Raft missing include snapshots, log compaction, and cluster membership changes.

The tests make an effort to introduce non-Byzantine failures and thus test protocol robustness under semi-realistic network conditions. That said, proper integration tests using Jepsen are needed before this can be used in the real world.

If you have any feedback or ideas on how to move this forward, please reach out.

Thanks for stopping by!


More

Apache Kafka: 8 things to check before going live

© 2022 by Aris Koliopoulos. All rights reserved.
Theme by LekoArts