namespace Raft.Test open System.Threading open Raft open NUnit.Framework open FsUnitTyped open FsCheck [] module TestInMemoryServer = [] let ``Startup sequence, first fumbling steps`` () = let cluster, network = InMemoryCluster.make 5 let logger, logs = TestLogger.make () // Candidate 1 asks server 0 to vote for it. { CandidateTerm = 0 CandidateId = 1 ReplyChannel = fun message -> logger (sprintf "Received message for term %i" message.VoterTerm) CandidateLastLogEntry = None } |> Instruction.RequestVote |> Message.Instruction |> cluster.SendMessageDirectly 0 logs () |> shouldEqual [ "Received message for term 0" ] // Candidate 1 asks to be elected again! This is fine, maybe the network is replaying requests // and the network swallowed our reply, so we should reply in the same way. { CandidateTerm = 0 CandidateId = 1 ReplyChannel = fun message -> logger (sprintf "Received message for term %i" message.VoterTerm) CandidateLastLogEntry = None } |> Instruction.RequestVote |> Message.Instruction |> cluster.SendMessageDirectly 0 logs () |> shouldEqual [ "Received message for term 0" ; "Received message for term 0" ] // Candidate 2 asks to be elected. We won't vote for them, because we've already voted. // and the network swallowed our reply, so we should reply in the same way. let calls = ref 0 { CandidateTerm = 0 CandidateId = 2 ReplyChannel = fun _ -> Interlocked.Increment calls |> ignore CandidateLastLogEntry = None } |> Instruction.RequestVote |> Message.Instruction |> cluster.SendMessageDirectly 0 calls.Value |> shouldEqual 0 [] let ``Startup sequence in prod, only one timeout takes place`` () = let cluster, network = InMemoryCluster.make 5 NetworkAction.InactivityTimeout 0 |> NetworkAction.perform cluster network // We sent a message to every other server; process them. for i in 1..4 do let server = i * 1 (network.AllInboundMessages server).Length |> shouldEqual 1 NetworkAction.NetworkMessage (server, 0) |> NetworkAction.perform cluster network (network.AllInboundMessages 0).Length |> shouldEqual i for i in 1..4 do NetworkAction.NetworkMessage (0, (i - 1)) |> NetworkAction.perform cluster network // (the messages we've already processed) (network.AllInboundMessages 0).Length |> shouldEqual 4 (network.UndeliveredMessages 0).Length |> shouldEqual 0 cluster.Servers.[0].State |> shouldEqual (ServerStatus.Leader 1) for i in 1..4 do cluster.Servers.[i].State |> shouldEqual ServerStatus.Follower let popOne (queues : 'a list list) : ((int * 'a) * 'a list list) list = queues |> List.indexed |> List.filter (fun (index, l) -> not (List.isEmpty l)) |> List.collect (fun (firstPlaceWithInstruction, entries) -> entries |> List.indexed |> List.map (fun (i, entry) -> (firstPlaceWithInstruction, entry), List.removeAt i entries) |> List.map (fun (removed, rest) -> let afterPop = queues |> List.removeAt firstPlaceWithInstruction |> List.insertAt firstPlaceWithInstruction rest removed, afterPop ) ) let rec allOrderings (queues : 'a list list) : (int * 'a) list list = let output = popOne queues match output with | [] -> [ [] ] | output -> output |> List.collect (fun (extracted, remaining) -> let sub = allOrderings remaining sub |> List.map (fun s -> extracted :: s) ) let factorial i = let rec go acc i = if i <= 0 then acc else go (acc * i) (i - 1) go 1 i [] [] [] [] [] let ``Test factorial`` (n : int, result : int) = factorial n |> shouldEqual result [] let ``Test allOrderings`` () = let case = [ [ "a" ; "b" ] ; [ "c" ; "d" ; "e" ] ] let output = case |> allOrderings output |> shouldEqual (List.distinct output) output |> List.length |> shouldEqual (factorial (List.concat case |> List.length)) let allElements = Set.ofList (List.concat case) for output in output do output |> List.map snd |> Set.ofList |> shouldEqual allElements let randomChoice<'a> (r : System.Random) (arr : 'a list) : 'a = arr.[r.Next (0, arr.Length)] [] let ``Startup sequence in prod, two timeouts at once, random`` () = let rand = System.Random () let cluster, network = InMemoryCluster.make 5 NetworkAction.InactivityTimeout 0 |> NetworkAction.perform cluster network NetworkAction.InactivityTimeout 1 |> NetworkAction.perform cluster network // Those two each sent a message to every other server. (network.AllInboundMessages 0).Length |> shouldEqual 1 (network.AllInboundMessages 1).Length |> shouldEqual 1 for i in 2..4 do let server = i * 1 (network.AllInboundMessages server).Length |> shouldEqual 2 while network.AllUndeliveredMessages () |> Seq.concat |> Seq.isEmpty |> not do let allOrderings' = network.AllUndeliveredMessages () |> List.map List.ofSeq |> allOrderings // Process the messages! let ordering = randomChoice rand allOrderings' for serverConsuming, (messageId, message) in ordering do let serverConsuming = serverConsuming * 1 cluster.SendMessageDirectly serverConsuming message network.DropMessage serverConsuming messageId match cluster.Servers.[0].State, cluster.Servers.[1].State with | Leader _, Leader _ -> failwith "Unexpectedly had two leaders" | Candidate _, Candidate _ -> failwith "Unexpectedly failed to elect a leader" | Leader 1, Follower | Follower, Leader 1 -> () | s1, s2 -> failwithf "Unexpected state: %O %O" s1 s2 for i in 2..4 do cluster.Servers.[i].State |> shouldEqual ServerStatus.Follower type History = History of (int * int) list let historyGen (clusterSize : int) = gen { let! pile = Gen.choose (0, clusterSize - 1) let! entry = Arb.generate return (pile * 1, abs entry) } |> Gen.listOf |> Gen.map History let apply (History history) (cluster : Cluster<'a>) (network : Network<'a>) : unit = for pile, entry in history do let messages = network.AllInboundMessages pile if entry < messages.Length then cluster.SendMessageDirectly pile messages.[entry] [] let ``Startup sequence in prod, two timeouts at once, property-based: at most one leader is elected`` () = let cluster, network = InMemoryCluster.make 5 NetworkAction.InactivityTimeout 0 |> NetworkAction.perform cluster network NetworkAction.InactivityTimeout 1 |> NetworkAction.perform cluster network // Those two each sent a message to every other server. (network.AllInboundMessages 0).Length |> shouldEqual 1 (network.AllInboundMessages 1).Length |> shouldEqual 1 for i in 2..4 do (network.AllInboundMessages (i * 1)).Length |> shouldEqual 2 let property (history : History) = apply history cluster network match cluster.Servers.[0].State, cluster.Servers.[1].State with | Leader _, Leader _ -> failwith "Unexpectedly elected two leaders" | _, _ -> () for i in 2..4 do cluster.Servers.[i].State |> shouldEqual ServerStatus.Follower property |> Prop.forAll (Arb.fromGen (historyGen 5)) |> Check.QuickThrowOnFailure [] let ``Heartbeat is rejected if an update hasn't propagated`` () = let clusterSize = 5 let cluster, network = InMemoryCluster.make clusterSize let startupSequence = [ NetworkAction.InactivityTimeout 1 // Two servers vote for server 1... NetworkAction.NetworkMessage (2, 0) NetworkAction.NetworkMessage (3, 0) // Server 1 processes incoming votes, and achieves majority, electing itself leader! NetworkAction.NetworkMessage (1, 0) NetworkAction.NetworkMessage (1, 1) // and the other votes are processed and discarded NetworkAction.NetworkMessage (0, 0) NetworkAction.NetworkMessage (4, 0) NetworkAction.NetworkMessage (1, 2) NetworkAction.NetworkMessage (1, 3) // Get the followers' heartbeat processing out of the way NetworkAction.NetworkMessage (0, 1) NetworkAction.NetworkMessage (2, 1) NetworkAction.NetworkMessage (3, 1) NetworkAction.NetworkMessage (4, 1) NetworkAction.NetworkMessage (1, 4) NetworkAction.NetworkMessage (1, 5) NetworkAction.NetworkMessage (1, 6) NetworkAction.NetworkMessage (1, 7) // Submit data to leader, then heartbeat leader, and process heartbeat on another node. // This should come through as "follower did not apply leader entry". // (This is correct: the network has effectively dropped all the leader's AppendEntries messages, // and the protocol has correctly allowed the followers to recognise that their logs are inconsistent // with the leader's.) NetworkAction.ClientRequest (1, byte 3, printfn "processed: %O") NetworkAction.Heartbeat 1 // Deliver the heartbeat messages. NetworkAction.NetworkMessage (0, 2) NetworkAction.NetworkMessage (2, 2) NetworkAction.NetworkMessage (3, 2) NetworkAction.NetworkMessage (4, 2) ] for action in startupSequence do NetworkAction.perform cluster network action // The servers have all rejected the heartbeat. network.UndeliveredMessages 1 |> List.map (fun (_index, message) -> match message with | Message.Reply (Reply.AppendEntriesReply reply) -> reply | _ -> failwithf "Unexpected reply: %+A" message ) |> shouldEqual [ { Success = None Follower = 0 FollowerTerm = 1 } { Success = None Follower = 2 FollowerTerm = 1 } { Success = None Follower = 3 FollowerTerm = 1 } { Success = None Follower = 4 FollowerTerm = 1 } ]