diff --git a/Raft.Test/Raft.Test.fsproj b/Raft.Test/Raft.Test.fsproj index 7b94afa..aeba3ad 100644 --- a/Raft.Test/Raft.Test.fsproj +++ b/Raft.Test/Raft.Test.fsproj @@ -8,6 +8,7 @@ + diff --git a/Raft.Test/TestInMemoryServer.fs b/Raft.Test/TestInMemoryServer.fs index 29869be..6316106 100644 --- a/Raft.Test/TestInMemoryServer.fs +++ b/Raft.Test/TestInMemoryServer.fs @@ -70,6 +70,7 @@ module TestInMemoryServer = // We sent a message to every other server; process them. for i in 1..4 do let server = i * 1 + (network.AllInboundMessages server).Length |> shouldEqual 1 NetworkAction.NetworkMessage (server, 0) @@ -83,6 +84,7 @@ module TestInMemoryServer = // (the messages we've already processed) (network.AllInboundMessages 0).Length |> shouldEqual 4 + (network.UndeliveredMessages 0).Length |> shouldEqual 0 cluster.Servers.[0].State |> shouldEqual (ServerStatus.Leader 1) @@ -164,10 +166,12 @@ module TestInMemoryServer = // Those two each sent a message to every other server. (network.AllInboundMessages 0).Length |> shouldEqual 1 + (network.AllInboundMessages 1).Length |> shouldEqual 1 for i in 2..4 do let server = i * 1 + (network.AllInboundMessages server).Length |> shouldEqual 2 while network.AllUndeliveredMessages () |> Seq.concat |> Seq.isEmpty |> not do @@ -192,42 +196,56 @@ module TestInMemoryServer = for i in 2..4 do cluster.Servers.[i].State |> shouldEqual ServerStatus.Follower - type History = History of (int * int) list + type NetworkMessageSelection = + | NetworkMessageSelection of (int * int) list - let historyGen (clusterSize : int) = + member this.Length = + match this with + | NetworkMessageSelection h -> List.length h + + let networkMessageSelectionGen (clusterSize : int) : Gen = gen { let! pile = Gen.choose (0, clusterSize - 1) let! entry = Arb.generate return (pile * 1, abs entry) } |> Gen.listOf - |> Gen.map History + |> Gen.map NetworkMessageSelection - let apply (History history) (cluster : Cluster<'a>) (network : Network<'a>) : unit = + let apply (NetworkMessageSelection history) (cluster : Cluster<'a>) (network : Network<'a>) : unit = for pile, entry in history do let messages = network.AllInboundMessages pile if entry < messages.Length then cluster.SendMessageDirectly pile messages.[entry] + let check (prop : Property) = + let config = + { Config.QuickThrowOnFailure with + MaxTest = 1000 + } + + Check.One (config, prop) + [] let ``Startup sequence in prod, two timeouts at once, property-based: at most one leader is elected`` () = - let cluster, network = InMemoryCluster.make 5 + let property (history : NetworkMessageSelection) = + let cluster, network = InMemoryCluster.make 5 - NetworkAction.InactivityTimeout 0 - |> NetworkAction.perform cluster network + NetworkAction.InactivityTimeout 0 + |> NetworkAction.perform cluster network - NetworkAction.InactivityTimeout 1 - |> NetworkAction.perform cluster network + NetworkAction.InactivityTimeout 1 + |> NetworkAction.perform cluster network - // Those two each sent a message to every other server. - (network.AllInboundMessages 0).Length |> shouldEqual 1 - (network.AllInboundMessages 1).Length |> shouldEqual 1 + // Those two each sent a message to every other server. + (network.AllInboundMessages 0).Length |> shouldEqual 1 - for i in 2..4 do - (network.AllInboundMessages (i * 1)).Length |> shouldEqual 2 + (network.AllInboundMessages 1).Length |> shouldEqual 1 + + for i in 2..4 do + (network.AllInboundMessages (i * 1)).Length |> shouldEqual 2 - let property (history : History) = apply history cluster network match cluster.Servers.[0].State, cluster.Servers.[1].State with @@ -237,15 +255,15 @@ module TestInMemoryServer = for i in 2..4 do cluster.Servers.[i].State |> shouldEqual ServerStatus.Follower - property - |> Prop.forAll (Arb.fromGen (historyGen 5)) - |> Check.QuickThrowOnFailure + property |> Prop.forAll (Arb.fromGen (networkMessageSelectionGen 5)) |> check [] - let ``Heartbeat is rejected if an update hasn't propagated`` () = + let ``Data can propagate from the leader`` () = let clusterSize = 5 let cluster, network = InMemoryCluster.make clusterSize + let mutable replyChannel = None + let startupSequence = [ NetworkAction.InactivityTimeout 1 @@ -269,15 +287,11 @@ module TestInMemoryServer = NetworkAction.NetworkMessage (1, 5) NetworkAction.NetworkMessage (1, 6) NetworkAction.NetworkMessage (1, 7) - // Submit data to leader, then heartbeat leader, and process heartbeat on another node. - // This should come through as "follower did not apply leader entry". - // (This is correct: the network has effectively dropped all the leader's AppendEntries messages, - // and the protocol has correctly allowed the followers to recognise that their logs are inconsistent - // with the leader's.) - NetworkAction.ClientRequest (1, byte 3, printfn "processed: %O") - NetworkAction.Heartbeat 1 + // Submit data to leader. This has the effect of heartbeating the other + // nodes, with a heartbeat that contains the new data. + NetworkAction.ClientRequest (1, byte 3, (fun s -> replyChannel <- Some s)) - // Deliver the heartbeat messages. + // Deliver the data messages. NetworkAction.NetworkMessage (0, 2) NetworkAction.NetworkMessage (2, 2) NetworkAction.NetworkMessage (3, 2) @@ -287,7 +301,9 @@ module TestInMemoryServer = for action in startupSequence do NetworkAction.perform cluster network action - // The servers have all rejected the heartbeat. + replyChannel |> Option.get |> shouldEqual ClientReply.Acknowledged + + // The servers have all accepted the data. network.UndeliveredMessages 1 |> List.map (fun (_index, message) -> match message with @@ -297,23 +313,184 @@ module TestInMemoryServer = |> shouldEqual [ { - Success = None + Success = Some 1 Follower = 0 FollowerTerm = 1 } { - Success = None + Success = Some 1 Follower = 2 FollowerTerm = 1 } { - Success = None + Success = Some 1 Follower = 3 FollowerTerm = 1 } { - Success = None + Success = Some 1 Follower = 4 FollowerTerm = 1 } ] + + let freeze<'a> (cluster : Cluster<'a>) = + List.init + cluster.ClusterSize + (fun i -> + let i = i * 1 + Async.RunSynchronously (cluster.GetCurrentInternalState i), cluster.Status i + ) + + let replay<'a> (ValidHistory history : ValidHistory<'a>) (cluster : Cluster<'a>) (network : Network<'a>) : unit = + for h in history do + NetworkAction.perform cluster network h + + [] + let ``History can be replayed`` () = + let clusterSize = 5 + + let property (history : ValidHistory) = + let firstTime = + let cluster, network = InMemoryCluster.make clusterSize + replay history cluster network + freeze cluster + + let secondTime = + let cluster, network = InMemoryCluster.make clusterSize + replay history cluster network + freeze cluster + + firstTime = secondTime + + property |> Prop.forAll (ValidHistory.arb clusterSize) |> check + + + [] + let ``There is never more than one leader in the same term`` () = + let clusterSize = 5 + + let property (history : ValidHistory) : bool = + let cluster, network = InMemoryCluster.make clusterSize + replay history cluster network + + let leaders = + freeze cluster + |> List.choose (fun (_, status) -> + match status with + | ServerStatus.Leader term -> Some term + | _ -> None + ) + + List.distinct leaders = leaders + + property |> Prop.forAll (ValidHistory.arb clusterSize) |> check + + let duplicationProperty<'a when 'a : equality> + (clusterSize : int) + (beforeDuplication : ValidHistory<'a>, afterDuplication : ValidHistory<'a>) + : bool = + let withoutDuplicate = + let cluster, network = InMemoryCluster.make<'a> clusterSize + replay beforeDuplication cluster network + freeze cluster + + let withDuplicate = + let cluster, network = InMemoryCluster.make<'a> clusterSize + replay afterDuplication cluster network + freeze cluster + + withDuplicate = withoutDuplicate + + let possibleDuplicates<'a> (history : NetworkAction<'a> list) : (int * NetworkAction<'a>) list = + history + |> List.indexed + |> List.filter (fun (_, action) -> + match action with + | NetworkAction.DropMessage _ -> true + | NetworkAction.Heartbeat _ -> true + | NetworkAction.NetworkMessage _ -> true + | NetworkAction.InactivityTimeout _ -> + // This starts a new term, so is not safe to repeat. + false + | NetworkAction.ClientRequest _ -> + // Clients repeating requests may of course change state! + false + ) + + let allDuplicatedHistories<'a> + (clusterSize : int) + (ValidHistory historyList : ValidHistory<'a> as history) + : _ list = + let duplicateCandidates = possibleDuplicates historyList + + duplicateCandidates + |> List.collect (fun (index, itemToDuplicate) -> + [ index .. historyList.Length ] + |> List.choose (fun insertIndex -> + List.insertAt insertIndex itemToDuplicate historyList + |> ValidHistory.validate clusterSize + |> Option.map (fun withDuplicate -> history, withDuplicate) + ) + ) + + let rec withDuplicateGen<'a> (clusterSize : int) : Gen * ValidHistory<'a>> = + gen { + let! history = ValidHistory.gen clusterSize + let allDuplicatedHistories = allDuplicatedHistories<'a> clusterSize history + + match allDuplicatedHistories with + | [] -> return! withDuplicateGen clusterSize + | x -> return! Gen.elements x + } + + let duplicationArb<'a> (clusterSize : int) : Arbitrary * ValidHistory<'a>> = + { new Arbitrary<_>() with + member _.Generator = withDuplicateGen<'a> clusterSize + + member _.Shrinker ((before, _withDuplicate)) = + ValidHistory.shrink<'a> clusterSize before + |> Seq.collect (allDuplicatedHistories clusterSize) + } + +(* + TODO: the following tests are borked; see the "specific example" for why. + [] + let ``Duplicate messages don't change network state`` () = + let clusterSize = 5 + + duplicationProperty clusterSize + |> Prop.forAll (duplicationArb clusterSize) + |> check + + [] + let ``Specific example`` () = + let clusterSize = 5 + + let history = + [ + InactivityTimeout 4 + InactivityTimeout 3 + NetworkMessage (0, 1) + InactivityTimeout 4 + NetworkMessage (3, 2) + ] + |> ValidHistory.validate clusterSize + |> Option.get + + let withDuplicate = + [ + InactivityTimeout 4 + InactivityTimeout 3 + NetworkMessage (0, 1) + NetworkMessage (0, 1) + InactivityTimeout 4 + // TODO: this is the problem, 2 no longer refers to the + // same + NetworkMessage (3, 2) + ] + |> ValidHistory.validate clusterSize + |> Option.get + + duplicationProperty clusterSize (history, withDuplicate) |> shouldEqual true + *) diff --git a/Raft.Test/ValidHistory.fs b/Raft.Test/ValidHistory.fs new file mode 100644 index 0000000..7ddeed6 --- /dev/null +++ b/Raft.Test/ValidHistory.fs @@ -0,0 +1,108 @@ +namespace Raft.Test + +open Raft +open FsCheck + +type ValidHistory<'a> = ValidHistory of NetworkAction<'a> list + +[] +module ValidHistory = + let validate<'a> (clusterSize : int) (history : NetworkAction<'a> list) : ValidHistory<'a> option = + let cluster, network = InMemoryCluster.make<'a> clusterSize + + let mutable isValid = true + + try + for action in history do + NetworkAction.perform cluster network action + with _ -> + isValid <- false + + if isValid then Some (ValidHistory history) else None + + let rec private networkActionGenNoClientRequests<'a> (clusterSize : int) : Gen> = + gen { + let! choice = Arb.generate> + let! server = Gen.choose (0, clusterSize - 1) + let server = server * 1 + + match choice with + | NetworkAction.InactivityTimeout _ -> return NetworkAction.InactivityTimeout server + | NetworkAction.NetworkMessage (_, message) -> return NetworkAction.NetworkMessage (server, abs message) + | NetworkAction.DropMessage (_, message) -> return NetworkAction.DropMessage (server, abs message) + | NetworkAction.Heartbeat _ -> return NetworkAction.Heartbeat server + | NetworkAction.ClientRequest _ -> return! networkActionGenNoClientRequests clusterSize + } + + let private historyGenOfLength<'a> (clusterSize : int) (len : int) : Gen list> = + let cluster, network = InMemoryCluster.make<'a> clusterSize + // Note: takes a reversed list. + let permissibleNext () : NetworkAction<'a> list = + let state = network.CompleteMessageHistory + + [ + for i in 0 .. clusterSize - 1 do + let server = i * 1 + + for messageId in 0 .. state.[i].Count - 1 do + yield NetworkAction.DropMessage (server, messageId) + yield NetworkAction.NetworkMessage (server, messageId) + + yield NetworkAction.Heartbeat server + yield NetworkAction.InactivityTimeout server + ] + + let rec go (len : int) = + gen { + if len = 0 then + return [] + else + let! smaller = go (len - 1) + let! next = Gen.elements (permissibleNext ()) + NetworkAction.perform cluster network next + return next :: smaller + } + + go (abs len) + + let gen<'a> (clusterSize : int) : Gen> = + historyGenOfLength<'a> clusterSize + |> Gen.sized + |> Gen.map (List.rev >> ValidHistory) + + let shrink<'a> (clusterSize : int) (ValidHistory history : ValidHistory<'a>) = + let removeOne = + Seq.init history.Length (fun i -> List.removeAt i history) + |> Seq.choose (validate clusterSize) + + let shrinkMessageId = + history + |> Seq.indexed + |> Seq.choose (fun (i, action) -> + let newMessage = + match action with + | NetworkAction.DropMessage (server, i) -> + if i > 0 then + Some (NetworkAction.DropMessage (server, i - 1)) + else + None + | NetworkAction.NetworkMessage (server, i) -> + if i > 0 then + Some (NetworkAction.NetworkMessage (server, i - 1)) + else + None + | _ -> None + + newMessage + |> Option.map (fun message -> history |> List.removeAt i |> List.insertAt i message) + ) + |> Seq.choose (validate clusterSize) + + Seq.concat [ removeOne ; shrinkMessageId ] + + let arb<'a> (clusterSize : int) = + { new Arbitrary>() with + override _.Generator = gen clusterSize + + override _.Shrinker history = shrink clusterSize history + } diff --git a/RaftFable/public/index.html b/RaftFable/public/index.html index 5846f43..9b157fa 100644 --- a/RaftFable/public/index.html +++ b/RaftFable/public/index.html @@ -39,6 +39,10 @@

