Skip to content

Latest commit

 

History

History
402 lines (265 loc) · 14.9 KB

README.md

File metadata and controls

402 lines (265 loc) · 14.9 KB

(Prototype) Raft Consensus Algorithm in Scala

Protoype Raft Consensus Algorithm in Scala

Tested on macOs 10.15.2 with openjdk64-11.0.2 and sbt 1.3.3

shields.io

Test line-coverage: 88,11% (12-30-2019)

Author: Maximilian Bundscherer

Let's get started

  • sbt and openjdk64-11.0.2 are required to build and run project

  • Run with: sbt run (see What happens in normal run? below)

  • Test with: sbt test (or see ci-tests in GitHub-Actions-CI-Pipeline) (see What happens in test run? below)

  • Generate test-coverage-html-report with: sbt jacoco

Used dependencies

What is implemented?

  • RaftNode as Finite-state machine (FSM) with key-value storage

    • (Uninitialized): Not initialized
    • Follower (Default behavior): Waiting for heartbeats from leader-node with hashCode from data. If local stored data's hashCode is not equal to leader-node data's hashCode, the node will synchronize with leader-node. If there is no heartbeat from leader-node in configured randomized interval received, the node will change to candidate-behavior.
    • Candidate: The candidate requests votes from all followers and votes for himself. If he wins the election in configured interval, he will become the leader. If not, he will become follower again. For winning the election the node requires the majority of votes.
    • Leader: The leader is sending continuous heartbeats to all followers with hashCode from his stored data. The leader is the only node that is allowed to write data.
    • (Sleep): Is used for simulating leader-crashes (triggered by crashIntervalHeartbeats in normal run or by SimulateLeaderCrash in test run). In this behavior, the node does not respond to non-debug-messages. After configured downtime, the node changes to follower-behavior.

Configuration

There are two configurations:

  • ./src/main/resources/application.conf used for normal run
  • ./src/test/resources/application.conf used for test run
akka {

    # Log Level (DEBUG, INFO, WARNING, ERROR)
    loglevel = "DEBUG"

}

raftPrototype {

    # Election Timer Min (Seconds)
    electionTimerIntervalMin = 2

    # Election Timer Max (Seconds)
    electionTimerIntervalMax = 3

    # Heartbeat Timer Interval (Seconds)
    heartbeatTimerInterval = 1

    # Raft Nodes (Amount)
    nodes = 5

    # Crash Interval (auto simulate crash after some heartbeats in LEADER behavior)
    crashIntervalHeartbeats = 10

    # Sleep downtime (Seconds) (after simulated crash in SLEEP behavior)
    sleepDowntime = 8

}

What happens in normal run?

All nodes start in follower behavior (some of them will change their behavior to candidate) and elect the first leader.

After some (configured) heartbeats from leader, the leader is simulating its crash and is "sleeping" for configured downtime. The next leader will be elected.

This happens again and again and again... until you stop the program or the earth is going to overheat. 😉

Data exchange (write data trough leader to followers) will be tested in test run (see below).

What happens in test run?

  1. Leader election (after init nodes)
  2. Write data trough leader to followers (first write data to leader and replicate data to followers)
  3. Get back data from all nodes (all nodes should have same data)
  4. Simulate leader crash (triggered in test)
  5. New leader election (old leader is gone)
  6. Write data trough leader to followers (first write data to leader and replicate data to followers)
  7. Get back data from all nodes (all nodes should have same data)

The integration-test is well documented - it is self explaining:

  • ./src/test/scala/de/maxbundscherer/scala/raft/RaftServiceTest.scala

Exciting (scala) stuff

Concurrent programming in Scala is usually done with akka actors. Akka actors is an actor model implementation for Scala and Java. Akka is developed/maintained by Lightbend (earlier called Typesafe).

The program and business logic is divided into separated actors. Each of these actors has its own state (own protected memory) and can only communicate with other actors by immutable messages.

(Image source)

The RaftNodeActor has the following state implemented:

