Plumb through client requests to UI

This commit is contained in:
Smaug123
2022-11-07 09:40:18 +00:00
parent e7b2f65596
commit 5df2910b7f
11 changed files with 413 additions and 86 deletions

View File

@@ -10,7 +10,13 @@ type Term
[<Measure>]
type ServerId
type LogEntry =
[<Measure>]
type ClientId
[<Measure>]
type ClientSequence
type LogEntryMetadata =
{
Index : int<LogIndex>
Term : int<Term>
@@ -18,3 +24,47 @@ type LogEntry =
override this.ToString () =
sprintf "Log entry %i at subjective term %i" this.Index this.Term
type ClientResponse =
| NotLeader of leaderHint : int<ServerId> option
| SessionExpired
| Success of int<ClientId> * int<ClientSequence>
override this.ToString () =
match this with
| ClientResponse.NotLeader hint ->
let hint =
match hint with
| None -> ""
| Some leader -> sprintf " (leader hint: %i)" leader
sprintf "Failed to send data due to not asking leader%s" hint
| ClientResponse.SessionExpired -> "Failed to send data as session expired"
| ClientResponse.Success (client, sequence) -> sprintf "Client %i's request %i succeeded" client sequence
type RegisterClientResponse =
| NotLeader of leaderHint : int<ServerId> option
| Success of int<ClientId>
override this.ToString () =
match this with
| RegisterClientResponse.Success client -> sprintf "Registered client with ID %i" client
| RegisterClientResponse.NotLeader hint ->
let hint =
match hint with
| None -> ""
| Some leader -> sprintf " (leader hint: %i)" leader
sprintf "Failed to register client due to not asking leader%s" hint
type InternalRaftCommunication = | NewClientRegistered of (RegisterClientResponse -> unit)
type LogEntry<'a> =
| ClientEntry of 'a * int<ClientId> * int<ClientSequence> * (ClientResponse -> unit)
| RaftOverhead of InternalRaftCommunication
override this.ToString () =
match this with
| LogEntry.ClientEntry (data, client, sequence, _) ->
sprintf "Client %i, sequence number %i, sends data %O" client sequence data
| LogEntry.RaftOverhead (InternalRaftCommunication.NewClientRegistered _) -> "New client registration"

View File

@@ -105,7 +105,7 @@ type NetworkAction<'a> =
| InactivityTimeout of int<ServerId>
| NetworkMessage of int<ServerId> * int
| DropMessage of int<ServerId> * int
| ClientRequest of int<ServerId> * 'a
| ClientRequest of int<ServerId> * ClientRequest<'a>
| Heartbeat of int<ServerId>
[<RequireQualifiedAccess>]
@@ -172,35 +172,65 @@ module NetworkAction =
else
sprintf "Cannot heartbeat a non-leader (%i)." serverId |> Error
let private getClientData<'a>
let private getNewClientTarget<'a> (clusterSize : int) (serverId : string) : Result<int<ServerId>, string> =
match Int32.TryParse serverId with
| false, _ -> sprintf "Expected an int for a server ID, got '%s'" serverId |> Error
| true, serverId ->
if serverId >= clusterSize || serverId < 0 then
sprintf "Server ID must be between 0 and %i inclusive, got %i." (clusterSize - 1) serverId
|> Error
else
Ok (serverId * 1<ServerId>)
let private getClientSubmitData<'a>
(parse : string -> Result<'a, string>)
(clusterSize : int)
(s : string)
: Result<int<ServerId> * 'a, string>
: Result<int<ServerId> * int<ClientId> * int<ClientSequence> * 'a, string>
=
match s.Split ',' |> List.ofArray with
| serverId :: (_ :: _ as rest) ->
| serverId :: clientId :: clientSequenceNumber :: (_ :: _ as rest) ->
let rest = String.concat "," rest |> fun s -> s.TrimStart ()
match Int32.TryParse serverId with
| true, serverId ->
if serverId >= clusterSize || serverId < 0 then
sprintf "Server ID must be between 0 and %i inclusive, got %i." (clusterSize - 1) serverId
|> Error
else
match parse rest with
| Ok b -> Ok (serverId * 1<ServerId>, b)
| Error e -> sprintf "Failed to parse client data: %s" e |> Error
match Int32.TryParse (serverId.Trim ()) with
| false, _ ->
sprintf "Server ID expected as first comma-separated component, got '%s'." serverId
|> Error
| _ -> sprintf "Expected a comma in client data string, got '%s'" s |> Error
| true, serverId ->
if serverId >= clusterSize || serverId < 0 then
sprintf "Server ID must be between 0 and %i inclusive, got %i." (clusterSize - 1) serverId
|> Error
else
match Int32.TryParse (clientId.Trim ()) with
| false, _ ->
sprintf "Client ID expected as second comma-separated component, got '%s'." clientId
|> Error
| true, clientId ->
match Int32.TryParse (clientSequenceNumber.Trim ()) with
| false, _ ->
sprintf
"Client sequence number expected as third comma-separated component, got '%s'."
clientSequenceNumber
|> Error
| true, clientSequenceNumber ->
match parse rest with
| Ok b -> Ok (serverId * 1<ServerId>, clientId * 1<ClientId>, clientSequenceNumber * 1<ClientSequence>, b)
| Error e -> sprintf "Failed to parse client data: %s" e |> Error
| _ ->
sprintf "Expected serverId,clientId,clientSequenceNumber,data; got '%s'" s
|> Error
/// Optionally also validates leaders against the input set of leaders.
let tryParse<'a>
(parse : string -> Result<'a, string>)
(leaders : Set<int<ServerId>> option)
(handleRegisterClientResponse : RegisterClientResponse -> unit)
(handleClientDataResponse : ClientResponse -> unit)
(clusterSize : int)
(s : string)
: Result<NetworkAction<'a>, string>
@@ -226,8 +256,17 @@ module NetworkAction =
| Ok h -> Heartbeat h |> Ok
| Error e -> Error e
| 'S' ->
match getClientData parse clusterSize (s.[1..].TrimStart ()) with
| Ok (server, data) -> ClientRequest (server, data) |> Ok
match getNewClientTarget clusterSize (s.[1..].TrimStart ()) with
| Ok target ->
ClientRequest (target, ClientRequest.RegisterClient handleRegisterClientResponse)
|> Ok
| Error e -> Error e
| 'R' ->
match getClientSubmitData parse clusterSize (s.[1..].TrimStart ()) with
| Ok (server, client, sequence, data) ->
(server, ClientRequest.ClientRequest (client, sequence, data, handleClientDataResponse))
|> ClientRequest
|> Ok
| Error e -> Error e
| c -> Error (sprintf "unexpected start char '%c'" c)
@@ -237,4 +276,6 @@ module NetworkAction =
| NetworkAction.NetworkMessage (server, id) -> sprintf "m %i,%i" server id
| NetworkAction.DropMessage (server, id) -> sprintf "d %i,%i" server id
| NetworkAction.InactivityTimeout server -> sprintf "t %i" server
| NetworkAction.ClientRequest (server, data) -> sprintf "s %i,%O" server data
| NetworkAction.ClientRequest (server, ClientRequest.RegisterClient _) -> sprintf "s %i" server
| NetworkAction.ClientRequest (server, ClientRequest.ClientRequest (client, sequence, data, _)) ->
sprintf "r %i,%i,%i,%O" server client sequence data

View File

@@ -6,15 +6,15 @@ type IPersistentState<'a> =
abstract CurrentTerm : int<Term>
/// If I know about an election in my CurrentTerm, who did I vote for during that election?
abstract VotedFor : int<ServerId> option
abstract AppendToLog : 'a -> int<Term> -> unit
abstract AppendToLog : LogEntry<'a> -> int<Term> -> unit
/// Truncate away the most recent entries of the log.
/// If `GetLogEntry x` would succeed, and then we call `TruncateLog x`,
/// then `GetLogEntry x` will still succeed (but `GetLogEntry (x + 1)` will not).
abstract TruncateLog : int<LogIndex> -> unit
abstract GetLogEntry : int<LogIndex> -> ('a * int<Term>) option
abstract GetLogEntry : int<LogIndex> -> (LogEntry<'a> * int<Term>) option
abstract CurrentLogIndex : int<LogIndex>
abstract GetLastLogEntry : unit -> ('a * LogEntry) option
abstract GetLastLogEntry : unit -> (LogEntry<'a> * LogEntryMetadata) option
abstract AdvanceToTerm : int<Term> -> unit
abstract IncrementTerm : unit -> unit
abstract Vote : int<ServerId> -> unit
@@ -24,7 +24,7 @@ type IPersistentState<'a> =
type InMemoryPersistentState<'a> () =
let mutable currentTerm = 0
let mutable votedFor : int<ServerId> option = None
let log = ResizeArray<'a * int<Term>> ()
let log = ResizeArray<LogEntry<'a> * int<Term>> ()
member this.GetLog () = log |> List.ofSeq
@@ -45,7 +45,7 @@ type InMemoryPersistentState<'a> () =
currentTerm <- term / 1<Term>
votedFor <- None
member this.AppendToLog entry term = log.Add (entry, term)
member this.AppendToLog (entry : LogEntry<'a>) term = log.Add (entry, term)
member this.TruncateLog position =
let position = position / 1<LogIndex>
@@ -54,7 +54,7 @@ type InMemoryPersistentState<'a> () =
let position = if position < 0 then 0 else position
log.RemoveRange (position, log.Count - position)
member this.GetLastLogEntry () : ('a * LogEntry) option =
member this.GetLastLogEntry () : (LogEntry<'a> * LogEntryMetadata) option =
if log.Count = 0 then
None
else

View File

@@ -1,7 +1,7 @@
namespace Raft
/// Server state which need not survive a server crash.
type VolatileState =
type VolatileState<'a> =
{
/// The index of the highest log entry we know is persisted to a majority of the cluster.
// Why is it correct for this to be volatile?
@@ -9,13 +9,16 @@ type VolatileState =
// we *don't* know that any of our log is reflected in the other nodes.
// (We'll soon learn a better value of CommitIndex as we start receiving messages again.)
CommitIndex : int<LogIndex>
/// TODO: do this, and model applying to state machine
LastApplied : int<LogIndex>
Clients : Map<int<ClientId>, Map<int<ClientSequence>, 'a>>
}
static member New : VolatileState =
static member New : VolatileState<'a> =
{
CommitIndex = 0<LogIndex>
LastApplied = 0<LogIndex>
Clients = Map.empty
}
type LeaderState =
@@ -30,7 +33,7 @@ type LeaderState =
MatchIndex : int<LogIndex> array
}
static member New (clusterSize : int) (currentIndex : int<LogIndex>) : LeaderState =
static member New<'a> (clusterSize : int) (currentIndex : int<LogIndex>) : LeaderState =
{
// +1, because these are indexed from 1.
ToSend = Array.create clusterSize (currentIndex + 1<LogIndex>)
@@ -71,7 +74,7 @@ type RequestVoteMessage =
{
CandidateTerm : int<Term>
CandidateId : int<ServerId>
CandidateLastLogEntry : LogEntry option
CandidateLastLogEntry : LogEntryMetadata option
ReplyChannel : RequestVoteReply -> unit
}
@@ -109,11 +112,11 @@ type AppendEntriesMessage<'a> =
/// I am your leader! This is me! (so everyone knows where to send clients to)
LeaderId : int<ServerId>
/// The entry immediately preceding the entry I'm sending you, so you can tell if we've got out of sync.
PrevLogEntry : LogEntry option
PrevLogEntry : LogEntryMetadata option
/// Followers, append this entry to your log. (Or, if None, this is just a heartbeat.)
/// It was determined at the given term - recall that we might need to bring a restarted node up to speed
/// with what happened during terms that took place while it was down.
NewEntry : ('a * int<Term>) option
NewEntry : (LogEntry<'a> * int<Term>) option
LeaderCommitIndex : int<LogIndex>
/// TODO - we don't need this, the responder should just construct
/// the appropriate Message and send it themselves
@@ -133,13 +136,27 @@ type AppendEntriesMessage<'a> =
this.LeaderTerm
this.LeaderCommitIndex
type SerialisedLogEntry<'a> =
| SerialisedClientEntry of 'a * int<ClientId> * int<ClientSequence>
| SerialisedClientRegister
static member Make (entry : LogEntry<'a>) : SerialisedLogEntry<'a> =
match entry with
| ClientEntry (a, client, sequence, _) -> SerialisedClientEntry (a, client, sequence)
| RaftOverhead (NewClientRegistered _) -> SerialisedClientRegister
override this.ToString () =
match this with
| SerialisedClientRegister -> "<client registration>"
| SerialisedClientEntry (data, client, sequence) -> sprintf "Client %i (%i) puts data: %O" client sequence data
/// A readout of the server's internal state, suitable for e.g. debugging tools.
type ServerInternalState<'a> =
{
LogIndex : int<LogIndex>
CurrentTerm : int<Term>
CurrentVote : int<ServerId> option
Log : ('a * int<Term>) option list
Log : (SerialisedLogEntry<'a> * int<Term>) option list
/// A clone of the leader state, if this is a leader.
LeaderState : LeaderState option
}
@@ -182,11 +199,15 @@ type Reply =
| RequestVoteReply v -> v.ToString ()
| AppendEntriesReply r -> r.ToString ()
type ClientRequest<'a> =
| RegisterClient of (RegisterClientResponse -> unit)
| ClientRequest of int<ClientId> * int<ClientSequence> * 'a * (ClientResponse -> unit)
[<RequireQualifiedAccess>]
type Message<'a> =
| Instruction of Instruction<'a>
| Reply of Reply
| ClientRequest of 'a
| ClientRequest of ClientRequest<'a>
override this.ToString () =
match this with
@@ -270,7 +291,7 @@ type Server<'a>
messageChannel : int<ServerId> -> Message<'a> -> unit
)
=
let mutable volatileState = VolatileState.New
let mutable volatileState = VolatileState<'a>.New
let mutable currentType =
ServerSpecialisation.Follower
@@ -567,11 +588,51 @@ type Server<'a>
| Some (_, term) -> term
if ourLogTerm = persistentState.CurrentTerm then
let oldCommitIndex = volatileState.CommitIndex
volatileState <-
{ volatileState with
CommitIndex = maxLogAQuorumHasCommitted
}
for i in (oldCommitIndex / 1<LogIndex> + 1) .. maxLogAQuorumHasCommitted / 1<LogIndex> do
let i = i * 1<LogIndex>
match persistentState.GetLogEntry i with
| None ->
failwith "Invariant violated. Leader does not have a log entry for a committed index."
| Some (logEntry, _term) ->
match logEntry with
| LogEntry.ClientEntry (stored, client, sequence, replyChannel) ->
let newClients =
volatileState.Clients
|> Map.change
client
(fun messageLog ->
let messages =
match messageLog with
| None -> Map.empty
| Some messageLog -> messageLog
messages |> Map.change sequence (fun _ -> Some stored) |> Some
)
volatileState <-
{ volatileState with
Clients = newClients
}
replyChannel (ClientResponse.Success (client, sequence))
| LogEntry.RaftOverhead (NewClientRegistered replyChannel) ->
let clientId = i / 1<LogIndex> * 1<ClientId>
volatileState <-
{ volatileState with
Clients = volatileState.Clients |> Map.add clientId Map.empty
}
clientId |> RegisterClientResponse.Success |> replyChannel
| RequestVoteReply requestVoteReply ->
match currentType with
| ServerSpecialisation.Leader _
@@ -635,18 +696,34 @@ type Server<'a>
|> messageChannel (i * 1<ServerId>)
| ServerAction.Receive (Message.Instruction m) -> processMessage m
| ServerAction.Receive (Message.Reply r) -> processReply r
| ServerAction.Receive (Message.ClientRequest toAdd) ->
match currentType with
| ServerSpecialisation.Leader leaderState ->
persistentState.AppendToLog toAdd persistentState.CurrentTerm
//replyChannel ClientReply.Acknowledged
emitHeartbeat leaderState
| ServerSpecialisation.Follower followerState ->
//replyChannel (ClientReply.Redirect followerState.CurrentLeader)
()
| ServerSpecialisation.Candidate _ ->
//replyChannel ClientReply.Dropped
()
| ServerAction.Receive (Message.ClientRequest request) ->
match request with
| ClientRequest.RegisterClient replyChannel ->
match currentType with
| ServerSpecialisation.Follower followerState ->
replyChannel (RegisterClientResponse.NotLeader followerState.CurrentLeader)
| ServerSpecialisation.Candidate _ -> replyChannel (RegisterClientResponse.NotLeader None)
| ServerSpecialisation.Leader leaderState ->
persistentState.AppendToLog
(RaftOverhead (NewClientRegistered replyChannel))
persistentState.CurrentTerm
leaderState.MatchIndex.[me / 1<ServerId>] <- persistentState.CurrentLogIndex
emitHeartbeat leaderState
| ClientRequest.ClientRequest (client, sequenceNumber, toAdd, replyChannel) ->
match currentType with
| ServerSpecialisation.Leader leaderState ->
persistentState.AppendToLog
(LogEntry.ClientEntry (toAdd, client, sequenceNumber, replyChannel))
persistentState.CurrentTerm
leaderState.MatchIndex.[me / 1<ServerId>] <- persistentState.CurrentLogIndex
emitHeartbeat leaderState
| ServerSpecialisation.Follower followerState ->
replyChannel (ClientResponse.NotLeader followerState.CurrentLeader)
| ServerSpecialisation.Candidate _ -> replyChannel (ClientResponse.NotLeader None)
| ServerAction.Sync replyChannel -> replyChannel.Reply ()
| ServerAction.StateReadout replyChannel ->
{
@@ -659,7 +736,11 @@ type Server<'a>
| Some (_, last) ->
List.init
(last.Index / 1<LogIndex>)
(fun index -> persistentState.GetLogEntry (1<LogIndex> + index * 1<LogIndex>))
(fun index ->
match persistentState.GetLogEntry (1<LogIndex> + index * 1<LogIndex>) with
| None -> None
| Some (entry, term) -> (SerialisedLogEntry.Make entry, term) |> Some
)
LeaderState =
match currentType with
| ServerSpecialisation.Leader state -> state.Clone () |> Some
@@ -676,7 +757,7 @@ type Server<'a>
#endif
mailbox
member this.SendClientRequest (request : 'a) =
member this.SendClientRequest (request : ClientRequest<'a>) =
mailbox.Post (ServerAction.Receive (Message.ClientRequest request))
member this.TriggerInactivityTimeout () = mailbox.Post ServerAction.BeginElection