diff --git a/.gitignore b/.gitignore index ee9ec4a..634a3a9 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,5 @@ riderModule.iml .idea/ *.user *.DotSettings +# Fable output +*.fs.js diff --git a/Raft/InMemory.fs b/Raft/InMemory.fs index 830b927..1436577 100644 --- a/Raft/InMemory.fs +++ b/Raft/InMemory.fs @@ -44,7 +44,7 @@ type Network<'a> = member this.InboundMessage (i : int) (id : int) : Message<'a> = this.CompleteMessageHistory.[i / 1].[id] - member this.DropMessage (i : int) (id : int) = + member this.DropMessage (i : int) (id : int) : unit = this.MessagesDelivered.[i / 1].Add id |> ignore member this.UndeliveredMessages (i : int) : (int * Message<'a>) list = diff --git a/Raft/PersistentState.fs b/Raft/PersistentState.fs index afa57e8..a0e5346 100644 --- a/Raft/PersistentState.fs +++ b/Raft/PersistentState.fs @@ -32,7 +32,11 @@ type InMemoryPersistentState<'a> () = member this.CurrentTerm = currentTerm * 1 member this.IncrementTerm () = +#if FABLE_COMPILER + currentTerm <- currentTerm + 1 +#else Interlocked.Increment ¤tTerm |> ignore +#endif member this.VotedFor = votedFor member this.Vote id = votedFor <- Some id diff --git a/Raft/Server.fs b/Raft/Server.fs index bbcdd2e..468c547 100644 --- a/Raft/Server.fs +++ b/Raft/Server.fs @@ -161,10 +161,17 @@ type private CandidateState = Votes = votes } +type private FollowerState = + { + /// This is certainly not canonical; it's just a hint so that we can + /// redirect clients to the right person. + CurrentLeader : int option + } + [] type private ServerSpecialisation = | Leader of LeaderState - | Follower + | Follower of FollowerState | Candidate of CandidateState type ServerStatus = @@ -178,9 +185,25 @@ type ServerStatus = | Candidate term -> sprintf "Candidate in term %i" term | Follower -> "Follower" +type ClientReply = + /// You asked a node that isn't the leader. Here's a hint about whom you should ask instead. + /// The hint may not be accurate even as of the time when we reply, and certainly it may not be + /// accurate as of the time *you* receive this message. + /// (Note also that an unreliable network could in principle deliver your original request + /// again at some point, so this is not a guarantee that your message will never be committed.) + | Redirect of int option + /// The cluster was not in a good enough state to process your request at this time. + /// (Note, though, that an unreliable network could in principle mean that your + /// original request gets delivered again at some point, so this is not a guarantee + /// that your message will never be committed.) + | Dropped + /// The cluster acknowledges your request. At some future time, it may be committed. + | Acknowledged + type private ServerAction<'a> = | BeginElection | EmitHeartbeat + | ClientRequest of 'a * (ClientReply -> unit) | Receive of Message<'a> | Sync of AsyncReplyChannel @@ -216,14 +239,24 @@ type Server<'a> ) = let mutable volatileState = VolatileState.New - let mutable currentType = ServerSpecialisation.Follower + + let mutable currentType = + ServerSpecialisation.Follower + { + CurrentLeader = None + } let processMessage (message : Instruction<'a>) : unit = // First, see if this message comes from a future term. // (This is `UpdateTerm` from the TLA+.) if message.Term > persistentState.CurrentTerm then // We're definitely out of date. Switch to follower mode. - currentType <- ServerSpecialisation.Follower + currentType <- + ServerSpecialisation.Follower + { + CurrentLeader = None + } + persistentState.AdvanceToTerm message.Term // TODO - `DropStaleResponse` suggests we should do this //elif message.Term < persistentState.CurrentTerm then @@ -333,7 +366,9 @@ type Server<'a> |> message.ReplyChannel let acceptRequest () : unit = - assert (currentType = ServerSpecialisation.Follower) + match currentType with + | ServerSpecialisation.Follower _ -> () + | _ -> failwith "Invariant violation. A non-follower attempted to accept a leader request." match message.NewEntry with | None -> heartbeat message @@ -406,7 +441,7 @@ type Server<'a> |> String.concat "\n" |> failwithf "%s" - | ServerSpecialisation.Follower -> + | ServerSpecialisation.Follower _ -> if not (logIsConsistent message) then // Reject the request, it's inconsistent with our history. { @@ -422,7 +457,11 @@ type Server<'a> | ServerSpecialisation.Candidate _ -> // We've already verified that the message was sent from a leader in the current term, so we have // lost the election. - currentType <- ServerSpecialisation.Follower + currentType <- + ServerSpecialisation.Follower + { + CurrentLeader = Some message.LeaderId + } // TODO: why does this assertion hold? assert (logIsConsistent message) acceptRequest () @@ -433,7 +472,7 @@ type Server<'a> let emitHeartbeat () = match currentType with | ServerSpecialisation.Candidate _ - | ServerSpecialisation.Follower -> () + | ServerSpecialisation.Follower _ -> () | ServerSpecialisation.Leader _ -> for i in 0 .. clusterSize - 1 do if i * 1 <> me then @@ -455,7 +494,7 @@ type Server<'a> | AppendEntriesReply appendEntriesReply -> match currentType with | ServerSpecialisation.Candidate _ - | ServerSpecialisation.Follower -> () + | ServerSpecialisation.Follower _ -> () | ServerSpecialisation.Leader leaderState -> if appendEntriesReply.FollowerTerm = persistentState.CurrentTerm then @@ -491,7 +530,7 @@ type Server<'a> | RequestVoteReply requestVoteReply -> match currentType with | ServerSpecialisation.Leader _ - | ServerSpecialisation.Follower -> + | ServerSpecialisation.Follower _ -> // We're not expecting any votes; drop them. () | ServerSpecialisation.Candidate state -> @@ -523,11 +562,19 @@ type Server<'a> match m with | ServerAction.EmitHeartbeat -> emitHeartbeat () + | ServerAction.ClientRequest (toAdd, replyChannel) -> + match currentType with + | ServerSpecialisation.Leader _ -> + persistentState.AppendToLog toAdd persistentState.CurrentTerm + replyChannel ClientReply.Acknowledged + | ServerSpecialisation.Follower followerState -> + replyChannel (ClientReply.Redirect followerState.CurrentLeader) + | ServerSpecialisation.Candidate _ -> replyChannel ClientReply.Dropped | ServerAction.BeginElection -> match currentType with | ServerSpecialisation.Leader _ -> () | ServerSpecialisation.Candidate _ - | ServerSpecialisation.Follower -> + | ServerSpecialisation.Follower _ -> // Start the election! currentType <- ServerSpecialisation.Candidate (CandidateState.New clusterSize me) @@ -553,7 +600,9 @@ type Server<'a> } let mailbox = loop |> MailboxProcessor.Start +#if !FABLE_COMPILER mailbox.Error.Add raise +#endif mailbox member this.TriggerInactivityTimeout () = mailbox.Post ServerAction.BeginElection @@ -561,10 +610,19 @@ type Server<'a> member this.Message (m : Message<'a>) = mailbox.Post (ServerAction.Receive m) - member this.Sync () = mailbox.PostAndReply ServerAction.Sync + member this.Sync () = + // This rather eccentric phrasing is so that Fable can run this mailbox. + // (Fable does not support `mailbox.PostAndReply`, nor does it support + // `Async.RunSynchronously`.) + mailbox.PostAndAsyncReply ServerAction.Sync +#if FABLE_COMPILER + |> Async.StartImmediate +#else + |> Async.RunSynchronously +#endif member this.State = match currentType with | ServerSpecialisation.Leader _ -> ServerStatus.Leader persistentState.CurrentTerm | ServerSpecialisation.Candidate _ -> ServerStatus.Candidate persistentState.CurrentTerm - | ServerSpecialisation.Follower -> ServerStatus.Follower + | ServerSpecialisation.Follower _ -> ServerStatus.Follower