From 72be3ebec0119cfdcd68494bc94695b0f8825ae3 Mon Sep 17 00:00:00 2001 From: Smaug123 Date: Tue, 25 Oct 2022 22:41:22 +0100 Subject: [PATCH] Test that we elect a leader on startup --- Raft.Test/Logger.fs | 13 +++ Raft.Test/Raft.Test.fsproj | 1 + Raft.Test/TestInMemoryPersistentState.fs | 2 +- Raft.Test/TestServer.fs | 27 +++-- Raft/PersistentState.fs | 20 +++- Raft/Server.fs | 122 +++++++++++++++++++---- 6 files changed, 153 insertions(+), 32 deletions(-) create mode 100644 Raft.Test/Logger.fs diff --git a/Raft.Test/Logger.fs b/Raft.Test/Logger.fs new file mode 100644 index 0000000..b17898e --- /dev/null +++ b/Raft.Test/Logger.fs @@ -0,0 +1,13 @@ +namespace Raft.Test + +[] +module TestLogger = + + let make () : (string -> unit) * (unit -> string list) = + let logs = ResizeArray () + let logLine (s : string) = lock logs (fun () -> logs.Add s) + + let freezeLogs () = + lock logs (fun () -> logs |> Seq.toList) + + logLine, freezeLogs diff --git a/Raft.Test/Raft.Test.fsproj b/Raft.Test/Raft.Test.fsproj index 332624a..5e4cc8b 100644 --- a/Raft.Test/Raft.Test.fsproj +++ b/Raft.Test/Raft.Test.fsproj @@ -7,6 +7,7 @@ + diff --git a/Raft.Test/TestInMemoryPersistentState.fs b/Raft.Test/TestInMemoryPersistentState.fs index 0347db6..47b6a53 100644 --- a/Raft.Test/TestInMemoryPersistentState.fs +++ b/Raft.Test/TestInMemoryPersistentState.fs @@ -50,7 +50,7 @@ module TestInMemoryPersistentState = | Some entry -> (uut :> IPersistentState<_>).TruncateLog truncate - (uut :> IPersistentState<_>).GetLastLogEntry () = Some entry + (uut :> IPersistentState<_>).GetLastLogEntry () = Some (truncate, entry) && isPrefix (uut.GetLog ()) oldLog && (uut :> IPersistentState<_>).CurrentLogIndex = truncate diff --git a/Raft.Test/TestServer.fs b/Raft.Test/TestServer.fs index 89333dc..7c88d10 100644 --- a/Raft.Test/TestServer.fs +++ b/Raft.Test/TestServer.fs @@ -9,17 +9,10 @@ open FsUnitTyped module TestServer = [] - let foo () = + let ``Startup sequence, first fumbling steps`` () = let cluster = InMemoryCluster.make true 5 - let logger, logs = - let logs = ResizeArray () - let logLine (s : string) = lock logs (fun () -> logs.Add s) - - let freezeLogs () = - lock logs (fun () -> logs |> Seq.toList) - - logLine, freezeLogs + let logger, logs = TestLogger.make () // Candidate 1 asks server 0 to vote for it. @@ -62,3 +55,19 @@ module TestServer = |> cluster.SendMessage 0 calls.Value |> shouldEqual 0 + + [] + let ``Startup sequence in prod`` () = + let cluster = InMemoryCluster.make false 5 + + cluster.Servers.[0].TriggerTimeout () + cluster.Servers.[0].Sync () + + // We sent a message to every other server; process them. + for i in 1..4 do + cluster.Servers.[i].Sync () + + cluster.Servers.[0].State |> shouldEqual ServerStatus.Leader + + for i in 1..4 do + cluster.Servers.[i].State |> shouldEqual ServerStatus.Follower diff --git a/Raft/PersistentState.fs b/Raft/PersistentState.fs index 51b24c7..ac45882 100644 --- a/Raft/PersistentState.fs +++ b/Raft/PersistentState.fs @@ -1,5 +1,7 @@ namespace Raft +open System.Threading + type IPersistentState<'a> = abstract CurrentTerm : int /// If I know about an election in my CurrentTerm, who did I vote for during that election? @@ -12,26 +14,31 @@ type IPersistentState<'a> = abstract TruncateLog : int -> unit abstract GetLogEntry : int -> ('a * int) option abstract CurrentLogIndex : int - abstract GetLastLogEntry : unit -> ('a * int) option + abstract GetLastLogEntry : unit -> (int * ('a * int)) option abstract AdvanceToTerm : int -> unit + abstract IncrementTerm : unit -> unit abstract Vote : int -> unit /// Server state which must survive a server crash. [] type InMemoryPersistentState<'a> () = - let mutable currentTerm = 0 + let mutable currentTerm = 0 let mutable votedFor : int option = None let log = ResizeArray<'a * int> () member this.GetLog () = log |> List.ofSeq interface IPersistentState<'a> with - member this.CurrentTerm = currentTerm + member this.CurrentTerm = currentTerm * 1 + + member this.IncrementTerm () = + Interlocked.Increment ¤tTerm |> ignore + member this.VotedFor = votedFor member this.Vote id = votedFor <- Some id member this.AdvanceToTerm term = - currentTerm <- term + currentTerm <- term / 1 votedFor <- None member this.AppendToLog entry term = log.Add (entry, term) @@ -44,7 +51,10 @@ type InMemoryPersistentState<'a> () = log.RemoveRange (position, log.Count - position) member this.GetLastLogEntry () = - if log.Count = 0 then None else Some log.[log.Count - 1] + if log.Count = 0 then + None + else + Some (log.Count * 1, log.[log.Count - 1]) member this.GetLogEntry position = let position = position / 1 diff --git a/Raft/Server.fs b/Raft/Server.fs index 0f9c8c1..b1a4d74 100644 --- a/Raft/Server.fs +++ b/Raft/Server.fs @@ -26,9 +26,17 @@ type LeaderState = MatchIndex : int array } + static member New (clusterSize : int) (currentIndex : int) : LeaderState = + { + NextIndex = Array.create clusterSize (currentIndex + 1) + MatchIndex = Array.zeroCreate clusterSize + } + /// You asked me to vote for you to become leader. Here is my response. type RequestVoteReply = { + /// Me! + Voter : int /// The term I think it is. VoterTerm : int /// Whether I am happy for you to become leader. (For example, if my term is greater than yours, then you're @@ -87,32 +95,54 @@ type Message<'a> = | AppendEntries m -> m.LeaderTerm | RequestVote m -> m.CandidateTerm -type ServerSpecialisation = +type private CandidateState = + { + /// For each voter, the vote I received from them. + Votes : bool option array + } + + static member New (count : int) (self : int) = + let votes = Array.zeroCreate<_> count + votes.[self / 1] <- Some true + + { + Votes = votes + } + +[] +type private ServerSpecialisation = | Leader of LeaderState | Follower + | Candidate of CandidateState + +type ServerStatus = + | Leader + | Follower | Candidate -type ServerAction<'a> = +type private ServerAction<'a> = | BeginElection | Receive of Message<'a> + | ReceiveReply of RequestVoteReply | Sync of AsyncReplyChannel type Server<'a> ( clusterSize : int, + me : int, persistentState : IPersistentState<'a>, messageChannel : int -> Message<'a> -> unit ) as this= let mutable volatileState = VolatileState.New - let mutable currentType = Follower + let mutable currentType = ServerSpecialisation.Follower let processMessage (message : Message<'a>) : unit = // First, see if this message comes from a future term. // (This is `UpdateTerm` from the TLA+.) if message.Term > persistentState.CurrentTerm then // We're definitely out of date. Switch to follower mode. - currentType <- Follower + currentType <- ServerSpecialisation.Follower persistentState.AdvanceToTerm message.Term match message with @@ -129,7 +159,7 @@ type Server<'a> // Is the candidate advertising a later term than our last-persisted write was made at? // (That would mean it's far in the future of us.) match persistentState.GetLastLogEntry () with - | Some (_, ourLastTerm) -> snd message.CandidateLastLogEntry > ourLastTerm + | Some (_, (_, ourLastTerm)) -> snd message.CandidateLastLogEntry > ourLastTerm | None -> // We have persisted no history at all! true @@ -137,7 +167,7 @@ type Server<'a> let messageExtendsMe = // Do we agree what the current term is, is the candidate advertising a more advanced log than us? match persistentState.GetLastLogEntry () with - | Some (_, ourLastTerm) -> + | Some (_, (_, ourLastTerm)) -> snd message.CandidateLastLogEntry = ourLastTerm && fst message.CandidateLastLogEntry >= persistentState.CurrentLogIndex | None -> @@ -182,6 +212,7 @@ type Server<'a> persistentState.Vote message.CandidateId { + Voter = me VoterTerm = persistentState.CurrentTerm VoteGranted = true } @@ -218,7 +249,7 @@ type Server<'a> |> message.ReplyChannel let acceptRequest () : unit = - assert (currentType = Follower) + assert (currentType = ServerSpecialisation.Follower) match message.NewEntry with | None -> heartbeat message @@ -274,7 +305,7 @@ type Server<'a> ourTermForThisEntry = message.PrevLogEntry.Term match currentType with - | Leader _ -> + | ServerSpecialisation.Leader _ -> [ "Violation of invariant." "This is a logic error that cannot happen unless there is a bug in this Raft implementation." @@ -285,7 +316,7 @@ type Server<'a> |> String.concat "\n" |> failwithf "%s" - | Follower -> + | ServerSpecialisation.Follower -> if not (logIsConsistent message) then // Reject the request, it's inconsistent with our history. { @@ -297,10 +328,10 @@ type Server<'a> else acceptRequest () - | Candidate -> + | ServerSpecialisation.Candidate _ -> // We've already verified that the message was sent from a leader in the current term, so we have // lost the election. - currentType <- Follower + currentType <- ServerSpecialisation.Follower // TODO: why does this assertion hold? assert (logIsConsistent message) acceptRequest () @@ -308,17 +339,70 @@ type Server<'a> let mailbox = let rec loop (mailbox : MailboxProcessor<_>) = async { + // TODO this should really wait for explicit Sync calls + // if we're running in memory, for determinism. let! m = mailbox.Receive () + let toPrint = sprintf "Processing message in server %i: %+A" me m + System.Console.WriteLine toPrint match m with - | ServerAction.BeginElection -> return failwith "not yet implemented" + | ServerAction.BeginElection -> + match currentType with + | ServerSpecialisation.Leader _ -> () + | ServerSpecialisation.Candidate _ + | ServerSpecialisation.Follower -> + + // Start the election! + currentType <- ServerSpecialisation.Candidate (CandidateState.New clusterSize me) + persistentState.IncrementTerm () + + for i in 0 .. clusterSize - 1 do + if i * 1 <> me then + { + CandidateTerm = persistentState.CurrentTerm + CandidateId = me + CandidateLastLogEntry = + match persistentState.GetLastLogEntry () with + | Some (index, (_, term)) -> (index, term) + | None -> + // TODO this is almost certainly not right + (0, 0) + ReplyChannel = fun reply -> mailbox.Post (ReceiveReply reply) + } + |> Message.RequestVote + |> messageChannel (i * 1) | ServerAction.Receive m -> return processMessage m | ServerAction.Sync reply -> reply.Reply () + | ServerAction.ReceiveReply requestVoteReply -> + match currentType with + | ServerSpecialisation.Leader _ + | ServerSpecialisation.Follower -> + // We're not expecting any votes; drop them. + () + | ServerSpecialisation.Candidate state -> + + if requestVoteReply.VoterTerm = persistentState.CurrentTerm then + state.Votes.[requestVoteReply.Voter / 1] <- Some requestVoteReply.VoteGranted + + // Inefficient, but :shrug: + if + Array.sumBy + (function + | Some true -> 1 + | _ -> 0) + state.Votes > clusterSize / 2 + then + // Become the leader! + currentType <- + LeaderState.New clusterSize persistentState.CurrentLogIndex + |> ServerSpecialisation.Leader return! loop mailbox } - loop |> MailboxProcessor.Start + let mailbox = loop |> MailboxProcessor.Start + mailbox.Error.Add raise + mailbox member this.TriggerTimeout () = mailbox.Post ServerAction.BeginElection @@ -326,6 +410,12 @@ type Server<'a> member this.Sync () = mailbox.PostAndReply ServerAction.Sync + member this.State = + match currentType with + | ServerSpecialisation.Leader _ -> ServerStatus.Leader + | ServerSpecialisation.Candidate _ -> ServerStatus.Candidate + | ServerSpecialisation.Follower -> ServerStatus.Follower + // { // ClusterSize : int // mutable VolatileState : VolatileState @@ -349,13 +439,11 @@ module InMemoryCluster = let make<'a> (immediateFlush : bool) (count : int) : Cluster<'a> = let servers = Array.zeroCreate> count - let locker = obj () - let messageChannel (serverId : int) (message : Message<'a>) : unit = - lock locker (fun () -> servers.[serverId / 1].Message message) + servers.[serverId / 1].Message message for s in 0 .. servers.Length - 1 do - servers.[s] <- Server (count, InMemoryPersistentState (), messageChannel) + servers.[s] <- Server (count, s * 1, InMemoryPersistentState (), messageChannel) { Servers = servers