Add action history to UI

This commit is contained in:
Smaug123
2022-11-06 14:24:19 +00:00
parent 36198d951d
commit fe0deaba36
9 changed files with 286 additions and 157 deletions

View File

@@ -1,5 +1,6 @@
namespace Raft
open System
open System.Collections.Generic
type Cluster<'a> =
@@ -26,6 +27,18 @@ type Cluster<'a> =
member this.ClusterSize : int = this.Servers.Length
member this.Leaders : Set<int<ServerId>> =
((Set.empty, 0<ServerId>), this.Servers)
||> Array.fold (fun (leaders, count) server ->
let leaders =
match server.State with
| ServerStatus.Leader _ -> Set.add count leaders
| _ -> leaders
leaders, count + 1<ServerId>
)
|> fst
type Network<'a> =
internal
{
@@ -92,7 +105,7 @@ type NetworkAction<'a> =
| InactivityTimeout of int<ServerId>
| NetworkMessage of int<ServerId> * int
| DropMessage of int<ServerId> * int
| ClientRequest of int<ServerId> * 'a * (ClientReply -> unit)
| ClientRequest of int<ServerId> * 'a
| Heartbeat of int<ServerId>
[<RequireQualifiedAccess>]
@@ -106,5 +119,122 @@ module NetworkAction =
| NetworkMessage (serverId, messageId) ->
network.InboundMessage serverId messageId |> cluster.SendMessage serverId
network.DropMessage serverId messageId
| ClientRequest (server, request, replyChannel) ->
Message.ClientRequest (request, replyChannel) |> cluster.SendMessage server
| ClientRequest (server, request) -> Message.ClientRequest request |> cluster.SendMessage server
let private getMessage (clusterSize : int) (s : string) : Result<int<ServerId> * int, string> =
match s.Split ',' with
| [| serverId ; messageId |] ->
let serverId = serverId.TrimEnd ()
let messageId = messageId.Trim ()
match Int32.TryParse serverId with
| true, serverId ->
match Int32.TryParse messageId with
| true, messageId ->
if serverId >= clusterSize || serverId < 0 then
sprintf "Server ID must be between 0 and %i inclusive, got %i" (clusterSize - 1) serverId
|> Error
else
Ok (serverId * 1<ServerId>, messageId)
| false, _ -> sprintf "Non-integer input '%s' for message ID." messageId |> Error
| false, _ -> sprintf "Non-integer input '%s' for server ID." serverId |> Error
| _ -> Error "Expected a single comma."
let private getTimeout (clusterSize : int) (serverId : string) : Result<int<ServerId>, string> =
match Int32.TryParse serverId with
| false, _ -> Error (sprintf "Expected an integer, got '%s'" serverId)
| true, serverId ->
if serverId >= clusterSize || serverId < 0 then
sprintf "Server ID must be between 0 and %i inclusive, got %i." (clusterSize - 1) serverId
|> Error
else
serverId * 1<ServerId> |> Ok
let private getHeartbeat (leaders : Set<int<ServerId>> option) (clusterSize : int) (serverId : string) =
match Int32.TryParse serverId with
| false, _ -> sprintf "Expected an integer server ID, got '%s'" serverId |> Error
| true, serverId ->
if serverId >= clusterSize || serverId < 0 then
sprintf "Server ID must be between 0 and %i inclusive, got %i." (clusterSize - 1) serverId
|> Error
else
let serverId = serverId * 1<ServerId>
match leaders with
| None -> Ok serverId
| Some leaders ->
if leaders |> Set.contains serverId then
Ok serverId
else
sprintf "Cannot heartbeat a non-leader (%i)." serverId |> Error
let private getClientData<'a>
(parse : string -> Result<'a, string>)
(clusterSize : int)
(s : string)
: Result<int<ServerId> * 'a, string>
=
match s.Split ',' |> List.ofArray with
| serverId :: (_ :: _ as rest) ->
let rest = String.concat "," rest |> fun s -> s.TrimStart ()
match Int32.TryParse serverId with
| true, serverId ->
if serverId >= clusterSize || serverId < 0 then
sprintf "Server ID must be between 0 and %i inclusive, got %i." (clusterSize - 1) serverId
|> Error
else
match parse rest with
| Ok b -> Ok (serverId * 1<ServerId>, b)
| Error e -> sprintf "Failed to parse client data: %s" e |> Error
| false, _ ->
sprintf "Server ID expected as first comma-separated component, got '%s'." serverId
|> Error
| _ -> sprintf "Expected a comma in client data string, got '%s'" s |> Error
/// Optionally also validates leaders against the input set of leaders.
let tryParse<'a>
(parse : string -> Result<'a, string>)
(leaders : Set<int<ServerId>> option)
(clusterSize : int)
(s : string)
: Result<NetworkAction<'a>, string>
=
if String.IsNullOrEmpty s then
Error "Can't parse an empty string"
else
match Char.ToUpper s.[0] with
| 'T' ->
match getTimeout clusterSize (s.[1..].TrimStart ()) with
| Ok t -> t |> InactivityTimeout |> Ok
| Error e -> Error e
| 'D' ->
match getMessage clusterSize (s.[1..].TrimStart ()) with
| Ok m -> m |> DropMessage |> Ok
| Error e -> Error e
| 'M' ->
match getMessage clusterSize (s.[1..].TrimStart ()) with
| Ok m -> m |> NetworkMessage |> Ok
| Error e -> Error e
| 'H' ->
match getHeartbeat leaders clusterSize (s.[1..].TrimStart ()) with
| Ok h -> Heartbeat h |> Ok
| Error e -> Error e
| 'S' ->
match getClientData parse clusterSize (s.[1..].TrimStart ()) with
| Ok (server, data) -> ClientRequest (server, data) |> Ok
| Error e -> Error e
| c -> Error (sprintf "unexpected start char '%c'" c)
let toString<'a> (action : NetworkAction<'a>) : string =
match action with
| NetworkAction.Heartbeat h -> sprintf "h %i" h
| NetworkAction.NetworkMessage (server, id) -> sprintf "m %i,%i" server id
| NetworkAction.DropMessage (server, id) -> sprintf "d %i,%i" server id
| NetworkAction.InactivityTimeout server -> sprintf "t %i" server
| NetworkAction.ClientRequest (server, data) -> sprintf "s %i,%O" server data

View File

@@ -186,13 +186,13 @@ type Reply =
type Message<'a> =
| Instruction of Instruction<'a>
| Reply of Reply
| ClientRequest of 'a * (ClientReply -> unit)
| ClientRequest of 'a
override this.ToString () =
match this with
| Instruction i -> i.ToString ()
| Reply r -> r.ToString ()
| ClientRequest (a, _) -> sprintf "Client requested insertion of: %O" a
| ClientRequest a -> sprintf "Client requested insertion of: %O" a
type private CandidateState =
{
@@ -635,15 +635,18 @@ type Server<'a>
|> messageChannel (i * 1<ServerId>)
| ServerAction.Receive (Message.Instruction m) -> processMessage m
| ServerAction.Receive (Message.Reply r) -> processReply r
| ServerAction.Receive (Message.ClientRequest (toAdd, replyChannel)) ->
| ServerAction.Receive (Message.ClientRequest toAdd) ->
match currentType with
| ServerSpecialisation.Leader leaderState ->
persistentState.AppendToLog toAdd persistentState.CurrentTerm
replyChannel ClientReply.Acknowledged
//replyChannel ClientReply.Acknowledged
emitHeartbeat leaderState
| ServerSpecialisation.Follower followerState ->
replyChannel (ClientReply.Redirect followerState.CurrentLeader)
| ServerSpecialisation.Candidate _ -> replyChannel ClientReply.Dropped
//replyChannel (ClientReply.Redirect followerState.CurrentLeader)
()
| ServerSpecialisation.Candidate _ ->
//replyChannel ClientReply.Dropped
()
| ServerAction.Sync replyChannel -> replyChannel.Reply ()
| ServerAction.StateReadout replyChannel ->
{
@@ -673,8 +676,8 @@ type Server<'a>
#endif
mailbox
member this.SendClientRequest (request : 'a) (reply : ClientReply -> unit) =
mailbox.Post (ServerAction.Receive (Message.ClientRequest (request, reply)))
member this.SendClientRequest (request : 'a) =
mailbox.Post (ServerAction.Receive (Message.ClientRequest request))
member this.TriggerInactivityTimeout () = mailbox.Post ServerAction.BeginElection
member this.TriggerHeartbeatTimeout () = mailbox.Post ServerAction.EmitHeartbeat