From fe0deaba3685dd58ede5ba9df241db8845bb147b Mon Sep 17 00:00:00 2001 From: Smaug123 Date: Sun, 6 Nov 2022 14:24:19 +0000 Subject: [PATCH] Add action history to UI --- Raft.Test/TestInMemoryServer.fs | 6 +- Raft/InMemory.fs | 136 +++++++++++++++++++++++++++++++- Raft/Server.fs | 19 +++-- RaftExplorer/Program.fs | 112 +++----------------------- RaftFable/public/index.html | 5 ++ RaftFable/src/App.fs | 108 +++++++++++++++++-------- RaftFable/src/RaftFable.fsproj | 1 + RaftFable/src/Result.fs | 30 +++++++ RaftFable/src/Ui.fs | 26 +++++- 9 files changed, 286 insertions(+), 157 deletions(-) create mode 100644 RaftFable/src/Result.fs diff --git a/Raft.Test/TestInMemoryServer.fs b/Raft.Test/TestInMemoryServer.fs index 21f16c4..c0e32f4 100644 --- a/Raft.Test/TestInMemoryServer.fs +++ b/Raft.Test/TestInMemoryServer.fs @@ -262,8 +262,6 @@ module TestInMemoryServer = let clusterSize = 5 let cluster, network = InMemoryCluster.make clusterSize - let mutable replyChannel = None - let startupSequence = [ NetworkAction.InactivityTimeout 1 @@ -289,7 +287,7 @@ module TestInMemoryServer = NetworkAction.NetworkMessage (1, 7) // Submit data to leader. This has the effect of heartbeating the other // nodes, with a heartbeat that contains the new data. - NetworkAction.ClientRequest (1, byte 3, (fun s -> replyChannel <- Some s)) + NetworkAction.ClientRequest (1, byte 3) // Deliver the data messages. NetworkAction.NetworkMessage (0, 2) @@ -301,8 +299,6 @@ module TestInMemoryServer = for action in startupSequence do NetworkAction.perform cluster network action - replyChannel |> Option.get |> shouldEqual ClientReply.Acknowledged - // The servers have all accepted the data. network.UndeliveredMessages 1 |> List.map (fun (_index, message) -> diff --git a/Raft/InMemory.fs b/Raft/InMemory.fs index d6fde56..6974a1b 100644 --- a/Raft/InMemory.fs +++ b/Raft/InMemory.fs @@ -1,5 +1,6 @@ namespace Raft +open System open System.Collections.Generic type Cluster<'a> = @@ -26,6 +27,18 @@ type Cluster<'a> = member this.ClusterSize : int = this.Servers.Length + member this.Leaders : Set> = + ((Set.empty, 0), this.Servers) + ||> Array.fold (fun (leaders, count) server -> + let leaders = + match server.State with + | ServerStatus.Leader _ -> Set.add count leaders + | _ -> leaders + + leaders, count + 1 + ) + |> fst + type Network<'a> = internal { @@ -92,7 +105,7 @@ type NetworkAction<'a> = | InactivityTimeout of int | NetworkMessage of int * int | DropMessage of int * int - | ClientRequest of int * 'a * (ClientReply -> unit) + | ClientRequest of int * 'a | Heartbeat of int [] @@ -106,5 +119,122 @@ module NetworkAction = | NetworkMessage (serverId, messageId) -> network.InboundMessage serverId messageId |> cluster.SendMessage serverId network.DropMessage serverId messageId - | ClientRequest (server, request, replyChannel) -> - Message.ClientRequest (request, replyChannel) |> cluster.SendMessage server + | ClientRequest (server, request) -> Message.ClientRequest request |> cluster.SendMessage server + + let private getMessage (clusterSize : int) (s : string) : Result * int, string> = + match s.Split ',' with + | [| serverId ; messageId |] -> + let serverId = serverId.TrimEnd () + let messageId = messageId.Trim () + + match Int32.TryParse serverId with + | true, serverId -> + match Int32.TryParse messageId with + | true, messageId -> + if serverId >= clusterSize || serverId < 0 then + sprintf "Server ID must be between 0 and %i inclusive, got %i" (clusterSize - 1) serverId + |> Error + else + Ok (serverId * 1, messageId) + | false, _ -> sprintf "Non-integer input '%s' for message ID." messageId |> Error + | false, _ -> sprintf "Non-integer input '%s' for server ID." serverId |> Error + | _ -> Error "Expected a single comma." + + let private getTimeout (clusterSize : int) (serverId : string) : Result, string> = + match Int32.TryParse serverId with + | false, _ -> Error (sprintf "Expected an integer, got '%s'" serverId) + | true, serverId -> + + if serverId >= clusterSize || serverId < 0 then + sprintf "Server ID must be between 0 and %i inclusive, got %i." (clusterSize - 1) serverId + |> Error + else + serverId * 1 |> Ok + + let private getHeartbeat (leaders : Set> option) (clusterSize : int) (serverId : string) = + match Int32.TryParse serverId with + | false, _ -> sprintf "Expected an integer server ID, got '%s'" serverId |> Error + | true, serverId -> + + if serverId >= clusterSize || serverId < 0 then + sprintf "Server ID must be between 0 and %i inclusive, got %i." (clusterSize - 1) serverId + |> Error + else + + let serverId = serverId * 1 + + match leaders with + | None -> Ok serverId + | Some leaders -> + + if leaders |> Set.contains serverId then + Ok serverId + else + sprintf "Cannot heartbeat a non-leader (%i)." serverId |> Error + + let private getClientData<'a> + (parse : string -> Result<'a, string>) + (clusterSize : int) + (s : string) + : Result * 'a, string> + = + match s.Split ',' |> List.ofArray with + | serverId :: (_ :: _ as rest) -> + let rest = String.concat "," rest |> fun s -> s.TrimStart () + + match Int32.TryParse serverId with + | true, serverId -> + if serverId >= clusterSize || serverId < 0 then + sprintf "Server ID must be between 0 and %i inclusive, got %i." (clusterSize - 1) serverId + |> Error + else + + match parse rest with + | Ok b -> Ok (serverId * 1, b) + | Error e -> sprintf "Failed to parse client data: %s" e |> Error + | false, _ -> + sprintf "Server ID expected as first comma-separated component, got '%s'." serverId + |> Error + | _ -> sprintf "Expected a comma in client data string, got '%s'" s |> Error + + /// Optionally also validates leaders against the input set of leaders. + let tryParse<'a> + (parse : string -> Result<'a, string>) + (leaders : Set> option) + (clusterSize : int) + (s : string) + : Result, string> + = + if String.IsNullOrEmpty s then + Error "Can't parse an empty string" + else + match Char.ToUpper s.[0] with + | 'T' -> + match getTimeout clusterSize (s.[1..].TrimStart ()) with + | Ok t -> t |> InactivityTimeout |> Ok + | Error e -> Error e + | 'D' -> + match getMessage clusterSize (s.[1..].TrimStart ()) with + | Ok m -> m |> DropMessage |> Ok + | Error e -> Error e + | 'M' -> + match getMessage clusterSize (s.[1..].TrimStart ()) with + | Ok m -> m |> NetworkMessage |> Ok + | Error e -> Error e + | 'H' -> + match getHeartbeat leaders clusterSize (s.[1..].TrimStart ()) with + | Ok h -> Heartbeat h |> Ok + | Error e -> Error e + | 'S' -> + match getClientData parse clusterSize (s.[1..].TrimStart ()) with + | Ok (server, data) -> ClientRequest (server, data) |> Ok + | Error e -> Error e + | c -> Error (sprintf "unexpected start char '%c'" c) + + let toString<'a> (action : NetworkAction<'a>) : string = + match action with + | NetworkAction.Heartbeat h -> sprintf "h %i" h + | NetworkAction.NetworkMessage (server, id) -> sprintf "m %i,%i" server id + | NetworkAction.DropMessage (server, id) -> sprintf "d %i,%i" server id + | NetworkAction.InactivityTimeout server -> sprintf "t %i" server + | NetworkAction.ClientRequest (server, data) -> sprintf "s %i,%O" server data diff --git a/Raft/Server.fs b/Raft/Server.fs index 3d92d40..226da42 100644 --- a/Raft/Server.fs +++ b/Raft/Server.fs @@ -186,13 +186,13 @@ type Reply = type Message<'a> = | Instruction of Instruction<'a> | Reply of Reply - | ClientRequest of 'a * (ClientReply -> unit) + | ClientRequest of 'a override this.ToString () = match this with | Instruction i -> i.ToString () | Reply r -> r.ToString () - | ClientRequest (a, _) -> sprintf "Client requested insertion of: %O" a + | ClientRequest a -> sprintf "Client requested insertion of: %O" a type private CandidateState = { @@ -635,15 +635,18 @@ type Server<'a> |> messageChannel (i * 1) | ServerAction.Receive (Message.Instruction m) -> processMessage m | ServerAction.Receive (Message.Reply r) -> processReply r - | ServerAction.Receive (Message.ClientRequest (toAdd, replyChannel)) -> + | ServerAction.Receive (Message.ClientRequest toAdd) -> match currentType with | ServerSpecialisation.Leader leaderState -> persistentState.AppendToLog toAdd persistentState.CurrentTerm - replyChannel ClientReply.Acknowledged + //replyChannel ClientReply.Acknowledged emitHeartbeat leaderState | ServerSpecialisation.Follower followerState -> - replyChannel (ClientReply.Redirect followerState.CurrentLeader) - | ServerSpecialisation.Candidate _ -> replyChannel ClientReply.Dropped + //replyChannel (ClientReply.Redirect followerState.CurrentLeader) + () + | ServerSpecialisation.Candidate _ -> + //replyChannel ClientReply.Dropped + () | ServerAction.Sync replyChannel -> replyChannel.Reply () | ServerAction.StateReadout replyChannel -> { @@ -673,8 +676,8 @@ type Server<'a> #endif mailbox - member this.SendClientRequest (request : 'a) (reply : ClientReply -> unit) = - mailbox.Post (ServerAction.Receive (Message.ClientRequest (request, reply))) + member this.SendClientRequest (request : 'a) = + mailbox.Post (ServerAction.Receive (Message.ClientRequest request)) member this.TriggerInactivityTimeout () = mailbox.Post ServerAction.BeginElection member this.TriggerHeartbeatTimeout () = mailbox.Post ServerAction.EmitHeartbeat diff --git a/RaftExplorer/Program.fs b/RaftExplorer/Program.fs index 10cdbca..cab4600 100644 --- a/RaftExplorer/Program.fs +++ b/RaftExplorer/Program.fs @@ -20,86 +20,6 @@ module Program = for i in 0 .. cluster.ClusterSize - 1 do printfn "Server %i: %O" i (cluster.Status (i * 1)) - let getMessage (clusterSize : int) (s : string) : (int * int) option = - match s.Split ',' with - | [| serverId ; messageId |] -> - let serverId = serverId.Trim () - let messageId = messageId.Trim () - - match Int32.TryParse serverId with - | true, serverId -> - match Int32.TryParse messageId with - | true, messageId -> - if serverId >= clusterSize || serverId < 0 then - printf "Server ID must be between 0 and %i inclusive. " (clusterSize - 1) - None - else - Some (serverId * 1, messageId) - | false, _ -> - printf "Non-integer input '%s' for message ID. " messageId - None - | false, _ -> - printf "Non-integer input '%s' for server ID. " serverId - None - | _ -> - printfn "Invalid input." - None - - let rec getTimeout (clusterSize : int) (serverId : string) = - match Int32.TryParse serverId with - | true, serverId -> - if serverId >= clusterSize || serverId < 0 then - printf "Server ID must be between 0 and %i inclusive. " (clusterSize - 1) - None - else - Some (serverId * 1) - | false, _ -> - printf "Unrecognised input. " - None - - let rec getHeartbeat (leaders : Set>) (clusterSize : int) (serverId : string) = - match Int32.TryParse serverId with - | true, serverId -> - if serverId >= clusterSize || serverId < 0 then - printf "Server ID must be between 0 and %i inclusive. " (clusterSize - 1) - None - else - let serverId = serverId * 1 - - if leaders |> Set.contains serverId then - Some serverId - else - printf "Cannot heartbeat a non-leader. " - None - | false, _ -> - printf "Unrecognised input. " - None - - let rec getClientData (clusterSize : int) (s : string) = - let s = s.Trim () - - match s.Split ',' |> List.ofArray with - | serverId :: rest -> - match Int32.TryParse serverId with - | true, serverId -> - if serverId >= clusterSize || serverId < 0 then - printf "Server ID must be between 0 and %i inclusive. " (clusterSize - 1) - None - else - let rest = String.concat "," rest |> fun s -> s.Trim () - - match Byte.TryParse rest with - | true, b -> Some (serverId * 1, b) - | false, _ -> - printfn "Client data must be a byte, e.g. 255, 0, or 43." - None - | false, _ -> - printfn "Server ID expected as first comma-separated component." - None - | _ -> - printfn "Expected server ID and byte, e.g. '3,76'" - None - let rec getAction (leaders : Set>) (clusterSize : int) : NetworkAction = printf "Enter action. Trigger [t]imeout , [h]eartbeat a leader , [d]rop message , [s]ubmit data , or allow [m]essage : " @@ -111,29 +31,15 @@ module Program = go () - match s.[0] with - | 'T' -> - match getTimeout clusterSize s.[1..] with - | Some t -> t |> InactivityTimeout - | None -> getAction leaders clusterSize - | 'D' -> - match getMessage clusterSize s.[1..] with - | Some m -> m |> DropMessage - | None -> getAction leaders clusterSize - | 'M' -> - match getMessage clusterSize s.[1..] with - | Some m -> m |> NetworkMessage - | None -> getAction leaders clusterSize - | 'H' -> - match getHeartbeat leaders clusterSize s.[1..] with - | Some h -> Heartbeat h - | None -> getAction leaders clusterSize - | 'S' -> - match getClientData clusterSize s.[1..] with - | Some (server, data) -> ClientRequest (server, data, printfn "%O") - | None -> getAction leaders clusterSize - | _ -> - printf "Unrecognised input. " + let parseByte (s : string) = + match Byte.TryParse s with + | true, b -> Ok b + | false, _ -> Error (sprintf "expected a byte, got '%s'" s) + + match NetworkAction.tryParse parseByte (Some leaders) clusterSize s with + | Ok action -> action + | Error e -> + printfn "%s" e getAction leaders clusterSize let electLeader = diff --git a/RaftFable/public/index.html b/RaftFable/public/index.html index 9b157fa..675b4cc 100644 --- a/RaftFable/public/index.html +++ b/RaftFable/public/index.html @@ -44,6 +44,11 @@
+

Action history

+
+ +
+ diff --git a/RaftFable/src/App.fs b/RaftFable/src/App.fs index 834db15..82ca84e 100644 --- a/RaftFable/src/App.fs +++ b/RaftFable/src/App.fs @@ -11,35 +11,62 @@ module App = let ui = Ui.initialise document - let rec fullyRerender<'a> (cluster : Cluster<'a>) (network : Network<'a>) : Promise = - let prefs = Ui.getUserPrefs ui + let rec fullyRerender<'a> + (parse : string -> Result<'a, string>) + (userPrefs : UserPreferences<'a> ref) + (cluster : Cluster<'a>) + (network : Network<'a>) + : Promise + = + userPrefs.Value <- Ui.getUserPrefs<'a> parse cluster.ClusterSize ui Ui.freezeState cluster network |> Async.StartAsPromise |> fun p -> p.``then`` (fun clusterState -> - Ui.render - (perform cluster network) + Ui.render<'a> + (perform<'a> parse userPrefs cluster network) document ui { - UserPreferences = prefs + UserPreferences = userPrefs.Value ClusterState = clusterState } ) - and perform (cluster : Cluster<'a>) (network : Network<'a>) (action : NetworkAction<'a>) : Promise = + and perform<'a> + (parse : string -> Result<'a, string>) + (userPrefs : UserPreferences<'a> ref) + (cluster : Cluster<'a>) + (network : Network<'a>) + (action : NetworkAction<'a>) + : Promise + = NetworkAction.perform cluster network action - fullyRerender cluster network - let cluster, network = InMemoryCluster.make clusterSize + userPrefs.Value <- + { userPrefs.Value with + ActionHistory = action :: userPrefs.Value.ActionHistory + } + + fullyRerender parse userPrefs cluster network + + let parseByte (s : string) = + match System.Byte.TryParse s with + | false, _ -> Error (sprintf "Expected byte, got '%s'" s) + | true, v -> Ok v + + let userPrefs : UserPreferences ref = + ref (Ui.getUserPrefs parseByte clusterSize ui) + + let mutable cluster, network = InMemoryCluster.make clusterSize let leaderStateButton = document.querySelector ".leader-select-button" :?> Browser.Types.HTMLButtonElement - leaderStateButton.onclick <- fun _ -> fullyRerender cluster network + leaderStateButton.onclick <- fun _ -> fullyRerender parseByte userPrefs cluster network - let startupSequence = + let startupActions : NetworkAction list = [ NetworkAction.InactivityTimeout 0 NetworkAction.InactivityTimeout 1 @@ -70,54 +97,65 @@ module App = NetworkAction.NetworkMessage (0, 2) NetworkAction.NetworkMessage (1, 6) ] - |> List.truncate 0 - |> fun s -> (fullyRerender cluster network, s) + + ui.ActionHistory.textContent <- startupActions |> Seq.map NetworkAction.toString |> String.concat "\n" + + let reloadActions () = + let newCluster, newNetwork = InMemoryCluster.make clusterSize + cluster <- newCluster + network <- newNetwork + + userPrefs.Value <- Ui.getUserPrefs parseByte clusterSize ui + + startupActions + |> fun s -> (fullyRerender parseByte userPrefs cluster network, s) ||> List.fold (fun (inPromise : Promise) action -> promise { let! _ = inPromise - return! perform cluster network action + return! perform parseByte userPrefs cluster network action } ) + let reloadActionsButton = + document.querySelector ".reload-actions" :?> Browser.Types.HTMLButtonElement + + reloadActionsButton.onclick <- fun _evt -> reloadActions () + + reloadActions () |> ignore + let timeoutButton = document.querySelector ".timeout-button" :?> Browser.Types.HTMLButtonElement timeoutButton.onclick <- fun _event -> - startupSequence.``then`` (fun () -> - ui.TimeoutField.valueAsNumber - |> int - |> fun i -> i * 1 - |> InactivityTimeout - |> perform cluster network - ) + ui.TimeoutField.valueAsNumber + |> int + |> fun i -> i * 1 + |> InactivityTimeout + |> perform parseByte userPrefs cluster network let heartbeatButton = document.querySelector ".heartbeat-button" :?> Browser.Types.HTMLButtonElement heartbeatButton.onclick <- fun _event -> - startupSequence.``then`` (fun () -> - ui.HeartbeatField.valueAsNumber - |> int - |> fun i -> i * 1 - |> Heartbeat - |> perform cluster network - ) + ui.HeartbeatField.valueAsNumber + |> int + |> fun i -> i * 1 + |> Heartbeat + |> perform parseByte userPrefs cluster network let clientDataSubmitButton = document.querySelector ".client-data-submit" :?> Browser.Types.HTMLButtonElement clientDataSubmitButton.onclick <- fun _event -> - startupSequence.``then`` (fun () -> - let server = - ui.ClientDataServerField.valueAsNumber |> int |> (fun i -> i * 1) + let server = + ui.ClientDataServerField.valueAsNumber |> int |> (fun i -> i * 1) - let data = ui.ClientDataField.valueAsNumber |> byte + let data = ui.ClientDataField.valueAsNumber |> byte - NetworkAction.ClientRequest (server, data, printfn "%O") - |> perform cluster network - ) + NetworkAction.ClientRequest (server, data) + |> perform parseByte userPrefs cluster network - ui.ShowConsumedMessages.onchange <- fun _event -> fullyRerender cluster network + ui.ShowConsumedMessages.onchange <- fun _event -> fullyRerender parseByte userPrefs cluster network diff --git a/RaftFable/src/RaftFable.fsproj b/RaftFable/src/RaftFable.fsproj index 458f608..3c6e4be 100644 --- a/RaftFable/src/RaftFable.fsproj +++ b/RaftFable/src/RaftFable.fsproj @@ -6,6 +6,7 @@ + diff --git a/RaftFable/src/Result.fs b/RaftFable/src/Result.fs new file mode 100644 index 0000000..d7d54e4 --- /dev/null +++ b/RaftFable/src/Result.fs @@ -0,0 +1,30 @@ +namespace RaftFable + +open System.Collections.Generic + +[] +module Result = + + let allOkOrError<'a, 'err> + (results : Result<'a, 'err> seq) + : Result<'a IReadOnlyList, 'a IReadOnlyList * 'err IReadOnlyList> + = + let okResults = ResizeArray () + let errResults = ResizeArray () + + for r in results do + match r with + | Error e -> errResults.Add e + | Ok o -> okResults.Add o + + let okResults = okResults :> IReadOnlyList<_> + + if errResults.Count = 0 then + Ok okResults + else + Error (okResults, errResults :> IReadOnlyList<_>) + + let get<'a, 'err> (r : Result<'a, 'err>) : 'a = + match r with + | Ok o -> o + | Error e -> failwithf "Tried to unwrap an error (%+A)" e diff --git a/RaftFable/src/Ui.fs b/RaftFable/src/Ui.fs index 4db165c..7902a69 100644 --- a/RaftFable/src/Ui.fs +++ b/RaftFable/src/Ui.fs @@ -11,16 +11,17 @@ type ClusterState<'a> = UndeliveredMessages : (int * Message<'a>) list array } -type UserPreferences = +type UserPreferences<'a> = { LeaderUnderConsideration : int ShowConsumedMessages : bool + ActionHistory : NetworkAction<'a> list } type UiBackingState<'a> = { ClusterState : ClusterState<'a> - UserPreferences : UserPreferences + UserPreferences : UserPreferences<'a> } type UiElements = @@ -36,6 +37,7 @@ type UiElements = HeartbeatField : Browser.Types.HTMLInputElement SelectedLeaderId : Browser.Types.HTMLInputElement ShowConsumedMessages : Browser.Types.HTMLInputElement + ActionHistory : Browser.Types.HTMLTextAreaElement } type RequiresPopulation = @@ -76,6 +78,9 @@ module Ui = let showConsumed = document.querySelector ".show-consumed" :?> Browser.Types.HTMLInputElement + let actionHistory = + document.querySelector ".action-history" :?> Browser.Types.HTMLTextAreaElement + { Document = document ServerStatusTable = serverStatuses @@ -88,6 +93,7 @@ module Ui = HeartbeatField = heartbeatField SelectedLeaderId = selectedLeaderId ShowConsumedMessages = showConsumed + ActionHistory = actionHistory } let reset (clusterSize : int) (ui : UiElements) : RequiresPopulation = @@ -319,8 +325,22 @@ module Ui = } } - let getUserPrefs (ui : UiElements) : UserPreferences = + let getUserPrefs<'a> + (parse : string -> Result<'a, string>) + (clusterSize : int) + (ui : UiElements) + : UserPreferences<'a> + = { LeaderUnderConsideration = ui.SelectedLeaderId.valueAsNumber |> int |> (fun i -> i * 1) ShowConsumedMessages = ui.ShowConsumedMessages.``checked`` + ActionHistory = + // TODO write these back out again, and give a button to Load + ui.ActionHistory.textContent.Split "\n" + |> Seq.filter (not << System.String.IsNullOrEmpty) + |> Seq.map (NetworkAction.tryParse<'a> parse None clusterSize) + |> Result.allOkOrError + // TODO handle this + |> Result.get + |> List.ofSeq }