From 622280fb012774224cff530e6824396ec9a1dca2 Mon Sep 17 00:00:00 2001 From: Smaug123 Date: Tue, 25 Oct 2022 21:29:09 +0100 Subject: [PATCH] Move to mailboxes --- Raft.Test/TestServer.fs | 10 +-- Raft/Server.fs | 180 ++++++++++++++++++++++------------------ 2 files changed, 104 insertions(+), 86 deletions(-) diff --git a/Raft.Test/TestServer.fs b/Raft.Test/TestServer.fs index 7e27ae3..89333dc 100644 --- a/Raft.Test/TestServer.fs +++ b/Raft.Test/TestServer.fs @@ -10,7 +10,7 @@ module TestServer = [] let foo () = - let cluster = InMemoryCluster.make 5 + let cluster = InMemoryCluster.make true 5 let logger, logs = let logs = ResizeArray () @@ -21,8 +21,6 @@ module TestServer = logLine, freezeLogs - let sendMessage = cluster.Servers.[0].OutboundMessageChannel - // Candidate 1 asks server 0 to vote for it. { @@ -32,7 +30,7 @@ module TestServer = CandidateLastLogEntry = 0, 0 } |> Message.RequestVote - |> sendMessage 0 + |> cluster.SendMessage 0 logs () |> shouldEqual [ "Received message for term 0" ] @@ -45,7 +43,7 @@ module TestServer = CandidateLastLogEntry = 0, 0 } |> Message.RequestVote - |> sendMessage 0 + |> cluster.SendMessage 0 logs () |> shouldEqual [ "Received message for term 0" ; "Received message for term 0" ] @@ -61,6 +59,6 @@ module TestServer = CandidateLastLogEntry = 0, 0 } |> Message.RequestVote - |> sendMessage 0 + |> cluster.SendMessage 0 calls.Value |> shouldEqual 0 diff --git a/Raft/Server.fs b/Raft/Server.fs index 5a647fa..0f9c8c1 100644 --- a/Raft/Server.fs +++ b/Raft/Server.fs @@ -12,6 +12,12 @@ type VolatileState = LastApplied : int } + static member New : VolatileState = + { + CommitIndex = 0 + LastApplied = 0 + } + type LeaderState = { /// For each server, index of the next log entry to send to that server @@ -86,55 +92,33 @@ type ServerSpecialisation = | Follower | Candidate -type Server<'a> = - { - mutable VolatileState : VolatileState - PersistentState : IPersistentState<'a> - mutable Type : ServerSpecialisation - Timeout : unit -> unit - OutboundMessageChannel : int -> Message<'a> -> unit - } +type ServerAction<'a> = + | BeginElection + | Receive of Message<'a> + | Sync of AsyncReplyChannel -[] -module Server = +type Server<'a> + ( + clusterSize : int, + persistentState : IPersistentState<'a>, + messageChannel : int -> Message<'a> -> unit + ) + as this= + let mutable volatileState = VolatileState.New + let mutable currentType = Follower - let inline private getLogEntry<'a> (index : int) (arr : 'a array) : 'a option = - arr |> Array.tryItem ((index - 1) / 1) - - let inline private truncateLog<'a> (finalIndex : int) (arr : 'a array) : 'a array = - arr |> Array.truncate (finalIndex / 1) - - let inline private replaceLog<'a> (index : int) (elt : 'a) (arr : 'a array) : 'a array = - let toRet = Array.copy arr - toRet.[(index - 1) / 1] <- elt - toRet - - let create<'a> (messageChannel : int -> Message<'a> -> unit) : Server<'a> = - { - OutboundMessageChannel = messageChannel - Type = Follower - VolatileState = - { - CommitIndex = 0 - LastApplied = 0 - } - PersistentState = InMemoryPersistentState () - Timeout = fun () -> () - } - - /// Returns the new state of the same server. - let processMessage<'a> (message : Message<'a>) (s : Server<'a>) : unit = + let processMessage (message : Message<'a>) : unit = // First, see if this message comes from a future term. // (This is `UpdateTerm` from the TLA+.) - if message.Term > s.PersistentState.CurrentTerm then + if message.Term > persistentState.CurrentTerm then // We're definitely out of date. Switch to follower mode. - s.Type <- Follower - s.PersistentState.AdvanceToTerm message.Term + currentType <- Follower + persistentState.AdvanceToTerm message.Term match message with | RequestVote message -> // This was guaranteed above. - assert (message.CandidateTerm <= s.PersistentState.CurrentTerm) + assert (message.CandidateTerm <= persistentState.CurrentTerm) // The following clauses define either condition under which we accept that the candidate has more data // than we do, and so could be a more suitable leader than us. @@ -144,7 +128,7 @@ module Server = let messageSupersedesMe = // Is the candidate advertising a later term than our last-persisted write was made at? // (That would mean it's far in the future of us.) - match s.PersistentState.GetLastLogEntry () with + match persistentState.GetLastLogEntry () with | Some (_, ourLastTerm) -> snd message.CandidateLastLogEntry > ourLastTerm | None -> // We have persisted no history at all! @@ -152,10 +136,10 @@ module Server = let messageExtendsMe = // Do we agree what the current term is, is the candidate advertising a more advanced log than us? - match s.PersistentState.GetLastLogEntry () with + match persistentState.GetLastLogEntry () with | Some (_, ourLastTerm) -> snd message.CandidateLastLogEntry = ourLastTerm - && fst message.CandidateLastLogEntry >= s.PersistentState.CurrentLogIndex + && fst message.CandidateLastLogEntry >= persistentState.CurrentLogIndex | None -> // We've persisted no history; the candidate needs to also be at the start of history, // or else we'd have already considered them in the `messageSupersedesMe` check. @@ -169,10 +153,10 @@ module Server = // (We can't rely on our own VotedFor property, because that may have been in a previous election.) let shouldVoteFor = - if message.CandidateTerm = s.PersistentState.CurrentTerm && candidateIsAhead then + if message.CandidateTerm = persistentState.CurrentTerm && candidateIsAhead then // We agree on which election we're taking part in, and moreover we agree that the candidate is // suitable. - match s.PersistentState.VotedFor with + match persistentState.VotedFor with | None -> // We haven't voted in this election before. true @@ -194,24 +178,23 @@ module Server = // getting a chance to persist. // (Better for us to wrongly think we've voted than to wrongly think we've yet to vote. In the worst // case we just end up not participating in an election.) - // TODO when this is made mutable etc: call Persist here - s.PersistentState.Vote message.CandidateId + persistentState.Vote message.CandidateId { - VoterTerm = s.PersistentState.CurrentTerm + VoterTerm = persistentState.CurrentTerm VoteGranted = true } |> message.ReplyChannel | AppendEntries message -> // This was guaranteed above. - assert (message.LeaderTerm <= s.PersistentState.CurrentTerm) + assert (message.LeaderTerm <= persistentState.CurrentTerm) - if message.LeaderTerm < s.PersistentState.CurrentTerm then + if message.LeaderTerm < persistentState.CurrentTerm then // Reject the request: the "leader" is actually outdated, it was only a leader in the past. { - FollowerTerm = s.PersistentState.CurrentTerm + FollowerTerm = persistentState.CurrentTerm Success = false } |> message.ReplyChannel @@ -219,45 +202,45 @@ module Server = else // This was guaranteed immediately above: we agree that the message is being sent by the current leader. - assert (message.LeaderTerm = s.PersistentState.CurrentTerm) + assert (message.LeaderTerm = persistentState.CurrentTerm) - let heartbeat (message : AppendEntriesMessage<'a>) (s : Server<'a>) : unit = + let heartbeat (message : AppendEntriesMessage<'a>) : unit = // Just a heartbeat; no change to our log is required. - s.VolatileState <- - { s.VolatileState with + volatileState <- + { volatileState with CommitIndex = message.LeaderCommitIndex } { Success = true - FollowerTerm = s.PersistentState.CurrentTerm + FollowerTerm = persistentState.CurrentTerm } |> message.ReplyChannel - let acceptRequest (s : Server<'a>) : unit = - assert (s.Type = Follower) + let acceptRequest () : unit = + assert (currentType = Follower) match message.NewEntry with - | None -> heartbeat message s + | None -> heartbeat message | Some (toInsert, toInsertTerm) -> let desiredLogInsertionPosition = message.PrevLogEntry.Index + 1 - match s.PersistentState.GetLogEntry desiredLogInsertionPosition with + match persistentState.GetLogEntry desiredLogInsertionPosition with | Some (_, existingTerm) when toInsertTerm = existingTerm -> // This is already persisted. Moreover, the value that we persisted came from the term we're // currently processing, so in particular came from the same leader and hence won't conflict // with what we received from that leader just now. - heartbeat message s - | Some (_, existingTerm) -> + heartbeat message + | Some _ -> // The leader's message conflicts with what we persisted. Defer to the leader. - s.PersistentState.TruncateLog (desiredLogInsertionPosition - 1) - s.PersistentState.AppendToLog toInsert toInsertTerm + persistentState.TruncateLog (desiredLogInsertionPosition - 1) + persistentState.AppendToLog toInsert toInsertTerm { Success = true - FollowerTerm = s.PersistentState.CurrentTerm + FollowerTerm = persistentState.CurrentTerm } |> message.ReplyChannel @@ -265,24 +248,24 @@ module Server = // The leader knows what we've committed, so it won't try and give us anything further than // the element immediately past our persisted log. // TODO - why can't this be -1? - assert (desiredLogInsertionPosition = 1 + s.PersistentState.CurrentLogIndex) + assert (desiredLogInsertionPosition = 1 + persistentState.CurrentLogIndex) // The leader's message is after our log. Append. - s.PersistentState.AppendToLog toInsert toInsertTerm + persistentState.AppendToLog toInsert toInsertTerm { Success = true - FollowerTerm = s.PersistentState.CurrentTerm + FollowerTerm = persistentState.CurrentTerm } |> message.ReplyChannel - let logIsConsistent (message : AppendEntriesMessage<'a>) (s : Server<'a>) : bool = + let logIsConsistent (message : AppendEntriesMessage<'a>) : bool = if message.PrevLogEntry.Index = 0 then // The leader advertises that they have no committed history, so certainly it's consistent with // us. true else - match s.PersistentState.GetLogEntry message.PrevLogEntry.Index with + match persistentState.GetLogEntry message.PrevLogEntry.Index with | None -> // The leader's advertised commit is ahead of our history. false @@ -290,7 +273,7 @@ module Server = // The leader's advertised commit is in our history; do we agree with it? ourTermForThisEntry = message.PrevLogEntry.Term - match s.Type with + match currentType with | Leader _ -> [ "Violation of invariant." @@ -303,47 +286,84 @@ module Server = |> failwithf "%s" | Follower -> - if not (logIsConsistent message s) then + if not (logIsConsistent message) then // Reject the request, it's inconsistent with our history. { - FollowerTerm = s.PersistentState.CurrentTerm + FollowerTerm = persistentState.CurrentTerm Success = false } |> message.ReplyChannel else - acceptRequest s + acceptRequest () | Candidate -> // We've already verified that the message was sent from a leader in the current term, so we have // lost the election. - s.Type <- Follower + currentType <- Follower // TODO: why does this assertion hold? - assert (logIsConsistent message s) - acceptRequest s + assert (logIsConsistent message) + acceptRequest () + let mailbox = + let rec loop (mailbox : MailboxProcessor<_>) = + async { + let! m = mailbox.Receive () + + match m with + | ServerAction.BeginElection -> return failwith "not yet implemented" + | ServerAction.Receive m -> return processMessage m + | ServerAction.Sync reply -> reply.Reply () + + return! loop mailbox + } + + loop |> MailboxProcessor.Start + + member this.TriggerTimeout () = mailbox.Post ServerAction.BeginElection + + member this.Message (m : Message<'a>) = mailbox.Post (ServerAction.Receive m) + + member this.Sync () = mailbox.PostAndReply ServerAction.Sync + +// { +// ClusterSize : int +// mutable VolatileState : VolatileState +// PersistentState : IPersistentState<'a> +// mutable Type : ServerSpecialisation +// TriggerTimeout : unit -> unit +// Mailbox : MailboxProcessor> +// } type Cluster<'a> = internal { Servers : Server<'a> array + SendMessage : int -> Message<'a> -> unit } [] module InMemoryCluster = [] - let make<'a> (count : int) : Cluster<'a> = + let make<'a> (immediateFlush : bool) (count : int) : Cluster<'a> = let servers = Array.zeroCreate> count let locker = obj () let messageChannel (serverId : int) (message : Message<'a>) : unit = - lock locker (fun () -> Server.processMessage message servers.[serverId / 1]) + lock locker (fun () -> servers.[serverId / 1].Message message) for s in 0 .. servers.Length - 1 do - servers.[s] <- Server.create messageChannel + servers.[s] <- Server (count, InMemoryPersistentState (), messageChannel) { Servers = servers + SendMessage = + if immediateFlush then + fun i m -> + servers.[i / 1].Message m + servers.[i / 1].Sync () + else + fun i -> servers.[i / 1].Message }