/**
    * Internal (mutable) actor state
    * @param neighbours Vector with another actors
    * @param electionTimer Cancellable for timer (used in FOLLOWER and CANDIDATE behavior)
    * @param heartbeatTimer Cancellable for timer (used in LEADER behavior)
    * @param alreadyVoted Boolean (has already voted in FOLLOWER behavior)
    * @param voteCounter Int (counter in CANDIDATE behavior)
    * @param majority Int (calculated majority - set up in init)
    * @param heartbeatCounter Int (auto simulate crash after some heartbeats in LEADER behavior)
   *  @param data Map (String->String) (used in FOLLOWER and LEADER behavior)
   *  @param lastHashCode Int (last hashcode from data) (used in FOLLOWER and LEADER behavior)
    */
  case class NodeState(
      var neighbours            : Vector[ActorRef]    = Vector.empty,
      var electionTimer         : Option[Cancellable] = None,
      var heartbeatTimer        : Option[Cancellable] = None,
      var alreadyVoted          : Boolean             = false,
      var voteCounter           : Int                 = 0,
      var majority              : Int                 = -1,
      var heartbeatCounter      : Int                 = 0,
      var data                  : Map[String, String] = Map.empty,
      var lastHashCode          : Int                 = -1,
  )

Akka Actors Example

package de.maxbundscherer.scala.raft.examples

import akka.actor.{Actor, ActorLogging}

class SimpleActor extends Actor with ActorLogging {
  
  override def receive: Receive = {

    case data: String =>
      
      sender ! data + "-pong"

    case any: Any =>
      
      log.error(s"Got unhandled message '$any'")
      
  }
  
}

In this example, you can see a very simple akka actor: The actor is waiting for string-messages and replies with a new string (! is used for fire-and-forget-pattern / use ? to use ask-pattern instead).

Non-string-messages are displayed by an error-logger.

Raft nodes as akka actors

In this project, raft nodes are implemented as an akka actor (RaftNodeActor) with finite-state machine (FSM) behavior (see description and image above).

Finite-state machine (FSM) in akka

You can define multiple behaviors in an akka actor - see example:

package de.maxbundscherer.scala.raft.examples

import akka.actor.{Actor, ActorLogging}

object SimpleFSMActor {
  
  //Initialize message/command
  case class Initialize(state: Int)
  
}

class SimpleFSMActor extends Actor with ActorLogging {

  import SimpleFSMActor._
  
  //Actor mutable state
  private var state = -1

  //Initialized behavior 
  def initialized: Receive = {

    case any: Any => log.info(s"Got message '$any'")
    
  }

  //Default behavior
  override def receive: Receive = {

    case Initialize(newState) =>

      state = newState
      context.become(initialized)

    case any: Any => log.error(s"Not initialized '$any'")

  }

}

Service-Layer

Classic akka actors are not type safety. To "simulate" type safety, the service-layer (RaftService) was implemented. The service-layer is also used to spawn & initialize actors and to supervise the actor system - see examples:

  • Spawn akka actor:
actorSystem.actorOf(props = RaftNodeActor.props, name = "myRaftNode")
  • Ask (type safety non-blocking request):
def ping(): Future[Pong] = {
  ( actorRef ? Ping() ).asInstanceOf[Future[Pong]]
}

Aggregates

The object (read-only-singleton) RaftAggregate includes all necessary classes and objects (actor messages) for RaftService, RaftNodeActor and RaftScheduler.

Trait Configuration

Scala traits are very similar to Java's interfaces. Traits can also include implementation. Normal classes can be extended (inheritance) by multiple traits, but only extend from one abstract class. Traits support multiple inheritance.

In this project the trait Configuration with internal object (read-only-singleton) Config is used to pass user-config to program.

The user-config is defined in the file application.conf and is loaded by a config-factory (see project dependencies).

Trait RaftScheduler

The trait RaftScheduler is used to control raft-nodes timers in RaftNodeActor with the following function-calls:

  • def stopElectionTimer(): Used to stop electionTimer. This timer informs about "heartbeat-timeout" (SchedulerTrigger.ElectionTimeout) in FOLLOWER behavior and about "election-timeout" (SchedulerTrigger.ElectionTimeout) in CANDIDATE behavior.
  • def restartElectionTimer(): Used to stop and start electionTimer.
  • def stopHeartbeatTimer(): Used to stop heartbeatTimer. This timer informs about "send-heartbeat to all followers" (SchedulerTrigger.Heartbeat) in LEADER behavior.
  • def restartHeartbeatTimer(): Used to stop and start heartbeatTimer.
  • def scheduleAwake(): Used to trigger awakening automatically after downtime in SLEEP behavior (SchedulerTrigger.Awake). Awakening means: The node changes to follower-behavior.

