Fix incomplete test

This commit is contained in:
Smaug123
2022-11-12 21:34:11 +00:00
parent 99d6e507d8
commit 7396979958
2 changed files with 120 additions and 50 deletions

View File

@@ -272,9 +272,9 @@ module TestInMemoryServer =
let requestResponse (response : ClientResponse) : unit =
response
|> shouldEqual (ClientResponse.Success (1<ClientId>, 1<ClientSequence>))
|> shouldEqual (ClientResponse.Success (1<ClientId>, 0<ClientSequence>))
Interlocked.Increment registeredSuccessfully |> ignore
Interlocked.Increment respondedSuccessfully |> ignore
let startupSequence =
[
@@ -299,15 +299,9 @@ module TestInMemoryServer =
NetworkAction.NetworkMessage (1<ServerId>, 5)
NetworkAction.NetworkMessage (1<ServerId>, 6)
NetworkAction.NetworkMessage (1<ServerId>, 7)
// Submit data to leader. This has the effect of heartbeating the other
// nodes, with a heartbeat that contains the new data.
NetworkAction.ClientRequest (1<ServerId>, ClientRequest.RegisterClient registerResponse)
NetworkAction.ClientRequest (
1<ServerId>,
ClientRequest.ClientRequest (1<ClientId>, 1<ClientSequence>, byte 3, requestResponse)
)
// Deliver the data messages.
// Create a client.
NetworkAction.ClientRequest (1<ServerId>, ClientRequest.RegisterClient registerResponse)
NetworkAction.NetworkMessage (0<ServerId>, 2)
NetworkAction.NetworkMessage (2<ServerId>, 2)
NetworkAction.NetworkMessage (3<ServerId>, 2)
@@ -317,39 +311,109 @@ module TestInMemoryServer =
for action in startupSequence do
NetworkAction.perform cluster network action
// The servers have all accepted the data.
network.UndeliveredMessages 1<ServerId>
|> List.map (fun (_index, message) ->
match message with
| Message.Reply (Reply.AppendEntriesReply reply) -> reply
| _ -> failwithf "Unexpected reply: %+A" message
)
|> shouldEqual
[
{
Success = Some 1<LogIndex>
Follower = 0<ServerId>
FollowerTerm = 1<Term>
}
{
Success = Some 1<LogIndex>
Follower = 2<ServerId>
FollowerTerm = 1<Term>
}
{
Success = Some 1<LogIndex>
Follower = 3<ServerId>
FollowerTerm = 1<Term>
}
{
Success = Some 1<LogIndex>
Follower = 4<ServerId>
FollowerTerm = 1<Term>
}
]
let leader = 1<ServerId>
respondedSuccessfully.Value |> shouldEqual 1
// Server 1 is the only leader.
cluster.Leaders |> Seq.exactlyOne |> shouldEqual leader
// No outstanding messages except to the leader.
for i in 0 .. clusterSize - 1 do
let i = i * 1<ServerId>
if i <> leader then
network.UndeliveredMessages i |> shouldBeEmpty
// The leader has yet to receive the acknowledgements.
let undelivered =
network.UndeliveredMessages leader
|> List.map (fun (i, message) ->
match message with
| Message.Reply (Reply.AppendEntriesReply r) ->
r.FollowerTerm |> shouldEqual 1<Term>
r.Success |> Option.get |> shouldEqual 1<LogIndex>
i, r.Follower
| _ -> failwith "oh no"
)
undelivered
|> List.map snd
|> shouldEqual (
[ 0 .. clusterSize - 1 ]
|> List.map ((*) 1<ServerId>)
|> List.filter ((<>) leader)
)
// The client has not received an acknowledgement.
respondedSuccessfully.Value |> shouldEqual 0
registeredSuccessfully.Value |> shouldEqual 0
// Now tell the leader that the followers have accepted the client.
undelivered
|> List.iter (fun (count, _) ->
NetworkAction.perform cluster network (NetworkAction.NetworkMessage (leader, count))
)
// The client now knows it exists!
registeredSuccessfully.Value |> shouldEqual 1
respondedSuccessfully.Value |> shouldEqual 0
// Submit some client data.
NetworkAction.ClientRequest (
1<ServerId>,
ClientRequest.ClientRequest (1<ClientId>, 0<ClientSequence>, 99uy, requestResponse)
)
|> NetworkAction.perform cluster network
// Perform data-propagating heartbeats.
for i in 0 .. clusterSize - 1 do
let server = i * 1<ServerId>
NetworkAction.NetworkMessage (server, 3)
|> NetworkAction.perform cluster network
// The client hasn't yet received a response, because the leader hasn't heard back from the cluster.
registeredSuccessfully.Value |> shouldEqual 1
respondedSuccessfully.Value |> shouldEqual 0
let awaiting =
network.UndeliveredMessages leader
|> List.map (fun (i, message) ->
match message with
| Message.Reply (Reply.AppendEntriesReply r) ->
r.FollowerTerm |> shouldEqual 1<Term>
// Note the increased log index from last time.
r.Success |> Option.get |> shouldEqual 2<LogIndex>
i, r.Follower
| _ -> failwith "oh no"
)
awaiting
|> List.head
|> fun (messageIndex, _) ->
NetworkAction.NetworkMessage (leader, messageIndex)
|> NetworkAction.perform cluster network
// Leader doesn't know a quorum has been reached, so does not reply to the client.
registeredSuccessfully.Value |> shouldEqual 1
respondedSuccessfully.Value |> shouldEqual 0
awaiting.[1]
|> fun (messageIndex, _) ->
NetworkAction.NetworkMessage (leader, messageIndex)
|> NetworkAction.perform cluster network
// Quorum achieved! Reply sent.
registeredSuccessfully.Value |> shouldEqual 1
respondedSuccessfully.Value |> shouldEqual 1
awaiting.[2..]
|> List.iter (fun (messageIndex, _) ->
NetworkAction.NetworkMessage (leader, messageIndex)
|> NetworkAction.perform cluster network
)
registeredSuccessfully.Value |> shouldEqual 1
respondedSuccessfully.Value |> shouldEqual 1
let freeze<'a> (cluster : Cluster<'a>) =
List.init

