Move to mailboxes

This commit is contained in:
Smaug123
2022-10-25 21:29:09 +01:00
parent 8b30ea45b8
commit 622280fb01
2 changed files with 104 additions and 86 deletions

View File

@@ -10,7 +10,7 @@ module TestServer =
[<Test>]
let foo () =
let cluster = InMemoryCluster.make<int> 5
let cluster = InMemoryCluster.make<int> true 5
let logger, logs =
let logs = ResizeArray ()
@@ -21,8 +21,6 @@ module TestServer =
logLine, freezeLogs
let sendMessage = cluster.Servers.[0].OutboundMessageChannel
// Candidate 1 asks server 0 to vote for it.
{
@@ -32,7 +30,7 @@ module TestServer =
CandidateLastLogEntry = 0<LogIndex>, 0<Term>
}
|> Message.RequestVote
|> sendMessage 0<ServerId>
|> cluster.SendMessage 0<ServerId>
logs () |> shouldEqual [ "Received message for term 0" ]
@@ -45,7 +43,7 @@ module TestServer =
CandidateLastLogEntry = 0<LogIndex>, 0<Term>
}
|> Message.RequestVote
|> sendMessage 0<ServerId>
|> cluster.SendMessage 0<ServerId>
logs ()
|> shouldEqual [ "Received message for term 0" ; "Received message for term 0" ]
@@ -61,6 +59,6 @@ module TestServer =
CandidateLastLogEntry = 0<LogIndex>, 0<Term>
}
|> Message.RequestVote
|> sendMessage 0<ServerId>
|> cluster.SendMessage 0<ServerId>
calls.Value |> shouldEqual 0

View File

