Test that we elect a leader on startup

This commit is contained in:
Smaug123
2022-10-25 22:41:22 +01:00
parent 622280fb01
commit 72be3ebec0
6 changed files with 153 additions and 32 deletions

13
Raft.Test/Logger.fs Normal file
View File

@@ -0,0 +1,13 @@
namespace Raft.Test
[<RequireQualifiedAccess>]
module TestLogger =
let make () : (string -> unit) * (unit -> string list) =
let logs = ResizeArray ()
let logLine (s : string) = lock logs (fun () -> logs.Add s)
let freezeLogs () =
lock logs (fun () -> logs |> Seq.toList)
logLine, freezeLogs

View File

@@ -7,6 +7,7 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<Compile Include="Logger.fs" />
<Compile Include="TestServer.fs" /> <Compile Include="TestServer.fs" />
<Compile Include="TestInMemoryPersistentState.fs" /> <Compile Include="TestInMemoryPersistentState.fs" />
</ItemGroup> </ItemGroup>

View File

@@ -50,7 +50,7 @@ module TestInMemoryPersistentState =
| Some entry -> | Some entry ->
(uut :> IPersistentState<_>).TruncateLog truncate (uut :> IPersistentState<_>).TruncateLog truncate
(uut :> IPersistentState<_>).GetLastLogEntry () = Some entry (uut :> IPersistentState<_>).GetLastLogEntry () = Some (truncate, entry)
&& isPrefix (uut.GetLog ()) oldLog && isPrefix (uut.GetLog ()) oldLog
&& (uut :> IPersistentState<_>).CurrentLogIndex = truncate && (uut :> IPersistentState<_>).CurrentLogIndex = truncate

View File

@@ -9,17 +9,10 @@ open FsUnitTyped
module TestServer = module TestServer =
[<Test>] [<Test>]
let foo () = let ``Startup sequence, first fumbling steps`` () =
let cluster = InMemoryCluster.make<int> true 5 let cluster = InMemoryCluster.make<int> true 5
let logger, logs = let logger, logs = TestLogger.make ()
let logs = ResizeArray ()
let logLine (s : string) = lock logs (fun () -> logs.Add s)
let freezeLogs () =
lock logs (fun () -> logs |> Seq.toList)
logLine, freezeLogs
// Candidate 1 asks server 0 to vote for it. // Candidate 1 asks server 0 to vote for it.
@@ -62,3 +55,19 @@ module TestServer =
|> cluster.SendMessage 0<ServerId> |> cluster.SendMessage 0<ServerId>
calls.Value |> shouldEqual 0 calls.Value |> shouldEqual 0
[<Test>]
let ``Startup sequence in prod`` () =
let cluster = InMemoryCluster.make<int> false 5
cluster.Servers.[0].TriggerTimeout ()
cluster.Servers.[0].Sync ()
// We sent a message to every other server; process them.
for i in 1..4 do
cluster.Servers.[i].Sync ()
cluster.Servers.[0].State |> shouldEqual ServerStatus.Leader
for i in 1..4 do
cluster.Servers.[i].State |> shouldEqual ServerStatus.Follower

View File