Timers are controlled by changeBehavior and followerBehavior in RaftNodeActor to stop and start timers dependent on the nodes' behavior:

/**
   * Before change of behavior
   */
val newBehavior: Receive = toBehavior match {

  [...]
  
  case BehaviorEnum.FOLLOWER =>
    restartElectionTimer()
    stopHeartbeatTimer()
    followerBehavior

  case BehaviorEnum.CANDIDATE =>
    restartElectionTimer()
    stopHeartbeatTimer()
    candidateBehavior

  [...]

}
/**
   * After change of behavior
   */
toBehavior match {
      
  [...]

  case BehaviorEnum.SLEEP => scheduleAwake()
      
  [...]

}
/**
   * In followerBehavior
   */
case Heartbeat(lastHashCode) =>
      
      [...]

      restartElectionTimer()

Service Configurator Pattern

The program architecture is based on the Service Configurator Pattern.

The actor system & the services are started and configured in ...

  • ... object Main for normal run.
  • ... trait BaseServiceTest for test run.

Scala compared to Go

  • Data-types in Scala and Go are strong, static, inferred and structural typed.
  • Scala intends to multicore architectures and brings functional programming & object oriented programming together. To improve code quality, you should not mix both concepts.
  • Go intends to multicore architectures, too, and is an alternative to the programming language C.
  • Learning Scala is time-consuming and sometimes quite involved because of the necessity to be familiar with the concept of functional programming and the huge amounts of complex concepts in the basic-language implemented.
  • Learning Go is not so time-consuming, because Go is built on easy & familiar concepts (for example the concept of object oriented programming).
  • Scala is usually running in the Java-virtual-machine (JVM) and can interact with Java-libraries. Compiling Scala native is possible, but unusual.
  • Go is running native (is not compiled to byte-code) and can interact with C-libraries.

Go concurrency

The language provides multiple possibilities:

  • Concurrent execution (goroutines)
  • Synchronization and messaging (channels - very similar to akka actors - Buffered Channels - FIFO)
  • Multi-way concurrent control (select)
  • Low level blocking primitives (locks/sync)

Scala concurrency

In Scala ExecutionContext (default is ExecutionContext.global) is responsible for executing computations. The default ExecutionContext is a global static thread pool and is based on Java's Fork/Join. For example, you can set:

  • scala.concurrent.context.minThreads
  • scala.concurrent.context.maxThreads

You can also use multiple ExecutionContexts in your application (or server-cluster).

Concurrent programming in Scala is usually done with akka actors. See "Exciting (scala) stuff" above.

You can also use:

  • Scala Futures
val future = Future {
  getData()
}

future.onComplete {
  case Success(data) => println(s"Got $data")
  case Failure(exception) => println(s"Got failure $exception")
}
  • Threads and Thread Pools from Java (unusually)
// This is unusually. Better use akka actors.

class ExampleProcessor extends Thread {
  override def run() {
    while(true) {
      val examples = Examples.getExamples()
      examples.foreach{ example =>
        process(example)
      }
    }
  }
}

//Start new thread
val thread = new ExampleProcessor()
thread.start()

//Wait for finishing
thread.join()

My personal opinion:

  • Scala is more empowering and you need less code.
  • Go runs faster and very effective but sometimes feels repetitive and very mechanic.
  • Scala is used for high-level cloud-applications (for example Apache Spark).
  • Go is used for low-level applications to make high-level-applications possible (for example Docker).
  • Comparing both languages is quite inconclusive because of their different fields of application.

Prospects

  • This implementation is a prototype and should not be used in production.
  • You can use akka cluster to run this implementation on network and different machines. You have to modify the RaftService to spawn actors in cluster.
  • Do not use Java serializer in production. It is slow and not secure. Use Protobuf instead.