Fable app
This commit is contained in:
@@ -85,16 +85,17 @@ module InMemoryCluster =
|
||||
|
||||
cluster, network
|
||||
|
||||
type NetworkAction =
|
||||
type NetworkAction<'a> =
|
||||
| InactivityTimeout of int<ServerId>
|
||||
| NetworkMessage of int<ServerId> * int
|
||||
| DropMessage of int<ServerId> * int
|
||||
| ClientRequest of int<ServerId> * 'a * (ClientReply -> unit)
|
||||
| Heartbeat of int<ServerId>
|
||||
|
||||
[<RequireQualifiedAccess>]
|
||||
module NetworkAction =
|
||||
|
||||
let perform<'a> (cluster : Cluster<'a>) (network : Network<'a>) (action : NetworkAction) : unit =
|
||||
let perform<'a> (cluster : Cluster<'a>) (network : Network<'a>) (action : NetworkAction<'a>) : unit =
|
||||
match action with
|
||||
| InactivityTimeout serverId -> cluster.InactivityTimeout serverId
|
||||
| Heartbeat serverId -> cluster.HeartbeatTimeout serverId
|
||||
@@ -102,3 +103,5 @@ module NetworkAction =
|
||||
| NetworkMessage (serverId, messageId) ->
|
||||
network.InboundMessage serverId messageId |> cluster.SendMessage serverId
|
||||
network.DropMessage serverId messageId
|
||||
| ClientRequest (server, request, replyChannel) ->
|
||||
Message.ClientRequest (request, replyChannel) |> cluster.SendMessage server
|
||||
|
@@ -115,6 +115,21 @@ type AppendEntriesMessage<'a> =
|
||||
this.LeaderTerm
|
||||
this.LeaderCommitIndex
|
||||
|
||||
type ClientReply =
|
||||
/// You asked a node that isn't the leader. Here's a hint about whom you should ask instead.
|
||||
/// The hint may not be accurate even as of the time when we reply, and certainly it may not be
|
||||
/// accurate as of the time *you* receive this message.
|
||||
/// (Note also that an unreliable network could in principle deliver your original request
|
||||
/// again at some point, so this is not a guarantee that your message will never be committed.)
|
||||
| Redirect of int<ServerId> option
|
||||
/// The cluster was not in a good enough state to process your request at this time.
|
||||
/// (Note, though, that an unreliable network could in principle mean that your
|
||||
/// original request gets delivered again at some point, so this is not a guarantee
|
||||
/// that your message will never be committed.)
|
||||
| Dropped
|
||||
/// The cluster acknowledges your request. At some future time, it may be committed.
|
||||
| Acknowledged
|
||||
|
||||
type Instruction<'a> =
|
||||
| AppendEntries of AppendEntriesMessage<'a>
|
||||
| RequestVote of RequestVoteMessage
|
||||
@@ -141,11 +156,13 @@ type Reply =
|
||||
type Message<'a> =
|
||||
| Instruction of Instruction<'a>
|
||||
| Reply of Reply
|
||||
| ClientRequest of 'a * (ClientReply -> unit)
|
||||
|
||||
override this.ToString () =
|
||||
match this with
|
||||
| Instruction i -> i.ToString ()
|
||||
| Reply r -> r.ToString ()
|
||||
| ClientRequest (a, _) -> sprintf "Client requested insertion of: %O" a
|
||||
|
||||
type private CandidateState =
|
||||
{
|
||||
@@ -185,25 +202,9 @@ type ServerStatus =
|
||||
| Candidate term -> sprintf "Candidate in term %i" term
|
||||
| Follower -> "Follower"
|
||||
|
||||
type ClientReply =
|
||||
/// You asked a node that isn't the leader. Here's a hint about whom you should ask instead.
|
||||
/// The hint may not be accurate even as of the time when we reply, and certainly it may not be
|
||||
/// accurate as of the time *you* receive this message.
|
||||
/// (Note also that an unreliable network could in principle deliver your original request
|
||||
/// again at some point, so this is not a guarantee that your message will never be committed.)
|
||||
| Redirect of int<ServerId> option
|
||||
/// The cluster was not in a good enough state to process your request at this time.
|
||||
/// (Note, though, that an unreliable network could in principle mean that your
|
||||
/// original request gets delivered again at some point, so this is not a guarantee
|
||||
/// that your message will never be committed.)
|
||||
| Dropped
|
||||
/// The cluster acknowledges your request. At some future time, it may be committed.
|
||||
| Acknowledged
|
||||
|
||||
type private ServerAction<'a> =
|
||||
| BeginElection
|
||||
| EmitHeartbeat
|
||||
| ClientRequest of 'a * (ClientReply -> unit)
|
||||
| Receive of Message<'a>
|
||||
| Sync of AsyncReplyChannel<unit>
|
||||
|
||||
@@ -562,14 +563,6 @@ type Server<'a>
|
||||
|
||||
match m with
|
||||
| ServerAction.EmitHeartbeat -> emitHeartbeat ()
|
||||
| ServerAction.ClientRequest (toAdd, replyChannel) ->
|
||||
match currentType with
|
||||
| ServerSpecialisation.Leader _ ->
|
||||
persistentState.AppendToLog toAdd persistentState.CurrentTerm
|
||||
replyChannel ClientReply.Acknowledged
|
||||
| ServerSpecialisation.Follower followerState ->
|
||||
replyChannel (ClientReply.Redirect followerState.CurrentLeader)
|
||||
| ServerSpecialisation.Candidate _ -> replyChannel ClientReply.Dropped
|
||||
| ServerAction.BeginElection ->
|
||||
match currentType with
|
||||
| ServerSpecialisation.Leader _ -> ()
|
||||
@@ -594,6 +587,14 @@ type Server<'a>
|
||||
|> messageChannel (i * 1<ServerId>)
|
||||
| ServerAction.Receive (Instruction m) -> processMessage m
|
||||
| ServerAction.Receive (Reply r) -> processReply r
|
||||
| ServerAction.Receive (Message.ClientRequest (toAdd, replyChannel)) ->
|
||||
match currentType with
|
||||
| ServerSpecialisation.Leader _ ->
|
||||
persistentState.AppendToLog toAdd persistentState.CurrentTerm
|
||||
replyChannel ClientReply.Acknowledged
|
||||
| ServerSpecialisation.Follower followerState ->
|
||||
replyChannel (ClientReply.Redirect followerState.CurrentLeader)
|
||||
| ServerSpecialisation.Candidate _ -> replyChannel ClientReply.Dropped
|
||||
| ServerAction.Sync replyChannel -> replyChannel.Reply ()
|
||||
|
||||
return! loop mailbox
|
||||
@@ -605,6 +606,9 @@ type Server<'a>
|
||||
#endif
|
||||
mailbox
|
||||
|
||||
member this.SendClientRequest (request : 'a) (reply : ClientReply -> unit) =
|
||||
mailbox.Post (ServerAction.Receive (Message.ClientRequest (request, reply)))
|
||||
|
||||
member this.TriggerInactivityTimeout () = mailbox.Post ServerAction.BeginElection
|
||||
member this.TriggerHeartbeatTimeout () = mailbox.Post ServerAction.EmitHeartbeat
|
||||
|
||||
|
Reference in New Issue
Block a user