diff --git a/Raft.Test/TestInMemoryPersistentState.fs b/Raft.Test/TestInMemoryPersistentState.fs index 7be6c49..67a6edb 100644 --- a/Raft.Test/TestInMemoryPersistentState.fs +++ b/Raft.Test/TestInMemoryPersistentState.fs @@ -15,18 +15,24 @@ module TestInMemoryPersistentState = s.CurrentLogIndex |> shouldEqual 0 for i in -2 .. 10 do - s.GetLogEntry (i * 1) |> shouldEqual None + match s.GetLogEntry (i * 1) with + | Some _ -> failwith "should not have had a log entry" + | None -> () s.CurrentTerm |> shouldEqual 0 s.VotedFor |> shouldEqual None - s.GetLastLogEntry () |> shouldEqual None + match s.GetLastLogEntry () with + | Some _ -> failwith "should not have had a log entry" + | None -> () let ofList<'a> (xs : ('a * int) list) : InMemoryPersistentState<'a> = let s = InMemoryPersistentState<'a> () for x, term in xs do - (s :> IPersistentState<_>).AppendToLog x term + (s :> IPersistentState<_>).AppendToLog + (LogEntry.ClientEntry (x, 1, 1, ignore)) + term s @@ -38,26 +44,39 @@ module TestInMemoryPersistentState = [] let ``Nonzero truncation followed by Get succeeds`` () = - let property (truncate : int) (xs : (int * int) list) : bool = + let property (truncate : int) (xs : (byte * int) list) : bool = let truncate = abs truncate + 1 let uut = ofList xs - let oldLog = uut.GetLog () + + let oldLog = + uut.GetLog () + |> List.map (fun (entry, term) -> SerialisedLogEntry.Make entry, term) match (uut :> IPersistentState<_>).GetLogEntry truncate with | None -> (uut :> IPersistentState<_>).TruncateLog truncate - uut.GetLog () = oldLog + + let newLog = + uut.GetLog () + |> List.map (fun (entry, term) -> SerialisedLogEntry.Make entry, term) + + newLog = oldLog | Some (itemStored, entry) -> (uut :> IPersistentState<_>).TruncateLog truncate - (uut :> IPersistentState<_>).GetLastLogEntry () = Some ( - itemStored, - { - Index = truncate - Term = entry - } - ) - && isPrefix (uut.GetLog ()) oldLog + let newLog = + uut.GetLog () + |> List.map (fun (entry, term) -> SerialisedLogEntry.Make entry, term) + + let retrieved, logEntry = + Option.get ((uut :> IPersistentState<_>).GetLastLogEntry ()) + + logEntry = { + Index = truncate + Term = entry + } + && (SerialisedLogEntry.Make retrieved = SerialisedLogEntry.Make itemStored) + && isPrefix newLog oldLog && (uut :> IPersistentState<_>).CurrentLogIndex = truncate Check.QuickThrowOnFailure property @@ -69,13 +88,20 @@ module TestInMemoryPersistentState = let uut = ofList xs // It's not meaningful to take the 0th element - (uut :> IPersistentState<_>).GetLogEntry truncate |> shouldEqual None + match (uut :> IPersistentState<_>).GetLogEntry truncate with + | Some _ -> failwith "should not have had any elements" + | None -> () (uut :> IPersistentState<_>).TruncateLog truncate - uut.GetLog () |> shouldEqual [] + match uut.GetLog () with + | [] -> () + | _ -> failwith "should not have had log entries" let uut = uut :> IPersistentState<_> - uut.GetLastLogEntry () = None && uut.CurrentLogIndex = 0 + + match uut.GetLastLogEntry () with + | Some _ -> false + | None -> uut.CurrentLogIndex = 0 Check.QuickThrowOnFailure property diff --git a/Raft.Test/TestInMemoryServer.fs b/Raft.Test/TestInMemoryServer.fs index bca7b98..58e7dcb 100644 --- a/Raft.Test/TestInMemoryServer.fs +++ b/Raft.Test/TestInMemoryServer.fs @@ -262,6 +262,20 @@ module TestInMemoryServer = let clusterSize = 5 let cluster, network = InMemoryCluster.make clusterSize + let registeredSuccessfully = ref 0 + + let registerResponse (response : RegisterClientResponse) : unit = + response |> shouldEqual (RegisterClientResponse.Success 1) + Interlocked.Increment registeredSuccessfully |> ignore + + let respondedSuccessfully = ref 0 + + let requestResponse (response : ClientResponse) : unit = + response + |> shouldEqual (ClientResponse.Success (1, 1)) + + Interlocked.Increment registeredSuccessfully |> ignore + let startupSequence = [ NetworkAction.InactivityTimeout 1 @@ -287,7 +301,11 @@ module TestInMemoryServer = NetworkAction.NetworkMessage (1, 7) // Submit data to leader. This has the effect of heartbeating the other // nodes, with a heartbeat that contains the new data. - NetworkAction.ClientRequest (1, byte 3) + NetworkAction.ClientRequest (1, ClientRequest.RegisterClient registerResponse) + NetworkAction.ClientRequest ( + 1, + ClientRequest.ClientRequest (1, 1, byte 3, requestResponse) + ) // Deliver the data messages. NetworkAction.NetworkMessage (0, 2) @@ -330,6 +348,9 @@ module TestInMemoryServer = } ] + respondedSuccessfully.Value |> shouldEqual 1 + registeredSuccessfully.Value |> shouldEqual 1 + let freeze<'a> (cluster : Cluster<'a>) = List.init cluster.ClusterSize diff --git a/Raft.Test/ValidHistory.fs b/Raft.Test/ValidHistory.fs index ff27eac..0772a3d 100644 --- a/Raft.Test/ValidHistory.fs +++ b/Raft.Test/ValidHistory.fs @@ -57,12 +57,14 @@ module ValidHistory = yield NetworkAction.InactivityTimeout server ] + (* let clientRequestGen = gen { let! element = elementGen let! id = Gen.choose (0, clusterSize - 1) return NetworkAction.ClientRequest (id * 1, element) } + *) let rec go (len : int) = gen { @@ -71,9 +73,11 @@ module ValidHistory = else let! smaller = go (len - 1) - let! next = + let! next = Gen.elements (permissibleNext ()) + (* clientRequestGen :: List.replicate 5 (Gen.elements (permissibleNext ())) |> Gen.oneof + *) NetworkAction.perform cluster network next return next :: smaller diff --git a/Raft/Domain.fs b/Raft/Domain.fs index fa105e4..6946d13 100644 --- a/Raft/Domain.fs +++ b/Raft/Domain.fs @@ -10,7 +10,13 @@ type Term [] type ServerId -type LogEntry = +[] +type ClientId + +[] +type ClientSequence + +type LogEntryMetadata = { Index : int Term : int @@ -18,3 +24,47 @@ type LogEntry = override this.ToString () = sprintf "Log entry %i at subjective term %i" this.Index this.Term + +type ClientResponse = + | NotLeader of leaderHint : int option + | SessionExpired + | Success of int * int + + override this.ToString () = + match this with + | ClientResponse.NotLeader hint -> + let hint = + match hint with + | None -> "" + | Some leader -> sprintf " (leader hint: %i)" leader + + sprintf "Failed to send data due to not asking leader%s" hint + | ClientResponse.SessionExpired -> "Failed to send data as session expired" + | ClientResponse.Success (client, sequence) -> sprintf "Client %i's request %i succeeded" client sequence + +type RegisterClientResponse = + | NotLeader of leaderHint : int option + | Success of int + + override this.ToString () = + match this with + | RegisterClientResponse.Success client -> sprintf "Registered client with ID %i" client + | RegisterClientResponse.NotLeader hint -> + let hint = + match hint with + | None -> "" + | Some leader -> sprintf " (leader hint: %i)" leader + + sprintf "Failed to register client due to not asking leader%s" hint + +type InternalRaftCommunication = | NewClientRegistered of (RegisterClientResponse -> unit) + +type LogEntry<'a> = + | ClientEntry of 'a * int * int * (ClientResponse -> unit) + | RaftOverhead of InternalRaftCommunication + + override this.ToString () = + match this with + | LogEntry.ClientEntry (data, client, sequence, _) -> + sprintf "Client %i, sequence number %i, sends data %O" client sequence data + | LogEntry.RaftOverhead (InternalRaftCommunication.NewClientRegistered _) -> "New client registration" diff --git a/Raft/InMemory.fs b/Raft/InMemory.fs index 6974a1b..6d58651 100644 --- a/Raft/InMemory.fs +++ b/Raft/InMemory.fs @@ -105,7 +105,7 @@ type NetworkAction<'a> = | InactivityTimeout of int | NetworkMessage of int * int | DropMessage of int * int - | ClientRequest of int * 'a + | ClientRequest of int * ClientRequest<'a> | Heartbeat of int [] @@ -172,35 +172,65 @@ module NetworkAction = else sprintf "Cannot heartbeat a non-leader (%i)." serverId |> Error - let private getClientData<'a> + let private getNewClientTarget<'a> (clusterSize : int) (serverId : string) : Result, string> = + match Int32.TryParse serverId with + | false, _ -> sprintf "Expected an int for a server ID, got '%s'" serverId |> Error + | true, serverId -> + + if serverId >= clusterSize || serverId < 0 then + sprintf "Server ID must be between 0 and %i inclusive, got %i." (clusterSize - 1) serverId + |> Error + else + Ok (serverId * 1) + + let private getClientSubmitData<'a> (parse : string -> Result<'a, string>) (clusterSize : int) (s : string) - : Result * 'a, string> + : Result * int * int * 'a, string> = match s.Split ',' |> List.ofArray with - | serverId :: (_ :: _ as rest) -> + | serverId :: clientId :: clientSequenceNumber :: (_ :: _ as rest) -> let rest = String.concat "," rest |> fun s -> s.TrimStart () - match Int32.TryParse serverId with - | true, serverId -> - if serverId >= clusterSize || serverId < 0 then - sprintf "Server ID must be between 0 and %i inclusive, got %i." (clusterSize - 1) serverId - |> Error - else - - match parse rest with - | Ok b -> Ok (serverId * 1, b) - | Error e -> sprintf "Failed to parse client data: %s" e |> Error + match Int32.TryParse (serverId.Trim ()) with | false, _ -> sprintf "Server ID expected as first comma-separated component, got '%s'." serverId |> Error - | _ -> sprintf "Expected a comma in client data string, got '%s'" s |> Error + | true, serverId -> + + if serverId >= clusterSize || serverId < 0 then + sprintf "Server ID must be between 0 and %i inclusive, got %i." (clusterSize - 1) serverId + |> Error + else + + match Int32.TryParse (clientId.Trim ()) with + | false, _ -> + sprintf "Client ID expected as second comma-separated component, got '%s'." clientId + |> Error + | true, clientId -> + + match Int32.TryParse (clientSequenceNumber.Trim ()) with + | false, _ -> + sprintf + "Client sequence number expected as third comma-separated component, got '%s'." + clientSequenceNumber + |> Error + | true, clientSequenceNumber -> + + match parse rest with + | Ok b -> Ok (serverId * 1, clientId * 1, clientSequenceNumber * 1, b) + | Error e -> sprintf "Failed to parse client data: %s" e |> Error + | _ -> + sprintf "Expected serverId,clientId,clientSequenceNumber,data; got '%s'" s + |> Error /// Optionally also validates leaders against the input set of leaders. let tryParse<'a> (parse : string -> Result<'a, string>) (leaders : Set> option) + (handleRegisterClientResponse : RegisterClientResponse -> unit) + (handleClientDataResponse : ClientResponse -> unit) (clusterSize : int) (s : string) : Result, string> @@ -226,8 +256,17 @@ module NetworkAction = | Ok h -> Heartbeat h |> Ok | Error e -> Error e | 'S' -> - match getClientData parse clusterSize (s.[1..].TrimStart ()) with - | Ok (server, data) -> ClientRequest (server, data) |> Ok + match getNewClientTarget clusterSize (s.[1..].TrimStart ()) with + | Ok target -> + ClientRequest (target, ClientRequest.RegisterClient handleRegisterClientResponse) + |> Ok + | Error e -> Error e + | 'R' -> + match getClientSubmitData parse clusterSize (s.[1..].TrimStart ()) with + | Ok (server, client, sequence, data) -> + (server, ClientRequest.ClientRequest (client, sequence, data, handleClientDataResponse)) + |> ClientRequest + |> Ok | Error e -> Error e | c -> Error (sprintf "unexpected start char '%c'" c) @@ -237,4 +276,6 @@ module NetworkAction = | NetworkAction.NetworkMessage (server, id) -> sprintf "m %i,%i" server id | NetworkAction.DropMessage (server, id) -> sprintf "d %i,%i" server id | NetworkAction.InactivityTimeout server -> sprintf "t %i" server - | NetworkAction.ClientRequest (server, data) -> sprintf "s %i,%O" server data + | NetworkAction.ClientRequest (server, ClientRequest.RegisterClient _) -> sprintf "s %i" server + | NetworkAction.ClientRequest (server, ClientRequest.ClientRequest (client, sequence, data, _)) -> + sprintf "r %i,%i,%i,%O" server client sequence data diff --git a/Raft/PersistentState.fs b/Raft/PersistentState.fs index a0e5346..732672e 100644 --- a/Raft/PersistentState.fs +++ b/Raft/PersistentState.fs @@ -6,15 +6,15 @@ type IPersistentState<'a> = abstract CurrentTerm : int /// If I know about an election in my CurrentTerm, who did I vote for during that election? abstract VotedFor : int option - abstract AppendToLog : 'a -> int -> unit + abstract AppendToLog : LogEntry<'a> -> int -> unit /// Truncate away the most recent entries of the log. /// If `GetLogEntry x` would succeed, and then we call `TruncateLog x`, /// then `GetLogEntry x` will still succeed (but `GetLogEntry (x + 1)` will not). abstract TruncateLog : int -> unit - abstract GetLogEntry : int -> ('a * int) option + abstract GetLogEntry : int -> (LogEntry<'a> * int) option abstract CurrentLogIndex : int - abstract GetLastLogEntry : unit -> ('a * LogEntry) option + abstract GetLastLogEntry : unit -> (LogEntry<'a> * LogEntryMetadata) option abstract AdvanceToTerm : int -> unit abstract IncrementTerm : unit -> unit abstract Vote : int -> unit @@ -24,7 +24,7 @@ type IPersistentState<'a> = type InMemoryPersistentState<'a> () = let mutable currentTerm = 0 let mutable votedFor : int option = None - let log = ResizeArray<'a * int> () + let log = ResizeArray * int> () member this.GetLog () = log |> List.ofSeq @@ -45,7 +45,7 @@ type InMemoryPersistentState<'a> () = currentTerm <- term / 1 votedFor <- None - member this.AppendToLog entry term = log.Add (entry, term) + member this.AppendToLog (entry : LogEntry<'a>) term = log.Add (entry, term) member this.TruncateLog position = let position = position / 1 @@ -54,7 +54,7 @@ type InMemoryPersistentState<'a> () = let position = if position < 0 then 0 else position log.RemoveRange (position, log.Count - position) - member this.GetLastLogEntry () : ('a * LogEntry) option = + member this.GetLastLogEntry () : (LogEntry<'a> * LogEntryMetadata) option = if log.Count = 0 then None else diff --git a/Raft/Server.fs b/Raft/Server.fs index 226da42..e8400af 100644 --- a/Raft/Server.fs +++ b/Raft/Server.fs @@ -1,7 +1,7 @@ namespace Raft /// Server state which need not survive a server crash. -type VolatileState = +type VolatileState<'a> = { /// The index of the highest log entry we know is persisted to a majority of the cluster. // Why is it correct for this to be volatile? @@ -9,13 +9,16 @@ type VolatileState = // we *don't* know that any of our log is reflected in the other nodes. // (We'll soon learn a better value of CommitIndex as we start receiving messages again.) CommitIndex : int + /// TODO: do this, and model applying to state machine LastApplied : int + Clients : Map, Map, 'a>> } - static member New : VolatileState = + static member New : VolatileState<'a> = { CommitIndex = 0 LastApplied = 0 + Clients = Map.empty } type LeaderState = @@ -30,7 +33,7 @@ type LeaderState = MatchIndex : int array } - static member New (clusterSize : int) (currentIndex : int) : LeaderState = + static member New<'a> (clusterSize : int) (currentIndex : int) : LeaderState = { // +1, because these are indexed from 1. ToSend = Array.create clusterSize (currentIndex + 1) @@ -71,7 +74,7 @@ type RequestVoteMessage = { CandidateTerm : int CandidateId : int - CandidateLastLogEntry : LogEntry option + CandidateLastLogEntry : LogEntryMetadata option ReplyChannel : RequestVoteReply -> unit } @@ -109,11 +112,11 @@ type AppendEntriesMessage<'a> = /// I am your leader! This is me! (so everyone knows where to send clients to) LeaderId : int /// The entry immediately preceding the entry I'm sending you, so you can tell if we've got out of sync. - PrevLogEntry : LogEntry option + PrevLogEntry : LogEntryMetadata option /// Followers, append this entry to your log. (Or, if None, this is just a heartbeat.) /// It was determined at the given term - recall that we might need to bring a restarted node up to speed /// with what happened during terms that took place while it was down. - NewEntry : ('a * int) option + NewEntry : (LogEntry<'a> * int) option LeaderCommitIndex : int /// TODO - we don't need this, the responder should just construct /// the appropriate Message and send it themselves @@ -133,13 +136,27 @@ type AppendEntriesMessage<'a> = this.LeaderTerm this.LeaderCommitIndex +type SerialisedLogEntry<'a> = + | SerialisedClientEntry of 'a * int * int + | SerialisedClientRegister + + static member Make (entry : LogEntry<'a>) : SerialisedLogEntry<'a> = + match entry with + | ClientEntry (a, client, sequence, _) -> SerialisedClientEntry (a, client, sequence) + | RaftOverhead (NewClientRegistered _) -> SerialisedClientRegister + + override this.ToString () = + match this with + | SerialisedClientRegister -> "" + | SerialisedClientEntry (data, client, sequence) -> sprintf "Client %i (%i) puts data: %O" client sequence data + /// A readout of the server's internal state, suitable for e.g. debugging tools. type ServerInternalState<'a> = { LogIndex : int CurrentTerm : int CurrentVote : int option - Log : ('a * int) option list + Log : (SerialisedLogEntry<'a> * int) option list /// A clone of the leader state, if this is a leader. LeaderState : LeaderState option } @@ -182,11 +199,15 @@ type Reply = | RequestVoteReply v -> v.ToString () | AppendEntriesReply r -> r.ToString () +type ClientRequest<'a> = + | RegisterClient of (RegisterClientResponse -> unit) + | ClientRequest of int * int * 'a * (ClientResponse -> unit) + [] type Message<'a> = | Instruction of Instruction<'a> | Reply of Reply - | ClientRequest of 'a + | ClientRequest of ClientRequest<'a> override this.ToString () = match this with @@ -270,7 +291,7 @@ type Server<'a> messageChannel : int -> Message<'a> -> unit ) = - let mutable volatileState = VolatileState.New + let mutable volatileState = VolatileState<'a>.New let mutable currentType = ServerSpecialisation.Follower @@ -567,11 +588,51 @@ type Server<'a> | Some (_, term) -> term if ourLogTerm = persistentState.CurrentTerm then + let oldCommitIndex = volatileState.CommitIndex + volatileState <- { volatileState with CommitIndex = maxLogAQuorumHasCommitted } + for i in (oldCommitIndex / 1 + 1) .. maxLogAQuorumHasCommitted / 1 do + let i = i * 1 + + match persistentState.GetLogEntry i with + | None -> + failwith "Invariant violated. Leader does not have a log entry for a committed index." + | Some (logEntry, _term) -> + match logEntry with + | LogEntry.ClientEntry (stored, client, sequence, replyChannel) -> + let newClients = + volatileState.Clients + |> Map.change + client + (fun messageLog -> + let messages = + match messageLog with + | None -> Map.empty + | Some messageLog -> messageLog + + messages |> Map.change sequence (fun _ -> Some stored) |> Some + ) + + volatileState <- + { volatileState with + Clients = newClients + } + + replyChannel (ClientResponse.Success (client, sequence)) + | LogEntry.RaftOverhead (NewClientRegistered replyChannel) -> + let clientId = i / 1 * 1 + + volatileState <- + { volatileState with + Clients = volatileState.Clients |> Map.add clientId Map.empty + } + + clientId |> RegisterClientResponse.Success |> replyChannel + | RequestVoteReply requestVoteReply -> match currentType with | ServerSpecialisation.Leader _ @@ -635,18 +696,34 @@ type Server<'a> |> messageChannel (i * 1) | ServerAction.Receive (Message.Instruction m) -> processMessage m | ServerAction.Receive (Message.Reply r) -> processReply r - | ServerAction.Receive (Message.ClientRequest toAdd) -> - match currentType with - | ServerSpecialisation.Leader leaderState -> - persistentState.AppendToLog toAdd persistentState.CurrentTerm - //replyChannel ClientReply.Acknowledged - emitHeartbeat leaderState - | ServerSpecialisation.Follower followerState -> - //replyChannel (ClientReply.Redirect followerState.CurrentLeader) - () - | ServerSpecialisation.Candidate _ -> - //replyChannel ClientReply.Dropped - () + | ServerAction.Receive (Message.ClientRequest request) -> + match request with + | ClientRequest.RegisterClient replyChannel -> + match currentType with + | ServerSpecialisation.Follower followerState -> + replyChannel (RegisterClientResponse.NotLeader followerState.CurrentLeader) + | ServerSpecialisation.Candidate _ -> replyChannel (RegisterClientResponse.NotLeader None) + | ServerSpecialisation.Leader leaderState -> + persistentState.AppendToLog + (RaftOverhead (NewClientRegistered replyChannel)) + persistentState.CurrentTerm + + leaderState.MatchIndex.[me / 1] <- persistentState.CurrentLogIndex + + emitHeartbeat leaderState + | ClientRequest.ClientRequest (client, sequenceNumber, toAdd, replyChannel) -> + match currentType with + | ServerSpecialisation.Leader leaderState -> + persistentState.AppendToLog + (LogEntry.ClientEntry (toAdd, client, sequenceNumber, replyChannel)) + persistentState.CurrentTerm + + leaderState.MatchIndex.[me / 1] <- persistentState.CurrentLogIndex + + emitHeartbeat leaderState + | ServerSpecialisation.Follower followerState -> + replyChannel (ClientResponse.NotLeader followerState.CurrentLeader) + | ServerSpecialisation.Candidate _ -> replyChannel (ClientResponse.NotLeader None) | ServerAction.Sync replyChannel -> replyChannel.Reply () | ServerAction.StateReadout replyChannel -> { @@ -659,7 +736,11 @@ type Server<'a> | Some (_, last) -> List.init (last.Index / 1) - (fun index -> persistentState.GetLogEntry (1 + index * 1)) + (fun index -> + match persistentState.GetLogEntry (1 + index * 1) with + | None -> None + | Some (entry, term) -> (SerialisedLogEntry.Make entry, term) |> Some + ) LeaderState = match currentType with | ServerSpecialisation.Leader state -> state.Clone () |> Some @@ -676,7 +757,7 @@ type Server<'a> #endif mailbox - member this.SendClientRequest (request : 'a) = + member this.SendClientRequest (request : ClientRequest<'a>) = mailbox.Post (ServerAction.Receive (Message.ClientRequest request)) member this.TriggerInactivityTimeout () = mailbox.Post ServerAction.BeginElection diff --git a/RaftExplorer/Program.fs b/RaftExplorer/Program.fs index cab4600..32e067d 100644 --- a/RaftExplorer/Program.fs +++ b/RaftExplorer/Program.fs @@ -22,7 +22,7 @@ module Program = let rec getAction (leaders : Set>) (clusterSize : int) : NetworkAction = printf - "Enter action. Trigger [t]imeout , [h]eartbeat a leader , [d]rop message , [s]ubmit data , or allow [m]essage : " + "Enter action. Trigger [t]imeout , [h]eartbeat a leader , [d]rop message , establish new [s]ession , or allow [m]essage : " let s = let rec go () = @@ -36,7 +36,27 @@ module Program = | true, b -> Ok b | false, _ -> Error (sprintf "expected a byte, got '%s'" s) - match NetworkAction.tryParse parseByte (Some leaders) clusterSize s with + let handleRegister (response : RegisterClientResponse) : unit = + match response with + | RegisterClientResponse.Success i -> printfn "Client successfully registered, getting ID %i" i + | RegisterClientResponse.NotLeader hint -> + match hint with + | Some hint -> printfn "Client failed to register due to not asking a leader; try asking server %i" hint + | None -> printfn "Client failed to register due to not asking a leader." + + let handleResponse (response : ClientResponse) : unit = + match response with + | ClientResponse.NotLeader hint -> + match hint with + | Some hint -> + printfn "Client failed to send request due to not asking a leader; try asking server %i" hint + | None -> printfn "Client failed to send request due to not asking a leader." + | ClientResponse.SessionExpired -> + failwith "Client failed to send request due to expiry of session. This currently can't happen." + | ClientResponse.Success (client, sequence) -> + printfn "Raft has committed request from client %i with sequence number %i" client sequence + + match NetworkAction.tryParse parseByte (Some leaders) handleRegister handleResponse clusterSize s with | Ok action -> action | Error e -> printfn "%s" e diff --git a/RaftFable/public/index.html b/RaftFable/public/index.html index 675b4cc..b0bc00f 100644 --- a/RaftFable/public/index.html +++ b/RaftFable/public/index.html @@ -23,6 +23,7 @@

