Lift the declarative history definition to the library
This commit is contained in:
@@ -64,24 +64,22 @@ module TestInMemoryServer =
|
||||
let ``Startup sequence in prod, only one timeout takes place`` () =
|
||||
let cluster, network = InMemoryCluster.make<int> 5
|
||||
|
||||
cluster.Servers.[0].TriggerInactivityTimeout ()
|
||||
cluster.Servers.[0].Sync ()
|
||||
NetworkAction.InactivityTimeout 0<ServerId>
|
||||
|> NetworkAction.perform cluster network
|
||||
|
||||
// We sent a message to every other server; process them.
|
||||
for i in 1..4 do
|
||||
let server = i * 1<ServerId>
|
||||
(network.AllInboundMessages server).Length |> shouldEqual 1
|
||||
let message = network.InboundMessage server 0
|
||||
network.DropMessage server 0
|
||||
cluster.SendMessageDirectly server message
|
||||
|
||||
NetworkAction.NetworkMessage (server, 0)
|
||||
|> NetworkAction.perform cluster network
|
||||
|
||||
(network.AllInboundMessages 0<ServerId>).Length |> shouldEqual i
|
||||
|
||||
for i in 1..4 do
|
||||
network.InboundMessage 0<ServerId> (i - 1)
|
||||
|> cluster.SendMessageDirectly 0<ServerId>
|
||||
|
||||
network.DropMessage 0<ServerId> (i - 1)
|
||||
NetworkAction.NetworkMessage (0<ServerId>, (i - 1))
|
||||
|> NetworkAction.perform cluster network
|
||||
|
||||
// (the messages we've already processed)
|
||||
(network.AllInboundMessages 0<ServerId>).Length |> shouldEqual 4
|
||||
@@ -158,10 +156,11 @@ module TestInMemoryServer =
|
||||
let rand = System.Random ()
|
||||
let cluster, network = InMemoryCluster.make<int> 5
|
||||
|
||||
cluster.Servers.[0].TriggerInactivityTimeout ()
|
||||
cluster.Servers.[0].Sync ()
|
||||
cluster.Servers.[1].TriggerInactivityTimeout ()
|
||||
cluster.Servers.[1].Sync ()
|
||||
NetworkAction.InactivityTimeout 0<ServerId>
|
||||
|> NetworkAction.perform cluster network
|
||||
|
||||
NetworkAction.InactivityTimeout 1<ServerId>
|
||||
|> NetworkAction.perform cluster network
|
||||
|
||||
// Those two each sent a message to every other server.
|
||||
(network.AllInboundMessages 0<ServerId>).Length |> shouldEqual 1
|
||||
@@ -186,8 +185,8 @@ module TestInMemoryServer =
|
||||
match cluster.Servers.[0].State, cluster.Servers.[1].State with
|
||||
| Leader _, Leader _ -> failwith "Unexpectedly had two leaders"
|
||||
| Candidate _, Candidate _ -> failwith "Unexpectedly failed to elect a leader"
|
||||
| Leader 1<Term>, Candidate 1<Term>
|
||||
| Candidate 1<Term>, Leader 1<Term> -> ()
|
||||
| Leader 1<Term>, Follower
|
||||
| Follower, Leader 1<Term> -> ()
|
||||
| s1, s2 -> failwithf "Unexpected state: %O %O" s1 s2
|
||||
|
||||
for i in 2..4 do
|
||||
@@ -215,10 +214,11 @@ module TestInMemoryServer =
|
||||
let ``Startup sequence in prod, two timeouts at once, property-based: at most one leader is elected`` () =
|
||||
let cluster, network = InMemoryCluster.make<int> 5
|
||||
|
||||
cluster.Servers.[0].TriggerInactivityTimeout ()
|
||||
cluster.Servers.[0].Sync ()
|
||||
cluster.Servers.[1].TriggerInactivityTimeout ()
|
||||
cluster.Servers.[1].Sync ()
|
||||
NetworkAction.InactivityTimeout 0<ServerId>
|
||||
|> NetworkAction.perform cluster network
|
||||
|
||||
NetworkAction.InactivityTimeout 1<ServerId>
|
||||
|> NetworkAction.perform cluster network
|
||||
|
||||
// Those two each sent a message to every other server.
|
||||
(network.AllInboundMessages 0<ServerId>).Length |> shouldEqual 1
|
||||
|
@@ -84,3 +84,21 @@ module InMemoryCluster =
|
||||
}
|
||||
|
||||
cluster, network
|
||||
|
||||
type NetworkAction =
|
||||
| InactivityTimeout of int<ServerId>
|
||||
| NetworkMessage of int<ServerId> * int
|
||||
| DropMessage of int<ServerId> * int
|
||||
| Heartbeat of int<ServerId>
|
||||
|
||||
[<RequireQualifiedAccess>]
|
||||
module NetworkAction =
|
||||
|
||||
let perform<'a> (cluster : Cluster<'a>) (network : Network<'a>) (action : NetworkAction) : unit =
|
||||
match action with
|
||||
| InactivityTimeout serverId -> cluster.InactivityTimeout serverId
|
||||
| Heartbeat serverId -> cluster.HeartbeatTimeout serverId
|
||||
| DropMessage (serverId, messageId) -> network.DropMessage serverId messageId
|
||||
| NetworkMessage (serverId, messageId) ->
|
||||
network.InboundMessage serverId messageId |> cluster.SendMessage serverId
|
||||
network.DropMessage serverId messageId
|
||||
|
@@ -430,6 +430,26 @@ type Server<'a>
|
||||
let divideByTwoRoundingUp (n : int) =
|
||||
if n % 2 = 0 then n / 2 else (n / 2) + 1
|
||||
|
||||
let emitHeartbeat () =
|
||||
match currentType with
|
||||
| ServerSpecialisation.Candidate _
|
||||
| ServerSpecialisation.Follower -> ()
|
||||
| ServerSpecialisation.Leader _ ->
|
||||
for i in 0 .. clusterSize - 1 do
|
||||
if i * 1<ServerId> <> me then
|
||||
{
|
||||
LeaderTerm = persistentState.CurrentTerm
|
||||
LeaderId = me
|
||||
PrevLogEntry = persistentState.GetLastLogEntry () |> Option.map snd
|
||||
NewEntry = None
|
||||
LeaderCommitIndex = volatileState.CommitIndex
|
||||
ReplyChannel =
|
||||
fun reply -> messageChannel me (reply |> Reply.AppendEntriesReply |> Message.Reply)
|
||||
}
|
||||
|> Instruction.AppendEntries
|
||||
|> Message.Instruction
|
||||
|> messageChannel (i * 1<ServerId>)
|
||||
|
||||
let processReply (r : Reply) : unit =
|
||||
match r with
|
||||
| AppendEntriesReply appendEntriesReply ->
|
||||
@@ -492,6 +512,7 @@ type Server<'a>
|
||||
LeaderState.New clusterSize persistentState.CurrentLogIndex
|
||||
|> ServerSpecialisation.Leader
|
||||
|
||||
emitHeartbeat ()
|
||||
|
||||
let mailbox =
|
||||
let rec loop (mailbox : MailboxProcessor<_>) =
|
||||
@@ -501,26 +522,7 @@ type Server<'a>
|
||||
//System.Console.WriteLine toPrint
|
||||
|
||||
match m with
|
||||
| ServerAction.EmitHeartbeat ->
|
||||
match currentType with
|
||||
| ServerSpecialisation.Candidate _
|
||||
| ServerSpecialisation.Follower -> ()
|
||||
| ServerSpecialisation.Leader _ ->
|
||||
for i in 0 .. clusterSize - 1 do
|
||||
if i * 1<ServerId> <> me then
|
||||
{
|
||||
LeaderTerm = persistentState.CurrentTerm
|
||||
LeaderId = me
|
||||
PrevLogEntry = persistentState.GetLastLogEntry () |> Option.map snd
|
||||
NewEntry = None
|
||||
LeaderCommitIndex = volatileState.CommitIndex
|
||||
ReplyChannel =
|
||||
fun reply ->
|
||||
messageChannel me (reply |> Reply.AppendEntriesReply |> Message.Reply)
|
||||
}
|
||||
|> Instruction.AppendEntries
|
||||
|> Message.Instruction
|
||||
|> messageChannel (i * 1<ServerId>)
|
||||
| ServerAction.EmitHeartbeat -> emitHeartbeat ()
|
||||
| ServerAction.BeginElection ->
|
||||
match currentType with
|
||||
| ServerSpecialisation.Leader _ -> ()
|
||||
|
@@ -70,12 +70,6 @@ module Program =
|
||||
printf "Unrecognised input. "
|
||||
None
|
||||
|
||||
type UserAction =
|
||||
| InactivityTimeout of int<ServerId>
|
||||
| NetworkMessage of int<ServerId> * int
|
||||
| DropMessage of int<ServerId> * int
|
||||
| Heartbeat of int<ServerId>
|
||||
|
||||
let rec getAction (clusterSize : int) =
|
||||
printf
|
||||
"Enter action. Trigger [t]imeout <server id>, [h]eartbeat a leader <server id>, [d]rop message <server id, message id>, or allow [m]essage <server id, message id>: "
|
||||
@@ -108,15 +102,17 @@ module Program =
|
||||
printf "Unrecognised input. "
|
||||
getAction clusterSize
|
||||
|
||||
let processAction (cluster : Cluster<'a>) (network : Network<'a>) (action : UserAction) : unit =
|
||||
match action with
|
||||
| InactivityTimeout serverId -> cluster.InactivityTimeout serverId
|
||||
| Heartbeat serverId -> cluster.HeartbeatTimeout serverId
|
||||
| DropMessage (serverId, messageId) -> network.DropMessage serverId messageId
|
||||
| NetworkMessage (serverId, messageId) ->
|
||||
network.InboundMessage serverId messageId |> cluster.SendMessage serverId
|
||||
network.DropMessage serverId messageId
|
||||
|
||||
let electLeader =
|
||||
[
|
||||
NetworkAction.InactivityTimeout 0<ServerId>
|
||||
NetworkAction.NetworkMessage (1<ServerId>, 0)
|
||||
NetworkAction.NetworkMessage (2<ServerId>, 0)
|
||||
NetworkAction.DropMessage (3<ServerId>, 0)
|
||||
NetworkAction.DropMessage (4<ServerId>, 0)
|
||||
NetworkAction.NetworkMessage (0<ServerId>, 0)
|
||||
NetworkAction.NetworkMessage (0<ServerId>, 1)
|
||||
// At this point, server 0 is leader in an uncontested election.
|
||||
]
|
||||
|
||||
[<EntryPoint>]
|
||||
let main _argv =
|
||||
@@ -125,24 +121,45 @@ module Program =
|
||||
|
||||
let startupSequence =
|
||||
[
|
||||
UserAction.InactivityTimeout 0<ServerId>
|
||||
UserAction.NetworkMessage (1<ServerId>, 0)
|
||||
UserAction.NetworkMessage (2<ServerId>, 0)
|
||||
UserAction.DropMessage (3<ServerId>, 0)
|
||||
UserAction.DropMessage (4<ServerId>, 0)
|
||||
UserAction.NetworkMessage (0<ServerId>, 0)
|
||||
UserAction.NetworkMessage (0<ServerId>, 1)
|
||||
NetworkAction.InactivityTimeout 0<ServerId>
|
||||
NetworkAction.InactivityTimeout 1<ServerId>
|
||||
// Two servers vote for server 1...
|
||||
NetworkAction.NetworkMessage (2<ServerId>, 1)
|
||||
NetworkAction.NetworkMessage (3<ServerId>, 1)
|
||||
// One server votes for server 0...
|
||||
NetworkAction.NetworkMessage (4<ServerId>, 0)
|
||||
// and the other votes are processed and discarded
|
||||
NetworkAction.NetworkMessage (0<ServerId>, 0)
|
||||
NetworkAction.NetworkMessage (1<ServerId>, 0)
|
||||
NetworkAction.NetworkMessage (2<ServerId>, 0)
|
||||
NetworkAction.NetworkMessage (3<ServerId>, 0)
|
||||
NetworkAction.NetworkMessage (4<ServerId>, 1)
|
||||
// Server 0 process incoming votes
|
||||
NetworkAction.NetworkMessage (0<ServerId>, 1)
|
||||
// Server 1 processes incoming votes, and achieves majority, electing itself leader!
|
||||
NetworkAction.NetworkMessage (1<ServerId>, 1)
|
||||
NetworkAction.NetworkMessage (1<ServerId>, 2)
|
||||
// Get the followers' heartbeat processing out of the way
|
||||
NetworkAction.NetworkMessage (2<ServerId>, 2)
|
||||
NetworkAction.NetworkMessage (3<ServerId>, 2)
|
||||
NetworkAction.NetworkMessage (4<ServerId>, 2)
|
||||
NetworkAction.NetworkMessage (1<ServerId>, 3)
|
||||
NetworkAction.NetworkMessage (1<ServerId>, 4)
|
||||
NetworkAction.NetworkMessage (1<ServerId>, 5)
|
||||
// Server 0 processes the leader's heartbeat and drops out of the election.
|
||||
NetworkAction.NetworkMessage (0<ServerId>, 2)
|
||||
NetworkAction.NetworkMessage (1<ServerId>, 6)
|
||||
]
|
||||
|
||||
for action in startupSequence do
|
||||
processAction cluster network action
|
||||
NetworkAction.perform cluster network action
|
||||
|
||||
while true do
|
||||
printNetworkState network
|
||||
printClusterState cluster
|
||||
|
||||
let action = getAction clusterSize
|
||||
processAction cluster network action
|
||||
NetworkAction.perform cluster network action
|
||||
|
||||
// TODO: log out the committed state so that we can see whether the leader is correctly
|
||||
// processing heartbeat responses
|
||||
|
Reference in New Issue
Block a user