All messages now go over the network
This commit is contained in:
@@ -10,7 +10,7 @@ module TestServer =
|
||||
|
||||
[<Test>]
|
||||
let ``Startup sequence, first fumbling steps`` () =
|
||||
let cluster = InMemoryCluster.make<int> true 5
|
||||
let cluster, network = InMemoryCluster.make<int> 5
|
||||
|
||||
let logger, logs = TestLogger.make ()
|
||||
|
||||
@@ -22,8 +22,9 @@ module TestServer =
|
||||
ReplyChannel = fun message -> logger (sprintf "Received message for term %i" message.VoterTerm)
|
||||
CandidateLastLogEntry = 0<LogIndex>, 0<Term>
|
||||
}
|
||||
|> Message.RequestVote
|
||||
|> cluster.SendMessage 0<ServerId>
|
||||
|> Instruction.RequestVote
|
||||
|> Message.Instruction
|
||||
|> cluster.SendMessageDirectly 0<ServerId>
|
||||
|
||||
logs () |> shouldEqual [ "Received message for term 0" ]
|
||||
|
||||
@@ -35,8 +36,9 @@ module TestServer =
|
||||
ReplyChannel = fun message -> logger (sprintf "Received message for term %i" message.VoterTerm)
|
||||
CandidateLastLogEntry = 0<LogIndex>, 0<Term>
|
||||
}
|
||||
|> Message.RequestVote
|
||||
|> cluster.SendMessage 0<ServerId>
|
||||
|> Instruction.RequestVote
|
||||
|> Message.Instruction
|
||||
|> cluster.SendMessageDirectly 0<ServerId>
|
||||
|
||||
logs ()
|
||||
|> shouldEqual [ "Received message for term 0" ; "Received message for term 0" ]
|
||||
@@ -51,21 +53,33 @@ module TestServer =
|
||||
ReplyChannel = fun _ -> Interlocked.Increment calls |> ignore
|
||||
CandidateLastLogEntry = 0<LogIndex>, 0<Term>
|
||||
}
|
||||
|> Message.RequestVote
|
||||
|> cluster.SendMessage 0<ServerId>
|
||||
|> Instruction.RequestVote
|
||||
|> Message.Instruction
|
||||
|> cluster.SendMessageDirectly 0<ServerId>
|
||||
|
||||
calls.Value |> shouldEqual 0
|
||||
|
||||
[<Test>]
|
||||
let ``Startup sequence in prod`` () =
|
||||
let cluster = InMemoryCluster.make<int> false 5
|
||||
let cluster, network = InMemoryCluster.make<int> 5
|
||||
|
||||
cluster.Servers.[0].TriggerTimeout ()
|
||||
cluster.Servers.[0].Sync ()
|
||||
|
||||
// We sent a message to every other server; process them.
|
||||
for i in 1..4 do
|
||||
cluster.Servers.[i].Sync ()
|
||||
network.InboundMessages.[i].Count |> shouldEqual 1
|
||||
let message = network.InboundMessages.[i].[0]
|
||||
network.InboundMessages.[i].Clear ()
|
||||
cluster.SendMessageDirectly (i * 1<ServerId>) message
|
||||
|
||||
network.InboundMessages.[0].Count |> shouldEqual i
|
||||
|
||||
for i in 1..4 do
|
||||
cluster.SendMessageDirectly 0<ServerId> network.InboundMessages.[0].[i - 1]
|
||||
// (the messages we've already processed)
|
||||
network.InboundMessages.[0].Count |> shouldEqual 4
|
||||
network.InboundMessages.[0].Clear ()
|
||||
|
||||
cluster.Servers.[0].State |> shouldEqual ServerStatus.Leader
|
||||
|
||||
|
@@ -1,14 +1,15 @@
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<PropertyGroup>
|
||||
<TargetFramework>net6.0</TargetFramework>
|
||||
<TargetFramework>net6.0</TargetFramework>
|
||||
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<Compile Include="AssemblyInfo.fs" />
|
||||
<Compile Include="Measures.fs" />
|
||||
<Compile Include="PersistentState.fs" />
|
||||
<Compile Include="Server.fs" />
|
||||
<Compile Include="AssemblyInfo.fs" />
|
||||
<Compile Include="Measures.fs" />
|
||||
<Compile Include="PersistentState.fs" />
|
||||
<Compile Include="Server.fs" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
|
@@ -86,7 +86,7 @@ type AppendEntriesMessage<'a> =
|
||||
ReplyChannel : AppendEntriesReply -> unit
|
||||
}
|
||||
|
||||
type Message<'a> =
|
||||
type Instruction<'a> =
|
||||
| AppendEntries of AppendEntriesMessage<'a>
|
||||
| RequestVote of RequestVoteMessage
|
||||
|
||||
@@ -95,6 +95,12 @@ type Message<'a> =
|
||||
| AppendEntries m -> m.LeaderTerm
|
||||
| RequestVote m -> m.CandidateTerm
|
||||
|
||||
type Reply = RequestVoteReply of RequestVoteReply
|
||||
|
||||
type Message<'a> =
|
||||
| Instruction of Instruction<'a>
|
||||
| Reply of Reply
|
||||
|
||||
type private CandidateState =
|
||||
{
|
||||
/// For each voter, the vote I received from them.
|
||||
@@ -123,7 +129,6 @@ type ServerStatus =
|
||||
type private ServerAction<'a> =
|
||||
| BeginElection
|
||||
| Receive of Message<'a>
|
||||
| ReceiveReply of RequestVoteReply
|
||||
| Sync of AsyncReplyChannel<unit>
|
||||
|
||||
type Server<'a>
|
||||
@@ -133,11 +138,11 @@ type Server<'a>
|
||||
persistentState : IPersistentState<'a>,
|
||||
messageChannel : int<ServerId> -> Message<'a> -> unit
|
||||
)
|
||||
as this=
|
||||
=
|
||||
let mutable volatileState = VolatileState.New
|
||||
let mutable currentType = ServerSpecialisation.Follower
|
||||
|
||||
let processMessage (message : Message<'a>) : unit =
|
||||
let processMessage (message : Instruction<'a>) : unit =
|
||||
// First, see if this message comes from a future term.
|
||||
// (This is `UpdateTerm` from the TLA+.)
|
||||
if message.Term > persistentState.CurrentTerm then
|
||||
@@ -342,8 +347,8 @@ type Server<'a>
|
||||
// TODO this should really wait for explicit Sync calls
|
||||
// if we're running in memory, for determinism.
|
||||
let! m = mailbox.Receive ()
|
||||
let toPrint = sprintf "Processing message in server %i: %+A" me m
|
||||
System.Console.WriteLine toPrint
|
||||
//let toPrint = sprintf "Processing message in server %i: %+A" me m
|
||||
//System.Console.WriteLine toPrint
|
||||
|
||||
match m with
|
||||
| ServerAction.BeginElection ->
|
||||
@@ -367,13 +372,16 @@ type Server<'a>
|
||||
| None ->
|
||||
// TODO this is almost certainly not right
|
||||
(0<LogIndex>, 0<Term>)
|
||||
ReplyChannel = fun reply -> mailbox.Post (ReceiveReply reply)
|
||||
ReplyChannel =
|
||||
// TODO this is bypassing the network - stop it!
|
||||
fun reply -> messageChannel me (RequestVoteReply reply |> Message.Reply)
|
||||
}
|
||||
|> Message.RequestVote
|
||||
|> Instruction.RequestVote
|
||||
|> Message.Instruction
|
||||
|> messageChannel (i * 1<ServerId>)
|
||||
| ServerAction.Receive m -> return processMessage m
|
||||
| ServerAction.Receive (Instruction m) -> return processMessage m
|
||||
| ServerAction.Sync reply -> reply.Reply ()
|
||||
| ServerAction.ReceiveReply requestVoteReply ->
|
||||
| ServerAction.Receive (Reply (RequestVoteReply requestVoteReply)) ->
|
||||
match currentType with
|
||||
| ServerSpecialisation.Leader _
|
||||
| ServerSpecialisation.Follower ->
|
||||
@@ -416,42 +424,49 @@ type Server<'a>
|
||||
| ServerSpecialisation.Candidate _ -> ServerStatus.Candidate
|
||||
| ServerSpecialisation.Follower -> ServerStatus.Follower
|
||||
|
||||
// {
|
||||
// ClusterSize : int
|
||||
// mutable VolatileState : VolatileState
|
||||
// PersistentState : IPersistentState<'a>
|
||||
// mutable Type : ServerSpecialisation
|
||||
// TriggerTimeout : unit -> unit
|
||||
// Mailbox : MailboxProcessor<ServerAction<'a>>
|
||||
// }
|
||||
|
||||
type Cluster<'a> =
|
||||
internal
|
||||
{
|
||||
Servers : Server<'a> array
|
||||
SendMessage : int<ServerId> -> Message<'a> -> unit
|
||||
SendMessageDirectly : int<ServerId> -> Message<'a> -> unit
|
||||
}
|
||||
|
||||
type Network<'a> =
|
||||
internal
|
||||
{
|
||||
/// InboundMessages.[i] is the collection of messages sent to
|
||||
/// server `i` and waiting for you to allow them through.
|
||||
InboundMessages : ResizeArray<Message<'a>>[]
|
||||
}
|
||||
|
||||
[<RequireQualifiedAccess>]
|
||||
module InMemoryCluster =
|
||||
|
||||
[<RequiresExplicitTypeArguments>]
|
||||
let make<'a> (immediateFlush : bool) (count : int) : Cluster<'a> =
|
||||
let make<'a> (count : int) : Cluster<'a> * Network<'a> =
|
||||
let servers = Array.zeroCreate<Server<'a>> count
|
||||
|
||||
let messageChannel (serverId : int<ServerId>) (message : Message<'a>) : unit =
|
||||
servers.[serverId / 1<ServerId>].Message message
|
||||
let network =
|
||||
{
|
||||
InboundMessages =
|
||||
fun _ -> ResizeArray<Message<'a>> ()
|
||||
|> Array.init count
|
||||
}
|
||||
|
||||
let messageChannelHold (serverId : int<ServerId>) (message : Message<'a>) : unit =
|
||||
let arr = network.InboundMessages.[serverId / 1<ServerId>]
|
||||
lock arr (fun () -> arr.Add message)
|
||||
|
||||
for s in 0 .. servers.Length - 1 do
|
||||
servers.[s] <- Server (count, s * 1<ServerId>, InMemoryPersistentState (), messageChannel)
|
||||
servers.[s] <- Server (count, s * 1<ServerId>, InMemoryPersistentState (), messageChannelHold)
|
||||
|
||||
{
|
||||
Servers = servers
|
||||
SendMessage =
|
||||
if immediateFlush then
|
||||
let cluster =
|
||||
{
|
||||
Servers = servers
|
||||
SendMessageDirectly =
|
||||
fun i m ->
|
||||
servers.[i / 1<ServerId>].Message m
|
||||
servers.[i / 1<ServerId>].Sync ()
|
||||
else
|
||||
fun i -> servers.[i / 1<ServerId>].Message
|
||||
}
|
||||
}
|
||||
|
||||
cluster, network
|
||||
|
Reference in New Issue
Block a user