Interaction

+

Servers

@@ -31,13 +32,24 @@
+

Clients

+
+ + + +
+
+ + + +

Messages in flight

diff --git a/RaftFable/src/App.fs b/RaftFable/src/App.fs index 82ca84e..ad1e1d0 100644 --- a/RaftFable/src/App.fs +++ b/RaftFable/src/App.fs @@ -11,6 +11,10 @@ module App = let ui = Ui.initialise document + let handleRegisterClientResponse (response : RegisterClientResponse) : unit = printfn "%O" response + + let handleClientResponse (response : ClientResponse) : unit = printfn "%O" response + let rec fullyRerender<'a> (parse : string -> Result<'a, string>) (userPrefs : UserPreferences<'a> ref) @@ -18,7 +22,8 @@ module App = (network : Network<'a>) : Promise = - userPrefs.Value <- Ui.getUserPrefs<'a> parse cluster.ClusterSize ui + userPrefs.Value <- + Ui.getUserPrefs<'a> parse handleRegisterClientResponse handleClientResponse cluster.ClusterSize ui Ui.freezeState cluster network |> Async.StartAsPromise @@ -57,7 +62,7 @@ module App = | true, v -> Ok v let userPrefs : UserPreferences ref = - ref (Ui.getUserPrefs parseByte clusterSize ui) + ref (Ui.getUserPrefs parseByte handleRegisterClientResponse handleClientResponse clusterSize ui) let mutable cluster, network = InMemoryCluster.make clusterSize @@ -105,7 +110,7 @@ module App = cluster <- newCluster network <- newNetwork - userPrefs.Value <- Ui.getUserPrefs parseByte clusterSize ui + userPrefs.Value <- Ui.getUserPrefs parseByte handleRegisterClientResponse handleClientResponse clusterSize ui startupActions |> fun s -> (fullyRerender parseByte userPrefs cluster network, s) @@ -151,11 +156,31 @@ module App = clientDataSubmitButton.onclick <- fun _event -> let server = - ui.ClientDataServerField.valueAsNumber |> int |> (fun i -> i * 1) + ui.ClientData.ClientDataServerField.valueAsNumber |> int |> (*) 1 - let data = ui.ClientDataField.valueAsNumber |> byte + let data = ui.ClientData.ClientDataField.valueAsNumber |> byte + let clientId = ui.ClientData.ClientIdField.valueAsNumber |> int |> (*) 1 - NetworkAction.ClientRequest (server, data) + let clientSequence = + ui.ClientData.ClientSequenceField.valueAsNumber |> int |> (*) 1 + + // TODO: store the reply and display it + NetworkAction.ClientRequest ( + server, + ClientRequest.ClientRequest (clientId, clientSequence, data, handleClientResponse) + ) + + |> perform parseByte userPrefs cluster network + + let clientCreateButton = + document.querySelector ".client-create" :?> Browser.Types.HTMLButtonElement + + clientCreateButton.onclick <- + fun _event -> + let server = ui.ClientCreateServer.valueAsNumber |> int |> (*) 1 + + // TODO: store the reply and display it + NetworkAction.ClientRequest (server, ClientRequest.RegisterClient handleRegisterClientResponse) |> perform parseByte userPrefs cluster network ui.ShowConsumedMessages.onchange <- fun _event -> fullyRerender parseByte userPrefs cluster network diff --git a/RaftFable/src/Ui.fs b/RaftFable/src/Ui.fs index 7902a69..4c98d4e 100644 --- a/RaftFable/src/Ui.fs +++ b/RaftFable/src/Ui.fs @@ -24,6 +24,14 @@ type UiBackingState<'a> = UserPreferences : UserPreferences<'a> } +type ClientDataSection = + { + ClientDataField : Browser.Types.HTMLInputElement + ClientDataServerField : Browser.Types.HTMLInputElement + ClientIdField : Browser.Types.HTMLInputElement + ClientSequenceField : Browser.Types.HTMLInputElement + } + type UiElements = { Document : Browser.Types.Document @@ -32,12 +40,13 @@ type UiElements = MessageQueueArea : Browser.Types.HTMLTableElement LeaderStateTable : Browser.Types.HTMLTableElement TimeoutField : Browser.Types.HTMLInputElement - ClientDataField : Browser.Types.HTMLInputElement - ClientDataServerField : Browser.Types.HTMLInputElement HeartbeatField : Browser.Types.HTMLInputElement SelectedLeaderId : Browser.Types.HTMLInputElement ShowConsumedMessages : Browser.Types.HTMLInputElement ActionHistory : Browser.Types.HTMLTextAreaElement + ClientsList : Browser.Types.HTMLDivElement + ClientData : ClientDataSection + ClientCreateServer : Browser.Types.HTMLInputElement } type RequiresPopulation = @@ -72,6 +81,12 @@ module Ui = let clientDataField = document.querySelector ".client-data" :?> Browser.Types.HTMLInputElement + let clientSequenceField = + document.querySelector ".client-sequence" :?> Browser.Types.HTMLInputElement + + let clientIdField = + document.querySelector ".client-id" :?> Browser.Types.HTMLInputElement + let selectedLeaderId = document.querySelector ".leader-select" :?> Browser.Types.HTMLInputElement @@ -81,6 +96,19 @@ module Ui = let actionHistory = document.querySelector ".action-history" :?> Browser.Types.HTMLTextAreaElement + let clientsList = document.querySelector ".clients" :?> Browser.Types.HTMLDivElement + + let clientCreateServer = + document.querySelector ".create-client-server" :?> Browser.Types.HTMLInputElement + + let clientInfo = + { + ClientDataField = clientDataField + ClientDataServerField = clientDataServerField + ClientIdField = clientIdField + ClientSequenceField = clientSequenceField + } + { Document = document ServerStatusTable = serverStatuses @@ -88,12 +116,13 @@ module Ui = MessageQueueArea = messageQueueArea LeaderStateTable = leaderStateTable TimeoutField = timeoutField - ClientDataField = clientDataField - ClientDataServerField = clientDataServerField HeartbeatField = heartbeatField SelectedLeaderId = selectedLeaderId ShowConsumedMessages = showConsumed ActionHistory = actionHistory + ClientsList = clientsList + ClientData = clientInfo + ClientCreateServer = clientCreateServer } let reset (clusterSize : int) (ui : UiElements) : RequiresPopulation = @@ -119,15 +148,22 @@ module Ui = ui.SelectedLeaderId.min <- "0" ui.SelectedLeaderId.max <- sprintf "%i" (clusterSize - 1) ui.SelectedLeaderId.defaultValue <- "0" - ui.ClientDataField.max <- "255" - ui.ClientDataField.min <- "0" - ui.ClientDataField.defaultValue <- "0" + ui.ClientData.ClientDataField.max <- "255" + ui.ClientData.ClientDataField.min <- "0" + ui.ClientData.ClientDataField.defaultValue <- "0" + ui.ClientData.ClientDataServerField.max <- string (clusterSize - 1) + ui.ClientData.ClientDataServerField.min <- "0" + ui.ClientData.ClientDataServerField.defaultValue <- "0" + ui.ClientData.ClientIdField.min <- "0" + ui.ClientData.ClientIdField.defaultValue <- "0" + ui.ClientData.ClientSequenceField.min <- "0" + ui.ClientData.ClientSequenceField.defaultValue <- "0" + ui.ClientCreateServer.min <- "0" + ui.ClientCreateServer.defaultValue <- "0" + ui.ClientCreateServer.max <- string (clusterSize - 1) ui.HeartbeatField.max <- string (clusterSize - 1) ui.HeartbeatField.min <- "0" ui.HeartbeatField.defaultValue <- "0" - ui.ClientDataServerField.max <- string (clusterSize - 1) - ui.ClientDataServerField.min <- "0" - ui.ClientDataServerField.defaultValue <- "0" ui.TimeoutField.max <- string (clusterSize - 1) ui.TimeoutField.min <- "0" ui.TimeoutField.defaultValue <- "0" @@ -148,6 +184,8 @@ module Ui = ui.ShowConsumedMessages.defaultChecked <- false + ui.ClientsList.innerText <- "" + { ServerStatusNodes = serverStatusNodes } @@ -327,6 +365,8 @@ module Ui = let getUserPrefs<'a> (parse : string -> Result<'a, string>) + (handleRegisterClientResponse : RegisterClientResponse -> unit) + (handleClientDataResponse : ClientResponse -> unit) (clusterSize : int) (ui : UiElements) : UserPreferences<'a> @@ -338,7 +378,14 @@ module Ui = // TODO write these back out again, and give a button to Load ui.ActionHistory.textContent.Split "\n" |> Seq.filter (not << System.String.IsNullOrEmpty) - |> Seq.map (NetworkAction.tryParse<'a> parse None clusterSize) + |> Seq.map ( + NetworkAction.tryParse<'a> + parse + None + handleRegisterClientResponse + handleClientDataResponse + clusterSize + ) |> Result.allOkOrError // TODO handle this |> Result.get