diff --git a/Raft.Test/TestInMemoryServer.fs b/Raft.Test/TestInMemoryServer.fs index c5e6795..29869be 100644 --- a/Raft.Test/TestInMemoryServer.fs +++ b/Raft.Test/TestInMemoryServer.fs @@ -240,3 +240,80 @@ module TestInMemoryServer = property |> Prop.forAll (Arb.fromGen (historyGen 5)) |> Check.QuickThrowOnFailure + + [] + let ``Heartbeat is rejected if an update hasn't propagated`` () = + let clusterSize = 5 + let cluster, network = InMemoryCluster.make clusterSize + + let startupSequence = + [ + NetworkAction.InactivityTimeout 1 + // Two servers vote for server 1... + NetworkAction.NetworkMessage (2, 0) + NetworkAction.NetworkMessage (3, 0) + // Server 1 processes incoming votes, and achieves majority, electing itself leader! + NetworkAction.NetworkMessage (1, 0) + NetworkAction.NetworkMessage (1, 1) + // and the other votes are processed and discarded + NetworkAction.NetworkMessage (0, 0) + NetworkAction.NetworkMessage (4, 0) + NetworkAction.NetworkMessage (1, 2) + NetworkAction.NetworkMessage (1, 3) + // Get the followers' heartbeat processing out of the way + NetworkAction.NetworkMessage (0, 1) + NetworkAction.NetworkMessage (2, 1) + NetworkAction.NetworkMessage (3, 1) + NetworkAction.NetworkMessage (4, 1) + NetworkAction.NetworkMessage (1, 4) + NetworkAction.NetworkMessage (1, 5) + NetworkAction.NetworkMessage (1, 6) + NetworkAction.NetworkMessage (1, 7) + // Submit data to leader, then heartbeat leader, and process heartbeat on another node. + // This should come through as "follower did not apply leader entry". + // (This is correct: the network has effectively dropped all the leader's AppendEntries messages, + // and the protocol has correctly allowed the followers to recognise that their logs are inconsistent + // with the leader's.) + NetworkAction.ClientRequest (1, byte 3, printfn "processed: %O") + NetworkAction.Heartbeat 1 + + // Deliver the heartbeat messages. + NetworkAction.NetworkMessage (0, 2) + NetworkAction.NetworkMessage (2, 2) + NetworkAction.NetworkMessage (3, 2) + NetworkAction.NetworkMessage (4, 2) + ] + + for action in startupSequence do + NetworkAction.perform cluster network action + + // The servers have all rejected the heartbeat. + network.UndeliveredMessages 1 + |> List.map (fun (_index, message) -> + match message with + | Message.Reply (Reply.AppendEntriesReply reply) -> reply + | _ -> failwithf "Unexpected reply: %+A" message + ) + |> shouldEqual + [ + { + Success = None + Follower = 0 + FollowerTerm = 1 + } + { + Success = None + Follower = 2 + FollowerTerm = 1 + } + { + Success = None + Follower = 3 + FollowerTerm = 1 + } + { + Success = None + Follower = 4 + FollowerTerm = 1 + } + ] diff --git a/Raft/Raft.fsproj b/Raft/Raft.fsproj index 0aa8d34..2fd7788 100644 --- a/Raft/Raft.fsproj +++ b/Raft/Raft.fsproj @@ -1,7 +1,7 @@  - net6.0 + netstandard2.0 true diff --git a/Raft/Server.fs b/Raft/Server.fs index 029cd99..5e6afba 100644 --- a/Raft/Server.fs +++ b/Raft/Server.fs @@ -83,7 +83,7 @@ type AppendEntriesReply = | Some index -> sprintf "successfully applied leader entry, log length %i" index | None -> "did not apply leader entry" - sprintf "Follower %i %s" this.FollowerTerm description + sprintf "Follower %i (at term %i) %s" this.Follower this.FollowerTerm description /// I am the leader. Followers, update your state as follows. type AppendEntriesMessage<'a> = @@ -153,6 +153,7 @@ type Reply = | RequestVoteReply v -> v.ToString () | AppendEntriesReply r -> r.ToString () +[] type Message<'a> = | Instruction of Instruction<'a> | Reply of Reply @@ -475,12 +476,14 @@ type Server<'a> | ServerSpecialisation.Candidate _ | ServerSpecialisation.Follower _ -> () | ServerSpecialisation.Leader _ -> + let lastLogEntry = persistentState.GetLastLogEntry () |> Option.map snd + for i in 0 .. clusterSize - 1 do if i * 1 <> me then { LeaderTerm = persistentState.CurrentTerm LeaderId = me - PrevLogEntry = persistentState.GetLastLogEntry () |> Option.map snd + PrevLogEntry = lastLogEntry NewEntry = None LeaderCommitIndex = volatileState.CommitIndex ReplyChannel = @@ -585,8 +588,8 @@ type Server<'a> |> Instruction.RequestVote |> Message.Instruction |> messageChannel (i * 1) - | ServerAction.Receive (Instruction m) -> processMessage m - | ServerAction.Receive (Reply r) -> processReply r + | ServerAction.Receive (Message.Instruction m) -> processMessage m + | ServerAction.Receive (Message.Reply r) -> processReply r | ServerAction.Receive (Message.ClientRequest (toAdd, replyChannel)) -> match currentType with | ServerSpecialisation.Leader _ -> diff --git a/RaftExplorer/Program.fs b/RaftExplorer/Program.fs index 5d4d4a2..7b50bc9 100644 --- a/RaftExplorer/Program.fs +++ b/RaftExplorer/Program.fs @@ -57,22 +57,52 @@ module Program = printf "Unrecognised input. " None - let rec getHeartbeater (clusterSize : int) (serverId : string) = - // TODO: restrict this to the leaders only + let rec getHeartbeat (leaders : Set>) (clusterSize : int) (serverId : string) = match Int32.TryParse serverId with | true, serverId -> if serverId >= clusterSize || serverId < 0 then printf "Server ID must be between 0 and %i inclusive. " (clusterSize - 1) None else - Some (serverId * 1) + let serverId = serverId * 1 + + if leaders |> Set.contains serverId then + Some serverId + else + printf "Cannot heartbeat a non-leader. " + None | false, _ -> printf "Unrecognised input. " None - let rec getAction (clusterSize : int) = + let rec getClientData (clusterSize : int) (s : string) = + let s = s.Trim () + + match s.Split ',' |> List.ofArray with + | serverId :: rest -> + match Int32.TryParse serverId with + | true, serverId -> + if serverId >= clusterSize || serverId < 0 then + printf "Server ID must be between 0 and %i inclusive. " (clusterSize - 1) + None + else + let rest = String.concat "," rest |> fun s -> s.Trim () + + match Byte.TryParse rest with + | true, b -> Some (serverId * 1, b) + | false, _ -> + printfn "Client data must be a byte, e.g. 255, 0, or 43." + None + | false, _ -> + printfn "Server ID expected as first comma-separated component." + None + | _ -> + printfn "Expected server ID and byte, e.g. '3,76'" + None + + let rec getAction (leaders : Set>) (clusterSize : int) : NetworkAction = printf - "Enter action. Trigger [t]imeout , [h]eartbeat a leader , [d]rop message , or allow [m]essage : " + "Enter action. Trigger [t]imeout , [h]eartbeat a leader , [d]rop message , [s]ubmit data , or allow [m]essage : " let s = let rec go () = @@ -85,22 +115,26 @@ module Program = | 'T' -> match getTimeout clusterSize s.[1..] with | Some t -> t |> InactivityTimeout - | None -> getAction clusterSize + | None -> getAction leaders clusterSize | 'D' -> match getMessage clusterSize s.[1..] with | Some m -> m |> DropMessage - | None -> getAction clusterSize + | None -> getAction leaders clusterSize | 'M' -> match getMessage clusterSize s.[1..] with | Some m -> m |> NetworkMessage - | None -> getAction clusterSize + | None -> getAction leaders clusterSize | 'H' -> - match getHeartbeater clusterSize s.[1..] with + match getHeartbeat leaders clusterSize s.[1..] with | Some h -> Heartbeat h - | None -> getAction clusterSize + | None -> getAction leaders clusterSize + | 'S' -> + match getClientData clusterSize s.[1..] with + | Some (server, data) -> ClientRequest (server, data, printfn "%O") + | None -> getAction leaders clusterSize | _ -> printf "Unrecognised input. " - getAction clusterSize + getAction leaders clusterSize let electLeader = [ @@ -117,7 +151,7 @@ module Program = [] let main _argv = let clusterSize = 5 - let cluster, network = InMemoryCluster.make clusterSize + let cluster, network = InMemoryCluster.make clusterSize let startupSequence = [ @@ -150,6 +184,9 @@ module Program = NetworkAction.NetworkMessage (0, 2) NetworkAction.NetworkMessage (1, 6) ] + |> ignore + + [] for action in startupSequence do NetworkAction.perform cluster network action @@ -158,11 +195,24 @@ module Program = printNetworkState network printClusterState cluster - let action = getAction clusterSize + let leaders = + Seq.init + clusterSize + (fun i -> + let i = i * 1 + i, cluster.State i + ) + |> Seq.choose (fun (i, status) -> + match status with + | Leader _ -> Some i + | _ -> None + ) + |> Set.ofSeq + + let action = getAction leaders clusterSize NetworkAction.perform cluster network action // TODO: log out the committed state so that we can see whether the leader is correctly // processing heartbeat responses - // TODO: allow client queries! 0 diff --git a/RaftFable/src/App.fs b/RaftFable/src/App.fs index 4e433f5..5c2968f 100644 --- a/RaftFable/src/App.fs +++ b/RaftFable/src/App.fs @@ -23,8 +23,8 @@ do serverHeading.textContent <- "Server" statusHeading.textContent <- "Status" - row.appendChild (serverHeading) |> ignore - row.appendChild (statusHeading) |> ignore + row.appendChild serverHeading |> ignore + row.appendChild statusHeading |> ignore serverStatuses.appendChild row |> ignore let serverStatusNodes = @@ -33,8 +33,8 @@ let serverStatusNodes = let child = document.createElement "td" :?> Browser.Types.HTMLTableCellElement let statusCell = document.createElement "td" :?> Browser.Types.HTMLTableCellElement child.textContent <- sprintf "%i" i - node.appendChild (child) |> ignore - node.appendChild (statusCell) |> ignore + node.appendChild child |> ignore + node.appendChild statusCell |> ignore serverStatuses.appendChild node |> ignore statusCell |> List.init clusterSize @@ -51,7 +51,7 @@ let resetButtonArea () = for i in 0 .. clusterSize - 1 do let heading = document.createElement "th" :?> Browser.Types.HTMLTableCellElement heading.innerText <- sprintf "Server %i" i - headerRow.appendChild (heading) |> ignore + headerRow.appendChild heading |> ignore messageQueueArea.appendChild headerRow |> ignore @@ -67,7 +67,7 @@ let printClusterState<'a> (cluster : Cluster<'a>) : unit = for i in 0 .. cluster.ClusterSize - 1 do serverStatusNodes.[i].textContent <- cluster.State (i * 1) |> string -let cluster, network = InMemoryCluster.make clusterSize +let cluster, network = InMemoryCluster.make clusterSize let performWithoutPrintingNetworkState action = NetworkAction.perform cluster network action @@ -156,42 +156,8 @@ let rec getHeartbeater (clusterSize : int) (serverId : string) = printf "Unrecognised input. " None -(* -let rec getAction (clusterSize : int) = - printf - "Enter action. Trigger [t]imeout , [h]eartbeat a leader , [d]rop message , or allow [m]essage : " - - let s = - let rec go () = - let s = Console.ReadLine().ToUpperInvariant () - if String.IsNullOrEmpty s then go () else s - - go () - - match s.[0] with - | 'T' -> - match getTimeout clusterSize s.[1..] with - | Some t -> t |> InactivityTimeout - | None -> getAction clusterSize - | 'D' -> - match getMessage clusterSize s.[1..] with - | Some m -> m |> DropMessage - | None -> getAction clusterSize - | 'M' -> - match getMessage clusterSize s.[1..] with - | Some m -> m |> NetworkMessage - | None -> getAction clusterSize - | 'H' -> - match getHeartbeater clusterSize s.[1..] with - | Some h -> Heartbeat h - | None -> getAction clusterSize - | _ -> - printf "Unrecognised input. " - getAction clusterSize -*) - let startupText = - document.querySelector (".startup-text") :?> Browser.Types.HTMLParagraphElement + document.querySelector ".startup-text" :?> Browser.Types.HTMLParagraphElement startupText.textContent <- "Starting up..." @@ -231,16 +197,16 @@ let startupSequence = |> fun p -> p.``then`` (fun () -> startupText.textContent <- "Started! Press buttons.") let timeoutButton = - document.querySelector (".timeout-button") :?> Browser.Types.HTMLButtonElement + document.querySelector ".timeout-button" :?> Browser.Types.HTMLButtonElement let timeoutField = - document.querySelector (".timeout-text") :?> Browser.Types.HTMLInputElement + document.querySelector ".timeout-text" :?> Browser.Types.HTMLInputElement timeoutField.max <- string (clusterSize - 1) timeoutField.min <- "0" timeoutButton.onclick <- - fun evt -> + fun _event -> startupSequence.``then`` (fun () -> timeoutField.valueAsNumber |> int @@ -252,16 +218,16 @@ timeoutButton.onclick <- ) let heartbeatButton = - document.querySelector (".heartbeat-button") :?> Browser.Types.HTMLButtonElement + document.querySelector ".heartbeat-button" :?> Browser.Types.HTMLButtonElement let heartbeatField = - document.querySelector (".heartbeat-text") :?> Browser.Types.HTMLInputElement + document.querySelector ".heartbeat-text" :?> Browser.Types.HTMLInputElement heartbeatField.max <- string (clusterSize - 1) heartbeatField.min <- "0" heartbeatButton.onclick <- - fun evt -> + fun _event -> startupSequence.``then`` (fun () -> heartbeatField.valueAsNumber |> int @@ -271,16 +237,3 @@ heartbeatButton.onclick <- printNetworkState network ) - - -//let electLeader = -// [ -// NetworkAction.InactivityTimeout 0 -// NetworkAction.NetworkMessage (1, 0) -// NetworkAction.NetworkMessage (2, 0) -// NetworkAction.DropMessage (3, 0) -// NetworkAction.DropMessage (4, 0) -// NetworkAction.NetworkMessage (0, 0) -// NetworkAction.NetworkMessage (0, 1) -// // At this point, server 0 is leader in an uncontested election. -// ] diff --git a/flake.nix b/flake.nix index 85d4716..ef8fa6c 100644 --- a/flake.nix +++ b/flake.nix @@ -17,19 +17,17 @@ flake-utils, ... }: - flake-utils.lib.eachDefaultSystem ( - system: let - pkgs = nixpkgs.legacyPackages.${system}; - in { - devShell = pkgs.mkShell { - buildInputs = - [alejandra.defaultPackage.${system} pkgs.nodejs-14_x pkgs.dotnet-sdk_6] - ++ ( - if pkgs.stdenv.isDarwin - then [pkgs.darwin.apple_sdk.frameworks.CoreServices] - else [] - ); - }; - } - ); + flake-utils.lib.eachSystem [flake-utils.lib.system.aarch64-darwin] (system: let + pkgs = nixpkgs.legacyPackages.${system}; + in { + devShells.default = pkgs.mkShell { + buildInputs = + [alejandra.defaultPackage.${system} pkgs.nodejs-14_x pkgs.dotnet-sdk_6] + ++ ( + if pkgs.stdenv.isDarwin + then [pkgs.darwin.apple_sdk.frameworks.CoreServices] + else [] + ); + }; + }); }