Tidy up the interfaces

This commit is contained in:
Smaug123
2022-10-26 20:12:07 +01:00
parent 72ed658bf6
commit 8441e75fb2
3 changed files with 84 additions and 57 deletions

View File

@@ -69,18 +69,23 @@ module TestServer =
// We sent a message to every other server; process them.
for i in 1..4 do
network.InboundMessages.[i].Count |> shouldEqual 1
let message = network.InboundMessages.[i].[0]
network.InboundMessages.[i].Clear ()
cluster.SendMessageDirectly (i * 1<ServerId>) message
let server = i * 1<ServerId>
(network.AllInboundMessages server).Length |> shouldEqual 1
let message = network.InboundMessage server 0
network.DropMessage server 0
cluster.SendMessageDirectly server message
network.InboundMessages.[0].Count |> shouldEqual i
(network.AllInboundMessages 0<ServerId>).Length |> shouldEqual i
for i in 1..4 do
cluster.SendMessageDirectly 0<ServerId> network.InboundMessages.[0].[i - 1]
network.InboundMessage 0<ServerId> (i - 1)
|> cluster.SendMessageDirectly 0<ServerId>
network.DropMessage 0<ServerId> (i - 1)
// (the messages we've already processed)
network.InboundMessages.[0].Count |> shouldEqual 4
network.InboundMessages.[0].Clear ()
(network.AllInboundMessages 0<ServerId>).Length |> shouldEqual 4
(network.UndeliveredMessages 0<ServerId>).Length |> shouldEqual 0
cluster.Servers.[0].State |> shouldEqual ServerStatus.Leader
@@ -159,22 +164,24 @@ module TestServer =
cluster.Servers.[1].Sync ()
// Those two each sent a message to every other server.
network.InboundMessages.[0].Count |> shouldEqual 1
network.InboundMessages.[1].Count |> shouldEqual 1
(network.AllInboundMessages 0<ServerId>).Length |> shouldEqual 1
(network.AllInboundMessages 1<ServerId>).Length |> shouldEqual 1
for i in 2..4 do
network.InboundMessages.[i].Count |> shouldEqual 2
let server = i * 1<ServerId>
(network.AllInboundMessages server).Length |> shouldEqual 2
while network.InboundMessages |> Seq.concat |> Seq.isEmpty |> not do
while network.AllUndeliveredMessages () |> Seq.concat |> Seq.isEmpty |> not do
let allOrderings' =
network.InboundMessages |> List.ofArray |> List.map List.ofSeq |> allOrderings
network.AllUndeliveredMessages () |> List.map List.ofSeq |> allOrderings
network.InboundMessages |> Array.iter (fun arr -> arr.Clear ())
// Process the messages!
let ordering = randomChoice rand allOrderings'
for serverConsuming, message in ordering do
cluster.SendMessageDirectly (serverConsuming * 1<ServerId>) message
for serverConsuming, (messageId, message) in ordering do
let serverConsuming = serverConsuming * 1<ServerId>
cluster.SendMessageDirectly serverConsuming message
network.DropMessage serverConsuming messageId
(cluster.Servers.[0].State = Leader && cluster.Servers.[1].State = Leader)
|> shouldEqual false
@@ -202,9 +209,9 @@ module TestServer =
let apply (History history) (cluster : Cluster<'a>) (network : Network<'a>) : unit =
for pile, entry in history do
let messages = network.InboundMessages.[pile / 1<ServerId>]
let messages = network.AllInboundMessages pile
if entry < messages.Count then
if entry < messages.Length then
cluster.SendMessageDirectly pile messages.[entry]
[<Test>]
@@ -217,11 +224,11 @@ module TestServer =
cluster.Servers.[1].Sync ()
// Those two each sent a message to every other server.
network.InboundMessages.[0].Count |> shouldEqual 1
network.InboundMessages.[1].Count |> shouldEqual 1
(network.AllInboundMessages 0<ServerId>).Length |> shouldEqual 1
(network.AllInboundMessages 1<ServerId>).Length |> shouldEqual 1
for i in 2..4 do
network.InboundMessages.[i].Count |> shouldEqual 2
(network.AllInboundMessages (i * 1<ServerId>)).Length |> shouldEqual 2
let property (history : History) =
apply history cluster network

View File