View File

@@ -118,9 +118,6 @@ type AppendEntriesMessage<'a> =
/// with what happened during terms that took place while it was down.
NewEntry : (LogEntry<'a> * int<Term>) option
LeaderCommitIndex : int<LogIndex>
/// TODO - we don't need this, the responder should just construct
/// the appropriate Message and send it themselves
ReplyChannel : AppendEntriesReply -> unit
}
override this.ToString () =
@@ -397,7 +394,9 @@ type Server<'a>
Success = None
Follower = me
}
|> message.ReplyChannel
|> Reply.AppendEntriesReply
|> Message.Reply
|> messageChannel message.LeaderId
else
@@ -416,7 +415,9 @@ type Server<'a>
FollowerTerm = persistentState.CurrentTerm
Follower = me
}
|> message.ReplyChannel
|> Reply.AppendEntriesReply
|> Message.Reply
|> messageChannel message.LeaderId
let acceptRequest () : unit =
match currentType with
@@ -449,7 +450,9 @@ type Server<'a>
FollowerTerm = persistentState.CurrentTerm
Follower = me
}
|> message.ReplyChannel
|> Reply.AppendEntriesReply
|> Message.Reply
|> messageChannel message.LeaderId
| None ->
// The leader knows what we've committed, so it won't try and give us anything further than
@@ -464,7 +467,9 @@ type Server<'a>
FollowerTerm = persistentState.CurrentTerm
Follower = me
}
|> message.ReplyChannel
|> Reply.AppendEntriesReply
|> Message.Reply
|> messageChannel message.LeaderId
let logIsConsistent (message : AppendEntriesMessage<'a>) : bool =
match message.PrevLogEntry with
@@ -502,7 +507,9 @@ type Server<'a>
Success = None
Follower = me
}
|> message.ReplyChannel
|> Reply.AppendEntriesReply
|> Message.Reply
|> messageChannel message.LeaderId
else
acceptRequest ()
@@ -537,7 +544,6 @@ type Server<'a>
|> Some
NewEntry = persistentState.GetLogEntry toSend
LeaderCommitIndex = volatileState.CommitIndex
ReplyChannel = fun reply -> reply |> Reply.AppendEntriesReply |> Message.Reply |> messageChannel me
}
|> Instruction.AppendEntries
|> Message.Instruction