@@ -12,6 +12,12 @@ type VolatileState =
LastApplied : int<LogIndex>
}
static member New : VolatileState =
{
CommitIndex = 0<LogIndex>
LastApplied = 0<LogIndex>
}
type LeaderState =
{
/// For each server, index of the next log entry to send to that server
@@ -86,55 +92,33 @@ type ServerSpecialisation =
| Follower
| Candidate
type Server<'a> =
{
mutable VolatileState : VolatileState
PersistentState : IPersistentState<'a>
mutable Type : ServerSpecialisation
Timeout : unit -> unit
OutboundMessageChannel : int<ServerId> -> Message<'a> -> unit
}
type ServerAction<'a> =
| BeginElection
| Receive of Message<'a>
| Sync of AsyncReplyChannel<unit>
[<RequireQualifiedAccess>]
module Server =
type Server<'a>
(
clusterSize : int,
persistentState : IPersistentState<'a>,
messageChannel : int<ServerId> -> Message<'a> -> unit
)
as this=
let mutable volatileState = VolatileState.New
let mutable currentType = Follower
let inline private getLogEntry<'a> (index : int<LogIndex>) (arr : 'a array) : 'a option =
arr |> Array.tryItem ((index - 1<LogIndex>) / 1<LogIndex>)
let inline private truncateLog<'a> (finalIndex : int<LogIndex>) (arr : 'a array) : 'a array =
arr |> Array.truncate (finalIndex / 1<LogIndex>)
let inline private replaceLog<'a> (index : int<LogIndex>) (elt : 'a) (arr : 'a array) : 'a array =
let toRet = Array.copy arr
toRet.[(index - 1<LogIndex>) / 1<LogIndex>] <- elt
toRet
let create<'a> (messageChannel : int<ServerId> -> Message<'a> -> unit) : Server<'a> =
{
OutboundMessageChannel = messageChannel
Type = Follower
VolatileState =
{
CommitIndex = 0<LogIndex>
LastApplied = 0<LogIndex>
}
PersistentState = InMemoryPersistentState ()
Timeout = fun () -> ()
}
/// Returns the new state of the same server.
let processMessage<'a> (message : Message<'a>) (s : Server<'a>) : unit =
let processMessage (message : Message<'a>) : unit =
// First, see if this message comes from a future term.
// (This is `UpdateTerm` from the TLA+.)
if message.Term > s.PersistentState.CurrentTerm then
if message.Term > persistentState.CurrentTerm then
// We're definitely out of date. Switch to follower mode.
s.Type <- Follower
s.PersistentState.AdvanceToTerm message.Term
currentType <- Follower
persistentState.AdvanceToTerm message.Term
match message with
| RequestVote message ->
// This was guaranteed above.
assert (message.CandidateTerm <= s.PersistentState.CurrentTerm)
assert (message.CandidateTerm <= persistentState.CurrentTerm)
// The following clauses define either condition under which we accept that the candidate has more data
// than we do, and so could be a more suitable leader than us.
@@ -144,7 +128,7 @@ module Server =
let messageSupersedesMe =
// Is the candidate advertising a later term than our last-persisted write was made at?
// (That would mean it's far in the future of us.)
match s.PersistentState.GetLastLogEntry () with
match persistentState.GetLastLogEntry () with
| Some (_, ourLastTerm) -> snd message.CandidateLastLogEntry > ourLastTerm
| None ->
// We have persisted no history at all!
@@ -152,10 +136,10 @@ module Server =
let messageExtendsMe =
// Do we agree what the current term is, is the candidate advertising a more advanced log than us?
match s.PersistentState.GetLastLogEntry () with
match persistentState.GetLastLogEntry () with
| Some (_, ourLastTerm) ->
snd message.CandidateLastLogEntry = ourLastTerm
&& fst message.CandidateLastLogEntry >= s.PersistentState.CurrentLogIndex
&& fst message.CandidateLastLogEntry >= persistentState.CurrentLogIndex
| None ->
// We've persisted no history; the candidate needs to also be at the start of history,
// or else we'd have already considered them in the `messageSupersedesMe` check.
@@ -169,10 +153,10 @@ module Server =
// (We can't rely on our own VotedFor property, because that may have been in a previous election.)
let shouldVoteFor =
if message.CandidateTerm = s.PersistentState.CurrentTerm && candidateIsAhead then
if message.CandidateTerm = persistentState.CurrentTerm && candidateIsAhead then
// We agree on which election we're taking part in, and moreover we agree that the candidate is
// suitable.
match s.PersistentState.VotedFor with
match persistentState.VotedFor with
| None ->
// We haven't voted in this election before.
true
@@ -194,24 +178,23 @@ module Server =
// getting a chance to persist.
// (Better for us to wrongly think we've voted than to wrongly think we've yet to vote. In the worst
// case we just end up not participating in an election.)
// TODO when this is made mutable etc: call Persist here
s.PersistentState.Vote message.CandidateId
persistentState.Vote message.CandidateId
{
VoterTerm = s.PersistentState.CurrentTerm
VoterTerm = persistentState.CurrentTerm
VoteGranted = true
}
|> message.ReplyChannel
| AppendEntries message ->
// This was guaranteed above.
assert (message.LeaderTerm <= s.PersistentState.CurrentTerm)
assert (message.LeaderTerm <= persistentState.CurrentTerm)
if message.LeaderTerm < s.PersistentState.CurrentTerm then
if message.LeaderTerm < persistentState.CurrentTerm then
// Reject the request: the "leader" is actually outdated, it was only a leader in the past.
{
FollowerTerm = s.PersistentState.CurrentTerm
FollowerTerm = persistentState.CurrentTerm
Success = false
}
|> message.ReplyChannel
@@ -219,45 +202,45 @@ module Server =
else
// This was guaranteed immediately above: we agree that the message is being sent by the current leader.
assert (message.LeaderTerm = s.PersistentState.CurrentTerm)
assert (message.LeaderTerm = persistentState.CurrentTerm)
let heartbeat (message : AppendEntriesMessage<'a>) (s : Server<'a>) : unit =
let heartbeat (message : AppendEntriesMessage<'a>) : unit =
// Just a heartbeat; no change to our log is required.
s.VolatileState <-
{ s.VolatileState with
volatileState <-
{ volatileState with
CommitIndex = message.LeaderCommitIndex
}
{
Success = true
FollowerTerm = s.PersistentState.CurrentTerm
FollowerTerm = persistentState.CurrentTerm
}
|> message.ReplyChannel
let acceptRequest (s : Server<'a>) : unit =
assert (s.Type = Follower)
let acceptRequest () : unit =
assert (currentType = Follower)
match message.NewEntry with
| None -> heartbeat message s
| None -> heartbeat message
| Some (toInsert, toInsertTerm) ->
let desiredLogInsertionPosition = message.PrevLogEntry.Index + 1<LogIndex>
match s.PersistentState.GetLogEntry desiredLogInsertionPosition with
match persistentState.GetLogEntry desiredLogInsertionPosition with
| Some (_, existingTerm) when toInsertTerm = existingTerm ->
// This is already persisted. Moreover, the value that we persisted came from the term we're
// currently processing, so in particular came from the same leader and hence won't conflict
// with what we received from that leader just now.
heartbeat message s
| Some (_, existingTerm) ->
heartbeat message
| Some _ ->
// The leader's message conflicts with what we persisted. Defer to the leader.
s.PersistentState.TruncateLog (desiredLogInsertionPosition - 1<LogIndex>)
s.PersistentState.AppendToLog toInsert toInsertTerm
persistentState.TruncateLog (desiredLogInsertionPosition - 1<LogIndex>)
persistentState.AppendToLog toInsert toInsertTerm
{
Success = true
FollowerTerm = s.PersistentState.CurrentTerm
FollowerTerm = persistentState.CurrentTerm
}
|> message.ReplyChannel
@@ -265,24 +248,24 @@ module Server =
// The leader knows what we've committed, so it won't try and give us anything further than
// the element immediately past our persisted log.
// TODO - why can't this be -1?
assert (desiredLogInsertionPosition = 1<LogIndex> + s.PersistentState.CurrentLogIndex)
assert (desiredLogInsertionPosition = 1<LogIndex> + persistentState.CurrentLogIndex)
// The leader's message is after our log. Append.
s.PersistentState.AppendToLog toInsert toInsertTerm
persistentState.AppendToLog toInsert toInsertTerm
{
Success = true
FollowerTerm = s.PersistentState.CurrentTerm
FollowerTerm = persistentState.CurrentTerm
}
|> message.ReplyChannel
let logIsConsistent (message : AppendEntriesMessage<'a>) (s : Server<'a>) : bool =
let logIsConsistent (message : AppendEntriesMessage<'a>) : bool =
if message.PrevLogEntry.Index = 0<LogIndex> then
// The leader advertises that they have no committed history, so certainly it's consistent with
// us.
true
else
match s.PersistentState.GetLogEntry message.PrevLogEntry.Index with
match persistentState.GetLogEntry message.PrevLogEntry.Index with
| None ->
// The leader's advertised commit is ahead of our history.
false
@@ -290,7 +273,7 @@ module Server =
// The leader's advertised commit is in our history; do we agree with it?
ourTermForThisEntry = message.PrevLogEntry.Term
match s.Type with
match currentType with
| Leader _ ->
[
"Violation of invariant."
@@ -303,47 +286,84 @@ module Server =
|> failwithf "%s"
| Follower ->
if not (logIsConsistent message s) then
if not (logIsConsistent message) then
// Reject the request, it's inconsistent with our history.
{
FollowerTerm = s.PersistentState.CurrentTerm
FollowerTerm = persistentState.CurrentTerm
Success = false
}
|> message.ReplyChannel
else
acceptRequest s
acceptRequest ()
| Candidate ->
// We've already verified that the message was sent from a leader in the current term, so we have
// lost the election.
s.Type <- Follower
currentType <- Follower
// TODO: why does this assertion hold?
assert (logIsConsistent message s)
acceptRequest s
assert (logIsConsistent message)
acceptRequest ()
let mailbox =
let rec loop (mailbox : MailboxProcessor<_>) =
async {
let! m = mailbox.Receive ()
match m with
| ServerAction.BeginElection -> return failwith "not yet implemented"
| ServerAction.Receive m -> return processMessage m
| ServerAction.Sync reply -> reply.Reply ()
return! loop mailbox
}
loop |> MailboxProcessor.Start
member this.TriggerTimeout () = mailbox.Post ServerAction.BeginElection
member this.Message (m : Message<'a>) = mailbox.Post (ServerAction.Receive m)
member this.Sync () = mailbox.PostAndReply ServerAction.Sync
// {
// ClusterSize : int
// mutable VolatileState : VolatileState
// PersistentState : IPersistentState<'a>
// mutable Type : ServerSpecialisation
// TriggerTimeout : unit -> unit
// Mailbox : MailboxProcessor<ServerAction<'a>>
// }
type Cluster<'a> =
internal
{
Servers : Server<'a> array
SendMessage : int<ServerId> -> Message<'a> -> unit
}
[<RequireQualifiedAccess>]
module InMemoryCluster =
[<RequiresExplicitTypeArguments>]
let make<'a> (count : int) : Cluster<'a> =
let make<'a> (immediateFlush : bool) (count : int) : Cluster<'a> =
let servers = Array.zeroCreate<Server<'a>> count
let locker = obj ()
let messageChannel (serverId : int<ServerId>) (message : Message<'a>) : unit =
lock locker (fun () -> Server.processMessage message servers.[serverId / 1<ServerId>])
lock locker (fun () -> servers.[serverId / 1<ServerId>].Message message)
for s in 0 .. servers.Length - 1 do
servers.[s] <- Server.create messageChannel
servers.[s] <- Server (count, InMemoryPersistentState (), messageChannel)
{
Servers = servers
SendMessage =
if immediateFlush then
fun i m ->
servers.[i / 1<ServerId>].Message m
servers.[i / 1<ServerId>].Sync ()
else
fun i -> servers.[i / 1<ServerId>].Message
}