diff --git a/Raft.Test/Raft.Test.fsproj b/Raft.Test/Raft.Test.fsproj index 9d7ad7d..332624a 100644 --- a/Raft.Test/Raft.Test.fsproj +++ b/Raft.Test/Raft.Test.fsproj @@ -8,6 +8,7 @@ + diff --git a/Raft.Test/TestInMemoryPersistentState.fs b/Raft.Test/TestInMemoryPersistentState.fs new file mode 100644 index 0000000..0347db6 --- /dev/null +++ b/Raft.Test/TestInMemoryPersistentState.fs @@ -0,0 +1,75 @@ +namespace Raft.Test + +open NUnit.Framework +open Raft +open FsUnitTyped +open FsCheck + +[] +module TestInMemoryPersistentState = + + [] + let ``Properties of empty`` () = + let s = InMemoryPersistentState () :> IPersistentState<_> + + s.CurrentLogIndex |> shouldEqual 0 + + for i in -2 .. 10 do + s.GetLogEntry (i * 1) |> shouldEqual None + + s.CurrentTerm |> shouldEqual 0 + s.VotedFor |> shouldEqual None + + s.GetLastLogEntry () |> shouldEqual None + + let ofList<'a> (xs : ('a * int) list) : InMemoryPersistentState<'a> = + let s = InMemoryPersistentState<'a> () + + for x, term in xs do + (s :> IPersistentState<_>).AppendToLog x term + + s + + let isPrefix (prefix : 'a list) (l : 'a list) : bool = + l + |> List.truncate prefix.Length + |> List.zip prefix + |> List.forall (fun (x, y) -> x = y) + + [] + let ``Nonzero truncation followed by Get succeeds`` () = + let property (truncate : int) (xs : (int * int) list) : bool = + let truncate = abs truncate + 1 + let uut = ofList xs + let oldLog = uut.GetLog () + + match (uut :> IPersistentState<_>).GetLogEntry truncate with + | None -> + (uut :> IPersistentState<_>).TruncateLog truncate + uut.GetLog () = oldLog + | Some entry -> + (uut :> IPersistentState<_>).TruncateLog truncate + + (uut :> IPersistentState<_>).GetLastLogEntry () = Some entry + && isPrefix (uut.GetLog ()) oldLog + && (uut :> IPersistentState<_>).CurrentLogIndex = truncate + + Check.QuickThrowOnFailure property + + [] + let ``Zero truncation results in empty log`` () = + let property (truncate : int) (xs : (int * int) list) : bool = + let truncate = -abs truncate + let uut = ofList xs + + // It's not meaningful to take the 0th element + (uut :> IPersistentState<_>).GetLogEntry truncate |> shouldEqual None + + (uut :> IPersistentState<_>).TruncateLog truncate + + uut.GetLog () |> shouldEqual [] + + let uut = uut :> IPersistentState<_> + uut.GetLastLogEntry () = None && uut.CurrentLogIndex = 0 + + Check.QuickThrowOnFailure property diff --git a/Raft.Test/TestServer.fs b/Raft.Test/TestServer.fs index 568a411..7e27ae3 100644 --- a/Raft.Test/TestServer.fs +++ b/Raft.Test/TestServer.fs @@ -1,5 +1,6 @@ namespace Raft.Test +open System.Threading open Raft open NUnit.Framework open FsUnitTyped @@ -22,7 +23,7 @@ module TestServer = let sendMessage = cluster.Servers.[0].OutboundMessageChannel - // Candidate 1 asks to be elected. + // Candidate 1 asks server 0 to vote for it. { CandidateTerm = 0 @@ -34,3 +35,32 @@ module TestServer = |> sendMessage 0 logs () |> shouldEqual [ "Received message for term 0" ] + + // Candidate 1 asks to be elected again! This is fine, maybe the network is replaying requests + // and the network swallowed our reply, so we should reply in the same way. + { + CandidateTerm = 0 + CandidateId = 1 + ReplyChannel = fun message -> logger (sprintf "Received message for term %i" message.VoterTerm) + CandidateLastLogEntry = 0, 0 + } + |> Message.RequestVote + |> sendMessage 0 + + logs () + |> shouldEqual [ "Received message for term 0" ; "Received message for term 0" ] + + // Candidate 2 asks to be elected. We won't vote for them, because we've already voted. + // and the network swallowed our reply, so we should reply in the same way. + let calls = ref 0 + + { + CandidateTerm = 0 + CandidateId = 2 + ReplyChannel = fun _ -> Interlocked.Increment calls |> ignore + CandidateLastLogEntry = 0, 0 + } + |> Message.RequestVote + |> sendMessage 0 + + calls.Value |> shouldEqual 0 diff --git a/Raft/Measures.fs b/Raft/Measures.fs new file mode 100644 index 0000000..99626bd --- /dev/null +++ b/Raft/Measures.fs @@ -0,0 +1,11 @@ +namespace Raft + +/// LogIndex is indexed from 1. We use 0 to indicate "before any history has started". +[] +type LogIndex + +[] +type Term + +[] +type ServerId diff --git a/Raft/PersistentState.fs b/Raft/PersistentState.fs new file mode 100644 index 0000000..51b24c7 --- /dev/null +++ b/Raft/PersistentState.fs @@ -0,0 +1,56 @@ +namespace Raft + +type IPersistentState<'a> = + abstract CurrentTerm : int + /// If I know about an election in my CurrentTerm, who did I vote for during that election? + abstract VotedFor : int option + abstract AppendToLog : 'a -> int -> unit + + /// Truncate away the most recent entries of the log. + /// If `GetLogEntry x` would succeed, and then we call `TruncateLog x`, + /// then `GetLogEntry x` will still succeed (but `GetLogEntry (x + 1)` will not). + abstract TruncateLog : int -> unit + abstract GetLogEntry : int -> ('a * int) option + abstract CurrentLogIndex : int + abstract GetLastLogEntry : unit -> ('a * int) option + abstract AdvanceToTerm : int -> unit + abstract Vote : int -> unit + +/// Server state which must survive a server crash. +[] +type InMemoryPersistentState<'a> () = + let mutable currentTerm = 0 + let mutable votedFor : int option = None + let log = ResizeArray<'a * int> () + + member this.GetLog () = log |> List.ofSeq + + interface IPersistentState<'a> with + member this.CurrentTerm = currentTerm + member this.VotedFor = votedFor + member this.Vote id = votedFor <- Some id + + member this.AdvanceToTerm term = + currentTerm <- term + votedFor <- None + + member this.AppendToLog entry term = log.Add (entry, term) + + member this.TruncateLog position = + let position = position / 1 + + if position < log.Count then + let position = if position < 0 then 0 else position + log.RemoveRange (position, log.Count - position) + + member this.GetLastLogEntry () = + if log.Count = 0 then None else Some log.[log.Count - 1] + + member this.GetLogEntry position = + let position = position / 1 + + if log.Count < position then None + elif position <= 0 then None + else Some log.[position - 1] + + member this.CurrentLogIndex = log.Count * 1 diff --git a/Raft/Raft.fsproj b/Raft/Raft.fsproj index d5cbdb4..340766b 100644 --- a/Raft/Raft.fsproj +++ b/Raft/Raft.fsproj @@ -6,6 +6,8 @@ + + diff --git a/Raft/Server.fs b/Raft/Server.fs index b906e3a..5a647fa 100644 --- a/Raft/Server.fs +++ b/Raft/Server.fs @@ -1,15 +1,5 @@ namespace Raft -/// LogIndex is indexed from 1. We use 0 to indicate "before any history has started". -[] -type LogIndex - -[] -type Term - -[] -type ServerId - /// Server state which need not survive a server crash. type VolatileState = { @@ -22,15 +12,6 @@ type VolatileState = LastApplied : int } -/// Server state which must survive a server crash. -type PersistentState<'a> = - { - CurrentTerm : int - Log : ('a * int) array - /// If there's an election ongoing, who did I vote for? - VotedFor : int option - } - type LeaderState = { /// For each server, index of the next log entry to send to that server @@ -107,9 +88,9 @@ type ServerSpecialisation = type Server<'a> = { - VolatileState : VolatileState - PersistentState : PersistentState<'a> - Type : ServerSpecialisation + mutable VolatileState : VolatileState + PersistentState : IPersistentState<'a> + mutable Type : ServerSpecialisation Timeout : unit -> unit OutboundMessageChannel : int -> Message<'a> -> unit } @@ -137,33 +118,18 @@ module Server = CommitIndex = 0 LastApplied = 0 } - PersistentState = - { - CurrentTerm = 0 - Log = [||] - VotedFor = None - } + PersistentState = InMemoryPersistentState () Timeout = fun () -> () } /// Returns the new state of the same server. - let processMessage<'a> (message : Message<'a>) (s : Server<'a>) : Server<'a> = + let processMessage<'a> (message : Message<'a>) (s : Server<'a>) : unit = // First, see if this message comes from a future term. // (This is `UpdateTerm` from the TLA+.) - let s = - if message.Term > s.PersistentState.CurrentTerm then - // We're definitely out of date. Switch to follower mode. - { s with - Type = Follower - PersistentState = - { s.PersistentState with - VotedFor = None - CurrentTerm = message.Term - } - } - // TODO when persistence is modelled: persist this - else - s + if message.Term > s.PersistentState.CurrentTerm then + // We're definitely out of date. Switch to follower mode. + s.Type <- Follower + s.PersistentState.AdvanceToTerm message.Term match message with | RequestVote message -> @@ -173,10 +139,12 @@ module Server = // The following clauses define either condition under which we accept that the candidate has more data // than we do, and so could be a more suitable leader than us. + // TODO collapse these clauses, it'll be much neater + let messageSupersedesMe = // Is the candidate advertising a later term than our last-persisted write was made at? // (That would mean it's far in the future of us.) - match Array.tryLast s.PersistentState.Log with + match s.PersistentState.GetLastLogEntry () with | Some (_, ourLastTerm) -> snd message.CandidateLastLogEntry > ourLastTerm | None -> // We have persisted no history at all! @@ -184,11 +152,10 @@ module Server = let messageExtendsMe = // Do we agree what the current term is, is the candidate advertising a more advanced log than us? - match Array.tryLast s.PersistentState.Log with + match s.PersistentState.GetLastLogEntry () with | Some (_, ourLastTerm) -> snd message.CandidateLastLogEntry = ourLastTerm - && fst message.CandidateLastLogEntry - >= (Array.length s.PersistentState.Log) * 1 + && fst message.CandidateLastLogEntry >= s.PersistentState.CurrentLogIndex | None -> // We've persisted no history; the candidate needs to also be at the start of history, // or else we'd have already considered them in the `messageSupersedesMe` check. @@ -229,13 +196,7 @@ module Server = // case we just end up not participating in an election.) // TODO when this is made mutable etc: call Persist here - let toReturn = - { s with - PersistentState = - { s.PersistentState with - VotedFor = Some message.CandidateId - } - } + s.PersistentState.Vote message.CandidateId { VoterTerm = s.PersistentState.CurrentTerm @@ -243,10 +204,6 @@ module Server = } |> message.ReplyChannel - toReturn - else - s - | AppendEntries message -> // This was guaranteed above. assert (message.LeaderTerm <= s.PersistentState.CurrentTerm) @@ -259,20 +216,16 @@ module Server = } |> message.ReplyChannel - s else // This was guaranteed immediately above: we agree that the message is being sent by the current leader. assert (message.LeaderTerm = s.PersistentState.CurrentTerm) - let heartbeat (message : AppendEntriesMessage<'a>) (s : Server<'a>) = + let heartbeat (message : AppendEntriesMessage<'a>) (s : Server<'a>) : unit = // Just a heartbeat; no change to our log is required. - let toReturn = - { s with - VolatileState = - { s.VolatileState with - CommitIndex = message.LeaderCommitIndex - } + s.VolatileState <- + { s.VolatileState with + CommitIndex = message.LeaderCommitIndex } { @@ -281,9 +234,7 @@ module Server = } |> message.ReplyChannel - toReturn - - let acceptRequest (s : Server<'a>) : Server<'a> = + let acceptRequest (s : Server<'a>) : unit = assert (s.Type = Follower) match message.NewEntry with @@ -293,7 +244,7 @@ module Server = let desiredLogInsertionPosition = message.PrevLogEntry.Index + 1 - match getLogEntry desiredLogInsertionPosition s.PersistentState.Log with + match s.PersistentState.GetLogEntry desiredLogInsertionPosition with | Some (_, existingTerm) when toInsertTerm = existingTerm -> // This is already persisted. Moreover, the value that we persisted came from the term we're // currently processing, so in particular came from the same leader and hence won't conflict @@ -301,16 +252,8 @@ module Server = heartbeat message s | Some (_, existingTerm) -> // The leader's message conflicts with what we persisted. Defer to the leader. - // TODO when persistence is modelled: model it here - let toReturn = - { s with - PersistentState = - { s.PersistentState with - Log = - truncateLog desiredLogInsertionPosition s.PersistentState.Log - |> replaceLog desiredLogInsertionPosition (toInsert, toInsertTerm) - } - } + s.PersistentState.TruncateLog (desiredLogInsertionPosition - 1) + s.PersistentState.AppendToLog toInsert toInsertTerm { Success = true @@ -318,21 +261,13 @@ module Server = } |> message.ReplyChannel - toReturn | None -> // The leader knows what we've committed, so it won't try and give us anything further than // the element immediately past our persisted log. // TODO - why can't this be -1? - assert (desiredLogInsertionPosition = (1 + s.PersistentState.Log.Length) * 1) + assert (desiredLogInsertionPosition = 1 + s.PersistentState.CurrentLogIndex) // The leader's message is after our log. Append. - // TODO when persistence is modelled: persist it here. - let toReturn = - { s with - PersistentState = - { s.PersistentState with - Log = Array.append s.PersistentState.Log [| (toInsert, toInsertTerm) |] - } - } + s.PersistentState.AppendToLog toInsert toInsertTerm { Success = true @@ -340,8 +275,6 @@ module Server = } |> message.ReplyChannel - toReturn - let logIsConsistent (message : AppendEntriesMessage<'a>) (s : Server<'a>) : bool = if message.PrevLogEntry.Index = 0 then // The leader advertises that they have no committed history, so certainly it's consistent with @@ -349,7 +282,7 @@ module Server = true else - match getLogEntry message.PrevLogEntry.Index s.PersistentState.Log with + match s.PersistentState.GetLogEntry message.PrevLogEntry.Index with | None -> // The leader's advertised commit is ahead of our history. false @@ -378,18 +311,13 @@ module Server = } |> message.ReplyChannel - s - else acceptRequest s | Candidate -> // We've already verified that the message was sent from a leader in the current term, so we have // lost the election. - let s = - { s with - Type = Follower - } + s.Type <- Follower // TODO: why does this assertion hold? assert (logIsConsistent message s) acceptRequest s @@ -411,12 +339,7 @@ module InMemoryCluster = let locker = obj () let messageChannel (serverId : int) (message : Message<'a>) : unit = - lock - locker - (fun () -> - let newServer = Server.processMessage message servers.[serverId / 1] - servers.[serverId / 1] <- newServer - ) + lock locker (fun () -> Server.processMessage message servers.[serverId / 1]) for s in 0 .. servers.Length - 1 do servers.[s] <- Server.create messageChannel