From 72ed658bf6da3c56405299df8a488f17abd32ec6 Mon Sep 17 00:00:00 2001 From: Smaug123 Date: Wed, 26 Oct 2022 19:51:07 +0100 Subject: [PATCH] Add a basic interactive client --- Raft.Test/TestServer.fs | 152 ++++++++++++++++++++++++++++++- Raft.sln | 6 ++ Raft/Server.fs | 60 +++++++++++- RaftExplorer/Program.fs | 89 ++++++++++++++++++ RaftExplorer/RaftExplorer.fsproj | 16 ++++ 5 files changed, 319 insertions(+), 4 deletions(-) create mode 100644 RaftExplorer/Program.fs create mode 100644 RaftExplorer/RaftExplorer.fsproj diff --git a/Raft.Test/TestServer.fs b/Raft.Test/TestServer.fs index d40ee98..4e6526d 100644 --- a/Raft.Test/TestServer.fs +++ b/Raft.Test/TestServer.fs @@ -4,6 +4,7 @@ open System.Threading open Raft open NUnit.Framework open FsUnitTyped +open FsCheck [] module TestServer = @@ -60,7 +61,7 @@ module TestServer = calls.Value |> shouldEqual 0 [] - let ``Startup sequence in prod`` () = + let ``Startup sequence in prod, only one timeout takes place`` () = let cluster, network = InMemoryCluster.make 5 cluster.Servers.[0].TriggerTimeout () @@ -85,3 +86,152 @@ module TestServer = for i in 1..4 do cluster.Servers.[i].State |> shouldEqual ServerStatus.Follower + + let popOne (queues : 'a list list) : ((int * 'a) * 'a list list) list = + queues + |> List.indexed + |> List.filter (fun (index, l) -> not (List.isEmpty l)) + |> List.collect (fun (firstPlaceWithInstruction, entries) -> + entries + |> List.indexed + |> List.map (fun (i, entry) -> (firstPlaceWithInstruction, entry), List.removeAt i entries) + |> List.map (fun (removed, rest) -> + let afterPop = + queues + |> List.removeAt firstPlaceWithInstruction + |> List.insertAt firstPlaceWithInstruction rest + + removed, afterPop + ) + ) + + let rec allOrderings (queues : 'a list list) : (int * 'a) list list = + let output = popOne queues + + match output with + | [] -> [ [] ] + | output -> + + output + |> List.collect (fun (extracted, remaining) -> + let sub = allOrderings remaining + sub |> List.map (fun s -> extracted :: s) + ) + + let factorial i = + let rec go acc i = + if i <= 0 then acc else go (acc * i) (i - 1) + + go 1 i + + [] + [] + [] + [] + [] + let ``Test factorial`` (n : int, result : int) = factorial n |> shouldEqual result + + [] + let ``Test allOrderings`` () = + let case = [ [ "a" ; "b" ] ; [ "c" ; "d" ; "e" ] ] + let output = case |> allOrderings + output |> shouldEqual (List.distinct output) + + output + |> List.length + |> shouldEqual (factorial (List.concat case |> List.length)) + + let allElements = Set.ofList (List.concat case) + + for output in output do + output |> List.map snd |> Set.ofList |> shouldEqual allElements + + let randomChoice<'a> (r : System.Random) (arr : 'a list) : 'a = arr.[r.Next (0, arr.Length)] + + [] + let ``Startup sequence in prod, two timeouts at once, random`` () = + let rand = System.Random () + let cluster, network = InMemoryCluster.make 5 + + cluster.Servers.[0].TriggerTimeout () + cluster.Servers.[0].Sync () + cluster.Servers.[1].TriggerTimeout () + cluster.Servers.[1].Sync () + + // Those two each sent a message to every other server. + network.InboundMessages.[0].Count |> shouldEqual 1 + network.InboundMessages.[1].Count |> shouldEqual 1 + + for i in 2..4 do + network.InboundMessages.[i].Count |> shouldEqual 2 + + while network.InboundMessages |> Seq.concat |> Seq.isEmpty |> not do + let allOrderings' = + network.InboundMessages |> List.ofArray |> List.map List.ofSeq |> allOrderings + + network.InboundMessages |> Array.iter (fun arr -> arr.Clear ()) + // Process the messages! + let ordering = randomChoice rand allOrderings' + + for serverConsuming, message in ordering do + cluster.SendMessageDirectly (serverConsuming * 1) message + + (cluster.Servers.[0].State = Leader && cluster.Servers.[1].State = Leader) + |> shouldEqual false + + (cluster.Servers.[0].State = Candidate && cluster.Servers.[1].State = Candidate) + |> shouldEqual false + + ((cluster.Servers.[0].State = Leader && cluster.Servers.[1].State = Candidate) + || (cluster.Servers.[1].State = Leader && cluster.Servers.[0].State = Candidate)) + |> shouldEqual true + + for i in 2..4 do + cluster.Servers.[i].State |> shouldEqual ServerStatus.Follower + + type History = History of (int * int) list + + let historyGen (clusterSize : int) = + gen { + let! pile = Gen.choose (0, clusterSize - 1) + let! entry = Arb.generate + return (pile * 1, abs entry) + } + |> Gen.listOf + |> Gen.map History + + let apply (History history) (cluster : Cluster<'a>) (network : Network<'a>) : unit = + for pile, entry in history do + let messages = network.InboundMessages.[pile / 1] + + if entry < messages.Count then + cluster.SendMessageDirectly pile messages.[entry] + + [] + let ``Startup sequence in prod, two timeouts at once, property-based: at most one leader is elected`` () = + let cluster, network = InMemoryCluster.make 5 + + cluster.Servers.[0].TriggerTimeout () + cluster.Servers.[0].Sync () + cluster.Servers.[1].TriggerTimeout () + cluster.Servers.[1].Sync () + + // Those two each sent a message to every other server. + network.InboundMessages.[0].Count |> shouldEqual 1 + network.InboundMessages.[1].Count |> shouldEqual 1 + + for i in 2..4 do + network.InboundMessages.[i].Count |> shouldEqual 2 + + let property (history : History) = + apply history cluster network + + (cluster.Servers.[0].State = Leader && cluster.Servers.[1].State = Leader) + |> shouldEqual false + + for i in 2..4 do + cluster.Servers.[i].State |> shouldEqual ServerStatus.Follower + + property + |> Prop.forAll (Arb.fromGen (historyGen 5)) + |> Check.QuickThrowOnFailure diff --git a/Raft.sln b/Raft.sln index ee02464..8e1fb55 100644 --- a/Raft.sln +++ b/Raft.sln @@ -4,6 +4,8 @@ Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Raft", "Raft\Raft.fsproj", EndProject Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Raft.Test", "Raft.Test\Raft.Test.fsproj", "{2807D236-2D29-45A8-AC06-1662F9629B64}" EndProject +Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "RaftExplorer", "RaftExplorer\RaftExplorer.fsproj", "{204FDA9A-F3B7-46CC-97F1-A39B55AA7A7A}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -18,5 +20,9 @@ Global {2807D236-2D29-45A8-AC06-1662F9629B64}.Debug|Any CPU.Build.0 = Debug|Any CPU {2807D236-2D29-45A8-AC06-1662F9629B64}.Release|Any CPU.ActiveCfg = Release|Any CPU {2807D236-2D29-45A8-AC06-1662F9629B64}.Release|Any CPU.Build.0 = Release|Any CPU + {204FDA9A-F3B7-46CC-97F1-A39B55AA7A7A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {204FDA9A-F3B7-46CC-97F1-A39B55AA7A7A}.Debug|Any CPU.Build.0 = Debug|Any CPU + {204FDA9A-F3B7-46CC-97F1-A39B55AA7A7A}.Release|Any CPU.ActiveCfg = Release|Any CPU + {204FDA9A-F3B7-46CC-97F1-A39B55AA7A7A}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection EndGlobal diff --git a/Raft/Server.fs b/Raft/Server.fs index ef13f25..71a4eea 100644 --- a/Raft/Server.fs +++ b/Raft/Server.fs @@ -42,8 +42,14 @@ type RequestVoteReply = /// Whether I am happy for you to become leader. (For example, if my term is greater than yours, then you're /// out of date and I won't vote for you.) VoteGranted : bool + /// The candidate I'm voting for or against. + Candidate : int } + override this.ToString () = + let decision = if this.VoteGranted then "in favour of" else "against" + sprintf "Server %i voting %s %i in term %i" this.Voter decision this.Candidate this.VoterTerm + /// I am starting an election. Everyone, please vote. type RequestVoteMessage = { @@ -53,6 +59,9 @@ type RequestVoteMessage = ReplyChannel : RequestVoteReply -> unit } + override this.ToString () = + sprintf "Vote request: %i in term %i" this.CandidateId this.CandidateTerm + /// I, a follower, acknowledge the leader's instruction to add an entry to my log. type AppendEntriesReply = { @@ -69,6 +78,9 @@ type LogEntry = Term : int } + override this.ToString () = + sprintf "Log entry %i at subjective term %i" this.Index this.Term + /// I am the leader. Followers, update your state as follows. type AppendEntriesMessage<'a> = { @@ -86,21 +98,49 @@ type AppendEntriesMessage<'a> = ReplyChannel : AppendEntriesReply -> unit } + override this.ToString () = + let description = + match this.NewEntry with + | None -> "Heartbeat" + | Some (entry, term) -> sprintf "Append %+A (term %i)" entry term + + sprintf + "%s (leader %i at term %i whose commit index is %i)" + description + this.LeaderId + this.LeaderTerm + this.LeaderCommitIndex + type Instruction<'a> = | AppendEntries of AppendEntriesMessage<'a> | RequestVote of RequestVoteMessage + override this.ToString () = + match this with + | RequestVote v -> v.ToString () + | AppendEntries a -> a.ToString () + member this.Term = match this with | AppendEntries m -> m.LeaderTerm | RequestVote m -> m.CandidateTerm -type Reply = RequestVoteReply of RequestVoteReply +type Reply = + | RequestVoteReply of RequestVoteReply + + override this.ToString () = + match this with + | RequestVoteReply v -> v.ToString () type Message<'a> = | Instruction of Instruction<'a> | Reply of Reply + override this.ToString () = + match this with + | Instruction i -> i.ToString () + | Reply r -> r.ToString () + type private CandidateState = { /// For each voter, the vote I received from them. @@ -220,6 +260,7 @@ type Server<'a> Voter = me VoterTerm = persistentState.CurrentTerm VoteGranted = true + Candidate = message.CandidateId } |> message.ReplyChannel @@ -344,8 +385,6 @@ type Server<'a> let mailbox = let rec loop (mailbox : MailboxProcessor<_>) = async { - // TODO this should really wait for explicit Sync calls - // if we're running in memory, for determinism. let! m = mailbox.Receive () //let toPrint = sprintf "Processing message in server %i: %+A" me m //System.Console.WriteLine toPrint @@ -360,6 +399,7 @@ type Server<'a> // Start the election! currentType <- ServerSpecialisation.Candidate (CandidateState.New clusterSize me) persistentState.IncrementTerm () + persistentState.Vote me for i in 0 .. clusterSize - 1 do if i * 1 <> me then @@ -431,6 +471,12 @@ type Cluster<'a> = SendMessageDirectly : int -> Message<'a> -> unit } + member this.SendMessage (i : int) (m : Message<'a>) : unit = this.SendMessageDirectly i m + + member this.Timeout (i : int) : unit = + this.Servers.[i / 1].TriggerTimeout () + this.Servers.[i / 1].Sync () + type Network<'a> = internal { @@ -439,6 +485,14 @@ type Network<'a> = InboundMessages : ResizeArray>[] } + member this.AllInboundMessages (i : int) : Message<'a> list = + this.InboundMessages.[i / 1] |> List.ofSeq + + member this.InboundMessage (i : int) (id : int) : Message<'a> = + this.InboundMessages.[i / 1].[id] + + member this.Size = this.InboundMessages.Length + [] module InMemoryCluster = diff --git a/RaftExplorer/Program.fs b/RaftExplorer/Program.fs new file mode 100644 index 0000000..4983186 --- /dev/null +++ b/RaftExplorer/Program.fs @@ -0,0 +1,89 @@ +namespace Raft.Explorer + +open System +open Raft + +module Program = + + let printNetworkState<'a> (network : Network<'a>) : unit = + let mutable wroteAnything = false + + for i in 0 .. network.Size - 1 do + for count, message in Seq.indexed (network.AllInboundMessages (i * 1)) do + printfn "Server %i, message %i: %O" i count message + wroteAnything <- true + + if not wroteAnything then + printfn "" + + let rec getMessage (clusterSize : int) = + printf "Enter : " + let s = Console.ReadLine () + + match s.Split ',' with + | [| serverId ; messageId |] -> + match Int32.TryParse serverId with + | true, serverId -> + match Int32.TryParse messageId with + | true, messageId -> + if serverId >= clusterSize || serverId < 0 then + printf "Server ID must be between 0 and %i inclusive. " (clusterSize - 1) + getMessage clusterSize + else + serverId * 1, messageId + | false, _ -> + printf "Non-integer input '%s' for message ID. " messageId + getMessage clusterSize + | false, _ -> + printf "Non-integer input '%s' for server ID. " serverId + getMessage clusterSize + | _ -> + printfn "Invalid input." + getMessage clusterSize + + let rec getTimeout (clusterSize : int) = + printf "Enter server ID: " + let serverId = Console.ReadLine () + + match Int32.TryParse serverId with + | true, serverId -> + if serverId >= clusterSize || serverId < 0 then + printf "Server ID must be between 0 and %i inclusive. " (clusterSize - 1) + getTimeout clusterSize + else + serverId * 1 + | false, _ -> + printf "Unrecognised input. " + getTimeout clusterSize + + type UserAction = + | Timeout of int + | NetworkMessage of int * int + + let rec getAction (clusterSize : int) = + printf "Enter action. Trigger [t]imeout, or allow [m]essage: " + let s = Console.ReadLine().ToUpperInvariant () + + match s with + | "T" -> getTimeout clusterSize |> Timeout + | "M" -> getMessage clusterSize |> NetworkMessage + | _ -> + printf "Unrecognised input. " + getAction clusterSize + + [] + let main _argv = + let clusterSize = 5 + let cluster, network = InMemoryCluster.make clusterSize + + while true do + printNetworkState network + + let action = getAction clusterSize + + match action with + | Timeout serverId -> cluster.Timeout serverId + | NetworkMessage (serverId, messageId) -> + network.InboundMessage serverId messageId |> cluster.SendMessage serverId + + 0 diff --git a/RaftExplorer/RaftExplorer.fsproj b/RaftExplorer/RaftExplorer.fsproj new file mode 100644 index 0000000..6639e46 --- /dev/null +++ b/RaftExplorer/RaftExplorer.fsproj @@ -0,0 +1,16 @@ + + + + Exe + net6.0 + + + + + + + + + + +