From 7396979958cc1829f7d889bce3a570a03f70f701 Mon Sep 17 00:00:00 2001 From: Smaug123 Date: Sat, 12 Nov 2022 21:34:11 +0000 Subject: [PATCH] Fix incomplete test --- Raft.Test/TestInMemoryServer.fs | 146 +++++++++++++++++++++++--------- Raft/Server.fs | 24 ++++-- 2 files changed, 120 insertions(+), 50 deletions(-) diff --git a/Raft.Test/TestInMemoryServer.fs b/Raft.Test/TestInMemoryServer.fs index 58e7dcb..9feb209 100644 --- a/Raft.Test/TestInMemoryServer.fs +++ b/Raft.Test/TestInMemoryServer.fs @@ -272,9 +272,9 @@ module TestInMemoryServer = let requestResponse (response : ClientResponse) : unit = response - |> shouldEqual (ClientResponse.Success (1, 1)) + |> shouldEqual (ClientResponse.Success (1, 0)) - Interlocked.Increment registeredSuccessfully |> ignore + Interlocked.Increment respondedSuccessfully |> ignore let startupSequence = [ @@ -299,15 +299,9 @@ module TestInMemoryServer = NetworkAction.NetworkMessage (1, 5) NetworkAction.NetworkMessage (1, 6) NetworkAction.NetworkMessage (1, 7) - // Submit data to leader. This has the effect of heartbeating the other - // nodes, with a heartbeat that contains the new data. - NetworkAction.ClientRequest (1, ClientRequest.RegisterClient registerResponse) - NetworkAction.ClientRequest ( - 1, - ClientRequest.ClientRequest (1, 1, byte 3, requestResponse) - ) - // Deliver the data messages. + // Create a client. + NetworkAction.ClientRequest (1, ClientRequest.RegisterClient registerResponse) NetworkAction.NetworkMessage (0, 2) NetworkAction.NetworkMessage (2, 2) NetworkAction.NetworkMessage (3, 2) @@ -317,39 +311,109 @@ module TestInMemoryServer = for action in startupSequence do NetworkAction.perform cluster network action - // The servers have all accepted the data. - network.UndeliveredMessages 1 - |> List.map (fun (_index, message) -> - match message with - | Message.Reply (Reply.AppendEntriesReply reply) -> reply - | _ -> failwithf "Unexpected reply: %+A" message - ) - |> shouldEqual - [ - { - Success = Some 1 - Follower = 0 - FollowerTerm = 1 - } - { - Success = Some 1 - Follower = 2 - FollowerTerm = 1 - } - { - Success = Some 1 - Follower = 3 - FollowerTerm = 1 - } - { - Success = Some 1 - Follower = 4 - FollowerTerm = 1 - } - ] + let leader = 1 - respondedSuccessfully.Value |> shouldEqual 1 + // Server 1 is the only leader. + cluster.Leaders |> Seq.exactlyOne |> shouldEqual leader + + // No outstanding messages except to the leader. + for i in 0 .. clusterSize - 1 do + let i = i * 1 + + if i <> leader then + network.UndeliveredMessages i |> shouldBeEmpty + + // The leader has yet to receive the acknowledgements. + let undelivered = + network.UndeliveredMessages leader + |> List.map (fun (i, message) -> + match message with + | Message.Reply (Reply.AppendEntriesReply r) -> + r.FollowerTerm |> shouldEqual 1 + r.Success |> Option.get |> shouldEqual 1 + i, r.Follower + | _ -> failwith "oh no" + ) + + undelivered + |> List.map snd + |> shouldEqual ( + [ 0 .. clusterSize - 1 ] + |> List.map ((*) 1) + |> List.filter ((<>) leader) + ) + + // The client has not received an acknowledgement. + respondedSuccessfully.Value |> shouldEqual 0 + registeredSuccessfully.Value |> shouldEqual 0 + + // Now tell the leader that the followers have accepted the client. + undelivered + |> List.iter (fun (count, _) -> + NetworkAction.perform cluster network (NetworkAction.NetworkMessage (leader, count)) + ) + + // The client now knows it exists! registeredSuccessfully.Value |> shouldEqual 1 + respondedSuccessfully.Value |> shouldEqual 0 + + // Submit some client data. + NetworkAction.ClientRequest ( + 1, + ClientRequest.ClientRequest (1, 0, 99uy, requestResponse) + ) + |> NetworkAction.perform cluster network + + // Perform data-propagating heartbeats. + for i in 0 .. clusterSize - 1 do + let server = i * 1 + + NetworkAction.NetworkMessage (server, 3) + |> NetworkAction.perform cluster network + + // The client hasn't yet received a response, because the leader hasn't heard back from the cluster. + registeredSuccessfully.Value |> shouldEqual 1 + respondedSuccessfully.Value |> shouldEqual 0 + + let awaiting = + network.UndeliveredMessages leader + |> List.map (fun (i, message) -> + match message with + | Message.Reply (Reply.AppendEntriesReply r) -> + r.FollowerTerm |> shouldEqual 1 + // Note the increased log index from last time. + r.Success |> Option.get |> shouldEqual 2 + i, r.Follower + | _ -> failwith "oh no" + ) + + awaiting + |> List.head + |> fun (messageIndex, _) -> + NetworkAction.NetworkMessage (leader, messageIndex) + |> NetworkAction.perform cluster network + + // Leader doesn't know a quorum has been reached, so does not reply to the client. + registeredSuccessfully.Value |> shouldEqual 1 + respondedSuccessfully.Value |> shouldEqual 0 + + awaiting.[1] + |> fun (messageIndex, _) -> + NetworkAction.NetworkMessage (leader, messageIndex) + |> NetworkAction.perform cluster network + + // Quorum achieved! Reply sent. + registeredSuccessfully.Value |> shouldEqual 1 + respondedSuccessfully.Value |> shouldEqual 1 + + awaiting.[2..] + |> List.iter (fun (messageIndex, _) -> + NetworkAction.NetworkMessage (leader, messageIndex) + |> NetworkAction.perform cluster network + ) + + registeredSuccessfully.Value |> shouldEqual 1 + respondedSuccessfully.Value |> shouldEqual 1 let freeze<'a> (cluster : Cluster<'a>) = List.init diff --git a/Raft/Server.fs b/Raft/Server.fs index b780cdb..2ea300c 100644 --- a/Raft/Server.fs +++ b/Raft/Server.fs @@ -118,9 +118,6 @@ type AppendEntriesMessage<'a> = /// with what happened during terms that took place while it was down. NewEntry : (LogEntry<'a> * int) option LeaderCommitIndex : int - /// TODO - we don't need this, the responder should just construct - /// the appropriate Message and send it themselves - ReplyChannel : AppendEntriesReply -> unit } override this.ToString () = @@ -397,7 +394,9 @@ type Server<'a> Success = None Follower = me } - |> message.ReplyChannel + |> Reply.AppendEntriesReply + |> Message.Reply + |> messageChannel message.LeaderId else @@ -416,7 +415,9 @@ type Server<'a> FollowerTerm = persistentState.CurrentTerm Follower = me } - |> message.ReplyChannel + |> Reply.AppendEntriesReply + |> Message.Reply + |> messageChannel message.LeaderId let acceptRequest () : unit = match currentType with @@ -449,7 +450,9 @@ type Server<'a> FollowerTerm = persistentState.CurrentTerm Follower = me } - |> message.ReplyChannel + |> Reply.AppendEntriesReply + |> Message.Reply + |> messageChannel message.LeaderId | None -> // The leader knows what we've committed, so it won't try and give us anything further than @@ -464,7 +467,9 @@ type Server<'a> FollowerTerm = persistentState.CurrentTerm Follower = me } - |> message.ReplyChannel + |> Reply.AppendEntriesReply + |> Message.Reply + |> messageChannel message.LeaderId let logIsConsistent (message : AppendEntriesMessage<'a>) : bool = match message.PrevLogEntry with @@ -502,7 +507,9 @@ type Server<'a> Success = None Follower = me } - |> message.ReplyChannel + |> Reply.AppendEntriesReply + |> Message.Reply + |> messageChannel message.LeaderId else acceptRequest () @@ -537,7 +544,6 @@ type Server<'a> |> Some NewEntry = persistentState.GetLogEntry toSend LeaderCommitIndex = volatileState.CommitIndex - ReplyChannel = fun reply -> reply |> Reply.AppendEntriesReply |> Message.Reply |> messageChannel me } |> Instruction.AppendEntries |> Message.Instruction