@@ -1,5 +1,7 @@
namespace Raft namespace Raft
open System.Threading
type IPersistentState<'a> = type IPersistentState<'a> =
abstract CurrentTerm : int<Term> abstract CurrentTerm : int<Term>
/// If I know about an election in my CurrentTerm, who did I vote for during that election? /// If I know about an election in my CurrentTerm, who did I vote for during that election?
@@ -12,26 +14,31 @@ type IPersistentState<'a> =
abstract TruncateLog : int<LogIndex> -> unit abstract TruncateLog : int<LogIndex> -> unit
abstract GetLogEntry : int<LogIndex> -> ('a * int<Term>) option abstract GetLogEntry : int<LogIndex> -> ('a * int<Term>) option
abstract CurrentLogIndex : int<LogIndex> abstract CurrentLogIndex : int<LogIndex>
abstract GetLastLogEntry : unit -> ('a * int<Term>) option abstract GetLastLogEntry : unit -> (int<LogIndex> * ('a * int<Term>)) option
abstract AdvanceToTerm : int<Term> -> unit abstract AdvanceToTerm : int<Term> -> unit
abstract IncrementTerm : unit -> unit
abstract Vote : int<ServerId> -> unit abstract Vote : int<ServerId> -> unit
/// Server state which must survive a server crash. /// Server state which must survive a server crash.
[<Class>] [<Class>]
type InMemoryPersistentState<'a> () = type InMemoryPersistentState<'a> () =
let mutable currentTerm = 0<Term> let mutable currentTerm = 0
let mutable votedFor : int<ServerId> option = None let mutable votedFor : int<ServerId> option = None
let log = ResizeArray<'a * int<Term>> () let log = ResizeArray<'a * int<Term>> ()
member this.GetLog () = log |> List.ofSeq member this.GetLog () = log |> List.ofSeq
interface IPersistentState<'a> with interface IPersistentState<'a> with
member this.CurrentTerm = currentTerm member this.CurrentTerm = currentTerm * 1<Term>
member this.IncrementTerm () =
Interlocked.Increment &currentTerm |> ignore
member this.VotedFor = votedFor member this.VotedFor = votedFor
member this.Vote id = votedFor <- Some id member this.Vote id = votedFor <- Some id
member this.AdvanceToTerm term = member this.AdvanceToTerm term =
currentTerm <- term currentTerm <- term / 1<Term>
votedFor <- None votedFor <- None
member this.AppendToLog entry term = log.Add (entry, term) member this.AppendToLog entry term = log.Add (entry, term)
@@ -44,7 +51,10 @@ type InMemoryPersistentState<'a> () =
log.RemoveRange (position, log.Count - position) log.RemoveRange (position, log.Count - position)
member this.GetLastLogEntry () = member this.GetLastLogEntry () =
if log.Count = 0 then None else Some log.[log.Count - 1] if log.Count = 0 then
None
else
Some (log.Count * 1<LogIndex>, log.[log.Count - 1])
member this.GetLogEntry position = member this.GetLogEntry position =
let position = position / 1<LogIndex> let position = position / 1<LogIndex>

View File

