Add a basic interactive client
This commit is contained in:
@@ -4,6 +4,7 @@ open System.Threading
|
||||
open Raft
|
||||
open NUnit.Framework
|
||||
open FsUnitTyped
|
||||
open FsCheck
|
||||
|
||||
[<TestFixture>]
|
||||
module TestServer =
|
||||
@@ -60,7 +61,7 @@ module TestServer =
|
||||
calls.Value |> shouldEqual 0
|
||||
|
||||
[<Test>]
|
||||
let ``Startup sequence in prod`` () =
|
||||
let ``Startup sequence in prod, only one timeout takes place`` () =
|
||||
let cluster, network = InMemoryCluster.make<int> 5
|
||||
|
||||
cluster.Servers.[0].TriggerTimeout ()
|
||||
@@ -85,3 +86,152 @@ module TestServer =
|
||||
|
||||
for i in 1..4 do
|
||||
cluster.Servers.[i].State |> shouldEqual ServerStatus.Follower
|
||||
|
||||
let popOne (queues : 'a list list) : ((int * 'a) * 'a list list) list =
|
||||
queues
|
||||
|> List.indexed
|
||||
|> List.filter (fun (index, l) -> not (List.isEmpty l))
|
||||
|> List.collect (fun (firstPlaceWithInstruction, entries) ->
|
||||
entries
|
||||
|> List.indexed
|
||||
|> List.map (fun (i, entry) -> (firstPlaceWithInstruction, entry), List.removeAt i entries)
|
||||
|> List.map (fun (removed, rest) ->
|
||||
let afterPop =
|
||||
queues
|
||||
|> List.removeAt firstPlaceWithInstruction
|
||||
|> List.insertAt firstPlaceWithInstruction rest
|
||||
|
||||
removed, afterPop
|
||||
)
|
||||
)
|
||||
|
||||
let rec allOrderings (queues : 'a list list) : (int * 'a) list list =
|
||||
let output = popOne queues
|
||||
|
||||
match output with
|
||||
| [] -> [ [] ]
|
||||
| output ->
|
||||
|
||||
output
|
||||
|> List.collect (fun (extracted, remaining) ->
|
||||
let sub = allOrderings remaining
|
||||
sub |> List.map (fun s -> extracted :: s)
|
||||
)
|
||||
|
||||
let factorial i =
|
||||
let rec go acc i =
|
||||
if i <= 0 then acc else go (acc * i) (i - 1)
|
||||
|
||||
go 1 i
|
||||
|
||||
[<TestCase(0, 1)>]
|
||||
[<TestCase(1, 1)>]
|
||||
[<TestCase(2, 2)>]
|
||||
[<TestCase(3, 6)>]
|
||||
[<TestCase(4, 24)>]
|
||||
let ``Test factorial`` (n : int, result : int) = factorial n |> shouldEqual result
|
||||
|
||||
[<Test>]
|
||||
let ``Test allOrderings`` () =
|
||||
let case = [ [ "a" ; "b" ] ; [ "c" ; "d" ; "e" ] ]
|
||||
let output = case |> allOrderings
|
||||
output |> shouldEqual (List.distinct output)
|
||||
|
||||
output
|
||||
|> List.length
|
||||
|> shouldEqual (factorial (List.concat case |> List.length))
|
||||
|
||||
let allElements = Set.ofList (List.concat case)
|
||||
|
||||
for output in output do
|
||||
output |> List.map snd |> Set.ofList |> shouldEqual allElements
|
||||
|
||||
let randomChoice<'a> (r : System.Random) (arr : 'a list) : 'a = arr.[r.Next (0, arr.Length)]
|
||||
|
||||
[<Test>]
|
||||
let ``Startup sequence in prod, two timeouts at once, random`` () =
|
||||
let rand = System.Random ()
|
||||
let cluster, network = InMemoryCluster.make<int> 5
|
||||
|
||||
cluster.Servers.[0].TriggerTimeout ()
|
||||
cluster.Servers.[0].Sync ()
|
||||
cluster.Servers.[1].TriggerTimeout ()
|
||||
cluster.Servers.[1].Sync ()
|
||||
|
||||
// Those two each sent a message to every other server.
|
||||
network.InboundMessages.[0].Count |> shouldEqual 1
|
||||
network.InboundMessages.[1].Count |> shouldEqual 1
|
||||
|
||||
for i in 2..4 do
|
||||
network.InboundMessages.[i].Count |> shouldEqual 2
|
||||
|
||||
while network.InboundMessages |> Seq.concat |> Seq.isEmpty |> not do
|
||||
let allOrderings' =
|
||||
network.InboundMessages |> List.ofArray |> List.map List.ofSeq |> allOrderings
|
||||
|
||||
network.InboundMessages |> Array.iter (fun arr -> arr.Clear ())
|
||||
// Process the messages!
|
||||
let ordering = randomChoice rand allOrderings'
|
||||
|
||||
for serverConsuming, message in ordering do
|
||||
cluster.SendMessageDirectly (serverConsuming * 1<ServerId>) message
|
||||
|
||||
(cluster.Servers.[0].State = Leader && cluster.Servers.[1].State = Leader)
|
||||
|> shouldEqual false
|
||||
|
||||
(cluster.Servers.[0].State = Candidate && cluster.Servers.[1].State = Candidate)
|
||||
|> shouldEqual false
|
||||
|
||||
((cluster.Servers.[0].State = Leader && cluster.Servers.[1].State = Candidate)
|
||||
|| (cluster.Servers.[1].State = Leader && cluster.Servers.[0].State = Candidate))
|
||||
|> shouldEqual true
|
||||
|
||||
for i in 2..4 do
|
||||
cluster.Servers.[i].State |> shouldEqual ServerStatus.Follower
|
||||
|
||||
type History = History of (int<ServerId> * int) list
|
||||
|
||||
let historyGen (clusterSize : int) =
|
||||
gen {
|
||||
let! pile = Gen.choose (0, clusterSize - 1)
|
||||
let! entry = Arb.generate<int>
|
||||
return (pile * 1<ServerId>, abs entry)
|
||||
}
|
||||
|> Gen.listOf
|
||||
|> Gen.map History
|
||||
|
||||
let apply (History history) (cluster : Cluster<'a>) (network : Network<'a>) : unit =
|
||||
for pile, entry in history do
|
||||
let messages = network.InboundMessages.[pile / 1<ServerId>]
|
||||
|
||||
if entry < messages.Count then
|
||||
cluster.SendMessageDirectly pile messages.[entry]
|
||||
|
||||
[<Test>]
|
||||
let ``Startup sequence in prod, two timeouts at once, property-based: at most one leader is elected`` () =
|
||||
let cluster, network = InMemoryCluster.make<int> 5
|
||||
|
||||
cluster.Servers.[0].TriggerTimeout ()
|
||||
cluster.Servers.[0].Sync ()
|
||||
cluster.Servers.[1].TriggerTimeout ()
|
||||
cluster.Servers.[1].Sync ()
|
||||
|
||||
// Those two each sent a message to every other server.
|
||||
network.InboundMessages.[0].Count |> shouldEqual 1
|
||||
network.InboundMessages.[1].Count |> shouldEqual 1
|
||||
|
||||
for i in 2..4 do
|
||||
network.InboundMessages.[i].Count |> shouldEqual 2
|
||||
|
||||
let property (history : History) =
|
||||
apply history cluster network
|
||||
|
||||
(cluster.Servers.[0].State = Leader && cluster.Servers.[1].State = Leader)
|
||||
|> shouldEqual false
|
||||
|
||||
for i in 2..4 do
|
||||
cluster.Servers.[i].State |> shouldEqual ServerStatus.Follower
|
||||
|
||||
property
|
||||
|> Prop.forAll (Arb.fromGen (historyGen 5))
|
||||
|> Check.QuickThrowOnFailure
|
||||
|
6
Raft.sln
6
Raft.sln
@@ -4,6 +4,8 @@ Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Raft", "Raft\Raft.fsproj",
|
||||
EndProject
|
||||
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Raft.Test", "Raft.Test\Raft.Test.fsproj", "{2807D236-2D29-45A8-AC06-1662F9629B64}"
|
||||
EndProject
|
||||
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "RaftExplorer", "RaftExplorer\RaftExplorer.fsproj", "{204FDA9A-F3B7-46CC-97F1-A39B55AA7A7A}"
|
||||
EndProject
|
||||
Global
|
||||
GlobalSection(SolutionConfigurationPlatforms) = preSolution
|
||||
Debug|Any CPU = Debug|Any CPU
|
||||
@@ -18,5 +20,9 @@ Global
|
||||
{2807D236-2D29-45A8-AC06-1662F9629B64}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{2807D236-2D29-45A8-AC06-1662F9629B64}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
{2807D236-2D29-45A8-AC06-1662F9629B64}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||
{204FDA9A-F3B7-46CC-97F1-A39B55AA7A7A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||
{204FDA9A-F3B7-46CC-97F1-A39B55AA7A7A}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{204FDA9A-F3B7-46CC-97F1-A39B55AA7A7A}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
{204FDA9A-F3B7-46CC-97F1-A39B55AA7A7A}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||
EndGlobalSection
|
||||
EndGlobal
|
||||
|
@@ -42,8 +42,14 @@ type RequestVoteReply =
|
||||
/// Whether I am happy for you to become leader. (For example, if my term is greater than yours, then you're
|
||||
/// out of date and I won't vote for you.)
|
||||
VoteGranted : bool
|
||||
/// The candidate I'm voting for or against.
|
||||
Candidate : int<ServerId>
|
||||
}
|
||||
|
||||
override this.ToString () =
|
||||
let decision = if this.VoteGranted then "in favour of" else "against"
|
||||
sprintf "Server %i voting %s %i in term %i" this.Voter decision this.Candidate this.VoterTerm
|
||||
|
||||
/// I am starting an election. Everyone, please vote.
|
||||
type RequestVoteMessage =
|
||||
{
|
||||
@@ -53,6 +59,9 @@ type RequestVoteMessage =
|
||||
ReplyChannel : RequestVoteReply -> unit
|
||||
}
|
||||
|
||||
override this.ToString () =
|
||||
sprintf "Vote request: %i in term %i" this.CandidateId this.CandidateTerm
|
||||
|
||||
/// I, a follower, acknowledge the leader's instruction to add an entry to my log.
|
||||
type AppendEntriesReply =
|
||||
{
|
||||
@@ -69,6 +78,9 @@ type LogEntry =
|
||||
Term : int<Term>
|
||||
}
|
||||
|
||||
override this.ToString () =
|
||||
sprintf "Log entry %i at subjective term %i" this.Index this.Term
|
||||
|
||||
/// I am the leader. Followers, update your state as follows.
|
||||
type AppendEntriesMessage<'a> =
|
||||
{
|
||||
@@ -86,21 +98,49 @@ type AppendEntriesMessage<'a> =
|
||||
ReplyChannel : AppendEntriesReply -> unit
|
||||
}
|
||||
|
||||
override this.ToString () =
|
||||
let description =
|
||||
match this.NewEntry with
|
||||
| None -> "Heartbeat"
|
||||
| Some (entry, term) -> sprintf "Append %+A (term %i)" entry term
|
||||
|
||||
sprintf
|
||||
"%s (leader %i at term %i whose commit index is %i)"
|
||||
description
|
||||
this.LeaderId
|
||||
this.LeaderTerm
|
||||
this.LeaderCommitIndex
|
||||
|
||||
type Instruction<'a> =
|
||||
| AppendEntries of AppendEntriesMessage<'a>
|
||||
| RequestVote of RequestVoteMessage
|
||||
|
||||
override this.ToString () =
|
||||
match this with
|
||||
| RequestVote v -> v.ToString ()
|
||||
| AppendEntries a -> a.ToString ()
|
||||
|
||||
member this.Term =
|
||||
match this with
|
||||
| AppendEntries m -> m.LeaderTerm
|
||||
| RequestVote m -> m.CandidateTerm
|
||||
|
||||
type Reply = RequestVoteReply of RequestVoteReply
|
||||
type Reply =
|
||||
| RequestVoteReply of RequestVoteReply
|
||||
|
||||
override this.ToString () =
|
||||
match this with
|
||||
| RequestVoteReply v -> v.ToString ()
|
||||
|
||||
type Message<'a> =
|
||||
| Instruction of Instruction<'a>
|
||||
| Reply of Reply
|
||||
|
||||
override this.ToString () =
|
||||
match this with
|
||||
| Instruction i -> i.ToString ()
|
||||
| Reply r -> r.ToString ()
|
||||
|
||||
type private CandidateState =
|
||||
{
|
||||
/// For each voter, the vote I received from them.
|
||||
@@ -220,6 +260,7 @@ type Server<'a>
|
||||
Voter = me
|
||||
VoterTerm = persistentState.CurrentTerm
|
||||
VoteGranted = true
|
||||
Candidate = message.CandidateId
|
||||
}
|
||||
|> message.ReplyChannel
|
||||
|
||||
@@ -344,8 +385,6 @@ type Server<'a>
|
||||
let mailbox =
|
||||
let rec loop (mailbox : MailboxProcessor<_>) =
|
||||
async {
|
||||
// TODO this should really wait for explicit Sync calls
|
||||
// if we're running in memory, for determinism.
|
||||
let! m = mailbox.Receive ()
|
||||
//let toPrint = sprintf "Processing message in server %i: %+A" me m
|
||||
//System.Console.WriteLine toPrint
|
||||
@@ -360,6 +399,7 @@ type Server<'a>
|
||||
// Start the election!
|
||||
currentType <- ServerSpecialisation.Candidate (CandidateState.New clusterSize me)
|
||||
persistentState.IncrementTerm ()
|
||||
persistentState.Vote me
|
||||
|
||||
for i in 0 .. clusterSize - 1 do
|
||||
if i * 1<ServerId> <> me then
|
||||
@@ -431,6 +471,12 @@ type Cluster<'a> =
|
||||
SendMessageDirectly : int<ServerId> -> Message<'a> -> unit
|
||||
}
|
||||
|
||||
member this.SendMessage (i : int<ServerId>) (m : Message<'a>) : unit = this.SendMessageDirectly i m
|
||||
|
||||
member this.Timeout (i : int<ServerId>) : unit =
|
||||
this.Servers.[i / 1<ServerId>].TriggerTimeout ()
|
||||
this.Servers.[i / 1<ServerId>].Sync ()
|
||||
|
||||
type Network<'a> =
|
||||
internal
|
||||
{
|
||||
@@ -439,6 +485,14 @@ type Network<'a> =
|
||||
InboundMessages : ResizeArray<Message<'a>>[]
|
||||
}
|
||||
|
||||
member this.AllInboundMessages (i : int<ServerId>) : Message<'a> list =
|
||||
this.InboundMessages.[i / 1<ServerId>] |> List.ofSeq
|
||||
|
||||
member this.InboundMessage (i : int<ServerId>) (id : int) : Message<'a> =
|
||||
this.InboundMessages.[i / 1<ServerId>].[id]
|
||||
|
||||
member this.Size = this.InboundMessages.Length
|
||||
|
||||
[<RequireQualifiedAccess>]
|
||||
module InMemoryCluster =
|
||||
|
||||
|
89
RaftExplorer/Program.fs
Normal file
89
RaftExplorer/Program.fs
Normal file
@@ -0,0 +1,89 @@
|
||||
namespace Raft.Explorer
|
||||
|
||||
open System
|
||||
open Raft
|
||||
|
||||
module Program =
|
||||
|
||||
let printNetworkState<'a> (network : Network<'a>) : unit =
|
||||
let mutable wroteAnything = false
|
||||
|
||||
for i in 0 .. network.Size - 1 do
|
||||
for count, message in Seq.indexed (network.AllInboundMessages (i * 1<ServerId>)) do
|
||||
printfn "Server %i, message %i: %O" i count message
|
||||
wroteAnything <- true
|
||||
|
||||
if not wroteAnything then
|
||||
printfn "<No messages in network>"
|
||||
|
||||
let rec getMessage (clusterSize : int) =
|
||||
printf "Enter <server ID, message ID>: "
|
||||
let s = Console.ReadLine ()
|
||||
|
||||
match s.Split ',' with
|
||||
| [| serverId ; messageId |] ->
|
||||
match Int32.TryParse serverId with
|
||||
| true, serverId ->
|
||||
match Int32.TryParse messageId with
|
||||
| true, messageId ->
|
||||
if serverId >= clusterSize || serverId < 0 then
|
||||
printf "Server ID must be between 0 and %i inclusive. " (clusterSize - 1)
|
||||
getMessage clusterSize
|
||||
else
|
||||
serverId * 1<ServerId>, messageId
|
||||
| false, _ ->
|
||||
printf "Non-integer input '%s' for message ID. " messageId
|
||||
getMessage clusterSize
|
||||
| false, _ ->
|
||||
printf "Non-integer input '%s' for server ID. " serverId
|
||||
getMessage clusterSize
|
||||
| _ ->
|
||||
printfn "Invalid input."
|
||||
getMessage clusterSize
|
||||
|
||||
let rec getTimeout (clusterSize : int) =
|
||||
printf "Enter server ID: "
|
||||
let serverId = Console.ReadLine ()
|
||||
|
||||
match Int32.TryParse serverId with
|
||||
| true, serverId ->
|
||||
if serverId >= clusterSize || serverId < 0 then
|
||||
printf "Server ID must be between 0 and %i inclusive. " (clusterSize - 1)
|
||||
getTimeout clusterSize
|
||||
else
|
||||
serverId * 1<ServerId>
|
||||
| false, _ ->
|
||||
printf "Unrecognised input. "
|
||||
getTimeout clusterSize
|
||||
|
||||
type UserAction =
|
||||
| Timeout of int<ServerId>
|
||||
| NetworkMessage of int<ServerId> * int
|
||||
|
||||
let rec getAction (clusterSize : int) =
|
||||
printf "Enter action. Trigger [t]imeout, or allow [m]essage: "
|
||||
let s = Console.ReadLine().ToUpperInvariant ()
|
||||
|
||||
match s with
|
||||
| "T" -> getTimeout clusterSize |> Timeout
|
||||
| "M" -> getMessage clusterSize |> NetworkMessage
|
||||
| _ ->
|
||||
printf "Unrecognised input. "
|
||||
getAction clusterSize
|
||||
|
||||
[<EntryPoint>]
|
||||
let main _argv =
|
||||
let clusterSize = 5
|
||||
let cluster, network = InMemoryCluster.make<int> clusterSize
|
||||
|
||||
while true do
|
||||
printNetworkState network
|
||||
|
||||
let action = getAction clusterSize
|
||||
|
||||
match action with
|
||||
| Timeout serverId -> cluster.Timeout serverId
|
||||
| NetworkMessage (serverId, messageId) ->
|
||||
network.InboundMessage serverId messageId |> cluster.SendMessage serverId
|
||||
|
||||
0
|
16
RaftExplorer/RaftExplorer.fsproj
Normal file
16
RaftExplorer/RaftExplorer.fsproj
Normal file
@@ -0,0 +1,16 @@
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<PropertyGroup>
|
||||
<OutputType>Exe</OutputType>
|
||||
<TargetFramework>net6.0</TargetFramework>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<Compile Include="Program.fs" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\Raft\Raft.fsproj" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
Reference in New Issue
Block a user