Add client interaction and Fable support
This commit is contained in:
2
.gitignore
vendored
2
.gitignore
vendored
@@ -6,3 +6,5 @@ riderModule.iml
|
|||||||
.idea/
|
.idea/
|
||||||
*.user
|
*.user
|
||||||
*.DotSettings
|
*.DotSettings
|
||||||
|
# Fable output
|
||||||
|
*.fs.js
|
||||||
|
@@ -44,7 +44,7 @@ type Network<'a> =
|
|||||||
member this.InboundMessage (i : int<ServerId>) (id : int) : Message<'a> =
|
member this.InboundMessage (i : int<ServerId>) (id : int) : Message<'a> =
|
||||||
this.CompleteMessageHistory.[i / 1<ServerId>].[id]
|
this.CompleteMessageHistory.[i / 1<ServerId>].[id]
|
||||||
|
|
||||||
member this.DropMessage (i : int<ServerId>) (id : int) =
|
member this.DropMessage (i : int<ServerId>) (id : int) : unit =
|
||||||
this.MessagesDelivered.[i / 1<ServerId>].Add id |> ignore
|
this.MessagesDelivered.[i / 1<ServerId>].Add id |> ignore
|
||||||
|
|
||||||
member this.UndeliveredMessages (i : int<ServerId>) : (int * Message<'a>) list =
|
member this.UndeliveredMessages (i : int<ServerId>) : (int * Message<'a>) list =
|
||||||
|
@@ -32,7 +32,11 @@ type InMemoryPersistentState<'a> () =
|
|||||||
member this.CurrentTerm = currentTerm * 1<Term>
|
member this.CurrentTerm = currentTerm * 1<Term>
|
||||||
|
|
||||||
member this.IncrementTerm () =
|
member this.IncrementTerm () =
|
||||||
|
#if FABLE_COMPILER
|
||||||
|
currentTerm <- currentTerm + 1
|
||||||
|
#else
|
||||||
Interlocked.Increment ¤tTerm |> ignore
|
Interlocked.Increment ¤tTerm |> ignore
|
||||||
|
#endif
|
||||||
|
|
||||||
member this.VotedFor = votedFor
|
member this.VotedFor = votedFor
|
||||||
member this.Vote id = votedFor <- Some id
|
member this.Vote id = votedFor <- Some id
|
||||||
|
@@ -161,10 +161,17 @@ type private CandidateState =
|
|||||||
Votes = votes
|
Votes = votes
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type private FollowerState =
|
||||||
|
{
|
||||||
|
/// This is certainly not canonical; it's just a hint so that we can
|
||||||
|
/// redirect clients to the right person.
|
||||||
|
CurrentLeader : int<ServerId> option
|
||||||
|
}
|
||||||
|
|
||||||
[<RequireQualifiedAccess>]
|
[<RequireQualifiedAccess>]
|
||||||
type private ServerSpecialisation =
|
type private ServerSpecialisation =
|
||||||
| Leader of LeaderState
|
| Leader of LeaderState
|
||||||
| Follower
|
| Follower of FollowerState
|
||||||
| Candidate of CandidateState
|
| Candidate of CandidateState
|
||||||
|
|
||||||
type ServerStatus =
|
type ServerStatus =
|
||||||
@@ -178,9 +185,25 @@ type ServerStatus =
|
|||||||
| Candidate term -> sprintf "Candidate in term %i" term
|
| Candidate term -> sprintf "Candidate in term %i" term
|
||||||
| Follower -> "Follower"
|
| Follower -> "Follower"
|
||||||
|
|
||||||
|
type ClientReply =
|
||||||
|
/// You asked a node that isn't the leader. Here's a hint about whom you should ask instead.
|
||||||
|
/// The hint may not be accurate even as of the time when we reply, and certainly it may not be
|
||||||
|
/// accurate as of the time *you* receive this message.
|
||||||
|
/// (Note also that an unreliable network could in principle deliver your original request
|
||||||
|
/// again at some point, so this is not a guarantee that your message will never be committed.)
|
||||||
|
| Redirect of int<ServerId> option
|
||||||
|
/// The cluster was not in a good enough state to process your request at this time.
|
||||||
|
/// (Note, though, that an unreliable network could in principle mean that your
|
||||||
|
/// original request gets delivered again at some point, so this is not a guarantee
|
||||||
|
/// that your message will never be committed.)
|
||||||
|
| Dropped
|
||||||
|
/// The cluster acknowledges your request. At some future time, it may be committed.
|
||||||
|
| Acknowledged
|
||||||
|
|
||||||
type private ServerAction<'a> =
|
type private ServerAction<'a> =
|
||||||
| BeginElection
|
| BeginElection
|
||||||
| EmitHeartbeat
|
| EmitHeartbeat
|
||||||
|
| ClientRequest of 'a * (ClientReply -> unit)
|
||||||
| Receive of Message<'a>
|
| Receive of Message<'a>
|
||||||
| Sync of AsyncReplyChannel<unit>
|
| Sync of AsyncReplyChannel<unit>
|
||||||
|
|
||||||
@@ -216,14 +239,24 @@ type Server<'a>
|
|||||||
)
|
)
|
||||||
=
|
=
|
||||||
let mutable volatileState = VolatileState.New
|
let mutable volatileState = VolatileState.New
|
||||||
let mutable currentType = ServerSpecialisation.Follower
|
|
||||||
|
let mutable currentType =
|
||||||
|
ServerSpecialisation.Follower
|
||||||
|
{
|
||||||
|
CurrentLeader = None
|
||||||
|
}
|
||||||
|
|
||||||
let processMessage (message : Instruction<'a>) : unit =
|
let processMessage (message : Instruction<'a>) : unit =
|
||||||
// First, see if this message comes from a future term.
|
// First, see if this message comes from a future term.
|
||||||
// (This is `UpdateTerm` from the TLA+.)
|
// (This is `UpdateTerm` from the TLA+.)
|
||||||
if message.Term > persistentState.CurrentTerm then
|
if message.Term > persistentState.CurrentTerm then
|
||||||
// We're definitely out of date. Switch to follower mode.
|
// We're definitely out of date. Switch to follower mode.
|
||||||
currentType <- ServerSpecialisation.Follower
|
currentType <-
|
||||||
|
ServerSpecialisation.Follower
|
||||||
|
{
|
||||||
|
CurrentLeader = None
|
||||||
|
}
|
||||||
|
|
||||||
persistentState.AdvanceToTerm message.Term
|
persistentState.AdvanceToTerm message.Term
|
||||||
// TODO - `DropStaleResponse` suggests we should do this
|
// TODO - `DropStaleResponse` suggests we should do this
|
||||||
//elif message.Term < persistentState.CurrentTerm then
|
//elif message.Term < persistentState.CurrentTerm then
|
||||||
@@ -333,7 +366,9 @@ type Server<'a>
|
|||||||
|> message.ReplyChannel
|
|> message.ReplyChannel
|
||||||
|
|
||||||
let acceptRequest () : unit =
|
let acceptRequest () : unit =
|
||||||
assert (currentType = ServerSpecialisation.Follower)
|
match currentType with
|
||||||
|
| ServerSpecialisation.Follower _ -> ()
|
||||||
|
| _ -> failwith "Invariant violation. A non-follower attempted to accept a leader request."
|
||||||
|
|
||||||
match message.NewEntry with
|
match message.NewEntry with
|
||||||
| None -> heartbeat message
|
| None -> heartbeat message
|
||||||
@@ -406,7 +441,7 @@ type Server<'a>
|
|||||||
|> String.concat "\n"
|
|> String.concat "\n"
|
||||||
|> failwithf "%s"
|
|> failwithf "%s"
|
||||||
|
|
||||||
| ServerSpecialisation.Follower ->
|
| ServerSpecialisation.Follower _ ->
|
||||||
if not (logIsConsistent message) then
|
if not (logIsConsistent message) then
|
||||||
// Reject the request, it's inconsistent with our history.
|
// Reject the request, it's inconsistent with our history.
|
||||||
{
|
{
|
||||||
@@ -422,7 +457,11 @@ type Server<'a>
|
|||||||
| ServerSpecialisation.Candidate _ ->
|
| ServerSpecialisation.Candidate _ ->
|
||||||
// We've already verified that the message was sent from a leader in the current term, so we have
|
// We've already verified that the message was sent from a leader in the current term, so we have
|
||||||
// lost the election.
|
// lost the election.
|
||||||
currentType <- ServerSpecialisation.Follower
|
currentType <-
|
||||||
|
ServerSpecialisation.Follower
|
||||||
|
{
|
||||||
|
CurrentLeader = Some message.LeaderId
|
||||||
|
}
|
||||||
// TODO: why does this assertion hold?
|
// TODO: why does this assertion hold?
|
||||||
assert (logIsConsistent message)
|
assert (logIsConsistent message)
|
||||||
acceptRequest ()
|
acceptRequest ()
|
||||||
@@ -433,7 +472,7 @@ type Server<'a>
|
|||||||
let emitHeartbeat () =
|
let emitHeartbeat () =
|
||||||
match currentType with
|
match currentType with
|
||||||
| ServerSpecialisation.Candidate _
|
| ServerSpecialisation.Candidate _
|
||||||
| ServerSpecialisation.Follower -> ()
|
| ServerSpecialisation.Follower _ -> ()
|
||||||
| ServerSpecialisation.Leader _ ->
|
| ServerSpecialisation.Leader _ ->
|
||||||
for i in 0 .. clusterSize - 1 do
|
for i in 0 .. clusterSize - 1 do
|
||||||
if i * 1<ServerId> <> me then
|
if i * 1<ServerId> <> me then
|
||||||
@@ -455,7 +494,7 @@ type Server<'a>
|
|||||||
| AppendEntriesReply appendEntriesReply ->
|
| AppendEntriesReply appendEntriesReply ->
|
||||||
match currentType with
|
match currentType with
|
||||||
| ServerSpecialisation.Candidate _
|
| ServerSpecialisation.Candidate _
|
||||||
| ServerSpecialisation.Follower -> ()
|
| ServerSpecialisation.Follower _ -> ()
|
||||||
| ServerSpecialisation.Leader leaderState ->
|
| ServerSpecialisation.Leader leaderState ->
|
||||||
|
|
||||||
if appendEntriesReply.FollowerTerm = persistentState.CurrentTerm then
|
if appendEntriesReply.FollowerTerm = persistentState.CurrentTerm then
|
||||||
@@ -491,7 +530,7 @@ type Server<'a>
|
|||||||
| RequestVoteReply requestVoteReply ->
|
| RequestVoteReply requestVoteReply ->
|
||||||
match currentType with
|
match currentType with
|
||||||
| ServerSpecialisation.Leader _
|
| ServerSpecialisation.Leader _
|
||||||
| ServerSpecialisation.Follower ->
|
| ServerSpecialisation.Follower _ ->
|
||||||
// We're not expecting any votes; drop them.
|
// We're not expecting any votes; drop them.
|
||||||
()
|
()
|
||||||
| ServerSpecialisation.Candidate state ->
|
| ServerSpecialisation.Candidate state ->
|
||||||
@@ -523,11 +562,19 @@ type Server<'a>
|
|||||||
|
|
||||||
match m with
|
match m with
|
||||||
| ServerAction.EmitHeartbeat -> emitHeartbeat ()
|
| ServerAction.EmitHeartbeat -> emitHeartbeat ()
|
||||||
|
| ServerAction.ClientRequest (toAdd, replyChannel) ->
|
||||||
|
match currentType with
|
||||||
|
| ServerSpecialisation.Leader _ ->
|
||||||
|
persistentState.AppendToLog toAdd persistentState.CurrentTerm
|
||||||
|
replyChannel ClientReply.Acknowledged
|
||||||
|
| ServerSpecialisation.Follower followerState ->
|
||||||
|
replyChannel (ClientReply.Redirect followerState.CurrentLeader)
|
||||||
|
| ServerSpecialisation.Candidate _ -> replyChannel ClientReply.Dropped
|
||||||
| ServerAction.BeginElection ->
|
| ServerAction.BeginElection ->
|
||||||
match currentType with
|
match currentType with
|
||||||
| ServerSpecialisation.Leader _ -> ()
|
| ServerSpecialisation.Leader _ -> ()
|
||||||
| ServerSpecialisation.Candidate _
|
| ServerSpecialisation.Candidate _
|
||||||
| ServerSpecialisation.Follower ->
|
| ServerSpecialisation.Follower _ ->
|
||||||
|
|
||||||
// Start the election!
|
// Start the election!
|
||||||
currentType <- ServerSpecialisation.Candidate (CandidateState.New clusterSize me)
|
currentType <- ServerSpecialisation.Candidate (CandidateState.New clusterSize me)
|
||||||
@@ -553,7 +600,9 @@ type Server<'a>
|
|||||||
}
|
}
|
||||||
|
|
||||||
let mailbox = loop |> MailboxProcessor.Start
|
let mailbox = loop |> MailboxProcessor.Start
|
||||||
|
#if !FABLE_COMPILER
|
||||||
mailbox.Error.Add raise
|
mailbox.Error.Add raise
|
||||||
|
#endif
|
||||||
mailbox
|
mailbox
|
||||||
|
|
||||||
member this.TriggerInactivityTimeout () = mailbox.Post ServerAction.BeginElection
|
member this.TriggerInactivityTimeout () = mailbox.Post ServerAction.BeginElection
|
||||||
@@ -561,10 +610,19 @@ type Server<'a>
|
|||||||
|
|
||||||
member this.Message (m : Message<'a>) = mailbox.Post (ServerAction.Receive m)
|
member this.Message (m : Message<'a>) = mailbox.Post (ServerAction.Receive m)
|
||||||
|
|
||||||
member this.Sync () = mailbox.PostAndReply ServerAction.Sync
|
member this.Sync () =
|
||||||
|
// This rather eccentric phrasing is so that Fable can run this mailbox.
|
||||||
|
// (Fable does not support `mailbox.PostAndReply`, nor does it support
|
||||||
|
// `Async.RunSynchronously`.)
|
||||||
|
mailbox.PostAndAsyncReply ServerAction.Sync
|
||||||
|
#if FABLE_COMPILER
|
||||||
|
|> Async.StartImmediate
|
||||||
|
#else
|
||||||
|
|> Async.RunSynchronously
|
||||||
|
#endif
|
||||||
|
|
||||||
member this.State =
|
member this.State =
|
||||||
match currentType with
|
match currentType with
|
||||||
| ServerSpecialisation.Leader _ -> ServerStatus.Leader persistentState.CurrentTerm
|
| ServerSpecialisation.Leader _ -> ServerStatus.Leader persistentState.CurrentTerm
|
||||||
| ServerSpecialisation.Candidate _ -> ServerStatus.Candidate persistentState.CurrentTerm
|
| ServerSpecialisation.Candidate _ -> ServerStatus.Candidate persistentState.CurrentTerm
|
||||||
| ServerSpecialisation.Follower -> ServerStatus.Follower
|
| ServerSpecialisation.Follower _ -> ServerStatus.Follower
|
||||||
|
Reference in New Issue
Block a user