Implement committal
This commit is contained in:
@@ -8,8 +8,9 @@
|
|||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<Compile Include="Logger.fs" />
|
<Compile Include="Logger.fs" />
|
||||||
<Compile Include="TestServer.fs" />
|
<Compile Include="TestInMemoryServer.fs" />
|
||||||
<Compile Include="TestInMemoryPersistentState.fs" />
|
<Compile Include="TestInMemoryPersistentState.fs" />
|
||||||
|
<Compile Include="TestServer.fs" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
|
@@ -47,10 +47,16 @@ module TestInMemoryPersistentState =
|
|||||||
| None ->
|
| None ->
|
||||||
(uut :> IPersistentState<_>).TruncateLog truncate
|
(uut :> IPersistentState<_>).TruncateLog truncate
|
||||||
uut.GetLog () = oldLog
|
uut.GetLog () = oldLog
|
||||||
| Some entry ->
|
| Some (itemStored, entry) ->
|
||||||
(uut :> IPersistentState<_>).TruncateLog truncate
|
(uut :> IPersistentState<_>).TruncateLog truncate
|
||||||
|
|
||||||
(uut :> IPersistentState<_>).GetLastLogEntry () = Some (truncate, entry)
|
(uut :> IPersistentState<_>).GetLastLogEntry () = Some (
|
||||||
|
itemStored,
|
||||||
|
{
|
||||||
|
Index = truncate
|
||||||
|
Term = entry
|
||||||
|
}
|
||||||
|
)
|
||||||
&& isPrefix (uut.GetLog ()) oldLog
|
&& isPrefix (uut.GetLog ()) oldLog
|
||||||
&& (uut :> IPersistentState<_>).CurrentLogIndex = truncate
|
&& (uut :> IPersistentState<_>).CurrentLogIndex = truncate
|
||||||
|
|
||||||
|
242
Raft.Test/TestInMemoryServer.fs
Normal file
242
Raft.Test/TestInMemoryServer.fs
Normal file
@@ -0,0 +1,242 @@
|
|||||||
|
namespace Raft.Test
|
||||||
|
|
||||||
|
open System.Threading
|
||||||
|
open Raft
|
||||||
|
open NUnit.Framework
|
||||||
|
open FsUnitTyped
|
||||||
|
open FsCheck
|
||||||
|
|
||||||
|
[<TestFixture>]
|
||||||
|
module TestInMemoryServer =
|
||||||
|
|
||||||
|
[<Test>]
|
||||||
|
let ``Startup sequence, first fumbling steps`` () =
|
||||||
|
let cluster, network = InMemoryCluster.make<int> 5
|
||||||
|
|
||||||
|
let logger, logs = TestLogger.make ()
|
||||||
|
|
||||||
|
// Candidate 1 asks server 0 to vote for it.
|
||||||
|
|
||||||
|
{
|
||||||
|
CandidateTerm = 0<Term>
|
||||||
|
CandidateId = 1<ServerId>
|
||||||
|
ReplyChannel = fun message -> logger (sprintf "Received message for term %i" message.VoterTerm)
|
||||||
|
CandidateLastLogEntry = None
|
||||||
|
}
|
||||||
|
|> Instruction.RequestVote
|
||||||
|
|> Message.Instruction
|
||||||
|
|> cluster.SendMessageDirectly 0<ServerId>
|
||||||
|
|
||||||
|
logs () |> shouldEqual [ "Received message for term 0" ]
|
||||||
|
|
||||||
|
// Candidate 1 asks to be elected again! This is fine, maybe the network is replaying requests
|
||||||
|
// and the network swallowed our reply, so we should reply in the same way.
|
||||||
|
{
|
||||||
|
CandidateTerm = 0<Term>
|
||||||
|
CandidateId = 1<ServerId>
|
||||||
|
ReplyChannel = fun message -> logger (sprintf "Received message for term %i" message.VoterTerm)
|
||||||
|
CandidateLastLogEntry = None
|
||||||
|
}
|
||||||
|
|> Instruction.RequestVote
|
||||||
|
|> Message.Instruction
|
||||||
|
|> cluster.SendMessageDirectly 0<ServerId>
|
||||||
|
|
||||||
|
logs ()
|
||||||
|
|> shouldEqual [ "Received message for term 0" ; "Received message for term 0" ]
|
||||||
|
|
||||||
|
// Candidate 2 asks to be elected. We won't vote for them, because we've already voted.
|
||||||
|
// and the network swallowed our reply, so we should reply in the same way.
|
||||||
|
let calls = ref 0
|
||||||
|
|
||||||
|
{
|
||||||
|
CandidateTerm = 0<Term>
|
||||||
|
CandidateId = 2<ServerId>
|
||||||
|
ReplyChannel = fun _ -> Interlocked.Increment calls |> ignore
|
||||||
|
CandidateLastLogEntry = None
|
||||||
|
}
|
||||||
|
|> Instruction.RequestVote
|
||||||
|
|> Message.Instruction
|
||||||
|
|> cluster.SendMessageDirectly 0<ServerId>
|
||||||
|
|
||||||
|
calls.Value |> shouldEqual 0
|
||||||
|
|
||||||
|
[<Test>]
|
||||||
|
let ``Startup sequence in prod, only one timeout takes place`` () =
|
||||||
|
let cluster, network = InMemoryCluster.make<int> 5
|
||||||
|
|
||||||
|
cluster.Servers.[0].TriggerInactivityTimeout ()
|
||||||
|
cluster.Servers.[0].Sync ()
|
||||||
|
|
||||||
|
// We sent a message to every other server; process them.
|
||||||
|
for i in 1..4 do
|
||||||
|
let server = i * 1<ServerId>
|
||||||
|
(network.AllInboundMessages server).Length |> shouldEqual 1
|
||||||
|
let message = network.InboundMessage server 0
|
||||||
|
network.DropMessage server 0
|
||||||
|
cluster.SendMessageDirectly server message
|
||||||
|
|
||||||
|
(network.AllInboundMessages 0<ServerId>).Length |> shouldEqual i
|
||||||
|
|
||||||
|
for i in 1..4 do
|
||||||
|
network.InboundMessage 0<ServerId> (i - 1)
|
||||||
|
|> cluster.SendMessageDirectly 0<ServerId>
|
||||||
|
|
||||||
|
network.DropMessage 0<ServerId> (i - 1)
|
||||||
|
|
||||||
|
// (the messages we've already processed)
|
||||||
|
(network.AllInboundMessages 0<ServerId>).Length |> shouldEqual 4
|
||||||
|
(network.UndeliveredMessages 0<ServerId>).Length |> shouldEqual 0
|
||||||
|
|
||||||
|
cluster.Servers.[0].State |> shouldEqual (ServerStatus.Leader 1<Term>)
|
||||||
|
|
||||||
|
for i in 1..4 do
|
||||||
|
cluster.Servers.[i].State |> shouldEqual ServerStatus.Follower
|
||||||
|
|
||||||
|
let popOne (queues : 'a list list) : ((int * 'a) * 'a list list) list =
|
||||||
|
queues
|
||||||
|
|> List.indexed
|
||||||
|
|> List.filter (fun (index, l) -> not (List.isEmpty l))
|
||||||
|
|> List.collect (fun (firstPlaceWithInstruction, entries) ->
|
||||||
|
entries
|
||||||
|
|> List.indexed
|
||||||
|
|> List.map (fun (i, entry) -> (firstPlaceWithInstruction, entry), List.removeAt i entries)
|
||||||
|
|> List.map (fun (removed, rest) ->
|
||||||
|
let afterPop =
|
||||||
|
queues
|
||||||
|
|> List.removeAt firstPlaceWithInstruction
|
||||||
|
|> List.insertAt firstPlaceWithInstruction rest
|
||||||
|
|
||||||
|
removed, afterPop
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
let rec allOrderings (queues : 'a list list) : (int * 'a) list list =
|
||||||
|
let output = popOne queues
|
||||||
|
|
||||||
|
match output with
|
||||||
|
| [] -> [ [] ]
|
||||||
|
| output ->
|
||||||
|
|
||||||
|
output
|
||||||
|
|> List.collect (fun (extracted, remaining) ->
|
||||||
|
let sub = allOrderings remaining
|
||||||
|
sub |> List.map (fun s -> extracted :: s)
|
||||||
|
)
|
||||||
|
|
||||||
|
let factorial i =
|
||||||
|
let rec go acc i =
|
||||||
|
if i <= 0 then acc else go (acc * i) (i - 1)
|
||||||
|
|
||||||
|
go 1 i
|
||||||
|
|
||||||
|
[<TestCase(0, 1)>]
|
||||||
|
[<TestCase(1, 1)>]
|
||||||
|
[<TestCase(2, 2)>]
|
||||||
|
[<TestCase(3, 6)>]
|
||||||
|
[<TestCase(4, 24)>]
|
||||||
|
let ``Test factorial`` (n : int, result : int) = factorial n |> shouldEqual result
|
||||||
|
|
||||||
|
[<Test>]
|
||||||
|
let ``Test allOrderings`` () =
|
||||||
|
let case = [ [ "a" ; "b" ] ; [ "c" ; "d" ; "e" ] ]
|
||||||
|
let output = case |> allOrderings
|
||||||
|
output |> shouldEqual (List.distinct output)
|
||||||
|
|
||||||
|
output
|
||||||
|
|> List.length
|
||||||
|
|> shouldEqual (factorial (List.concat case |> List.length))
|
||||||
|
|
||||||
|
let allElements = Set.ofList (List.concat case)
|
||||||
|
|
||||||
|
for output in output do
|
||||||
|
output |> List.map snd |> Set.ofList |> shouldEqual allElements
|
||||||
|
|
||||||
|
let randomChoice<'a> (r : System.Random) (arr : 'a list) : 'a = arr.[r.Next (0, arr.Length)]
|
||||||
|
|
||||||
|
[<Test>]
|
||||||
|
let ``Startup sequence in prod, two timeouts at once, random`` () =
|
||||||
|
let rand = System.Random ()
|
||||||
|
let cluster, network = InMemoryCluster.make<int> 5
|
||||||
|
|
||||||
|
cluster.Servers.[0].TriggerInactivityTimeout ()
|
||||||
|
cluster.Servers.[0].Sync ()
|
||||||
|
cluster.Servers.[1].TriggerInactivityTimeout ()
|
||||||
|
cluster.Servers.[1].Sync ()
|
||||||
|
|
||||||
|
// Those two each sent a message to every other server.
|
||||||
|
(network.AllInboundMessages 0<ServerId>).Length |> shouldEqual 1
|
||||||
|
(network.AllInboundMessages 1<ServerId>).Length |> shouldEqual 1
|
||||||
|
|
||||||
|
for i in 2..4 do
|
||||||
|
let server = i * 1<ServerId>
|
||||||
|
(network.AllInboundMessages server).Length |> shouldEqual 2
|
||||||
|
|
||||||
|
while network.AllUndeliveredMessages () |> Seq.concat |> Seq.isEmpty |> not do
|
||||||
|
let allOrderings' =
|
||||||
|
network.AllUndeliveredMessages () |> List.map List.ofSeq |> allOrderings
|
||||||
|
|
||||||
|
// Process the messages!
|
||||||
|
let ordering = randomChoice rand allOrderings'
|
||||||
|
|
||||||
|
for serverConsuming, (messageId, message) in ordering do
|
||||||
|
let serverConsuming = serverConsuming * 1<ServerId>
|
||||||
|
cluster.SendMessageDirectly serverConsuming message
|
||||||
|
network.DropMessage serverConsuming messageId
|
||||||
|
|
||||||
|
match cluster.Servers.[0].State, cluster.Servers.[1].State with
|
||||||
|
| Leader _, Leader _ -> failwith "Unexpectedly had two leaders"
|
||||||
|
| Candidate _, Candidate _ -> failwith "Unexpectedly failed to elect a leader"
|
||||||
|
| Leader 1<Term>, Candidate 1<Term>
|
||||||
|
| Candidate 1<Term>, Leader 1<Term> -> ()
|
||||||
|
| s1, s2 -> failwithf "Unexpected state: %O %O" s1 s2
|
||||||
|
|
||||||
|
for i in 2..4 do
|
||||||
|
cluster.Servers.[i].State |> shouldEqual ServerStatus.Follower
|
||||||
|
|
||||||
|
type History = History of (int<ServerId> * int) list
|
||||||
|
|
||||||
|
let historyGen (clusterSize : int) =
|
||||||
|
gen {
|
||||||
|
let! pile = Gen.choose (0, clusterSize - 1)
|
||||||
|
let! entry = Arb.generate<int>
|
||||||
|
return (pile * 1<ServerId>, abs entry)
|
||||||
|
}
|
||||||
|
|> Gen.listOf
|
||||||
|
|> Gen.map History
|
||||||
|
|
||||||
|
let apply (History history) (cluster : Cluster<'a>) (network : Network<'a>) : unit =
|
||||||
|
for pile, entry in history do
|
||||||
|
let messages = network.AllInboundMessages pile
|
||||||
|
|
||||||
|
if entry < messages.Length then
|
||||||
|
cluster.SendMessageDirectly pile messages.[entry]
|
||||||
|
|
||||||
|
[<Test>]
|
||||||
|
let ``Startup sequence in prod, two timeouts at once, property-based: at most one leader is elected`` () =
|
||||||
|
let cluster, network = InMemoryCluster.make<int> 5
|
||||||
|
|
||||||
|
cluster.Servers.[0].TriggerInactivityTimeout ()
|
||||||
|
cluster.Servers.[0].Sync ()
|
||||||
|
cluster.Servers.[1].TriggerInactivityTimeout ()
|
||||||
|
cluster.Servers.[1].Sync ()
|
||||||
|
|
||||||
|
// Those two each sent a message to every other server.
|
||||||
|
(network.AllInboundMessages 0<ServerId>).Length |> shouldEqual 1
|
||||||
|
(network.AllInboundMessages 1<ServerId>).Length |> shouldEqual 1
|
||||||
|
|
||||||
|
for i in 2..4 do
|
||||||
|
(network.AllInboundMessages (i * 1<ServerId>)).Length |> shouldEqual 2
|
||||||
|
|
||||||
|
let property (history : History) =
|
||||||
|
apply history cluster network
|
||||||
|
|
||||||
|
match cluster.Servers.[0].State, cluster.Servers.[1].State with
|
||||||
|
| Leader _, Leader _ -> failwith "Unexpectedly elected two leaders"
|
||||||
|
| _, _ -> ()
|
||||||
|
|
||||||
|
for i in 2..4 do
|
||||||
|
cluster.Servers.[i].State |> shouldEqual ServerStatus.Follower
|
||||||
|
|
||||||
|
property
|
||||||
|
|> Prop.forAll (Arb.fromGen (historyGen 5))
|
||||||
|
|> Check.QuickThrowOnFailure
|
@@ -1,242 +1,34 @@
|
|||||||
namespace Raft.Test
|
namespace Raft.Test
|
||||||
|
|
||||||
open System.Threading
|
|
||||||
open Raft
|
|
||||||
open NUnit.Framework
|
|
||||||
open FsUnitTyped
|
open FsUnitTyped
|
||||||
open FsCheck
|
open NUnit.Framework
|
||||||
|
open Raft
|
||||||
|
|
||||||
[<TestFixture>]
|
[<TestFixture>]
|
||||||
module TestServer =
|
module TestServer =
|
||||||
|
|
||||||
[<Test>]
|
[<Test>]
|
||||||
let ``Startup sequence, first fumbling steps`` () =
|
let ``maxLogAQuorumHasCommitted tests`` () =
|
||||||
let cluster, network = InMemoryCluster.make<int> 5
|
for length in 1..7 do
|
||||||
|
for i in 0..10 do
|
||||||
|
let i = i * 1<LogIndex>
|
||||||
|
|
||||||
let logger, logs = TestLogger.make ()
|
i
|
||||||
|
|> Array.create length
|
||||||
|
|> ServerUtils.maxLogAQuorumHasCommitted
|
||||||
|
|> shouldEqual i
|
||||||
|
|
||||||
// Candidate 1 asks server 0 to vote for it.
|
ServerUtils.maxLogAQuorumHasCommitted [| 0<LogIndex> ; 0<LogIndex> ; 1<LogIndex> |]
|
||||||
|
|> shouldEqual 0<LogIndex>
|
||||||
|
|
||||||
{
|
ServerUtils.maxLogAQuorumHasCommitted [| 0<LogIndex> ; 1<LogIndex> ; 1<LogIndex> |]
|
||||||
CandidateTerm = 0<Term>
|
|> shouldEqual 1<LogIndex>
|
||||||
CandidateId = 1<ServerId>
|
|
||||||
ReplyChannel = fun message -> logger (sprintf "Received message for term %i" message.VoterTerm)
|
|
||||||
CandidateLastLogEntry = 0<LogIndex>, 0<Term>
|
|
||||||
}
|
|
||||||
|> Instruction.RequestVote
|
|
||||||
|> Message.Instruction
|
|
||||||
|> cluster.SendMessageDirectly 0<ServerId>
|
|
||||||
|
|
||||||
logs () |> shouldEqual [ "Received message for term 0" ]
|
ServerUtils.maxLogAQuorumHasCommitted [| 2<LogIndex> ; 1<LogIndex> ; 1<LogIndex> |]
|
||||||
|
|> shouldEqual 1<LogIndex>
|
||||||
|
|
||||||
// Candidate 1 asks to be elected again! This is fine, maybe the network is replaying requests
|
ServerUtils.maxLogAQuorumHasCommitted [| 2<LogIndex> ; 1<LogIndex> ; 2<LogIndex> |]
|
||||||
// and the network swallowed our reply, so we should reply in the same way.
|
|> shouldEqual 2<LogIndex>
|
||||||
{
|
|
||||||
CandidateTerm = 0<Term>
|
|
||||||
CandidateId = 1<ServerId>
|
|
||||||
ReplyChannel = fun message -> logger (sprintf "Received message for term %i" message.VoterTerm)
|
|
||||||
CandidateLastLogEntry = 0<LogIndex>, 0<Term>
|
|
||||||
}
|
|
||||||
|> Instruction.RequestVote
|
|
||||||
|> Message.Instruction
|
|
||||||
|> cluster.SendMessageDirectly 0<ServerId>
|
|
||||||
|
|
||||||
logs ()
|
ServerUtils.maxLogAQuorumHasCommitted [| 1<LogIndex> ; 2<LogIndex> ; 3<LogIndex> ; 4<LogIndex> |]
|
||||||
|> shouldEqual [ "Received message for term 0" ; "Received message for term 0" ]
|
|> shouldEqual 2<LogIndex>
|
||||||
|
|
||||||
// Candidate 2 asks to be elected. We won't vote for them, because we've already voted.
|
|
||||||
// and the network swallowed our reply, so we should reply in the same way.
|
|
||||||
let calls = ref 0
|
|
||||||
|
|
||||||
{
|
|
||||||
CandidateTerm = 0<Term>
|
|
||||||
CandidateId = 2<ServerId>
|
|
||||||
ReplyChannel = fun _ -> Interlocked.Increment calls |> ignore
|
|
||||||
CandidateLastLogEntry = 0<LogIndex>, 0<Term>
|
|
||||||
}
|
|
||||||
|> Instruction.RequestVote
|
|
||||||
|> Message.Instruction
|
|
||||||
|> cluster.SendMessageDirectly 0<ServerId>
|
|
||||||
|
|
||||||
calls.Value |> shouldEqual 0
|
|
||||||
|
|
||||||
[<Test>]
|
|
||||||
let ``Startup sequence in prod, only one timeout takes place`` () =
|
|
||||||
let cluster, network = InMemoryCluster.make<int> 5
|
|
||||||
|
|
||||||
cluster.Servers.[0].TriggerTimeout ()
|
|
||||||
cluster.Servers.[0].Sync ()
|
|
||||||
|
|
||||||
// We sent a message to every other server; process them.
|
|
||||||
for i in 1..4 do
|
|
||||||
let server = i * 1<ServerId>
|
|
||||||
(network.AllInboundMessages server).Length |> shouldEqual 1
|
|
||||||
let message = network.InboundMessage server 0
|
|
||||||
network.DropMessage server 0
|
|
||||||
cluster.SendMessageDirectly server message
|
|
||||||
|
|
||||||
(network.AllInboundMessages 0<ServerId>).Length |> shouldEqual i
|
|
||||||
|
|
||||||
for i in 1..4 do
|
|
||||||
network.InboundMessage 0<ServerId> (i - 1)
|
|
||||||
|> cluster.SendMessageDirectly 0<ServerId>
|
|
||||||
|
|
||||||
network.DropMessage 0<ServerId> (i - 1)
|
|
||||||
|
|
||||||
// (the messages we've already processed)
|
|
||||||
(network.AllInboundMessages 0<ServerId>).Length |> shouldEqual 4
|
|
||||||
(network.UndeliveredMessages 0<ServerId>).Length |> shouldEqual 0
|
|
||||||
|
|
||||||
cluster.Servers.[0].State |> shouldEqual (ServerStatus.Leader 1<Term>)
|
|
||||||
|
|
||||||
for i in 1..4 do
|
|
||||||
cluster.Servers.[i].State |> shouldEqual ServerStatus.Follower
|
|
||||||
|
|
||||||
let popOne (queues : 'a list list) : ((int * 'a) * 'a list list) list =
|
|
||||||
queues
|
|
||||||
|> List.indexed
|
|
||||||
|> List.filter (fun (index, l) -> not (List.isEmpty l))
|
|
||||||
|> List.collect (fun (firstPlaceWithInstruction, entries) ->
|
|
||||||
entries
|
|
||||||
|> List.indexed
|
|
||||||
|> List.map (fun (i, entry) -> (firstPlaceWithInstruction, entry), List.removeAt i entries)
|
|
||||||
|> List.map (fun (removed, rest) ->
|
|
||||||
let afterPop =
|
|
||||||
queues
|
|
||||||
|> List.removeAt firstPlaceWithInstruction
|
|
||||||
|> List.insertAt firstPlaceWithInstruction rest
|
|
||||||
|
|
||||||
removed, afterPop
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
let rec allOrderings (queues : 'a list list) : (int * 'a) list list =
|
|
||||||
let output = popOne queues
|
|
||||||
|
|
||||||
match output with
|
|
||||||
| [] -> [ [] ]
|
|
||||||
| output ->
|
|
||||||
|
|
||||||
output
|
|
||||||
|> List.collect (fun (extracted, remaining) ->
|
|
||||||
let sub = allOrderings remaining
|
|
||||||
sub |> List.map (fun s -> extracted :: s)
|
|
||||||
)
|
|
||||||
|
|
||||||
let factorial i =
|
|
||||||
let rec go acc i =
|
|
||||||
if i <= 0 then acc else go (acc * i) (i - 1)
|
|
||||||
|
|
||||||
go 1 i
|
|
||||||
|
|
||||||
[<TestCase(0, 1)>]
|
|
||||||
[<TestCase(1, 1)>]
|
|
||||||
[<TestCase(2, 2)>]
|
|
||||||
[<TestCase(3, 6)>]
|
|
||||||
[<TestCase(4, 24)>]
|
|
||||||
let ``Test factorial`` (n : int, result : int) = factorial n |> shouldEqual result
|
|
||||||
|
|
||||||
[<Test>]
|
|
||||||
let ``Test allOrderings`` () =
|
|
||||||
let case = [ [ "a" ; "b" ] ; [ "c" ; "d" ; "e" ] ]
|
|
||||||
let output = case |> allOrderings
|
|
||||||
output |> shouldEqual (List.distinct output)
|
|
||||||
|
|
||||||
output
|
|
||||||
|> List.length
|
|
||||||
|> shouldEqual (factorial (List.concat case |> List.length))
|
|
||||||
|
|
||||||
let allElements = Set.ofList (List.concat case)
|
|
||||||
|
|
||||||
for output in output do
|
|
||||||
output |> List.map snd |> Set.ofList |> shouldEqual allElements
|
|
||||||
|
|
||||||
let randomChoice<'a> (r : System.Random) (arr : 'a list) : 'a = arr.[r.Next (0, arr.Length)]
|
|
||||||
|
|
||||||
[<Test>]
|
|
||||||
let ``Startup sequence in prod, two timeouts at once, random`` () =
|
|
||||||
let rand = System.Random ()
|
|
||||||
let cluster, network = InMemoryCluster.make<int> 5
|
|
||||||
|
|
||||||
cluster.Servers.[0].TriggerTimeout ()
|
|
||||||
cluster.Servers.[0].Sync ()
|
|
||||||
cluster.Servers.[1].TriggerTimeout ()
|
|
||||||
cluster.Servers.[1].Sync ()
|
|
||||||
|
|
||||||
// Those two each sent a message to every other server.
|
|
||||||
(network.AllInboundMessages 0<ServerId>).Length |> shouldEqual 1
|
|
||||||
(network.AllInboundMessages 1<ServerId>).Length |> shouldEqual 1
|
|
||||||
|
|
||||||
for i in 2..4 do
|
|
||||||
let server = i * 1<ServerId>
|
|
||||||
(network.AllInboundMessages server).Length |> shouldEqual 2
|
|
||||||
|
|
||||||
while network.AllUndeliveredMessages () |> Seq.concat |> Seq.isEmpty |> not do
|
|
||||||
let allOrderings' =
|
|
||||||
network.AllUndeliveredMessages () |> List.map List.ofSeq |> allOrderings
|
|
||||||
|
|
||||||
// Process the messages!
|
|
||||||
let ordering = randomChoice rand allOrderings'
|
|
||||||
|
|
||||||
for serverConsuming, (messageId, message) in ordering do
|
|
||||||
let serverConsuming = serverConsuming * 1<ServerId>
|
|
||||||
cluster.SendMessageDirectly serverConsuming message
|
|
||||||
network.DropMessage serverConsuming messageId
|
|
||||||
|
|
||||||
match cluster.Servers.[0].State, cluster.Servers.[1].State with
|
|
||||||
| Leader _, Leader _ -> failwith "Unexpectedly had two leaders"
|
|
||||||
| Candidate _, Candidate _ -> failwith "Unexpectedly failed to elect a leader"
|
|
||||||
| Leader 1<Term>, Candidate 1<Term>
|
|
||||||
| Candidate 1<Term>, Leader 1<Term> -> ()
|
|
||||||
| s1, s2 -> failwithf "Unexpected state: %O %O" s1 s2
|
|
||||||
|
|
||||||
for i in 2..4 do
|
|
||||||
cluster.Servers.[i].State |> shouldEqual ServerStatus.Follower
|
|
||||||
|
|
||||||
type History = History of (int<ServerId> * int) list
|
|
||||||
|
|
||||||
let historyGen (clusterSize : int) =
|
|
||||||
gen {
|
|
||||||
let! pile = Gen.choose (0, clusterSize - 1)
|
|
||||||
let! entry = Arb.generate<int>
|
|
||||||
return (pile * 1<ServerId>, abs entry)
|
|
||||||
}
|
|
||||||
|> Gen.listOf
|
|
||||||
|> Gen.map History
|
|
||||||
|
|
||||||
let apply (History history) (cluster : Cluster<'a>) (network : Network<'a>) : unit =
|
|
||||||
for pile, entry in history do
|
|
||||||
let messages = network.AllInboundMessages pile
|
|
||||||
|
|
||||||
if entry < messages.Length then
|
|
||||||
cluster.SendMessageDirectly pile messages.[entry]
|
|
||||||
|
|
||||||
[<Test>]
|
|
||||||
let ``Startup sequence in prod, two timeouts at once, property-based: at most one leader is elected`` () =
|
|
||||||
let cluster, network = InMemoryCluster.make<int> 5
|
|
||||||
|
|
||||||
cluster.Servers.[0].TriggerTimeout ()
|
|
||||||
cluster.Servers.[0].Sync ()
|
|
||||||
cluster.Servers.[1].TriggerTimeout ()
|
|
||||||
cluster.Servers.[1].Sync ()
|
|
||||||
|
|
||||||
// Those two each sent a message to every other server.
|
|
||||||
(network.AllInboundMessages 0<ServerId>).Length |> shouldEqual 1
|
|
||||||
(network.AllInboundMessages 1<ServerId>).Length |> shouldEqual 1
|
|
||||||
|
|
||||||
for i in 2..4 do
|
|
||||||
(network.AllInboundMessages (i * 1<ServerId>)).Length |> shouldEqual 2
|
|
||||||
|
|
||||||
let property (history : History) =
|
|
||||||
apply history cluster network
|
|
||||||
|
|
||||||
match cluster.Servers.[0].State, cluster.Servers.[1].State with
|
|
||||||
| Leader _, Leader _ -> failwith "Unexpectedly elected two leaders"
|
|
||||||
| _, _ -> ()
|
|
||||||
|
|
||||||
for i in 2..4 do
|
|
||||||
cluster.Servers.[i].State |> shouldEqual ServerStatus.Follower
|
|
||||||
|
|
||||||
property
|
|
||||||
|> Prop.forAll (Arb.fromGen (historyGen 5))
|
|
||||||
|> Check.QuickThrowOnFailure
|
|
||||||
|
20
Raft/Domain.fs
Normal file
20
Raft/Domain.fs
Normal file
@@ -0,0 +1,20 @@
|
|||||||
|
namespace Raft
|
||||||
|
|
||||||
|
/// LogIndex is indexed from 1. We use 0 to indicate "before any history has started".
|
||||||
|
[<Measure>]
|
||||||
|
type LogIndex
|
||||||
|
|
||||||
|
[<Measure>]
|
||||||
|
type Term
|
||||||
|
|
||||||
|
[<Measure>]
|
||||||
|
type ServerId
|
||||||
|
|
||||||
|
type LogEntry =
|
||||||
|
{
|
||||||
|
Index : int<LogIndex>
|
||||||
|
Term : int<Term>
|
||||||
|
}
|
||||||
|
|
||||||
|
override this.ToString () =
|
||||||
|
sprintf "Log entry %i at subjective term %i" this.Index this.Term
|
@@ -11,8 +11,12 @@ type Cluster<'a> =
|
|||||||
|
|
||||||
member this.SendMessage (i : int<ServerId>) (m : Message<'a>) : unit = this.SendMessageDirectly i m
|
member this.SendMessage (i : int<ServerId>) (m : Message<'a>) : unit = this.SendMessageDirectly i m
|
||||||
|
|
||||||
member this.Timeout (i : int<ServerId>) : unit =
|
member this.InactivityTimeout (i : int<ServerId>) : unit =
|
||||||
this.Servers.[i / 1<ServerId>].TriggerTimeout ()
|
this.Servers.[i / 1<ServerId>].TriggerInactivityTimeout ()
|
||||||
|
this.Servers.[i / 1<ServerId>].Sync ()
|
||||||
|
|
||||||
|
member this.HeartbeatTimeout (i : int<ServerId>) : unit =
|
||||||
|
this.Servers.[i / 1<ServerId>].TriggerHeartbeatTimeout ()
|
||||||
this.Servers.[i / 1<ServerId>].Sync ()
|
this.Servers.[i / 1<ServerId>].Sync ()
|
||||||
|
|
||||||
member this.State (i : int<ServerId>) : ServerStatus = this.Servers.[i / 1<ServerId>].State
|
member this.State (i : int<ServerId>) : ServerStatus = this.Servers.[i / 1<ServerId>].State
|
||||||
|
@@ -1,11 +0,0 @@
|
|||||||
namespace Raft
|
|
||||||
|
|
||||||
/// LogIndex is indexed from 1. We use 0 to indicate "before any history has started".
|
|
||||||
[<Measure>]
|
|
||||||
type LogIndex
|
|
||||||
|
|
||||||
[<Measure>]
|
|
||||||
type Term
|
|
||||||
|
|
||||||
[<Measure>]
|
|
||||||
type ServerId
|
|
@@ -14,7 +14,7 @@ type IPersistentState<'a> =
|
|||||||
abstract TruncateLog : int<LogIndex> -> unit
|
abstract TruncateLog : int<LogIndex> -> unit
|
||||||
abstract GetLogEntry : int<LogIndex> -> ('a * int<Term>) option
|
abstract GetLogEntry : int<LogIndex> -> ('a * int<Term>) option
|
||||||
abstract CurrentLogIndex : int<LogIndex>
|
abstract CurrentLogIndex : int<LogIndex>
|
||||||
abstract GetLastLogEntry : unit -> (int<LogIndex> * ('a * int<Term>)) option
|
abstract GetLastLogEntry : unit -> ('a * LogEntry) option
|
||||||
abstract AdvanceToTerm : int<Term> -> unit
|
abstract AdvanceToTerm : int<Term> -> unit
|
||||||
abstract IncrementTerm : unit -> unit
|
abstract IncrementTerm : unit -> unit
|
||||||
abstract Vote : int<ServerId> -> unit
|
abstract Vote : int<ServerId> -> unit
|
||||||
@@ -50,11 +50,19 @@ type InMemoryPersistentState<'a> () =
|
|||||||
let position = if position < 0 then 0 else position
|
let position = if position < 0 then 0 else position
|
||||||
log.RemoveRange (position, log.Count - position)
|
log.RemoveRange (position, log.Count - position)
|
||||||
|
|
||||||
member this.GetLastLogEntry () =
|
member this.GetLastLogEntry () : ('a * LogEntry) option =
|
||||||
if log.Count = 0 then
|
if log.Count = 0 then
|
||||||
None
|
None
|
||||||
else
|
else
|
||||||
Some (log.Count * 1<LogIndex>, log.[log.Count - 1])
|
let stored, term = log.[log.Count - 1]
|
||||||
|
|
||||||
|
Some (
|
||||||
|
stored,
|
||||||
|
{
|
||||||
|
Index = log.Count * 1<LogIndex>
|
||||||
|
Term = term
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
member this.GetLogEntry position =
|
member this.GetLogEntry position =
|
||||||
let position = position / 1<LogIndex>
|
let position = position / 1<LogIndex>
|
||||||
|
@@ -7,7 +7,7 @@
|
|||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<Compile Include="AssemblyInfo.fs" />
|
<Compile Include="AssemblyInfo.fs" />
|
||||||
<Compile Include="Measures.fs" />
|
<Compile Include="Domain.fs" />
|
||||||
<Compile Include="PersistentState.fs" />
|
<Compile Include="PersistentState.fs" />
|
||||||
<Compile Include="Server.fs" />
|
<Compile Include="Server.fs" />
|
||||||
<Compile Include="InMemory.fs" />
|
<Compile Include="InMemory.fs" />
|
||||||
|
261
Raft/Server.fs
261
Raft/Server.fs
@@ -23,7 +23,7 @@ type LeaderState =
|
|||||||
/// For each server, index of the next log entry to send to that server
|
/// For each server, index of the next log entry to send to that server
|
||||||
NextIndex : int<LogIndex> array
|
NextIndex : int<LogIndex> array
|
||||||
/// For each server, index of the highest log entry known to be replicated on that server
|
/// For each server, index of the highest log entry known to be replicated on that server
|
||||||
MatchIndex : int array
|
MatchIndex : int<LogIndex> array
|
||||||
}
|
}
|
||||||
|
|
||||||
static member New (clusterSize : int) (currentIndex : int<LogIndex>) : LeaderState =
|
static member New (clusterSize : int) (currentIndex : int<LogIndex>) : LeaderState =
|
||||||
@@ -55,7 +55,7 @@ type RequestVoteMessage =
|
|||||||
{
|
{
|
||||||
CandidateTerm : int<Term>
|
CandidateTerm : int<Term>
|
||||||
CandidateId : int<ServerId>
|
CandidateId : int<ServerId>
|
||||||
CandidateLastLogEntry : int<LogIndex> * int<Term>
|
CandidateLastLogEntry : LogEntry option
|
||||||
ReplyChannel : RequestVoteReply -> unit
|
ReplyChannel : RequestVoteReply -> unit
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -65,21 +65,25 @@ type RequestVoteMessage =
|
|||||||
/// I, a follower, acknowledge the leader's instruction to add an entry to my log.
|
/// I, a follower, acknowledge the leader's instruction to add an entry to my log.
|
||||||
type AppendEntriesReply =
|
type AppendEntriesReply =
|
||||||
{
|
{
|
||||||
|
/// Me, the follower who is replying
|
||||||
|
Follower : int<ServerId>
|
||||||
|
/// The term I, the follower, think it is
|
||||||
FollowerTerm : int<Term>
|
FollowerTerm : int<Term>
|
||||||
/// Reply with failure if the follower thinks the leader is out of date:
|
/// Reply with None if the follower thinks the leader is out of date:
|
||||||
/// that is, if the leader's believed term is smaller than the follower's,
|
/// that is, if the leader's believed term is smaller than the follower's,
|
||||||
/// or if the leader's declared previous log entry doesn't exist on the follower.
|
/// or if the leader's declared previous log entry doesn't exist on the follower.
|
||||||
Success : bool
|
/// If instead we accepted the update, this is the current head of the follower's log
|
||||||
}
|
/// after accepting the update.
|
||||||
|
Success : int<LogIndex> option
|
||||||
type LogEntry =
|
|
||||||
{
|
|
||||||
Index : int<LogIndex>
|
|
||||||
Term : int<Term>
|
|
||||||
}
|
}
|
||||||
|
|
||||||
override this.ToString () =
|
override this.ToString () =
|
||||||
sprintf "Log entry %i at subjective term %i" this.Index this.Term
|
let description =
|
||||||
|
match this.Success with
|
||||||
|
| Some index -> sprintf "successfully applied leader entry, log length %i" index
|
||||||
|
| None -> "did not apply leader entry"
|
||||||
|
|
||||||
|
sprintf "Follower %i %s" this.FollowerTerm description
|
||||||
|
|
||||||
/// I am the leader. Followers, update your state as follows.
|
/// I am the leader. Followers, update your state as follows.
|
||||||
type AppendEntriesMessage<'a> =
|
type AppendEntriesMessage<'a> =
|
||||||
@@ -89,7 +93,7 @@ type AppendEntriesMessage<'a> =
|
|||||||
/// I am your leader! This is me! (so everyone knows where to send clients to)
|
/// I am your leader! This is me! (so everyone knows where to send clients to)
|
||||||
LeaderId : int<ServerId>
|
LeaderId : int<ServerId>
|
||||||
/// The entry immediately preceding the entry I'm sending you, so you can tell if we've got out of sync.
|
/// The entry immediately preceding the entry I'm sending you, so you can tell if we've got out of sync.
|
||||||
PrevLogEntry : LogEntry
|
PrevLogEntry : LogEntry option
|
||||||
/// Followers, append this entry to your log. (Or, if None, this is just a heartbeat.)
|
/// Followers, append this entry to your log. (Or, if None, this is just a heartbeat.)
|
||||||
/// It was determined at the given term - recall that we might need to bring a restarted node up to speed
|
/// It was determined at the given term - recall that we might need to bring a restarted node up to speed
|
||||||
/// with what happened during terms that took place while it was down.
|
/// with what happened during terms that took place while it was down.
|
||||||
@@ -127,10 +131,12 @@ type Instruction<'a> =
|
|||||||
|
|
||||||
type Reply =
|
type Reply =
|
||||||
| RequestVoteReply of RequestVoteReply
|
| RequestVoteReply of RequestVoteReply
|
||||||
|
| AppendEntriesReply of AppendEntriesReply
|
||||||
|
|
||||||
override this.ToString () =
|
override this.ToString () =
|
||||||
match this with
|
match this with
|
||||||
| RequestVoteReply v -> v.ToString ()
|
| RequestVoteReply v -> v.ToString ()
|
||||||
|
| AppendEntriesReply r -> r.ToString ()
|
||||||
|
|
||||||
type Message<'a> =
|
type Message<'a> =
|
||||||
| Instruction of Instruction<'a>
|
| Instruction of Instruction<'a>
|
||||||
@@ -174,9 +180,33 @@ type ServerStatus =
|
|||||||
|
|
||||||
type private ServerAction<'a> =
|
type private ServerAction<'a> =
|
||||||
| BeginElection
|
| BeginElection
|
||||||
|
| EmitHeartbeat
|
||||||
| Receive of Message<'a>
|
| Receive of Message<'a>
|
||||||
| Sync of AsyncReplyChannel<unit>
|
| Sync of AsyncReplyChannel<unit>
|
||||||
|
|
||||||
|
[<RequireQualifiedAccess>]
|
||||||
|
module internal ServerUtils =
|
||||||
|
|
||||||
|
/// Return the maximum log index which a quorum has committed.
|
||||||
|
/// Recall that 0 means "nothing committed".
|
||||||
|
let maxLogAQuorumHasCommitted (matchIndex : int<LogIndex>[]) : int<LogIndex> =
|
||||||
|
let numberWhoCommittedIndex = matchIndex |> Array.countBy id
|
||||||
|
|
||||||
|
numberWhoCommittedIndex |> Array.sortInPlaceBy fst
|
||||||
|
|
||||||
|
let rec go (numberCounted : int) (result : int<LogIndex>) (i : int) =
|
||||||
|
if i < 0 then
|
||||||
|
result
|
||||||
|
else
|
||||||
|
let numberCounted = numberCounted + snd numberWhoCommittedIndex.[i]
|
||||||
|
|
||||||
|
if numberCounted > matchIndex.Length / 2 then
|
||||||
|
fst numberWhoCommittedIndex.[i]
|
||||||
|
else
|
||||||
|
go numberCounted (fst numberWhoCommittedIndex.[i]) (i - 1)
|
||||||
|
|
||||||
|
go 0 0<LogIndex> (numberWhoCommittedIndex.Length - 1)
|
||||||
|
|
||||||
type Server<'a>
|
type Server<'a>
|
||||||
(
|
(
|
||||||
clusterSize : int,
|
clusterSize : int,
|
||||||
@@ -195,46 +225,46 @@ type Server<'a>
|
|||||||
// We're definitely out of date. Switch to follower mode.
|
// We're definitely out of date. Switch to follower mode.
|
||||||
currentType <- ServerSpecialisation.Follower
|
currentType <- ServerSpecialisation.Follower
|
||||||
persistentState.AdvanceToTerm message.Term
|
persistentState.AdvanceToTerm message.Term
|
||||||
|
// TODO - `DropStaleResponse` suggests we should do this
|
||||||
|
//elif message.Term < persistentState.CurrentTerm then
|
||||||
|
// // Drop the message, it's old.
|
||||||
|
// ()
|
||||||
|
//else
|
||||||
|
|
||||||
match message with
|
match message with
|
||||||
| RequestVote message ->
|
| RequestVote message ->
|
||||||
// This was guaranteed above.
|
// This was guaranteed above.
|
||||||
assert (message.CandidateTerm <= persistentState.CurrentTerm)
|
assert (message.CandidateTerm <= persistentState.CurrentTerm)
|
||||||
|
|
||||||
// The following clauses define either condition under which we accept that the candidate has more data
|
// The following clause defines the condition under which we accept that the candidate has more data
|
||||||
// than we do, and so could be a more suitable leader than us.
|
// than we do, and so could be a more suitable leader than us.
|
||||||
|
|
||||||
// TODO collapse these clauses, it'll be much neater
|
// (This is the `logOk` of the paper.)
|
||||||
|
let candidateSupersedesMe =
|
||||||
let messageSupersedesMe =
|
match persistentState.GetLastLogEntry (), message.CandidateLastLogEntry with
|
||||||
// Is the candidate advertising a later term than our last-persisted write was made at?
|
| Some (_, ourLastEntry), Some candidateLastLogEntry ->
|
||||||
// (That would mean it's far in the future of us.)
|
// The candidate wins if:
|
||||||
match persistentState.GetLastLogEntry () with
|
// * it's from so far in the future that an election has happened which we haven't heard about; or
|
||||||
| Some (_, (_, ourLastTerm)) -> snd message.CandidateLastLogEntry > ourLastTerm
|
// * it's from the same term as us, but it's logged more than we have.
|
||||||
| None ->
|
candidateLastLogEntry.Term > ourLastEntry.Term
|
||||||
// We have persisted no history at all!
|
|| (candidateLastLogEntry.Term = ourLastEntry.Term
|
||||||
|
&& candidateLastLogEntry.Index >= persistentState.CurrentLogIndex)
|
||||||
|
| Some _, None ->
|
||||||
|
// We've logged something, and they have not. We're ahead, and won't vote for them.
|
||||||
|
false
|
||||||
|
| None, _ ->
|
||||||
|
// We have persisted no history at all.
|
||||||
|
// They take precedence - either they've logged something (in which case they must be ahead of us),
|
||||||
|
// or they haven't logged anything (in which case we'll defer to them for beating us to the
|
||||||
|
// first election).
|
||||||
true
|
true
|
||||||
|
|
||||||
let messageExtendsMe =
|
|
||||||
// Do we agree what the current term is, is the candidate advertising a more advanced log than us?
|
|
||||||
match persistentState.GetLastLogEntry () with
|
|
||||||
| Some (_, (_, ourLastTerm)) ->
|
|
||||||
snd message.CandidateLastLogEntry = ourLastTerm
|
|
||||||
&& fst message.CandidateLastLogEntry >= persistentState.CurrentLogIndex
|
|
||||||
| None ->
|
|
||||||
// We've persisted no history; the candidate needs to also be at the start of history,
|
|
||||||
// or else we'd have already considered them in the `messageSupersedesMe` check.
|
|
||||||
snd message.CandidateLastLogEntry = 0<Term>
|
|
||||||
|
|
||||||
// (This is the `logOk` of the paper. It's true iff the candidate's declaration is ahead of us.)
|
|
||||||
let candidateIsAhead = messageSupersedesMe || messageExtendsMe
|
|
||||||
|
|
||||||
// But just because the candidate is ahead of us, doesn't mean we can vote for them.
|
// But just because the candidate is ahead of us, doesn't mean we can vote for them.
|
||||||
// We can only vote for one candidate per election.
|
// We can only vote for one candidate per election.
|
||||||
// (We can't rely on our own VotedFor property, because that may have been in a previous election.)
|
// (We can't rely on our own VotedFor property, because that may have been in a previous election.)
|
||||||
|
|
||||||
let shouldVoteFor =
|
let shouldVoteFor =
|
||||||
if message.CandidateTerm = persistentState.CurrentTerm && candidateIsAhead then
|
if message.CandidateTerm = persistentState.CurrentTerm && candidateSupersedesMe then
|
||||||
// We agree on which election we're taking part in, and moreover we agree that the candidate is
|
// We agree on which election we're taking part in, and moreover we agree that the candidate is
|
||||||
// suitable.
|
// suitable.
|
||||||
match persistentState.VotedFor with
|
match persistentState.VotedFor with
|
||||||
@@ -278,7 +308,8 @@ type Server<'a>
|
|||||||
// Reject the request: the "leader" is actually outdated, it was only a leader in the past.
|
// Reject the request: the "leader" is actually outdated, it was only a leader in the past.
|
||||||
{
|
{
|
||||||
FollowerTerm = persistentState.CurrentTerm
|
FollowerTerm = persistentState.CurrentTerm
|
||||||
Success = false
|
Success = None
|
||||||
|
Follower = me
|
||||||
}
|
}
|
||||||
|> message.ReplyChannel
|
|> message.ReplyChannel
|
||||||
|
|
||||||
@@ -295,8 +326,9 @@ type Server<'a>
|
|||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
Success = true
|
Success = Some persistentState.CurrentLogIndex
|
||||||
FollowerTerm = persistentState.CurrentTerm
|
FollowerTerm = persistentState.CurrentTerm
|
||||||
|
Follower = me
|
||||||
}
|
}
|
||||||
|> message.ReplyChannel
|
|> message.ReplyChannel
|
||||||
|
|
||||||
@@ -308,7 +340,10 @@ type Server<'a>
|
|||||||
|
|
||||||
| Some (toInsert, toInsertTerm) ->
|
| Some (toInsert, toInsertTerm) ->
|
||||||
|
|
||||||
let desiredLogInsertionPosition = message.PrevLogEntry.Index + 1<LogIndex>
|
let desiredLogInsertionPosition =
|
||||||
|
match message.PrevLogEntry with
|
||||||
|
| None -> 1<LogIndex>
|
||||||
|
| Some entry -> entry.Index + 1<LogIndex>
|
||||||
|
|
||||||
match persistentState.GetLogEntry desiredLogInsertionPosition with
|
match persistentState.GetLogEntry desiredLogInsertionPosition with
|
||||||
| Some (_, existingTerm) when toInsertTerm = existingTerm ->
|
| Some (_, existingTerm) when toInsertTerm = existingTerm ->
|
||||||
@@ -322,8 +357,9 @@ type Server<'a>
|
|||||||
persistentState.AppendToLog toInsert toInsertTerm
|
persistentState.AppendToLog toInsert toInsertTerm
|
||||||
|
|
||||||
{
|
{
|
||||||
Success = true
|
Success = Some desiredLogInsertionPosition
|
||||||
FollowerTerm = persistentState.CurrentTerm
|
FollowerTerm = persistentState.CurrentTerm
|
||||||
|
Follower = me
|
||||||
}
|
}
|
||||||
|> message.ReplyChannel
|
|> message.ReplyChannel
|
||||||
|
|
||||||
@@ -336,25 +372,27 @@ type Server<'a>
|
|||||||
persistentState.AppendToLog toInsert toInsertTerm
|
persistentState.AppendToLog toInsert toInsertTerm
|
||||||
|
|
||||||
{
|
{
|
||||||
Success = true
|
Success = Some desiredLogInsertionPosition
|
||||||
FollowerTerm = persistentState.CurrentTerm
|
FollowerTerm = persistentState.CurrentTerm
|
||||||
|
Follower = me
|
||||||
}
|
}
|
||||||
|> message.ReplyChannel
|
|> message.ReplyChannel
|
||||||
|
|
||||||
let logIsConsistent (message : AppendEntriesMessage<'a>) : bool =
|
let logIsConsistent (message : AppendEntriesMessage<'a>) : bool =
|
||||||
if message.PrevLogEntry.Index = 0<LogIndex> then
|
match message.PrevLogEntry with
|
||||||
|
| None ->
|
||||||
// The leader advertises that they have no committed history, so certainly it's consistent with
|
// The leader advertises that they have no committed history, so certainly it's consistent with
|
||||||
// us.
|
// us.
|
||||||
true
|
true
|
||||||
else
|
| Some entry ->
|
||||||
|
|
||||||
match persistentState.GetLogEntry message.PrevLogEntry.Index with
|
match persistentState.GetLogEntry entry.Index with
|
||||||
| None ->
|
| None ->
|
||||||
// The leader's advertised commit is ahead of our history.
|
// The leader's advertised commit is ahead of our history.
|
||||||
false
|
false
|
||||||
| Some (_, ourTermForThisEntry) ->
|
| Some (_, ourTermForThisEntry) ->
|
||||||
// The leader's advertised commit is in our history; do we agree with it?
|
// The leader's advertised commit is in our history; do we agree with it?
|
||||||
ourTermForThisEntry = message.PrevLogEntry.Term
|
ourTermForThisEntry = entry.Term
|
||||||
|
|
||||||
match currentType with
|
match currentType with
|
||||||
| ServerSpecialisation.Leader _ ->
|
| ServerSpecialisation.Leader _ ->
|
||||||
@@ -373,7 +411,8 @@ type Server<'a>
|
|||||||
// Reject the request, it's inconsistent with our history.
|
// Reject the request, it's inconsistent with our history.
|
||||||
{
|
{
|
||||||
FollowerTerm = persistentState.CurrentTerm
|
FollowerTerm = persistentState.CurrentTerm
|
||||||
Success = false
|
Success = None
|
||||||
|
Follower = me
|
||||||
}
|
}
|
||||||
|> message.ReplyChannel
|
|> message.ReplyChannel
|
||||||
|
|
||||||
@@ -388,46 +427,48 @@ type Server<'a>
|
|||||||
assert (logIsConsistent message)
|
assert (logIsConsistent message)
|
||||||
acceptRequest ()
|
acceptRequest ()
|
||||||
|
|
||||||
let mailbox =
|
let divideByTwoRoundingUp (n : int) =
|
||||||
let rec loop (mailbox : MailboxProcessor<_>) =
|
if n % 2 = 0 then n / 2 else (n / 2) + 1
|
||||||
async {
|
|
||||||
let! m = mailbox.Receive ()
|
|
||||||
//let toPrint = sprintf "Processing message in server %i: %+A" me m
|
|
||||||
//System.Console.WriteLine toPrint
|
|
||||||
|
|
||||||
match m with
|
let processReply (r : Reply) : unit =
|
||||||
| ServerAction.BeginElection ->
|
match r with
|
||||||
|
| AppendEntriesReply appendEntriesReply ->
|
||||||
match currentType with
|
match currentType with
|
||||||
| ServerSpecialisation.Leader _ -> ()
|
|
||||||
| ServerSpecialisation.Candidate _
|
| ServerSpecialisation.Candidate _
|
||||||
| ServerSpecialisation.Follower ->
|
| ServerSpecialisation.Follower -> ()
|
||||||
|
| ServerSpecialisation.Leader leaderState ->
|
||||||
|
|
||||||
// Start the election!
|
if appendEntriesReply.FollowerTerm = persistentState.CurrentTerm then
|
||||||
currentType <- ServerSpecialisation.Candidate (CandidateState.New clusterSize me)
|
match appendEntriesReply.Success with
|
||||||
persistentState.IncrementTerm ()
|
| Some matchIndex ->
|
||||||
persistentState.Vote me
|
leaderState.MatchIndex.[appendEntriesReply.Follower / 1<ServerId>] <- matchIndex
|
||||||
|
leaderState.NextIndex.[appendEntriesReply.Follower / 1<ServerId>] <- matchIndex + 1<LogIndex>
|
||||||
for i in 0 .. clusterSize - 1 do
|
|
||||||
if i * 1<ServerId> <> me then
|
|
||||||
{
|
|
||||||
CandidateTerm = persistentState.CurrentTerm
|
|
||||||
CandidateId = me
|
|
||||||
CandidateLastLogEntry =
|
|
||||||
match persistentState.GetLastLogEntry () with
|
|
||||||
| Some (index, (_, term)) -> (index, term)
|
|
||||||
| None ->
|
| None ->
|
||||||
// TODO this is almost certainly not right
|
leaderState.NextIndex.[appendEntriesReply.Follower / 1<ServerId>] <-
|
||||||
(0<LogIndex>, 0<Term>)
|
max
|
||||||
ReplyChannel =
|
(leaderState.NextIndex.[appendEntriesReply.Follower / 1<ServerId>] - 1<LogIndex>)
|
||||||
// TODO this is bypassing the network - stop it!
|
1<LogIndex>
|
||||||
fun reply -> messageChannel me (RequestVoteReply reply |> Message.Reply)
|
// Note that the decision to process this *here* means the algorithm doesn't work in clusters of
|
||||||
|
// only one node, because then there will never be any AppendEntries replies.
|
||||||
|
let maxLogAQuorumHasCommitted =
|
||||||
|
ServerUtils.maxLogAQuorumHasCommitted leaderState.MatchIndex
|
||||||
|
|
||||||
|
if maxLogAQuorumHasCommitted > 0<LogIndex> then
|
||||||
|
let ourLogTerm =
|
||||||
|
match persistentState.GetLogEntry maxLogAQuorumHasCommitted with
|
||||||
|
| None ->
|
||||||
|
failwithf
|
||||||
|
"Invariant violated. The leader has not logged an entry (%i) which it knows a quorum has logged."
|
||||||
|
maxLogAQuorumHasCommitted
|
||||||
|
| Some (_, term) -> term
|
||||||
|
|
||||||
|
if ourLogTerm = persistentState.CurrentTerm then
|
||||||
|
volatileState <-
|
||||||
|
{ volatileState with
|
||||||
|
CommitIndex = maxLogAQuorumHasCommitted
|
||||||
}
|
}
|
||||||
|> Instruction.RequestVote
|
|
||||||
|> Message.Instruction
|
| RequestVoteReply requestVoteReply ->
|
||||||
|> messageChannel (i * 1<ServerId>)
|
|
||||||
| ServerAction.Receive (Instruction m) -> return processMessage m
|
|
||||||
| ServerAction.Sync reply -> reply.Reply ()
|
|
||||||
| ServerAction.Receive (Reply (RequestVoteReply requestVoteReply)) ->
|
|
||||||
match currentType with
|
match currentType with
|
||||||
| ServerSpecialisation.Leader _
|
| ServerSpecialisation.Leader _
|
||||||
| ServerSpecialisation.Follower ->
|
| ServerSpecialisation.Follower ->
|
||||||
@@ -451,6 +492,61 @@ type Server<'a>
|
|||||||
LeaderState.New clusterSize persistentState.CurrentLogIndex
|
LeaderState.New clusterSize persistentState.CurrentLogIndex
|
||||||
|> ServerSpecialisation.Leader
|
|> ServerSpecialisation.Leader
|
||||||
|
|
||||||
|
|
||||||
|
let mailbox =
|
||||||
|
let rec loop (mailbox : MailboxProcessor<_>) =
|
||||||
|
async {
|
||||||
|
let! m = mailbox.Receive ()
|
||||||
|
//let toPrint = sprintf "Processing message in server %i: %+A" me m
|
||||||
|
//System.Console.WriteLine toPrint
|
||||||
|
|
||||||
|
match m with
|
||||||
|
| ServerAction.EmitHeartbeat ->
|
||||||
|
match currentType with
|
||||||
|
| ServerSpecialisation.Candidate _
|
||||||
|
| ServerSpecialisation.Follower -> ()
|
||||||
|
| ServerSpecialisation.Leader _ ->
|
||||||
|
for i in 0 .. clusterSize - 1 do
|
||||||
|
if i * 1<ServerId> <> me then
|
||||||
|
{
|
||||||
|
LeaderTerm = persistentState.CurrentTerm
|
||||||
|
LeaderId = me
|
||||||
|
PrevLogEntry = persistentState.GetLastLogEntry () |> Option.map snd
|
||||||
|
NewEntry = None
|
||||||
|
LeaderCommitIndex = volatileState.CommitIndex
|
||||||
|
ReplyChannel =
|
||||||
|
fun reply ->
|
||||||
|
messageChannel me (reply |> Reply.AppendEntriesReply |> Message.Reply)
|
||||||
|
}
|
||||||
|
|> Instruction.AppendEntries
|
||||||
|
|> Message.Instruction
|
||||||
|
|> messageChannel (i * 1<ServerId>)
|
||||||
|
| ServerAction.BeginElection ->
|
||||||
|
match currentType with
|
||||||
|
| ServerSpecialisation.Leader _ -> ()
|
||||||
|
| ServerSpecialisation.Candidate _
|
||||||
|
| ServerSpecialisation.Follower ->
|
||||||
|
|
||||||
|
// Start the election!
|
||||||
|
currentType <- ServerSpecialisation.Candidate (CandidateState.New clusterSize me)
|
||||||
|
persistentState.IncrementTerm ()
|
||||||
|
persistentState.Vote me
|
||||||
|
|
||||||
|
for i in 0 .. clusterSize - 1 do
|
||||||
|
if i * 1<ServerId> <> me then
|
||||||
|
{
|
||||||
|
CandidateTerm = persistentState.CurrentTerm
|
||||||
|
CandidateId = me
|
||||||
|
CandidateLastLogEntry = persistentState.GetLastLogEntry () |> Option.map snd
|
||||||
|
ReplyChannel = fun reply -> messageChannel me (RequestVoteReply reply |> Message.Reply)
|
||||||
|
}
|
||||||
|
|> Instruction.RequestVote
|
||||||
|
|> Message.Instruction
|
||||||
|
|> messageChannel (i * 1<ServerId>)
|
||||||
|
| ServerAction.Receive (Instruction m) -> processMessage m
|
||||||
|
| ServerAction.Receive (Reply r) -> processReply r
|
||||||
|
| ServerAction.Sync replyChannel -> replyChannel.Reply ()
|
||||||
|
|
||||||
return! loop mailbox
|
return! loop mailbox
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -458,7 +554,8 @@ type Server<'a>
|
|||||||
mailbox.Error.Add raise
|
mailbox.Error.Add raise
|
||||||
mailbox
|
mailbox
|
||||||
|
|
||||||
member this.TriggerTimeout () = mailbox.Post ServerAction.BeginElection
|
member this.TriggerInactivityTimeout () = mailbox.Post ServerAction.BeginElection
|
||||||
|
member this.TriggerHeartbeatTimeout () = mailbox.Post ServerAction.EmitHeartbeat
|
||||||
|
|
||||||
member this.Message (m : Message<'a>) = mailbox.Post (ServerAction.Receive m)
|
member this.Message (m : Message<'a>) = mailbox.Post (ServerAction.Receive m)
|
||||||
|
|
||||||
|
@@ -57,12 +57,28 @@ module Program =
|
|||||||
printf "Unrecognised input. "
|
printf "Unrecognised input. "
|
||||||
None
|
None
|
||||||
|
|
||||||
|
let rec getHeartbeater (clusterSize : int) (serverId : string) =
|
||||||
|
// TODO: restrict this to the leaders only
|
||||||
|
match Int32.TryParse serverId with
|
||||||
|
| true, serverId ->
|
||||||
|
if serverId >= clusterSize || serverId < 0 then
|
||||||
|
printf "Server ID must be between 0 and %i inclusive. " (clusterSize - 1)
|
||||||
|
None
|
||||||
|
else
|
||||||
|
Some (serverId * 1<ServerId>)
|
||||||
|
| false, _ ->
|
||||||
|
printf "Unrecognised input. "
|
||||||
|
None
|
||||||
|
|
||||||
type UserAction =
|
type UserAction =
|
||||||
| Timeout of int<ServerId>
|
| InactivityTimeout of int<ServerId>
|
||||||
| NetworkMessage of int<ServerId> * int
|
| NetworkMessage of int<ServerId> * int
|
||||||
|
| DropMessage of int<ServerId> * int
|
||||||
|
| Heartbeat of int<ServerId>
|
||||||
|
|
||||||
let rec getAction (clusterSize : int) =
|
let rec getAction (clusterSize : int) =
|
||||||
printf "Enter action. Trigger [t]imeout <server id>, or allow [m]essage <server id, message id>: "
|
printf
|
||||||
|
"Enter action. Trigger [t]imeout <server id>, [h]eartbeat a leader <server id>, [d]rop message <server id, message id>, or allow [m]essage <server id, message id>: "
|
||||||
|
|
||||||
let s =
|
let s =
|
||||||
let rec go () =
|
let rec go () =
|
||||||
@@ -74,31 +90,62 @@ module Program =
|
|||||||
match s.[0] with
|
match s.[0] with
|
||||||
| 'T' ->
|
| 'T' ->
|
||||||
match getTimeout clusterSize s.[1..] with
|
match getTimeout clusterSize s.[1..] with
|
||||||
| Some t -> t |> Timeout
|
| Some t -> t |> InactivityTimeout
|
||||||
|
| None -> getAction clusterSize
|
||||||
|
| 'D' ->
|
||||||
|
match getMessage clusterSize s.[1..] with
|
||||||
|
| Some m -> m |> DropMessage
|
||||||
| None -> getAction clusterSize
|
| None -> getAction clusterSize
|
||||||
| 'M' ->
|
| 'M' ->
|
||||||
match getMessage clusterSize s.[1..] with
|
match getMessage clusterSize s.[1..] with
|
||||||
| Some m -> m |> NetworkMessage
|
| Some m -> m |> NetworkMessage
|
||||||
| None -> getAction clusterSize
|
| None -> getAction clusterSize
|
||||||
|
| 'H' ->
|
||||||
|
match getHeartbeater clusterSize s.[1..] with
|
||||||
|
| Some h -> Heartbeat h
|
||||||
|
| None -> getAction clusterSize
|
||||||
| _ ->
|
| _ ->
|
||||||
printf "Unrecognised input. "
|
printf "Unrecognised input. "
|
||||||
getAction clusterSize
|
getAction clusterSize
|
||||||
|
|
||||||
|
let processAction (cluster : Cluster<'a>) (network : Network<'a>) (action : UserAction) : unit =
|
||||||
|
match action with
|
||||||
|
| InactivityTimeout serverId -> cluster.InactivityTimeout serverId
|
||||||
|
| Heartbeat serverId -> cluster.HeartbeatTimeout serverId
|
||||||
|
| DropMessage (serverId, messageId) -> network.DropMessage serverId messageId
|
||||||
|
| NetworkMessage (serverId, messageId) ->
|
||||||
|
network.InboundMessage serverId messageId |> cluster.SendMessage serverId
|
||||||
|
network.DropMessage serverId messageId
|
||||||
|
|
||||||
|
|
||||||
[<EntryPoint>]
|
[<EntryPoint>]
|
||||||
let main _argv =
|
let main _argv =
|
||||||
let clusterSize = 5
|
let clusterSize = 5
|
||||||
let cluster, network = InMemoryCluster.make<int> clusterSize
|
let cluster, network = InMemoryCluster.make<int> clusterSize
|
||||||
|
|
||||||
|
let startupSequence =
|
||||||
|
[
|
||||||
|
UserAction.InactivityTimeout 0<ServerId>
|
||||||
|
UserAction.NetworkMessage (1<ServerId>, 0)
|
||||||
|
UserAction.NetworkMessage (2<ServerId>, 0)
|
||||||
|
UserAction.DropMessage (3<ServerId>, 0)
|
||||||
|
UserAction.DropMessage (4<ServerId>, 0)
|
||||||
|
UserAction.NetworkMessage (0<ServerId>, 0)
|
||||||
|
UserAction.NetworkMessage (0<ServerId>, 1)
|
||||||
|
]
|
||||||
|
|
||||||
|
for action in startupSequence do
|
||||||
|
processAction cluster network action
|
||||||
|
|
||||||
while true do
|
while true do
|
||||||
printNetworkState network
|
printNetworkState network
|
||||||
printClusterState cluster
|
printClusterState cluster
|
||||||
|
|
||||||
let action = getAction clusterSize
|
let action = getAction clusterSize
|
||||||
|
processAction cluster network action
|
||||||
|
|
||||||
match action with
|
// TODO: log out the committed state so that we can see whether the leader is correctly
|
||||||
| Timeout serverId -> cluster.Timeout serverId
|
// processing heartbeat responses
|
||||||
| NetworkMessage (serverId, messageId) ->
|
// TODO: allow client queries!
|
||||||
network.InboundMessage serverId messageId |> cluster.SendMessage serverId
|
|
||||||
network.DropMessage serverId messageId
|
|
||||||
|
|
||||||
0
|
0
|
||||||
|
Reference in New Issue
Block a user