Messages in flight

+
+ + +
diff --git a/RaftFable/src/App.fs b/RaftFable/src/App.fs index 4649b0f..2f0a7d2 100644 --- a/RaftFable/src/App.fs +++ b/RaftFable/src/App.fs @@ -70,6 +70,7 @@ module App = NetworkAction.NetworkMessage (0, 2) NetworkAction.NetworkMessage (1, 6) ] + |> List.truncate 0 |> fun s -> (fullyRerender cluster network, s) ||> List.fold (fun (inPromise : Promise) action -> promise { @@ -118,3 +119,5 @@ module App = NetworkAction.ClientRequest (server, data, printfn "%O") |> perform cluster network ) + + ui.ShowConsumedMessages.onchange <- fun _event -> fullyRerender cluster network diff --git a/RaftFable/src/Ui.fs b/RaftFable/src/Ui.fs index c1f406f..9185815 100644 --- a/RaftFable/src/Ui.fs +++ b/RaftFable/src/Ui.fs @@ -7,12 +7,14 @@ type ClusterState<'a> = ClusterSize : int InternalState : ServerInternalState<'a> array Statuses : ServerStatus array + AllMessages : Message<'a> list array UndeliveredMessages : (int * Message<'a>) list array } type UserPreferences = { LeaderUnderConsideration : int + ShowConsumedMessages : bool } type UiBackingState<'a> = @@ -33,6 +35,7 @@ type UiElements = ClientDataServerField : Browser.Types.HTMLInputElement HeartbeatField : Browser.Types.HTMLInputElement SelectedLeaderId : Browser.Types.HTMLInputElement + ShowConsumedMessages : Browser.Types.HTMLInputElement } type RequiresPopulation = @@ -73,6 +76,9 @@ module Ui = let selectedLeaderId = document.querySelector ".leader-select" :?> Browser.Types.HTMLInputElement + let showConsumed = + document.querySelector ".show-consumed" :?> Browser.Types.HTMLInputElement + { Document = document ServerStatusTable = serverStatuses @@ -84,6 +90,7 @@ module Ui = ClientDataServerField = clientDataServerField HeartbeatField = heartbeatField SelectedLeaderId = selectedLeaderId + ShowConsumedMessages = showConsumed } let reset (clusterSize : int) (ui : UiElements) : RequiresPopulation = @@ -136,6 +143,8 @@ module Ui = ui.LogArea.border <- "1px" ui.LogArea.innerText <- "" + ui.ShowConsumedMessages.defaultChecked <- false + { ServerStatusNodes = serverStatusNodes } @@ -182,13 +191,18 @@ module Ui = let allButtons = [ 0 .. state.ClusterSize - 1 ] |> List.map (fun i -> - state.UndeliveredMessages.[i] + if userPrefs.ShowConsumedMessages then + state.AllMessages.[i] + |> List.indexed + else + state.UndeliveredMessages.[i] |> List.map (fun (messageId, message) -> Button.create document (sprintf "Server %i, message %i: %O" i messageId message) (fun button -> - button.remove () + if not userPrefs.ShowConsumedMessages then + button.remove () NetworkMessage (i * 1, messageId) |> perform ) @@ -289,6 +303,12 @@ module Ui = yield network.UndeliveredMessages (i * 1) |] + let allMessages = + [| + for i in 0 .. cluster.ClusterSize - 1 do + yield network.AllInboundMessages (i * 1) + |] + async { let! internalState = internalState @@ -298,10 +318,12 @@ module Ui = InternalState = internalState Statuses = statuses UndeliveredMessages = undeliveredMessages + AllMessages = allMessages } } let getUserPrefs (ui : UiElements) : UserPreferences = { LeaderUnderConsideration = ui.SelectedLeaderId.valueAsNumber |> int |> (fun i -> i * 1) + ShowConsumedMessages = ui.ShowConsumedMessages.checked }