diff --git a/Raft.Test/TestInMemoryServer.fs b/Raft.Test/TestInMemoryServer.fs index 46cb1d0..c5e6795 100644 --- a/Raft.Test/TestInMemoryServer.fs +++ b/Raft.Test/TestInMemoryServer.fs @@ -64,24 +64,22 @@ module TestInMemoryServer = let ``Startup sequence in prod, only one timeout takes place`` () = let cluster, network = InMemoryCluster.make 5 - cluster.Servers.[0].TriggerInactivityTimeout () - cluster.Servers.[0].Sync () + NetworkAction.InactivityTimeout 0 + |> NetworkAction.perform cluster network // We sent a message to every other server; process them. for i in 1..4 do let server = i * 1 (network.AllInboundMessages server).Length |> shouldEqual 1 - let message = network.InboundMessage server 0 - network.DropMessage server 0 - cluster.SendMessageDirectly server message + + NetworkAction.NetworkMessage (server, 0) + |> NetworkAction.perform cluster network (network.AllInboundMessages 0).Length |> shouldEqual i for i in 1..4 do - network.InboundMessage 0 (i - 1) - |> cluster.SendMessageDirectly 0 - - network.DropMessage 0 (i - 1) + NetworkAction.NetworkMessage (0, (i - 1)) + |> NetworkAction.perform cluster network // (the messages we've already processed) (network.AllInboundMessages 0).Length |> shouldEqual 4 @@ -158,10 +156,11 @@ module TestInMemoryServer = let rand = System.Random () let cluster, network = InMemoryCluster.make 5 - cluster.Servers.[0].TriggerInactivityTimeout () - cluster.Servers.[0].Sync () - cluster.Servers.[1].TriggerInactivityTimeout () - cluster.Servers.[1].Sync () + NetworkAction.InactivityTimeout 0 + |> NetworkAction.perform cluster network + + NetworkAction.InactivityTimeout 1 + |> NetworkAction.perform cluster network // Those two each sent a message to every other server. (network.AllInboundMessages 0).Length |> shouldEqual 1 @@ -186,8 +185,8 @@ module TestInMemoryServer = match cluster.Servers.[0].State, cluster.Servers.[1].State with | Leader _, Leader _ -> failwith "Unexpectedly had two leaders" | Candidate _, Candidate _ -> failwith "Unexpectedly failed to elect a leader" - | Leader 1, Candidate 1 - | Candidate 1, Leader 1 -> () + | Leader 1, Follower + | Follower, Leader 1 -> () | s1, s2 -> failwithf "Unexpected state: %O %O" s1 s2 for i in 2..4 do @@ -215,10 +214,11 @@ module TestInMemoryServer = let ``Startup sequence in prod, two timeouts at once, property-based: at most one leader is elected`` () = let cluster, network = InMemoryCluster.make 5 - cluster.Servers.[0].TriggerInactivityTimeout () - cluster.Servers.[0].Sync () - cluster.Servers.[1].TriggerInactivityTimeout () - cluster.Servers.[1].Sync () + NetworkAction.InactivityTimeout 0 + |> NetworkAction.perform cluster network + + NetworkAction.InactivityTimeout 1 + |> NetworkAction.perform cluster network // Those two each sent a message to every other server. (network.AllInboundMessages 0).Length |> shouldEqual 1 diff --git a/Raft/InMemory.fs b/Raft/InMemory.fs index a2d1b40..830b927 100644 --- a/Raft/InMemory.fs +++ b/Raft/InMemory.fs @@ -84,3 +84,21 @@ module InMemoryCluster = } cluster, network + +type NetworkAction = + | InactivityTimeout of int + | NetworkMessage of int * int + | DropMessage of int * int + | Heartbeat of int + +[] +module NetworkAction = + + let perform<'a> (cluster : Cluster<'a>) (network : Network<'a>) (action : NetworkAction) : unit = + match action with + | InactivityTimeout serverId -> cluster.InactivityTimeout serverId + | Heartbeat serverId -> cluster.HeartbeatTimeout serverId + | DropMessage (serverId, messageId) -> network.DropMessage serverId messageId + | NetworkMessage (serverId, messageId) -> + network.InboundMessage serverId messageId |> cluster.SendMessage serverId + network.DropMessage serverId messageId diff --git a/Raft/Server.fs b/Raft/Server.fs index 0367195..bbcdd2e 100644 --- a/Raft/Server.fs +++ b/Raft/Server.fs @@ -430,6 +430,26 @@ type Server<'a> let divideByTwoRoundingUp (n : int) = if n % 2 = 0 then n / 2 else (n / 2) + 1 + let emitHeartbeat () = + match currentType with + | ServerSpecialisation.Candidate _ + | ServerSpecialisation.Follower -> () + | ServerSpecialisation.Leader _ -> + for i in 0 .. clusterSize - 1 do + if i * 1 <> me then + { + LeaderTerm = persistentState.CurrentTerm + LeaderId = me + PrevLogEntry = persistentState.GetLastLogEntry () |> Option.map snd + NewEntry = None + LeaderCommitIndex = volatileState.CommitIndex + ReplyChannel = + fun reply -> messageChannel me (reply |> Reply.AppendEntriesReply |> Message.Reply) + } + |> Instruction.AppendEntries + |> Message.Instruction + |> messageChannel (i * 1) + let processReply (r : Reply) : unit = match r with | AppendEntriesReply appendEntriesReply -> @@ -492,6 +512,7 @@ type Server<'a> LeaderState.New clusterSize persistentState.CurrentLogIndex |> ServerSpecialisation.Leader + emitHeartbeat () let mailbox = let rec loop (mailbox : MailboxProcessor<_>) = @@ -501,26 +522,7 @@ type Server<'a> //System.Console.WriteLine toPrint match m with - | ServerAction.EmitHeartbeat -> - match currentType with - | ServerSpecialisation.Candidate _ - | ServerSpecialisation.Follower -> () - | ServerSpecialisation.Leader _ -> - for i in 0 .. clusterSize - 1 do - if i * 1 <> me then - { - LeaderTerm = persistentState.CurrentTerm - LeaderId = me - PrevLogEntry = persistentState.GetLastLogEntry () |> Option.map snd - NewEntry = None - LeaderCommitIndex = volatileState.CommitIndex - ReplyChannel = - fun reply -> - messageChannel me (reply |> Reply.AppendEntriesReply |> Message.Reply) - } - |> Instruction.AppendEntries - |> Message.Instruction - |> messageChannel (i * 1) + | ServerAction.EmitHeartbeat -> emitHeartbeat () | ServerAction.BeginElection -> match currentType with | ServerSpecialisation.Leader _ -> () diff --git a/RaftExplorer/Program.fs b/RaftExplorer/Program.fs index c5dea0d..5d4d4a2 100644 --- a/RaftExplorer/Program.fs +++ b/RaftExplorer/Program.fs @@ -70,12 +70,6 @@ module Program = printf "Unrecognised input. " None - type UserAction = - | InactivityTimeout of int - | NetworkMessage of int * int - | DropMessage of int * int - | Heartbeat of int - let rec getAction (clusterSize : int) = printf "Enter action. Trigger [t]imeout , [h]eartbeat a leader , [d]rop message , or allow [m]essage : " @@ -108,15 +102,17 @@ module Program = printf "Unrecognised input. " getAction clusterSize - let processAction (cluster : Cluster<'a>) (network : Network<'a>) (action : UserAction) : unit = - match action with - | InactivityTimeout serverId -> cluster.InactivityTimeout serverId - | Heartbeat serverId -> cluster.HeartbeatTimeout serverId - | DropMessage (serverId, messageId) -> network.DropMessage serverId messageId - | NetworkMessage (serverId, messageId) -> - network.InboundMessage serverId messageId |> cluster.SendMessage serverId - network.DropMessage serverId messageId - + let electLeader = + [ + NetworkAction.InactivityTimeout 0 + NetworkAction.NetworkMessage (1, 0) + NetworkAction.NetworkMessage (2, 0) + NetworkAction.DropMessage (3, 0) + NetworkAction.DropMessage (4, 0) + NetworkAction.NetworkMessage (0, 0) + NetworkAction.NetworkMessage (0, 1) + // At this point, server 0 is leader in an uncontested election. + ] [] let main _argv = @@ -125,24 +121,45 @@ module Program = let startupSequence = [ - UserAction.InactivityTimeout 0 - UserAction.NetworkMessage (1, 0) - UserAction.NetworkMessage (2, 0) - UserAction.DropMessage (3, 0) - UserAction.DropMessage (4, 0) - UserAction.NetworkMessage (0, 0) - UserAction.NetworkMessage (0, 1) + NetworkAction.InactivityTimeout 0 + NetworkAction.InactivityTimeout 1 + // Two servers vote for server 1... + NetworkAction.NetworkMessage (2, 1) + NetworkAction.NetworkMessage (3, 1) + // One server votes for server 0... + NetworkAction.NetworkMessage (4, 0) + // and the other votes are processed and discarded + NetworkAction.NetworkMessage (0, 0) + NetworkAction.NetworkMessage (1, 0) + NetworkAction.NetworkMessage (2, 0) + NetworkAction.NetworkMessage (3, 0) + NetworkAction.NetworkMessage (4, 1) + // Server 0 process incoming votes + NetworkAction.NetworkMessage (0, 1) + // Server 1 processes incoming votes, and achieves majority, electing itself leader! + NetworkAction.NetworkMessage (1, 1) + NetworkAction.NetworkMessage (1, 2) + // Get the followers' heartbeat processing out of the way + NetworkAction.NetworkMessage (2, 2) + NetworkAction.NetworkMessage (3, 2) + NetworkAction.NetworkMessage (4, 2) + NetworkAction.NetworkMessage (1, 3) + NetworkAction.NetworkMessage (1, 4) + NetworkAction.NetworkMessage (1, 5) + // Server 0 processes the leader's heartbeat and drops out of the election. + NetworkAction.NetworkMessage (0, 2) + NetworkAction.NetworkMessage (1, 6) ] for action in startupSequence do - processAction cluster network action + NetworkAction.perform cluster network action while true do printNetworkState network printClusterState cluster let action = getAction clusterSize - processAction cluster network action + NetworkAction.perform cluster network action // TODO: log out the committed state so that we can see whether the leader is correctly // processing heartbeat responses