@@ -1,5 +1,7 @@
namespace Raft
open System.Collections.Generic
/// Server state which need not survive a server crash.
type VolatileState =
{
@@ -480,18 +482,37 @@ type Cluster<'a> =
type Network<'a> =
internal
{
/// InboundMessages.[i] is the collection of messages sent to
/// server `i` and waiting for you to allow them through.
InboundMessages : ResizeArray<Message<'a>>[]
/// CompleteMessageHistory.[i] is the collection of all messages
/// ever sent to server `i`.
CompleteMessageHistory : ResizeArray<Message<'a>>[]
MessagesDelivered : HashSet<int>[]
}
static member Make (clusterSize : int) =
{
CompleteMessageHistory = Array.init clusterSize (fun _ -> ResizeArray ())
MessagesDelivered = Array.init clusterSize (fun _ -> HashSet ())
}
member this.AllInboundMessages (i : int<ServerId>) : Message<'a> list =
this.InboundMessages.[i / 1<ServerId>] |> List.ofSeq
this.CompleteMessageHistory.[i / 1<ServerId>] |> List.ofSeq
member this.InboundMessage (i : int<ServerId>) (id : int) : Message<'a> =
this.InboundMessages.[i / 1<ServerId>].[id]
this.CompleteMessageHistory.[i / 1<ServerId>].[id]
member this.Size = this.InboundMessages.Length
member this.DropMessage (i : int<ServerId>) (id : int) =
this.MessagesDelivered.[i / 1<ServerId>].Add id |> ignore
member this.UndeliveredMessages (i : int<ServerId>) : (int * Message<'a>) list =
this.CompleteMessageHistory.[i / 1<ServerId>]
|> Seq.indexed
|> Seq.filter (fun (count, _) -> this.MessagesDelivered.[i / 1<ServerId>].Contains count |> not)
|> List.ofSeq
member this.AllUndeliveredMessages () : ((int * Message<'a>) list) list =
List.init this.CompleteMessageHistory.Length (fun i -> this.UndeliveredMessages (i * 1<ServerId>))
member this.ClusterSize = this.CompleteMessageHistory.Length
[<RequireQualifiedAccess>]
module InMemoryCluster =
@@ -500,15 +521,10 @@ module InMemoryCluster =
let make<'a> (count : int) : Cluster<'a> * Network<'a> =
let servers = Array.zeroCreate<Server<'a>> count
let network =
{
InboundMessages =
fun _ -> ResizeArray<Message<'a>> ()
|> Array.init count
}
let network = Network<int>.Make count
let messageChannelHold (serverId : int<ServerId>) (message : Message<'a>) : unit =
let arr = network.InboundMessages.[serverId / 1<ServerId>]
let arr = network.CompleteMessageHistory.[serverId / 1<ServerId>]
lock arr (fun () -> arr.Add message)
for s in 0 .. servers.Length - 1 do

View File

@@ -8,65 +8,68 @@ module Program =
let printNetworkState<'a> (network : Network<'a>) : unit =
let mutable wroteAnything = false
for i in 0 .. network.Size - 1 do
for count, message in Seq.indexed (network.AllInboundMessages (i * 1<ServerId>)) do
printfn "Server %i, message %i: %O" i count message
for i in 0 .. network.ClusterSize - 1 do
for messageId, message in network.UndeliveredMessages (i * 1<ServerId>) do
printfn "Server %i, message %i: %O" i messageId message
wroteAnything <- true
if not wroteAnything then
printfn "<No messages in network>"
let rec getMessage (clusterSize : int) =
printf "Enter <server ID, message ID>: "
let s = Console.ReadLine ()
let getMessage (clusterSize : int) (s : string) : (int<ServerId> * int) option =
match s.Split ',' with
| [| serverId ; messageId |] ->
let serverId = serverId.Trim ()
let messageId = messageId.Trim ()
match Int32.TryParse serverId with
| true, serverId ->
match Int32.TryParse messageId with
| true, messageId ->
if serverId >= clusterSize || serverId < 0 then
printf "Server ID must be between 0 and %i inclusive. " (clusterSize - 1)
getMessage clusterSize
None
else
serverId * 1<ServerId>, messageId
Some (serverId * 1<ServerId>, messageId)
| false, _ ->
printf "Non-integer input '%s' for message ID. " messageId
getMessage clusterSize
None
| false, _ ->
printf "Non-integer input '%s' for server ID. " serverId
getMessage clusterSize
None
| _ ->
printfn "Invalid input."
getMessage clusterSize
let rec getTimeout (clusterSize : int) =
printf "Enter server ID: "
let serverId = Console.ReadLine ()
None
let rec getTimeout (clusterSize : int) (serverId : string) =
match Int32.TryParse serverId with
| true, serverId ->
if serverId >= clusterSize || serverId < 0 then
printf "Server ID must be between 0 and %i inclusive. " (clusterSize - 1)
getTimeout clusterSize
None
else
serverId * 1<ServerId>
Some (serverId * 1<ServerId>)
| false, _ ->
printf "Unrecognised input. "
getTimeout clusterSize
None
type UserAction =
| Timeout of int<ServerId>
| NetworkMessage of int<ServerId> * int
let rec getAction (clusterSize : int) =
printf "Enter action. Trigger [t]imeout, or allow [m]essage: "
printf "Enter action. Trigger [t]imeout <server id>, or allow [m]essage <server id, message id>: "
let s = Console.ReadLine().ToUpperInvariant ()
match s with
| "T" -> getTimeout clusterSize |> Timeout
| "M" -> getMessage clusterSize |> NetworkMessage
match s.[0] with
| 'T' ->
match getTimeout clusterSize s.[1..] with
| Some t -> t |> Timeout
| None -> getAction clusterSize
| 'M' ->
match getMessage clusterSize s.[1..] with
| Some m -> m |> NetworkMessage
| None -> getAction clusterSize
| _ ->
printf "Unrecognised input. "
getAction clusterSize
@@ -85,5 +88,6 @@ module Program =
| Timeout serverId -> cluster.Timeout serverId
| NetworkMessage (serverId, messageId) ->
network.InboundMessage serverId messageId |> cluster.SendMessage serverId
network.DropMessage serverId messageId
0