@@ -26,9 +26,17 @@ type LeaderState =
MatchIndex : int array MatchIndex : int array
} }
static member New (clusterSize : int) (currentIndex : int<LogIndex>) : LeaderState =
{
NextIndex = Array.create clusterSize (currentIndex + 1<LogIndex>)
MatchIndex = Array.zeroCreate clusterSize
}
/// You asked me to vote for you to become leader. Here is my response. /// You asked me to vote for you to become leader. Here is my response.
type RequestVoteReply = type RequestVoteReply =
{ {
/// Me!
Voter : int<ServerId>
/// The term I think it is. /// The term I think it is.
VoterTerm : int<Term> VoterTerm : int<Term>
/// Whether I am happy for you to become leader. (For example, if my term is greater than yours, then you're /// Whether I am happy for you to become leader. (For example, if my term is greater than yours, then you're
@@ -87,32 +95,54 @@ type Message<'a> =
| AppendEntries m -> m.LeaderTerm | AppendEntries m -> m.LeaderTerm
| RequestVote m -> m.CandidateTerm | RequestVote m -> m.CandidateTerm
type ServerSpecialisation = type private CandidateState =
{
/// For each voter, the vote I received from them.
Votes : bool option array
}
static member New (count : int) (self : int<ServerId>) =
let votes = Array.zeroCreate<_> count
votes.[self / 1<ServerId>] <- Some true
{
Votes = votes
}
[<RequireQualifiedAccess>]
type private ServerSpecialisation =
| Leader of LeaderState | Leader of LeaderState
| Follower | Follower
| Candidate of CandidateState
type ServerStatus =
| Leader
| Follower
| Candidate | Candidate
type ServerAction<'a> = type private ServerAction<'a> =
| BeginElection | BeginElection
| Receive of Message<'a> | Receive of Message<'a>
| ReceiveReply of RequestVoteReply
| Sync of AsyncReplyChannel<unit> | Sync of AsyncReplyChannel<unit>
type Server<'a> type Server<'a>
( (
clusterSize : int, clusterSize : int,
me : int<ServerId>,
persistentState : IPersistentState<'a>, persistentState : IPersistentState<'a>,
messageChannel : int<ServerId> -> Message<'a> -> unit messageChannel : int<ServerId> -> Message<'a> -> unit
) )
as this= as this=
let mutable volatileState = VolatileState.New let mutable volatileState = VolatileState.New
let mutable currentType = Follower let mutable currentType = ServerSpecialisation.Follower
let processMessage (message : Message<'a>) : unit = let processMessage (message : Message<'a>) : unit =
// First, see if this message comes from a future term. // First, see if this message comes from a future term.
// (This is `UpdateTerm` from the TLA+.) // (This is `UpdateTerm` from the TLA+.)
if message.Term > persistentState.CurrentTerm then if message.Term > persistentState.CurrentTerm then
// We're definitely out of date. Switch to follower mode. // We're definitely out of date. Switch to follower mode.
currentType <- Follower currentType <- ServerSpecialisation.Follower
persistentState.AdvanceToTerm message.Term persistentState.AdvanceToTerm message.Term
match message with match message with
@@ -129,7 +159,7 @@ type Server<'a>
// Is the candidate advertising a later term than our last-persisted write was made at? // Is the candidate advertising a later term than our last-persisted write was made at?
// (That would mean it's far in the future of us.) // (That would mean it's far in the future of us.)
match persistentState.GetLastLogEntry () with match persistentState.GetLastLogEntry () with
| Some (_, ourLastTerm) -> snd message.CandidateLastLogEntry > ourLastTerm | Some (_, (_, ourLastTerm)) -> snd message.CandidateLastLogEntry > ourLastTerm
| None -> | None ->
// We have persisted no history at all! // We have persisted no history at all!
true true
@@ -137,7 +167,7 @@ type Server<'a>
let messageExtendsMe = let messageExtendsMe =
// Do we agree what the current term is, is the candidate advertising a more advanced log than us? // Do we agree what the current term is, is the candidate advertising a more advanced log than us?
match persistentState.GetLastLogEntry () with match persistentState.GetLastLogEntry () with
| Some (_, ourLastTerm) -> | Some (_, (_, ourLastTerm)) ->
snd message.CandidateLastLogEntry = ourLastTerm snd message.CandidateLastLogEntry = ourLastTerm
&& fst message.CandidateLastLogEntry >= persistentState.CurrentLogIndex && fst message.CandidateLastLogEntry >= persistentState.CurrentLogIndex
| None -> | None ->
@@ -182,6 +212,7 @@ type Server<'a>
persistentState.Vote message.CandidateId persistentState.Vote message.CandidateId
{ {
Voter = me
VoterTerm = persistentState.CurrentTerm VoterTerm = persistentState.CurrentTerm
VoteGranted = true VoteGranted = true
} }
@@ -218,7 +249,7 @@ type Server<'a>
|> message.ReplyChannel |> message.ReplyChannel
let acceptRequest () : unit = let acceptRequest () : unit =
assert (currentType = Follower) assert (currentType = ServerSpecialisation.Follower)
match message.NewEntry with match message.NewEntry with
| None -> heartbeat message | None -> heartbeat message
@@ -274,7 +305,7 @@ type Server<'a>
ourTermForThisEntry = message.PrevLogEntry.Term ourTermForThisEntry = message.PrevLogEntry.Term
match currentType with match currentType with
| Leader _ -> | ServerSpecialisation.Leader _ ->
[ [
"Violation of invariant." "Violation of invariant."
"This is a logic error that cannot happen unless there is a bug in this Raft implementation." "This is a logic error that cannot happen unless there is a bug in this Raft implementation."
@@ -285,7 +316,7 @@ type Server<'a>
|> String.concat "\n" |> String.concat "\n"
|> failwithf "%s" |> failwithf "%s"
| Follower -> | ServerSpecialisation.Follower ->
if not (logIsConsistent message) then if not (logIsConsistent message) then
// Reject the request, it's inconsistent with our history. // Reject the request, it's inconsistent with our history.
{ {
@@ -297,10 +328,10 @@ type Server<'a>
else else
acceptRequest () acceptRequest ()
| Candidate -> | ServerSpecialisation.Candidate _ ->
// We've already verified that the message was sent from a leader in the current term, so we have // We've already verified that the message was sent from a leader in the current term, so we have
// lost the election. // lost the election.
currentType <- Follower currentType <- ServerSpecialisation.Follower
// TODO: why does this assertion hold? // TODO: why does this assertion hold?
assert (logIsConsistent message) assert (logIsConsistent message)
acceptRequest () acceptRequest ()
@@ -308,17 +339,70 @@ type Server<'a>
let mailbox = let mailbox =
let rec loop (mailbox : MailboxProcessor<_>) = let rec loop (mailbox : MailboxProcessor<_>) =
async { async {
// TODO this should really wait for explicit Sync calls
// if we're running in memory, for determinism.
let! m = mailbox.Receive () let! m = mailbox.Receive ()
let toPrint = sprintf "Processing message in server %i: %+A" me m
System.Console.WriteLine toPrint
match m with match m with
| ServerAction.BeginElection -> return failwith "not yet implemented" | ServerAction.BeginElection ->
match currentType with
| ServerSpecialisation.Leader _ -> ()
| ServerSpecialisation.Candidate _
| ServerSpecialisation.Follower ->
// Start the election!
currentType <- ServerSpecialisation.Candidate (CandidateState.New clusterSize me)
persistentState.IncrementTerm ()
for i in 0 .. clusterSize - 1 do
if i * 1<ServerId> <> me then
{
CandidateTerm = persistentState.CurrentTerm
CandidateId = me
CandidateLastLogEntry =
match persistentState.GetLastLogEntry () with
| Some (index, (_, term)) -> (index, term)
| None ->
// TODO this is almost certainly not right
(0<LogIndex>, 0<Term>)
ReplyChannel = fun reply -> mailbox.Post (ReceiveReply reply)
}
|> Message.RequestVote
|> messageChannel (i * 1<ServerId>)
| ServerAction.Receive m -> return processMessage m | ServerAction.Receive m -> return processMessage m
| ServerAction.Sync reply -> reply.Reply () | ServerAction.Sync reply -> reply.Reply ()
| ServerAction.ReceiveReply requestVoteReply ->
match currentType with
| ServerSpecialisation.Leader _
| ServerSpecialisation.Follower ->
// We're not expecting any votes; drop them.
()
| ServerSpecialisation.Candidate state ->
if requestVoteReply.VoterTerm = persistentState.CurrentTerm then
state.Votes.[requestVoteReply.Voter / 1<ServerId>] <- Some requestVoteReply.VoteGranted
// Inefficient, but :shrug:
if
Array.sumBy
(function
| Some true -> 1
| _ -> 0)
state.Votes > clusterSize / 2
then
// Become the leader!
currentType <-
LeaderState.New clusterSize persistentState.CurrentLogIndex
|> ServerSpecialisation.Leader
return! loop mailbox return! loop mailbox
} }
loop |> MailboxProcessor.Start let mailbox = loop |> MailboxProcessor.Start
mailbox.Error.Add raise
mailbox
member this.TriggerTimeout () = mailbox.Post ServerAction.BeginElection member this.TriggerTimeout () = mailbox.Post ServerAction.BeginElection
@@ -326,6 +410,12 @@ type Server<'a>
member this.Sync () = mailbox.PostAndReply ServerAction.Sync member this.Sync () = mailbox.PostAndReply ServerAction.Sync
member this.State =
match currentType with
| ServerSpecialisation.Leader _ -> ServerStatus.Leader
| ServerSpecialisation.Candidate _ -> ServerStatus.Candidate
| ServerSpecialisation.Follower -> ServerStatus.Follower
// { // {
// ClusterSize : int // ClusterSize : int
// mutable VolatileState : VolatileState // mutable VolatileState : VolatileState
@@ -349,13 +439,11 @@ module InMemoryCluster =
let make<'a> (immediateFlush : bool) (count : int) : Cluster<'a> = let make<'a> (immediateFlush : bool) (count : int) : Cluster<'a> =
let servers = Array.zeroCreate<Server<'a>> count let servers = Array.zeroCreate<Server<'a>> count
let locker = obj ()
let messageChannel (serverId : int<ServerId>) (message : Message<'a>) : unit = let messageChannel (serverId : int<ServerId>) (message : Message<'a>) : unit =
lock locker (fun () -> servers.[serverId / 1<ServerId>].Message message) servers.[serverId / 1<ServerId>].Message message
for s in 0 .. servers.Length - 1 do for s in 0 .. servers.Length - 1 do
servers.[s] <- Server (count, InMemoryPersistentState (), messageChannel) servers.[s] <- Server (count, s * 1<ServerId>, InMemoryPersistentState (), messageChannel)
{ {
Servers = servers Servers = servers