From 72785eda0623017aa99b21f53eb59e2e23fef31c Mon Sep 17 00:00:00 2001 From: Smaug123 Date: Sun, 13 Nov 2022 21:44:19 +0000 Subject: [PATCH] Use spans for Efficiency --- Raft.Test/NetworkAction.fs | 33 +++++ Raft.Test/Raft.Test.fsproj | 3 + Raft.Test/Result.fs | 9 ++ Raft.Test/TestInMemoryServer.fs | 47 +++++-- Raft.Test/TestNetworkAction.fs | 37 ++++++ Raft.Test/ValidHistory.fs | 14 -- Raft/EfficientString.fs | 44 +++++++ Raft/InMemory.fs | 221 +++++++++++++++++--------------- Raft/PersistentState.fs | 4 +- Raft/Raft.fsproj | 7 +- Raft/Server.fs | 45 +++---- RaftExplorer/Program.fs | 1 - 12 files changed, 315 insertions(+), 150 deletions(-) create mode 100644 Raft.Test/NetworkAction.fs create mode 100644 Raft.Test/Result.fs create mode 100644 Raft.Test/TestNetworkAction.fs create mode 100644 Raft/EfficientString.fs diff --git a/Raft.Test/NetworkAction.fs b/Raft.Test/NetworkAction.fs new file mode 100644 index 0000000..abb97f4 --- /dev/null +++ b/Raft.Test/NetworkAction.fs @@ -0,0 +1,33 @@ +namespace Raft.Test + +open FsCheck +open Raft + +[] +module NetworkAction = + + let generate<'a> (clusterSize : int) : Gen> = + gen { + let! choice = Arb.generate> + let! server = Gen.choose (0, clusterSize - 1) + let server = server * 1 + + match choice with + | NetworkAction.InactivityTimeout _ -> return NetworkAction.InactivityTimeout server + | NetworkAction.NetworkMessage (_, message) -> return NetworkAction.NetworkMessage (server, abs message) + | NetworkAction.DropMessage (_, message) -> return NetworkAction.DropMessage (server, abs message) + | NetworkAction.Heartbeat _ -> return NetworkAction.Heartbeat server + | NetworkAction.ClientRequest (_, ClientRequest.ClientRequest (client, sequence, data, func)) -> + return + NetworkAction.ClientRequest (server, ClientRequest.ClientRequest (client, sequence, data, ignore)) + | NetworkAction.ClientRequest (_, ClientRequest.RegisterClient _) -> + return NetworkAction.ClientRequest (server, ClientRequest.RegisterClient ignore) + } + + let rec genNoClientRequests<'a> (clusterSize : int) : Gen> = + generate clusterSize + |> Gen.filter (fun action -> + match action with + | NetworkAction.ClientRequest _ -> false + | _ -> true + ) diff --git a/Raft.Test/Raft.Test.fsproj b/Raft.Test/Raft.Test.fsproj index 2d41ddc..d81f07c 100644 --- a/Raft.Test/Raft.Test.fsproj +++ b/Raft.Test/Raft.Test.fsproj @@ -9,6 +9,9 @@ + + + diff --git a/Raft.Test/Result.fs b/Raft.Test/Result.fs new file mode 100644 index 0000000..6bc7070 --- /dev/null +++ b/Raft.Test/Result.fs @@ -0,0 +1,9 @@ +namespace Raft.Test + +[] +module Result = + + let get<'a, 'b> (r : Result<'a, 'b>) : 'a = + match r with + | Ok a -> a + | Error e -> failwithf "%+A" e diff --git a/Raft.Test/TestInMemoryServer.fs b/Raft.Test/TestInMemoryServer.fs index 0e6179c..9373601 100644 --- a/Raft.Test/TestInMemoryServer.fs +++ b/Raft.Test/TestInMemoryServer.fs @@ -9,6 +9,45 @@ open FsCheck [] module TestInMemoryServer = + let check<'T> (prop : 'T) = + let config = + { Config.QuickThrowOnFailure with + MaxTest = 1000 + } + + Check.One (config, prop) + + let parseByte (s : string) : Result = + match System.Byte.TryParse s with + | false, _ -> Error (sprintf "oh no: %s" s) + | true, v -> Ok v + + [] + let ``Can round-trip NetworkAction`` () = + let property (action : NetworkAction) = + let roundTripped = + NetworkAction.toString action + |> NetworkAction.tryParse parseByte None ignore ignore 5 + |> Result.get + + match roundTripped, action with + | NetworkAction.ClientRequest (server1, request1), NetworkAction.ClientRequest (server2, request2) -> + match request1, request2 with + | ClientRequest.ClientRequest (client1, seq1, data1, _), + ClientRequest.ClientRequest (client2, seq2, data2, _) -> + server1 = server2 && client1 = client2 && seq1 = seq2 && data1 = data2 + | ClientRequest.RegisterClient _, ClientRequest.RegisterClient _ -> server1 = server2 + | _, _ -> false + | NetworkAction.InactivityTimeout server1, NetworkAction.InactivityTimeout server2 -> server1 = server2 + | NetworkAction.Heartbeat server1, NetworkAction.Heartbeat server2 -> server1 = server2 + | NetworkAction.DropMessage (server1, message1), NetworkAction.DropMessage (server2, message2) -> + server1 = server2 && message1 = message2 + | NetworkAction.NetworkMessage (server1, message1), NetworkAction.NetworkMessage (server2, message2) -> + server1 = server2 && message1 = message2 + | _, _ -> false + + property |> Prop.forAll (NetworkAction.generate 5 |> Arb.fromGen) |> check + [] let ``Startup sequence in prod, only one timeout takes place`` () = let cluster, network = InMemoryCluster.make 5 @@ -168,14 +207,6 @@ module TestInMemoryServer = if entry < messages.Length then cluster.SendMessageDirectly pile messages.[entry] - let check (prop : Property) = - let config = - { Config.QuickThrowOnFailure with - MaxTest = 1000 - } - - Check.One (config, prop) - [] let ``Startup sequence in prod, two timeouts at once, property-based: at most one leader is elected`` () = let property (history : NetworkMessageSelection) = diff --git a/Raft.Test/TestNetworkAction.fs b/Raft.Test/TestNetworkAction.fs new file mode 100644 index 0000000..a472016 --- /dev/null +++ b/Raft.Test/TestNetworkAction.fs @@ -0,0 +1,37 @@ +namespace Raft.Test + +open Raft +open System.Collections.Generic +open NUnit.Framework +open FsCheck +open FsUnitTyped + +[] +module TestNetworkAction = + + [] + let ``Generator hits all cases`` () = + let populate (cases : HashSet<_>) (a : NetworkAction<'a>) : bool = + cases.Add (a.GetType ()) |> ignore + true + + let authoritativeCases = HashSet () + Check.QuickThrowOnFailure (populate authoritativeCases) + + let authoritativeCases = + authoritativeCases |> Seq.map (fun ty -> ty.FullName) |> Set.ofSeq + + let ourCases = HashSet () + + populate ourCases + |> Prop.forAll (Arb.fromGen (NetworkAction.generate 5)) + |> Check.QuickThrowOnFailure + + let ourCases = ourCases |> Seq.map (fun ty -> ty.FullName) |> Set.ofSeq + Set.difference authoritativeCases ourCases |> shouldBeEmpty + + // Sanity check that we are actually measuring what we think we're measuring + ourCases + |> Seq.map (fun name -> name.Split ('[') |> Array.head) + |> Set.ofSeq + |> shouldContain "Raft.NetworkAction`1+DropMessage" diff --git a/Raft.Test/ValidHistory.fs b/Raft.Test/ValidHistory.fs index 0772a3d..00366ed 100644 --- a/Raft.Test/ValidHistory.fs +++ b/Raft.Test/ValidHistory.fs @@ -20,20 +20,6 @@ module ValidHistory = if isValid then Some (ValidHistory history) else None - let rec private networkActionGenNoClientRequests<'a> (clusterSize : int) : Gen> = - gen { - let! choice = Arb.generate> - let! server = Gen.choose (0, clusterSize - 1) - let server = server * 1 - - match choice with - | NetworkAction.InactivityTimeout _ -> return NetworkAction.InactivityTimeout server - | NetworkAction.NetworkMessage (_, message) -> return NetworkAction.NetworkMessage (server, abs message) - | NetworkAction.DropMessage (_, message) -> return NetworkAction.DropMessage (server, abs message) - | NetworkAction.Heartbeat _ -> return NetworkAction.Heartbeat server - | NetworkAction.ClientRequest _ -> return! networkActionGenNoClientRequests clusterSize - } - let private historyGenOfLength<'a> (elementGen : Gen<'a>) (clusterSize : int) diff --git a/Raft/EfficientString.fs b/Raft/EfficientString.fs new file mode 100644 index 0000000..8887f68 --- /dev/null +++ b/Raft/EfficientString.fs @@ -0,0 +1,44 @@ +namespace Raft + +open System + +type internal EfficientString = +#if NETSTANDARD2_0 + string +#else + System.ReadOnlySpan +#endif + +[] +module internal EfficientString = + + let inline ofString (s : string) : EfficientString = +#if NETSTANDARD2_0 + s +#else + s.AsSpan () +#endif + + let inline trimStart (s : EfficientString) : EfficientString = s.TrimStart () + + let inline slice (start : int) (length : int) (s : EfficientString) : EfficientString = +#if NETSTANDARD2_0 + s.[start .. start + length - 1] +#else + s.Slice (start, length) +#endif + + /// Mutates the input to drop up to the first instance of the input char, + /// and returns what was dropped. + /// If the char is not present, deletes the input. + let takeUntil<'a> (c : char) (s : EfficientString byref) : EfficientString = + let first = s.IndexOf c + + if first < 0 then + let toRet = s + s <- EfficientString.Empty + toRet + else + let toRet = slice 0 first s + s <- slice (first + 1) (s.Length - first - 1) s + toRet diff --git a/Raft/InMemory.fs b/Raft/InMemory.fs index 6d58651..1b2d4b6 100644 --- a/Raft/InMemory.fs +++ b/Raft/InMemory.fs @@ -13,17 +13,17 @@ type Cluster<'a> = member this.SendMessage (i : int) (m : Message<'a>) : unit = this.SendMessageDirectly i m member this.InactivityTimeout (i : int) : unit = - this.Servers.[i / 1].TriggerInactivityTimeout () - this.Servers.[i / 1].Sync () + this.Servers[ i / 1 ].TriggerInactivityTimeout () + this.Servers[ i / 1 ].Sync () member this.HeartbeatTimeout (i : int) : unit = - this.Servers.[i / 1].TriggerHeartbeatTimeout () - this.Servers.[i / 1].Sync () + this.Servers[ i / 1 ].TriggerHeartbeatTimeout () + this.Servers[ i / 1 ].Sync () - member this.Status (i : int) : ServerStatus = this.Servers.[i / 1].State + member this.Status (i : int) : ServerStatus = this.Servers[i / 1].State member this.GetCurrentInternalState (i : int) : ServerInternalState<'a> Async = - this.Servers.[i / 1].GetCurrentInternalState () + this.Servers[ i / 1 ].GetCurrentInternalState () member this.ClusterSize : int = this.Servers.Length @@ -55,21 +55,21 @@ type Network<'a> = } member this.AllInboundMessages (i : int) : Message<'a> list = - this.CompleteMessageHistory.[i / 1] |> List.ofSeq + this.CompleteMessageHistory[i / 1] |> List.ofSeq member this.InboundMessage (i : int) (id : int) : Message<'a> = - this.CompleteMessageHistory.[i / 1].[id] + this.CompleteMessageHistory[i / 1].[id] member this.DropMessage (i : int) (id : int) : unit = - this.MessagesDelivered.[i / 1].Add id |> ignore + this.MessagesDelivered[ i / 1 ].Add id |> ignore member this.UndeliveredMessages (i : int) : (int * Message<'a>) list = - this.CompleteMessageHistory.[i / 1] + this.CompleteMessageHistory[i / 1] |> Seq.indexed - |> Seq.filter (fun (count, _) -> this.MessagesDelivered.[i / 1].Contains count |> not) + |> Seq.filter (fun (count, _) -> this.MessagesDelivered[ i / 1 ].Contains count |> not) |> List.ofSeq - member this.AllUndeliveredMessages () : ((int * Message<'a>) list) list = + member this.AllUndeliveredMessages () : (int * Message<'a>) list list = List.init this.CompleteMessageHistory.Length (fun i -> this.UndeliveredMessages (i * 1)) member this.ClusterSize = this.CompleteMessageHistory.Length @@ -84,19 +84,19 @@ module InMemoryCluster = let network = Network.Make count let messageChannelHold (serverId : int) (message : Message<'a>) : unit = - let arr = network.CompleteMessageHistory.[serverId / 1] + let arr = network.CompleteMessageHistory[serverId / 1] lock arr (fun () -> arr.Add message) for s in 0 .. servers.Length - 1 do - servers.[s] <- Server (count, s * 1, InMemoryPersistentState (), messageChannelHold) + servers[s] <- Server (count, s * 1, InMemoryPersistentState (), messageChannelHold) let cluster = { Servers = servers SendMessageDirectly = fun i m -> - servers.[i / 1].Message m - servers.[i / 1].Sync () + servers[ i / 1 ].Message m + servers[ i / 1 ].Sync () } cluster, network @@ -121,28 +121,33 @@ module NetworkAction = network.DropMessage serverId messageId | ClientRequest (server, request) -> Message.ClientRequest request |> cluster.SendMessage server - let private getMessage (clusterSize : int) (s : string) : Result * int, string> = - match s.Split ',' with - | [| serverId ; messageId |] -> - let serverId = serverId.TrimEnd () - let messageId = messageId.Trim () + let private getMessage (clusterSize : int) (s : EfficientString) : Result * int, string> = + let mutable messageId = s + let serverId = EfficientString.takeUntil ',' &messageId - match Int32.TryParse serverId with - | true, serverId -> - match Int32.TryParse messageId with - | true, messageId -> - if serverId >= clusterSize || serverId < 0 then - sprintf "Server ID must be between 0 and %i inclusive, got %i" (clusterSize - 1) serverId - |> Error - else - Ok (serverId * 1, messageId) - | false, _ -> sprintf "Non-integer input '%s' for message ID." messageId |> Error - | false, _ -> sprintf "Non-integer input '%s' for server ID." serverId |> Error - | _ -> Error "Expected a single comma." + let serverId = serverId.TrimEnd () + let messageId = messageId.Trim () - let private getTimeout (clusterSize : int) (serverId : string) : Result, string> = match Int32.TryParse serverId with - | false, _ -> Error (sprintf "Expected an integer, got '%s'" serverId) + | false, _ -> sprintf "Non-integer input '%s' for server ID." (serverId.ToString ()) |> Error + | true, serverId -> + + match Int32.TryParse messageId with + | false, _ -> + sprintf "Non-integer input '%s' for message ID." (messageId.ToString ()) + |> Error + + | true, messageId -> + + if serverId >= clusterSize || serverId < 0 then + sprintf "Server ID must be between 0 and %i inclusive, got %i" (clusterSize - 1) serverId + |> Error + else + Ok (serverId * 1, messageId) + + let private getTimeout (clusterSize : int) (serverId : EfficientString) : Result, string> = + match Int32.TryParse serverId with + | false, _ -> Error (sprintf "Expected an integer, got '%s'" (serverId.ToString ())) | true, serverId -> if serverId >= clusterSize || serverId < 0 then @@ -151,9 +156,11 @@ module NetworkAction = else serverId * 1 |> Ok - let private getHeartbeat (leaders : Set> option) (clusterSize : int) (serverId : string) = + let private getHeartbeat (leaders : Set> option) (clusterSize : int) (serverId : EfficientString) = match Int32.TryParse serverId with - | false, _ -> sprintf "Expected an integer server ID, got '%s'" serverId |> Error + | false, _ -> + sprintf "Expected an integer server ID, got '%s'" (serverId.ToString ()) + |> Error | true, serverId -> if serverId >= clusterSize || serverId < 0 then @@ -172,9 +179,15 @@ module NetworkAction = else sprintf "Cannot heartbeat a non-leader (%i)." serverId |> Error - let private getNewClientTarget<'a> (clusterSize : int) (serverId : string) : Result, string> = + let private getNewClientTarget<'a> + (clusterSize : int) + (serverId : EfficientString) + : Result, string> + = match Int32.TryParse serverId with - | false, _ -> sprintf "Expected an int for a server ID, got '%s'" serverId |> Error + | false, _ -> + sprintf "Expected an int for a server ID, got '%s'" (serverId.ToString ()) + |> Error | true, serverId -> if serverId >= clusterSize || serverId < 0 then @@ -183,47 +196,43 @@ module NetworkAction = else Ok (serverId * 1) + /// Mutates the input byref to contain the result. let private getClientSubmitData<'a> - (parse : string -> Result<'a, string>) (clusterSize : int) - (s : string) - : Result * int * int * 'a, string> + (s : byref) + : Result * int * int, string> = - match s.Split ',' |> List.ofArray with - | serverId :: clientId :: clientSequenceNumber :: (_ :: _ as rest) -> - let rest = String.concat "," rest |> fun s -> s.TrimStart () + let serverId = EfficientString.takeUntil ',' &s + let clientId = EfficientString.takeUntil ',' &s + let clientSequenceNumber = EfficientString.takeUntil ',' &s - match Int32.TryParse (serverId.Trim ()) with - | false, _ -> - sprintf "Server ID expected as first comma-separated component, got '%s'." serverId - |> Error - | true, serverId -> - - if serverId >= clusterSize || serverId < 0 then - sprintf "Server ID must be between 0 and %i inclusive, got %i." (clusterSize - 1) serverId - |> Error - else - - match Int32.TryParse (clientId.Trim ()) with - | false, _ -> - sprintf "Client ID expected as second comma-separated component, got '%s'." clientId - |> Error - | true, clientId -> - - match Int32.TryParse (clientSequenceNumber.Trim ()) with - | false, _ -> - sprintf - "Client sequence number expected as third comma-separated component, got '%s'." - clientSequenceNumber - |> Error - | true, clientSequenceNumber -> - - match parse rest with - | Ok b -> Ok (serverId * 1, clientId * 1, clientSequenceNumber * 1, b) - | Error e -> sprintf "Failed to parse client data: %s" e |> Error - | _ -> - sprintf "Expected serverId,clientId,clientSequenceNumber,data; got '%s'" s + match Int32.TryParse (serverId.Trim ()) with + | false, _ -> + sprintf "Server ID expected as first comma-separated component, got '%s'." (serverId.ToString ()) |> Error + | true, serverId -> + + if serverId >= clusterSize || serverId < 0 then + sprintf "Server ID must be between 0 and %i inclusive, got %i." (clusterSize - 1) serverId + |> Error + else + + match Int32.TryParse (clientId.Trim ()) with + | false, _ -> + sprintf "Client ID expected as second comma-separated component, got '%s'." (clientId.ToString ()) + |> Error + | true, clientId -> + + match Int32.TryParse (clientSequenceNumber.Trim ()) with + | false, _ -> + sprintf + "Client sequence number expected as third comma-separated component, got '%s'." + (clientSequenceNumber.ToString ()) + |> Error + | true, clientSequenceNumber -> + + (serverId * 1, clientId * 1, clientSequenceNumber * 1) + |> Ok /// Optionally also validates leaders against the input set of leaders. let tryParse<'a> @@ -238,37 +247,45 @@ module NetworkAction = if String.IsNullOrEmpty s then Error "Can't parse an empty string" else - match Char.ToUpper s.[0] with - | 'T' -> - match getTimeout clusterSize (s.[1..].TrimStart ()) with - | Ok t -> t |> InactivityTimeout |> Ok - | Error e -> Error e - | 'D' -> - match getMessage clusterSize (s.[1..].TrimStart ()) with - | Ok m -> m |> DropMessage |> Ok - | Error e -> Error e - | 'M' -> - match getMessage clusterSize (s.[1..].TrimStart ()) with - | Ok m -> m |> NetworkMessage |> Ok - | Error e -> Error e - | 'H' -> - match getHeartbeat leaders clusterSize (s.[1..].TrimStart ()) with - | Ok h -> Heartbeat h |> Ok - | Error e -> Error e - | 'S' -> - match getNewClientTarget clusterSize (s.[1..].TrimStart ()) with - | Ok target -> - ClientRequest (target, ClientRequest.RegisterClient handleRegisterClientResponse) - |> Ok - | Error e -> Error e - | 'R' -> - match getClientSubmitData parse clusterSize (s.[1..].TrimStart ()) with - | Ok (server, client, sequence, data) -> + + let rest = EfficientString.slice 1 (s.Length - 1) (EfficientString.ofString s) + + match Char.ToUpper s[0] with + | 'T' -> + match getTimeout clusterSize (EfficientString.trimStart rest) with + | Ok t -> t |> InactivityTimeout |> Ok + | Error e -> Error e + | 'D' -> + match getMessage clusterSize (EfficientString.trimStart rest) with + | Ok m -> m |> DropMessage |> Ok + | Error e -> Error e + | 'M' -> + match getMessage clusterSize (EfficientString.trimStart rest) with + | Ok m -> m |> NetworkMessage |> Ok + | Error e -> Error e + | 'H' -> + match getHeartbeat leaders clusterSize (EfficientString.trimStart rest) with + | Ok h -> Heartbeat h |> Ok + | Error e -> Error e + | 'S' -> + match getNewClientTarget clusterSize (EfficientString.trimStart rest) with + | Ok target -> + ClientRequest (target, ClientRequest.RegisterClient handleRegisterClientResponse) + |> Ok + | Error e -> Error e + | 'R' -> + let mutable rest = EfficientString.trimStart rest + + match getClientSubmitData clusterSize &rest with + | Ok (server, client, sequence) -> + match parse (rest.ToString ()) with + | Ok data -> (server, ClientRequest.ClientRequest (client, sequence, data, handleClientDataResponse)) |> ClientRequest |> Ok | Error e -> Error e - | c -> Error (sprintf "unexpected start char '%c'" c) + | Error e -> Error e + | c -> Error (sprintf "unexpected start char '%c'" c) let toString<'a> (action : NetworkAction<'a>) : string = match action with diff --git a/Raft/PersistentState.fs b/Raft/PersistentState.fs index 732672e..b8da259 100644 --- a/Raft/PersistentState.fs +++ b/Raft/PersistentState.fs @@ -58,7 +58,7 @@ type InMemoryPersistentState<'a> () = if log.Count = 0 then None else - let stored, term = log.[log.Count - 1] + let stored, term = log[log.Count - 1] Some ( stored, @@ -73,6 +73,6 @@ type InMemoryPersistentState<'a> () = if log.Count < position then None elif position <= 0 then None - else Some log.[position - 1] + else Some log[position - 1] member this.CurrentLogIndex = log.Count * 1 diff --git a/Raft/Raft.fsproj b/Raft/Raft.fsproj index 2fd7788..df2f0b4 100644 --- a/Raft/Raft.fsproj +++ b/Raft/Raft.fsproj @@ -1,16 +1,21 @@  - netstandard2.0 + netstandard2.0;netstandard2.1 true + + + + + diff --git a/Raft/Server.fs b/Raft/Server.fs index 946187a..1d31515 100644 --- a/Raft/Server.fs +++ b/Raft/Server.fs @@ -219,7 +219,7 @@ type private CandidateState = static member New (count : int) (self : int) = let votes = Array.zeroCreate<_> count - votes.[self / 1] <- Some true + votes[self / 1] <- Some true { Votes = votes @@ -270,12 +270,12 @@ module internal ServerUtils = if i < 0 then result else - let numberCounted = numberCounted + snd numberWhoCommittedIndex.[i] + let numberCounted = numberCounted + snd numberWhoCommittedIndex[i] if numberCounted > matchIndex.Length / 2 then - fst numberWhoCommittedIndex.[i] + fst numberWhoCommittedIndex[i] else - go numberCounted (fst numberWhoCommittedIndex.[i]) (i - 1) + go numberCounted (fst numberWhoCommittedIndex[i]) (i - 1) go 0 0 (numberWhoCommittedIndex.Length - 1) @@ -528,7 +528,7 @@ type Server<'a> acceptRequest () let sendAppendEntries (leaderState : LeaderState) (j : int) = - let toSend = leaderState.ToSend.[j / 1] + let toSend = leaderState.ToSend[j / 1] let prevLogTerm = persistentState.GetLogEntry (toSend - 1) { @@ -572,14 +572,14 @@ type Server<'a> match appendEntriesReply.Success with | Some matchIndex -> // They applied our request. Update our record of what we know they have applied... - leaderState.MatchIndex.[appendEntriesReply.Follower / 1] <- matchIndex + leaderState.MatchIndex[appendEntriesReply.Follower / 1] <- matchIndex // ... and update our record of what we'll be sending them next. - leaderState.ToSend.[appendEntriesReply.Follower / 1] <- matchIndex + 1 + leaderState.ToSend[appendEntriesReply.Follower / 1] <- matchIndex + 1 | None -> // They failed to apply our request. Next time, we'll be trying one message further // back in our history. - leaderState.ToSend.[appendEntriesReply.Follower / 1] <- - max (leaderState.ToSend.[appendEntriesReply.Follower / 1] - 1) 1 + leaderState.ToSend[appendEntriesReply.Follower / 1] <- + max (leaderState.ToSend[appendEntriesReply.Follower / 1] - 1) 1 // Note that the decision to process this *here* means the algorithm doesn't work in clusters of // only one node, because then there will never be any AppendEntries replies. let maxLogAQuorumHasCommitted = @@ -612,17 +612,18 @@ type Server<'a> match logEntry with | LogEntry.ClientEntry (stored, client, sequence, replyChannel) -> let newClients = - volatileState.Clients - |> Map.change - client - (fun messageLog -> - let messages = - match messageLog with - | None -> Map.empty - | Some messageLog -> messageLog + match Map.tryFind client volatileState.Clients with + | None -> Map.add client Map.empty volatileState.Clients + | Some messageLog -> + let messageLog = + match Map.tryFind sequence messageLog with + | None -> messageLog |> Map.add sequence stored + | Some _ -> + // Here we'd assert that existing was equal to stored, + // if we had the equality constraint + messageLog - messages |> Map.change sequence (fun _ -> Some stored) |> Some - ) + volatileState.Clients |> Map.add client messageLog volatileState <- { volatileState with @@ -649,7 +650,7 @@ type Server<'a> | ServerSpecialisation.Candidate state -> if requestVoteReply.VoterTerm = persistentState.CurrentTerm then - state.Votes.[requestVoteReply.Voter / 1] <- Some requestVoteReply.VoteGranted + state.Votes[requestVoteReply.Voter / 1] <- Some requestVoteReply.VoteGranted // Inefficient, but :shrug: if @@ -714,7 +715,7 @@ type Server<'a> (RaftOverhead (NewClientRegistered replyChannel)) persistentState.CurrentTerm - leaderState.MatchIndex.[me / 1] <- persistentState.CurrentLogIndex + leaderState.MatchIndex[me / 1] <- persistentState.CurrentLogIndex emitHeartbeat leaderState | ClientRequest.ClientRequest (client, sequenceNumber, toAdd, replyChannel) -> @@ -724,7 +725,7 @@ type Server<'a> (LogEntry.ClientEntry (toAdd, client, sequenceNumber, replyChannel)) persistentState.CurrentTerm - leaderState.MatchIndex.[me / 1] <- persistentState.CurrentLogIndex + leaderState.MatchIndex[me / 1] <- persistentState.CurrentLogIndex emitHeartbeat leaderState | ServerSpecialisation.Follower followerState -> diff --git a/RaftExplorer/Program.fs b/RaftExplorer/Program.fs index 32e067d..dfdd397 100644 --- a/RaftExplorer/Program.fs +++ b/RaftExplorer/Program.fs @@ -140,5 +140,4 @@ module Program = // TODO: log out the committed state so that we can see whether the leader is correctly // processing heartbeat responses - 0