From 3c40471d7efe936d8ce92cd5589ce95405aa7f32 Mon Sep 17 00:00:00 2001 From: Smaug123 Date: Thu, 27 Oct 2022 21:31:48 +0100 Subject: [PATCH] Implement committal --- Raft.Test/Raft.Test.fsproj | 3 +- Raft.Test/TestInMemoryPersistentState.fs | 10 +- Raft.Test/TestInMemoryServer.fs | 242 +++++++++++++++++++++ Raft.Test/TestServer.fs | 248 ++------------------- Raft/Domain.fs | 20 ++ Raft/InMemory.fs | 8 +- Raft/Measures.fs | 11 - Raft/PersistentState.fs | 14 +- Raft/Raft.fsproj | 2 +- Raft/Server.fs | 261 ++++++++++++++++------- RaftExplorer/Program.fs | 63 +++++- 11 files changed, 544 insertions(+), 338 deletions(-) create mode 100644 Raft.Test/TestInMemoryServer.fs create mode 100644 Raft/Domain.fs delete mode 100644 Raft/Measures.fs diff --git a/Raft.Test/Raft.Test.fsproj b/Raft.Test/Raft.Test.fsproj index 5e4cc8b..7b94afa 100644 --- a/Raft.Test/Raft.Test.fsproj +++ b/Raft.Test/Raft.Test.fsproj @@ -8,8 +8,9 @@ - + + diff --git a/Raft.Test/TestInMemoryPersistentState.fs b/Raft.Test/TestInMemoryPersistentState.fs index 47b6a53..7be6c49 100644 --- a/Raft.Test/TestInMemoryPersistentState.fs +++ b/Raft.Test/TestInMemoryPersistentState.fs @@ -47,10 +47,16 @@ module TestInMemoryPersistentState = | None -> (uut :> IPersistentState<_>).TruncateLog truncate uut.GetLog () = oldLog - | Some entry -> + | Some (itemStored, entry) -> (uut :> IPersistentState<_>).TruncateLog truncate - (uut :> IPersistentState<_>).GetLastLogEntry () = Some (truncate, entry) + (uut :> IPersistentState<_>).GetLastLogEntry () = Some ( + itemStored, + { + Index = truncate + Term = entry + } + ) && isPrefix (uut.GetLog ()) oldLog && (uut :> IPersistentState<_>).CurrentLogIndex = truncate diff --git a/Raft.Test/TestInMemoryServer.fs b/Raft.Test/TestInMemoryServer.fs new file mode 100644 index 0000000..46cb1d0 --- /dev/null +++ b/Raft.Test/TestInMemoryServer.fs @@ -0,0 +1,242 @@ +namespace Raft.Test + +open System.Threading +open Raft +open NUnit.Framework +open FsUnitTyped +open FsCheck + +[] +module TestInMemoryServer = + + [] + let ``Startup sequence, first fumbling steps`` () = + let cluster, network = InMemoryCluster.make 5 + + let logger, logs = TestLogger.make () + + // Candidate 1 asks server 0 to vote for it. + + { + CandidateTerm = 0 + CandidateId = 1 + ReplyChannel = fun message -> logger (sprintf "Received message for term %i" message.VoterTerm) + CandidateLastLogEntry = None + } + |> Instruction.RequestVote + |> Message.Instruction + |> cluster.SendMessageDirectly 0 + + logs () |> shouldEqual [ "Received message for term 0" ] + + // Candidate 1 asks to be elected again! This is fine, maybe the network is replaying requests + // and the network swallowed our reply, so we should reply in the same way. + { + CandidateTerm = 0 + CandidateId = 1 + ReplyChannel = fun message -> logger (sprintf "Received message for term %i" message.VoterTerm) + CandidateLastLogEntry = None + } + |> Instruction.RequestVote + |> Message.Instruction + |> cluster.SendMessageDirectly 0 + + logs () + |> shouldEqual [ "Received message for term 0" ; "Received message for term 0" ] + + // Candidate 2 asks to be elected. We won't vote for them, because we've already voted. + // and the network swallowed our reply, so we should reply in the same way. + let calls = ref 0 + + { + CandidateTerm = 0 + CandidateId = 2 + ReplyChannel = fun _ -> Interlocked.Increment calls |> ignore + CandidateLastLogEntry = None + } + |> Instruction.RequestVote + |> Message.Instruction + |> cluster.SendMessageDirectly 0 + + calls.Value |> shouldEqual 0 + + [] + let ``Startup sequence in prod, only one timeout takes place`` () = + let cluster, network = InMemoryCluster.make 5 + + cluster.Servers.[0].TriggerInactivityTimeout () + cluster.Servers.[0].Sync () + + // We sent a message to every other server; process them. + for i in 1..4 do + let server = i * 1 + (network.AllInboundMessages server).Length |> shouldEqual 1 + let message = network.InboundMessage server 0 + network.DropMessage server 0 + cluster.SendMessageDirectly server message + + (network.AllInboundMessages 0).Length |> shouldEqual i + + for i in 1..4 do + network.InboundMessage 0 (i - 1) + |> cluster.SendMessageDirectly 0 + + network.DropMessage 0 (i - 1) + + // (the messages we've already processed) + (network.AllInboundMessages 0).Length |> shouldEqual 4 + (network.UndeliveredMessages 0).Length |> shouldEqual 0 + + cluster.Servers.[0].State |> shouldEqual (ServerStatus.Leader 1) + + for i in 1..4 do + cluster.Servers.[i].State |> shouldEqual ServerStatus.Follower + + let popOne (queues : 'a list list) : ((int * 'a) * 'a list list) list = + queues + |> List.indexed + |> List.filter (fun (index, l) -> not (List.isEmpty l)) + |> List.collect (fun (firstPlaceWithInstruction, entries) -> + entries + |> List.indexed + |> List.map (fun (i, entry) -> (firstPlaceWithInstruction, entry), List.removeAt i entries) + |> List.map (fun (removed, rest) -> + let afterPop = + queues + |> List.removeAt firstPlaceWithInstruction + |> List.insertAt firstPlaceWithInstruction rest + + removed, afterPop + ) + ) + + let rec allOrderings (queues : 'a list list) : (int * 'a) list list = + let output = popOne queues + + match output with + | [] -> [ [] ] + | output -> + + output + |> List.collect (fun (extracted, remaining) -> + let sub = allOrderings remaining + sub |> List.map (fun s -> extracted :: s) + ) + + let factorial i = + let rec go acc i = + if i <= 0 then acc else go (acc * i) (i - 1) + + go 1 i + + [] + [] + [] + [] + [] + let ``Test factorial`` (n : int, result : int) = factorial n |> shouldEqual result + + [] + let ``Test allOrderings`` () = + let case = [ [ "a" ; "b" ] ; [ "c" ; "d" ; "e" ] ] + let output = case |> allOrderings + output |> shouldEqual (List.distinct output) + + output + |> List.length + |> shouldEqual (factorial (List.concat case |> List.length)) + + let allElements = Set.ofList (List.concat case) + + for output in output do + output |> List.map snd |> Set.ofList |> shouldEqual allElements + + let randomChoice<'a> (r : System.Random) (arr : 'a list) : 'a = arr.[r.Next (0, arr.Length)] + + [] + let ``Startup sequence in prod, two timeouts at once, random`` () = + let rand = System.Random () + let cluster, network = InMemoryCluster.make 5 + + cluster.Servers.[0].TriggerInactivityTimeout () + cluster.Servers.[0].Sync () + cluster.Servers.[1].TriggerInactivityTimeout () + cluster.Servers.[1].Sync () + + // Those two each sent a message to every other server. + (network.AllInboundMessages 0).Length |> shouldEqual 1 + (network.AllInboundMessages 1).Length |> shouldEqual 1 + + for i in 2..4 do + let server = i * 1 + (network.AllInboundMessages server).Length |> shouldEqual 2 + + while network.AllUndeliveredMessages () |> Seq.concat |> Seq.isEmpty |> not do + let allOrderings' = + network.AllUndeliveredMessages () |> List.map List.ofSeq |> allOrderings + + // Process the messages! + let ordering = randomChoice rand allOrderings' + + for serverConsuming, (messageId, message) in ordering do + let serverConsuming = serverConsuming * 1 + cluster.SendMessageDirectly serverConsuming message + network.DropMessage serverConsuming messageId + + match cluster.Servers.[0].State, cluster.Servers.[1].State with + | Leader _, Leader _ -> failwith "Unexpectedly had two leaders" + | Candidate _, Candidate _ -> failwith "Unexpectedly failed to elect a leader" + | Leader 1, Candidate 1 + | Candidate 1, Leader 1 -> () + | s1, s2 -> failwithf "Unexpected state: %O %O" s1 s2 + + for i in 2..4 do + cluster.Servers.[i].State |> shouldEqual ServerStatus.Follower + + type History = History of (int * int) list + + let historyGen (clusterSize : int) = + gen { + let! pile = Gen.choose (0, clusterSize - 1) + let! entry = Arb.generate + return (pile * 1, abs entry) + } + |> Gen.listOf + |> Gen.map History + + let apply (History history) (cluster : Cluster<'a>) (network : Network<'a>) : unit = + for pile, entry in history do + let messages = network.AllInboundMessages pile + + if entry < messages.Length then + cluster.SendMessageDirectly pile messages.[entry] + + [] + let ``Startup sequence in prod, two timeouts at once, property-based: at most one leader is elected`` () = + let cluster, network = InMemoryCluster.make 5 + + cluster.Servers.[0].TriggerInactivityTimeout () + cluster.Servers.[0].Sync () + cluster.Servers.[1].TriggerInactivityTimeout () + cluster.Servers.[1].Sync () + + // Those two each sent a message to every other server. + (network.AllInboundMessages 0).Length |> shouldEqual 1 + (network.AllInboundMessages 1).Length |> shouldEqual 1 + + for i in 2..4 do + (network.AllInboundMessages (i * 1)).Length |> shouldEqual 2 + + let property (history : History) = + apply history cluster network + + match cluster.Servers.[0].State, cluster.Servers.[1].State with + | Leader _, Leader _ -> failwith "Unexpectedly elected two leaders" + | _, _ -> () + + for i in 2..4 do + cluster.Servers.[i].State |> shouldEqual ServerStatus.Follower + + property + |> Prop.forAll (Arb.fromGen (historyGen 5)) + |> Check.QuickThrowOnFailure diff --git a/Raft.Test/TestServer.fs b/Raft.Test/TestServer.fs index e075621..a4ce723 100644 --- a/Raft.Test/TestServer.fs +++ b/Raft.Test/TestServer.fs @@ -1,242 +1,34 @@ namespace Raft.Test -open System.Threading -open Raft -open NUnit.Framework open FsUnitTyped -open FsCheck +open NUnit.Framework +open Raft [] module TestServer = [] - let ``Startup sequence, first fumbling steps`` () = - let cluster, network = InMemoryCluster.make 5 + let ``maxLogAQuorumHasCommitted tests`` () = + for length in 1..7 do + for i in 0..10 do + let i = i * 1 - let logger, logs = TestLogger.make () + i + |> Array.create length + |> ServerUtils.maxLogAQuorumHasCommitted + |> shouldEqual i - // Candidate 1 asks server 0 to vote for it. + ServerUtils.maxLogAQuorumHasCommitted [| 0 ; 0 ; 1 |] + |> shouldEqual 0 - { - CandidateTerm = 0 - CandidateId = 1 - ReplyChannel = fun message -> logger (sprintf "Received message for term %i" message.VoterTerm) - CandidateLastLogEntry = 0, 0 - } - |> Instruction.RequestVote - |> Message.Instruction - |> cluster.SendMessageDirectly 0 + ServerUtils.maxLogAQuorumHasCommitted [| 0 ; 1 ; 1 |] + |> shouldEqual 1 - logs () |> shouldEqual [ "Received message for term 0" ] + ServerUtils.maxLogAQuorumHasCommitted [| 2 ; 1 ; 1 |] + |> shouldEqual 1 - // Candidate 1 asks to be elected again! This is fine, maybe the network is replaying requests - // and the network swallowed our reply, so we should reply in the same way. - { - CandidateTerm = 0 - CandidateId = 1 - ReplyChannel = fun message -> logger (sprintf "Received message for term %i" message.VoterTerm) - CandidateLastLogEntry = 0, 0 - } - |> Instruction.RequestVote - |> Message.Instruction - |> cluster.SendMessageDirectly 0 + ServerUtils.maxLogAQuorumHasCommitted [| 2 ; 1 ; 2 |] + |> shouldEqual 2 - logs () - |> shouldEqual [ "Received message for term 0" ; "Received message for term 0" ] - - // Candidate 2 asks to be elected. We won't vote for them, because we've already voted. - // and the network swallowed our reply, so we should reply in the same way. - let calls = ref 0 - - { - CandidateTerm = 0 - CandidateId = 2 - ReplyChannel = fun _ -> Interlocked.Increment calls |> ignore - CandidateLastLogEntry = 0, 0 - } - |> Instruction.RequestVote - |> Message.Instruction - |> cluster.SendMessageDirectly 0 - - calls.Value |> shouldEqual 0 - - [] - let ``Startup sequence in prod, only one timeout takes place`` () = - let cluster, network = InMemoryCluster.make 5 - - cluster.Servers.[0].TriggerTimeout () - cluster.Servers.[0].Sync () - - // We sent a message to every other server; process them. - for i in 1..4 do - let server = i * 1 - (network.AllInboundMessages server).Length |> shouldEqual 1 - let message = network.InboundMessage server 0 - network.DropMessage server 0 - cluster.SendMessageDirectly server message - - (network.AllInboundMessages 0).Length |> shouldEqual i - - for i in 1..4 do - network.InboundMessage 0 (i - 1) - |> cluster.SendMessageDirectly 0 - - network.DropMessage 0 (i - 1) - - // (the messages we've already processed) - (network.AllInboundMessages 0).Length |> shouldEqual 4 - (network.UndeliveredMessages 0).Length |> shouldEqual 0 - - cluster.Servers.[0].State |> shouldEqual (ServerStatus.Leader 1) - - for i in 1..4 do - cluster.Servers.[i].State |> shouldEqual ServerStatus.Follower - - let popOne (queues : 'a list list) : ((int * 'a) * 'a list list) list = - queues - |> List.indexed - |> List.filter (fun (index, l) -> not (List.isEmpty l)) - |> List.collect (fun (firstPlaceWithInstruction, entries) -> - entries - |> List.indexed - |> List.map (fun (i, entry) -> (firstPlaceWithInstruction, entry), List.removeAt i entries) - |> List.map (fun (removed, rest) -> - let afterPop = - queues - |> List.removeAt firstPlaceWithInstruction - |> List.insertAt firstPlaceWithInstruction rest - - removed, afterPop - ) - ) - - let rec allOrderings (queues : 'a list list) : (int * 'a) list list = - let output = popOne queues - - match output with - | [] -> [ [] ] - | output -> - - output - |> List.collect (fun (extracted, remaining) -> - let sub = allOrderings remaining - sub |> List.map (fun s -> extracted :: s) - ) - - let factorial i = - let rec go acc i = - if i <= 0 then acc else go (acc * i) (i - 1) - - go 1 i - - [] - [] - [] - [] - [] - let ``Test factorial`` (n : int, result : int) = factorial n |> shouldEqual result - - [] - let ``Test allOrderings`` () = - let case = [ [ "a" ; "b" ] ; [ "c" ; "d" ; "e" ] ] - let output = case |> allOrderings - output |> shouldEqual (List.distinct output) - - output - |> List.length - |> shouldEqual (factorial (List.concat case |> List.length)) - - let allElements = Set.ofList (List.concat case) - - for output in output do - output |> List.map snd |> Set.ofList |> shouldEqual allElements - - let randomChoice<'a> (r : System.Random) (arr : 'a list) : 'a = arr.[r.Next (0, arr.Length)] - - [] - let ``Startup sequence in prod, two timeouts at once, random`` () = - let rand = System.Random () - let cluster, network = InMemoryCluster.make 5 - - cluster.Servers.[0].TriggerTimeout () - cluster.Servers.[0].Sync () - cluster.Servers.[1].TriggerTimeout () - cluster.Servers.[1].Sync () - - // Those two each sent a message to every other server. - (network.AllInboundMessages 0).Length |> shouldEqual 1 - (network.AllInboundMessages 1).Length |> shouldEqual 1 - - for i in 2..4 do - let server = i * 1 - (network.AllInboundMessages server).Length |> shouldEqual 2 - - while network.AllUndeliveredMessages () |> Seq.concat |> Seq.isEmpty |> not do - let allOrderings' = - network.AllUndeliveredMessages () |> List.map List.ofSeq |> allOrderings - - // Process the messages! - let ordering = randomChoice rand allOrderings' - - for serverConsuming, (messageId, message) in ordering do - let serverConsuming = serverConsuming * 1 - cluster.SendMessageDirectly serverConsuming message - network.DropMessage serverConsuming messageId - - match cluster.Servers.[0].State, cluster.Servers.[1].State with - | Leader _, Leader _ -> failwith "Unexpectedly had two leaders" - | Candidate _, Candidate _ -> failwith "Unexpectedly failed to elect a leader" - | Leader 1, Candidate 1 - | Candidate 1, Leader 1 -> () - | s1, s2 -> failwithf "Unexpected state: %O %O" s1 s2 - - for i in 2..4 do - cluster.Servers.[i].State |> shouldEqual ServerStatus.Follower - - type History = History of (int * int) list - - let historyGen (clusterSize : int) = - gen { - let! pile = Gen.choose (0, clusterSize - 1) - let! entry = Arb.generate - return (pile * 1, abs entry) - } - |> Gen.listOf - |> Gen.map History - - let apply (History history) (cluster : Cluster<'a>) (network : Network<'a>) : unit = - for pile, entry in history do - let messages = network.AllInboundMessages pile - - if entry < messages.Length then - cluster.SendMessageDirectly pile messages.[entry] - - [] - let ``Startup sequence in prod, two timeouts at once, property-based: at most one leader is elected`` () = - let cluster, network = InMemoryCluster.make 5 - - cluster.Servers.[0].TriggerTimeout () - cluster.Servers.[0].Sync () - cluster.Servers.[1].TriggerTimeout () - cluster.Servers.[1].Sync () - - // Those two each sent a message to every other server. - (network.AllInboundMessages 0).Length |> shouldEqual 1 - (network.AllInboundMessages 1).Length |> shouldEqual 1 - - for i in 2..4 do - (network.AllInboundMessages (i * 1)).Length |> shouldEqual 2 - - let property (history : History) = - apply history cluster network - - match cluster.Servers.[0].State, cluster.Servers.[1].State with - | Leader _, Leader _ -> failwith "Unexpectedly elected two leaders" - | _, _ -> () - - for i in 2..4 do - cluster.Servers.[i].State |> shouldEqual ServerStatus.Follower - - property - |> Prop.forAll (Arb.fromGen (historyGen 5)) - |> Check.QuickThrowOnFailure + ServerUtils.maxLogAQuorumHasCommitted [| 1 ; 2 ; 3 ; 4 |] + |> shouldEqual 2 diff --git a/Raft/Domain.fs b/Raft/Domain.fs new file mode 100644 index 0000000..fa105e4 --- /dev/null +++ b/Raft/Domain.fs @@ -0,0 +1,20 @@ +namespace Raft + +/// LogIndex is indexed from 1. We use 0 to indicate "before any history has started". +[] +type LogIndex + +[] +type Term + +[] +type ServerId + +type LogEntry = + { + Index : int + Term : int + } + + override this.ToString () = + sprintf "Log entry %i at subjective term %i" this.Index this.Term diff --git a/Raft/InMemory.fs b/Raft/InMemory.fs index 59fb8c4..a2d1b40 100644 --- a/Raft/InMemory.fs +++ b/Raft/InMemory.fs @@ -11,8 +11,12 @@ type Cluster<'a> = member this.SendMessage (i : int) (m : Message<'a>) : unit = this.SendMessageDirectly i m - member this.Timeout (i : int) : unit = - this.Servers.[i / 1].TriggerTimeout () + member this.InactivityTimeout (i : int) : unit = + this.Servers.[i / 1].TriggerInactivityTimeout () + this.Servers.[i / 1].Sync () + + member this.HeartbeatTimeout (i : int) : unit = + this.Servers.[i / 1].TriggerHeartbeatTimeout () this.Servers.[i / 1].Sync () member this.State (i : int) : ServerStatus = this.Servers.[i / 1].State diff --git a/Raft/Measures.fs b/Raft/Measures.fs deleted file mode 100644 index 99626bd..0000000 --- a/Raft/Measures.fs +++ /dev/null @@ -1,11 +0,0 @@ -namespace Raft - -/// LogIndex is indexed from 1. We use 0 to indicate "before any history has started". -[] -type LogIndex - -[] -type Term - -[] -type ServerId diff --git a/Raft/PersistentState.fs b/Raft/PersistentState.fs index ac45882..afa57e8 100644 --- a/Raft/PersistentState.fs +++ b/Raft/PersistentState.fs @@ -14,7 +14,7 @@ type IPersistentState<'a> = abstract TruncateLog : int -> unit abstract GetLogEntry : int -> ('a * int) option abstract CurrentLogIndex : int - abstract GetLastLogEntry : unit -> (int * ('a * int)) option + abstract GetLastLogEntry : unit -> ('a * LogEntry) option abstract AdvanceToTerm : int -> unit abstract IncrementTerm : unit -> unit abstract Vote : int -> unit @@ -50,11 +50,19 @@ type InMemoryPersistentState<'a> () = let position = if position < 0 then 0 else position log.RemoveRange (position, log.Count - position) - member this.GetLastLogEntry () = + member this.GetLastLogEntry () : ('a * LogEntry) option = if log.Count = 0 then None else - Some (log.Count * 1, log.[log.Count - 1]) + let stored, term = log.[log.Count - 1] + + Some ( + stored, + { + Index = log.Count * 1 + Term = term + } + ) member this.GetLogEntry position = let position = position / 1 diff --git a/Raft/Raft.fsproj b/Raft/Raft.fsproj index 463eac6..0aa8d34 100644 --- a/Raft/Raft.fsproj +++ b/Raft/Raft.fsproj @@ -7,7 +7,7 @@ - + diff --git a/Raft/Server.fs b/Raft/Server.fs index 84ec0a9..0367195 100644 --- a/Raft/Server.fs +++ b/Raft/Server.fs @@ -23,7 +23,7 @@ type LeaderState = /// For each server, index of the next log entry to send to that server NextIndex : int array /// For each server, index of the highest log entry known to be replicated on that server - MatchIndex : int array + MatchIndex : int array } static member New (clusterSize : int) (currentIndex : int) : LeaderState = @@ -55,7 +55,7 @@ type RequestVoteMessage = { CandidateTerm : int CandidateId : int - CandidateLastLogEntry : int * int + CandidateLastLogEntry : LogEntry option ReplyChannel : RequestVoteReply -> unit } @@ -65,21 +65,25 @@ type RequestVoteMessage = /// I, a follower, acknowledge the leader's instruction to add an entry to my log. type AppendEntriesReply = { + /// Me, the follower who is replying + Follower : int + /// The term I, the follower, think it is FollowerTerm : int - /// Reply with failure if the follower thinks the leader is out of date: + /// Reply with None if the follower thinks the leader is out of date: /// that is, if the leader's believed term is smaller than the follower's, /// or if the leader's declared previous log entry doesn't exist on the follower. - Success : bool - } - -type LogEntry = - { - Index : int - Term : int + /// If instead we accepted the update, this is the current head of the follower's log + /// after accepting the update. + Success : int option } override this.ToString () = - sprintf "Log entry %i at subjective term %i" this.Index this.Term + let description = + match this.Success with + | Some index -> sprintf "successfully applied leader entry, log length %i" index + | None -> "did not apply leader entry" + + sprintf "Follower %i %s" this.FollowerTerm description /// I am the leader. Followers, update your state as follows. type AppendEntriesMessage<'a> = @@ -89,7 +93,7 @@ type AppendEntriesMessage<'a> = /// I am your leader! This is me! (so everyone knows where to send clients to) LeaderId : int /// The entry immediately preceding the entry I'm sending you, so you can tell if we've got out of sync. - PrevLogEntry : LogEntry + PrevLogEntry : LogEntry option /// Followers, append this entry to your log. (Or, if None, this is just a heartbeat.) /// It was determined at the given term - recall that we might need to bring a restarted node up to speed /// with what happened during terms that took place while it was down. @@ -127,10 +131,12 @@ type Instruction<'a> = type Reply = | RequestVoteReply of RequestVoteReply + | AppendEntriesReply of AppendEntriesReply override this.ToString () = match this with | RequestVoteReply v -> v.ToString () + | AppendEntriesReply r -> r.ToString () type Message<'a> = | Instruction of Instruction<'a> @@ -174,9 +180,33 @@ type ServerStatus = type private ServerAction<'a> = | BeginElection + | EmitHeartbeat | Receive of Message<'a> | Sync of AsyncReplyChannel +[] +module internal ServerUtils = + + /// Return the maximum log index which a quorum has committed. + /// Recall that 0 means "nothing committed". + let maxLogAQuorumHasCommitted (matchIndex : int[]) : int = + let numberWhoCommittedIndex = matchIndex |> Array.countBy id + + numberWhoCommittedIndex |> Array.sortInPlaceBy fst + + let rec go (numberCounted : int) (result : int) (i : int) = + if i < 0 then + result + else + let numberCounted = numberCounted + snd numberWhoCommittedIndex.[i] + + if numberCounted > matchIndex.Length / 2 then + fst numberWhoCommittedIndex.[i] + else + go numberCounted (fst numberWhoCommittedIndex.[i]) (i - 1) + + go 0 0 (numberWhoCommittedIndex.Length - 1) + type Server<'a> ( clusterSize : int, @@ -195,46 +225,46 @@ type Server<'a> // We're definitely out of date. Switch to follower mode. currentType <- ServerSpecialisation.Follower persistentState.AdvanceToTerm message.Term + // TODO - `DropStaleResponse` suggests we should do this + //elif message.Term < persistentState.CurrentTerm then + // // Drop the message, it's old. + // () + //else match message with | RequestVote message -> // This was guaranteed above. assert (message.CandidateTerm <= persistentState.CurrentTerm) - // The following clauses define either condition under which we accept that the candidate has more data + // The following clause defines the condition under which we accept that the candidate has more data // than we do, and so could be a more suitable leader than us. - // TODO collapse these clauses, it'll be much neater - - let messageSupersedesMe = - // Is the candidate advertising a later term than our last-persisted write was made at? - // (That would mean it's far in the future of us.) - match persistentState.GetLastLogEntry () with - | Some (_, (_, ourLastTerm)) -> snd message.CandidateLastLogEntry > ourLastTerm - | None -> - // We have persisted no history at all! + // (This is the `logOk` of the paper.) + let candidateSupersedesMe = + match persistentState.GetLastLogEntry (), message.CandidateLastLogEntry with + | Some (_, ourLastEntry), Some candidateLastLogEntry -> + // The candidate wins if: + // * it's from so far in the future that an election has happened which we haven't heard about; or + // * it's from the same term as us, but it's logged more than we have. + candidateLastLogEntry.Term > ourLastEntry.Term + || (candidateLastLogEntry.Term = ourLastEntry.Term + && candidateLastLogEntry.Index >= persistentState.CurrentLogIndex) + | Some _, None -> + // We've logged something, and they have not. We're ahead, and won't vote for them. + false + | None, _ -> + // We have persisted no history at all. + // They take precedence - either they've logged something (in which case they must be ahead of us), + // or they haven't logged anything (in which case we'll defer to them for beating us to the + // first election). true - let messageExtendsMe = - // Do we agree what the current term is, is the candidate advertising a more advanced log than us? - match persistentState.GetLastLogEntry () with - | Some (_, (_, ourLastTerm)) -> - snd message.CandidateLastLogEntry = ourLastTerm - && fst message.CandidateLastLogEntry >= persistentState.CurrentLogIndex - | None -> - // We've persisted no history; the candidate needs to also be at the start of history, - // or else we'd have already considered them in the `messageSupersedesMe` check. - snd message.CandidateLastLogEntry = 0 - - // (This is the `logOk` of the paper. It's true iff the candidate's declaration is ahead of us.) - let candidateIsAhead = messageSupersedesMe || messageExtendsMe - // But just because the candidate is ahead of us, doesn't mean we can vote for them. // We can only vote for one candidate per election. // (We can't rely on our own VotedFor property, because that may have been in a previous election.) let shouldVoteFor = - if message.CandidateTerm = persistentState.CurrentTerm && candidateIsAhead then + if message.CandidateTerm = persistentState.CurrentTerm && candidateSupersedesMe then // We agree on which election we're taking part in, and moreover we agree that the candidate is // suitable. match persistentState.VotedFor with @@ -278,7 +308,8 @@ type Server<'a> // Reject the request: the "leader" is actually outdated, it was only a leader in the past. { FollowerTerm = persistentState.CurrentTerm - Success = false + Success = None + Follower = me } |> message.ReplyChannel @@ -295,8 +326,9 @@ type Server<'a> } { - Success = true + Success = Some persistentState.CurrentLogIndex FollowerTerm = persistentState.CurrentTerm + Follower = me } |> message.ReplyChannel @@ -308,7 +340,10 @@ type Server<'a> | Some (toInsert, toInsertTerm) -> - let desiredLogInsertionPosition = message.PrevLogEntry.Index + 1 + let desiredLogInsertionPosition = + match message.PrevLogEntry with + | None -> 1 + | Some entry -> entry.Index + 1 match persistentState.GetLogEntry desiredLogInsertionPosition with | Some (_, existingTerm) when toInsertTerm = existingTerm -> @@ -322,8 +357,9 @@ type Server<'a> persistentState.AppendToLog toInsert toInsertTerm { - Success = true + Success = Some desiredLogInsertionPosition FollowerTerm = persistentState.CurrentTerm + Follower = me } |> message.ReplyChannel @@ -336,25 +372,27 @@ type Server<'a> persistentState.AppendToLog toInsert toInsertTerm { - Success = true + Success = Some desiredLogInsertionPosition FollowerTerm = persistentState.CurrentTerm + Follower = me } |> message.ReplyChannel let logIsConsistent (message : AppendEntriesMessage<'a>) : bool = - if message.PrevLogEntry.Index = 0 then + match message.PrevLogEntry with + | None -> // The leader advertises that they have no committed history, so certainly it's consistent with // us. true - else + | Some entry -> - match persistentState.GetLogEntry message.PrevLogEntry.Index with + match persistentState.GetLogEntry entry.Index with | None -> // The leader's advertised commit is ahead of our history. false | Some (_, ourTermForThisEntry) -> // The leader's advertised commit is in our history; do we agree with it? - ourTermForThisEntry = message.PrevLogEntry.Term + ourTermForThisEntry = entry.Term match currentType with | ServerSpecialisation.Leader _ -> @@ -373,7 +411,8 @@ type Server<'a> // Reject the request, it's inconsistent with our history. { FollowerTerm = persistentState.CurrentTerm - Success = false + Success = None + Follower = me } |> message.ReplyChannel @@ -388,6 +427,72 @@ type Server<'a> assert (logIsConsistent message) acceptRequest () + let divideByTwoRoundingUp (n : int) = + if n % 2 = 0 then n / 2 else (n / 2) + 1 + + let processReply (r : Reply) : unit = + match r with + | AppendEntriesReply appendEntriesReply -> + match currentType with + | ServerSpecialisation.Candidate _ + | ServerSpecialisation.Follower -> () + | ServerSpecialisation.Leader leaderState -> + + if appendEntriesReply.FollowerTerm = persistentState.CurrentTerm then + match appendEntriesReply.Success with + | Some matchIndex -> + leaderState.MatchIndex.[appendEntriesReply.Follower / 1] <- matchIndex + leaderState.NextIndex.[appendEntriesReply.Follower / 1] <- matchIndex + 1 + | None -> + leaderState.NextIndex.[appendEntriesReply.Follower / 1] <- + max + (leaderState.NextIndex.[appendEntriesReply.Follower / 1] - 1) + 1 + // Note that the decision to process this *here* means the algorithm doesn't work in clusters of + // only one node, because then there will never be any AppendEntries replies. + let maxLogAQuorumHasCommitted = + ServerUtils.maxLogAQuorumHasCommitted leaderState.MatchIndex + + if maxLogAQuorumHasCommitted > 0 then + let ourLogTerm = + match persistentState.GetLogEntry maxLogAQuorumHasCommitted with + | None -> + failwithf + "Invariant violated. The leader has not logged an entry (%i) which it knows a quorum has logged." + maxLogAQuorumHasCommitted + | Some (_, term) -> term + + if ourLogTerm = persistentState.CurrentTerm then + volatileState <- + { volatileState with + CommitIndex = maxLogAQuorumHasCommitted + } + + | RequestVoteReply requestVoteReply -> + match currentType with + | ServerSpecialisation.Leader _ + | ServerSpecialisation.Follower -> + // We're not expecting any votes; drop them. + () + | ServerSpecialisation.Candidate state -> + + if requestVoteReply.VoterTerm = persistentState.CurrentTerm then + state.Votes.[requestVoteReply.Voter / 1] <- Some requestVoteReply.VoteGranted + + // Inefficient, but :shrug: + if + Array.sumBy + (function + | Some true -> 1 + | _ -> 0) + state.Votes > clusterSize / 2 + then + // Become the leader! + currentType <- + LeaderState.New clusterSize persistentState.CurrentLogIndex + |> ServerSpecialisation.Leader + + let mailbox = let rec loop (mailbox : MailboxProcessor<_>) = async { @@ -396,6 +501,26 @@ type Server<'a> //System.Console.WriteLine toPrint match m with + | ServerAction.EmitHeartbeat -> + match currentType with + | ServerSpecialisation.Candidate _ + | ServerSpecialisation.Follower -> () + | ServerSpecialisation.Leader _ -> + for i in 0 .. clusterSize - 1 do + if i * 1 <> me then + { + LeaderTerm = persistentState.CurrentTerm + LeaderId = me + PrevLogEntry = persistentState.GetLastLogEntry () |> Option.map snd + NewEntry = None + LeaderCommitIndex = volatileState.CommitIndex + ReplyChannel = + fun reply -> + messageChannel me (reply |> Reply.AppendEntriesReply |> Message.Reply) + } + |> Instruction.AppendEntries + |> Message.Instruction + |> messageChannel (i * 1) | ServerAction.BeginElection -> match currentType with | ServerSpecialisation.Leader _ -> () @@ -412,44 +537,15 @@ type Server<'a> { CandidateTerm = persistentState.CurrentTerm CandidateId = me - CandidateLastLogEntry = - match persistentState.GetLastLogEntry () with - | Some (index, (_, term)) -> (index, term) - | None -> - // TODO this is almost certainly not right - (0, 0) - ReplyChannel = - // TODO this is bypassing the network - stop it! - fun reply -> messageChannel me (RequestVoteReply reply |> Message.Reply) + CandidateLastLogEntry = persistentState.GetLastLogEntry () |> Option.map snd + ReplyChannel = fun reply -> messageChannel me (RequestVoteReply reply |> Message.Reply) } |> Instruction.RequestVote |> Message.Instruction |> messageChannel (i * 1) - | ServerAction.Receive (Instruction m) -> return processMessage m - | ServerAction.Sync reply -> reply.Reply () - | ServerAction.Receive (Reply (RequestVoteReply requestVoteReply)) -> - match currentType with - | ServerSpecialisation.Leader _ - | ServerSpecialisation.Follower -> - // We're not expecting any votes; drop them. - () - | ServerSpecialisation.Candidate state -> - - if requestVoteReply.VoterTerm = persistentState.CurrentTerm then - state.Votes.[requestVoteReply.Voter / 1] <- Some requestVoteReply.VoteGranted - - // Inefficient, but :shrug: - if - Array.sumBy - (function - | Some true -> 1 - | _ -> 0) - state.Votes > clusterSize / 2 - then - // Become the leader! - currentType <- - LeaderState.New clusterSize persistentState.CurrentLogIndex - |> ServerSpecialisation.Leader + | ServerAction.Receive (Instruction m) -> processMessage m + | ServerAction.Receive (Reply r) -> processReply r + | ServerAction.Sync replyChannel -> replyChannel.Reply () return! loop mailbox } @@ -458,7 +554,8 @@ type Server<'a> mailbox.Error.Add raise mailbox - member this.TriggerTimeout () = mailbox.Post ServerAction.BeginElection + member this.TriggerInactivityTimeout () = mailbox.Post ServerAction.BeginElection + member this.TriggerHeartbeatTimeout () = mailbox.Post ServerAction.EmitHeartbeat member this.Message (m : Message<'a>) = mailbox.Post (ServerAction.Receive m) diff --git a/RaftExplorer/Program.fs b/RaftExplorer/Program.fs index 790cd74..c5dea0d 100644 --- a/RaftExplorer/Program.fs +++ b/RaftExplorer/Program.fs @@ -57,12 +57,28 @@ module Program = printf "Unrecognised input. " None + let rec getHeartbeater (clusterSize : int) (serverId : string) = + // TODO: restrict this to the leaders only + match Int32.TryParse serverId with + | true, serverId -> + if serverId >= clusterSize || serverId < 0 then + printf "Server ID must be between 0 and %i inclusive. " (clusterSize - 1) + None + else + Some (serverId * 1) + | false, _ -> + printf "Unrecognised input. " + None + type UserAction = - | Timeout of int + | InactivityTimeout of int | NetworkMessage of int * int + | DropMessage of int * int + | Heartbeat of int let rec getAction (clusterSize : int) = - printf "Enter action. Trigger [t]imeout , or allow [m]essage : " + printf + "Enter action. Trigger [t]imeout , [h]eartbeat a leader , [d]rop message , or allow [m]essage : " let s = let rec go () = @@ -74,31 +90,62 @@ module Program = match s.[0] with | 'T' -> match getTimeout clusterSize s.[1..] with - | Some t -> t |> Timeout + | Some t -> t |> InactivityTimeout + | None -> getAction clusterSize + | 'D' -> + match getMessage clusterSize s.[1..] with + | Some m -> m |> DropMessage | None -> getAction clusterSize | 'M' -> match getMessage clusterSize s.[1..] with | Some m -> m |> NetworkMessage | None -> getAction clusterSize + | 'H' -> + match getHeartbeater clusterSize s.[1..] with + | Some h -> Heartbeat h + | None -> getAction clusterSize | _ -> printf "Unrecognised input. " getAction clusterSize + let processAction (cluster : Cluster<'a>) (network : Network<'a>) (action : UserAction) : unit = + match action with + | InactivityTimeout serverId -> cluster.InactivityTimeout serverId + | Heartbeat serverId -> cluster.HeartbeatTimeout serverId + | DropMessage (serverId, messageId) -> network.DropMessage serverId messageId + | NetworkMessage (serverId, messageId) -> + network.InboundMessage serverId messageId |> cluster.SendMessage serverId + network.DropMessage serverId messageId + + [] let main _argv = let clusterSize = 5 let cluster, network = InMemoryCluster.make clusterSize + let startupSequence = + [ + UserAction.InactivityTimeout 0 + UserAction.NetworkMessage (1, 0) + UserAction.NetworkMessage (2, 0) + UserAction.DropMessage (3, 0) + UserAction.DropMessage (4, 0) + UserAction.NetworkMessage (0, 0) + UserAction.NetworkMessage (0, 1) + ] + + for action in startupSequence do + processAction cluster network action + while true do printNetworkState network printClusterState cluster let action = getAction clusterSize + processAction cluster network action - match action with - | Timeout serverId -> cluster.Timeout serverId - | NetworkMessage (serverId, messageId) -> - network.InboundMessage serverId messageId |> cluster.SendMessage serverId - network.DropMessage serverId messageId + // TODO: log out the committed state so that we can see whether the leader is correctly + // processing heartbeat responses + // TODO: allow client queries! 0