namespace Raft open System open System.Collections.Generic type Cluster<'a> = internal { Servers : Server<'a> array SendMessageDirectly : int -> Message<'a> -> unit } member this.SendMessage (i : int) (m : Message<'a>) : unit = this.SendMessageDirectly i m member this.InactivityTimeout (i : int) : unit = this.Servers.[i / 1].TriggerInactivityTimeout () this.Servers.[i / 1].Sync () member this.HeartbeatTimeout (i : int) : unit = this.Servers.[i / 1].TriggerHeartbeatTimeout () this.Servers.[i / 1].Sync () member this.Status (i : int) : ServerStatus = this.Servers.[i / 1].State member this.GetCurrentInternalState (i : int) : ServerInternalState<'a> Async = this.Servers.[i / 1].GetCurrentInternalState () member this.ClusterSize : int = this.Servers.Length member this.Leaders : Set> = ((Set.empty, 0), this.Servers) ||> Array.fold (fun (leaders, count) server -> let leaders = match server.State with | ServerStatus.Leader _ -> Set.add count leaders | _ -> leaders leaders, count + 1 ) |> fst type Network<'a> = internal { /// CompleteMessageHistory.[i] is the collection of all messages /// ever sent to server `i`. CompleteMessageHistory : ResizeArray>[] MessagesDelivered : HashSet[] } static member Make (clusterSize : int) = { CompleteMessageHistory = Array.init clusterSize (fun _ -> ResizeArray ()) MessagesDelivered = Array.init clusterSize (fun _ -> HashSet ()) } member this.AllInboundMessages (i : int) : Message<'a> list = this.CompleteMessageHistory.[i / 1] |> List.ofSeq member this.InboundMessage (i : int) (id : int) : Message<'a> = this.CompleteMessageHistory.[i / 1].[id] member this.DropMessage (i : int) (id : int) : unit = this.MessagesDelivered.[i / 1].Add id |> ignore member this.UndeliveredMessages (i : int) : (int * Message<'a>) list = this.CompleteMessageHistory.[i / 1] |> Seq.indexed |> Seq.filter (fun (count, _) -> this.MessagesDelivered.[i / 1].Contains count |> not) |> List.ofSeq member this.AllUndeliveredMessages () : ((int * Message<'a>) list) list = List.init this.CompleteMessageHistory.Length (fun i -> this.UndeliveredMessages (i * 1)) member this.ClusterSize = this.CompleteMessageHistory.Length [] module InMemoryCluster = [] let make<'a> (count : int) : Cluster<'a> * Network<'a> = let servers = Array.zeroCreate> count let network = Network.Make count let messageChannelHold (serverId : int) (message : Message<'a>) : unit = let arr = network.CompleteMessageHistory.[serverId / 1] lock arr (fun () -> arr.Add message) for s in 0 .. servers.Length - 1 do servers.[s] <- Server (count, s * 1, InMemoryPersistentState (), messageChannelHold) let cluster = { Servers = servers SendMessageDirectly = fun i m -> servers.[i / 1].Message m servers.[i / 1].Sync () } cluster, network type NetworkAction<'a> = | InactivityTimeout of int | NetworkMessage of int * int | DropMessage of int * int | ClientRequest of int * ClientRequest<'a> | Heartbeat of int [] module NetworkAction = let perform<'a> (cluster : Cluster<'a>) (network : Network<'a>) (action : NetworkAction<'a>) : unit = match action with | InactivityTimeout serverId -> cluster.InactivityTimeout serverId | Heartbeat serverId -> cluster.HeartbeatTimeout serverId | DropMessage (serverId, messageId) -> network.DropMessage serverId messageId | NetworkMessage (serverId, messageId) -> network.InboundMessage serverId messageId |> cluster.SendMessage serverId network.DropMessage serverId messageId | ClientRequest (server, request) -> Message.ClientRequest request |> cluster.SendMessage server let private getMessage (clusterSize : int) (s : string) : Result * int, string> = match s.Split ',' with | [| serverId ; messageId |] -> let serverId = serverId.TrimEnd () let messageId = messageId.Trim () match Int32.TryParse serverId with | true, serverId -> match Int32.TryParse messageId with | true, messageId -> if serverId >= clusterSize || serverId < 0 then sprintf "Server ID must be between 0 and %i inclusive, got %i" (clusterSize - 1) serverId |> Error else Ok (serverId * 1, messageId) | false, _ -> sprintf "Non-integer input '%s' for message ID." messageId |> Error | false, _ -> sprintf "Non-integer input '%s' for server ID." serverId |> Error | _ -> Error "Expected a single comma." let private getTimeout (clusterSize : int) (serverId : string) : Result, string> = match Int32.TryParse serverId with | false, _ -> Error (sprintf "Expected an integer, got '%s'" serverId) | true, serverId -> if serverId >= clusterSize || serverId < 0 then sprintf "Server ID must be between 0 and %i inclusive, got %i." (clusterSize - 1) serverId |> Error else serverId * 1 |> Ok let private getHeartbeat (leaders : Set> option) (clusterSize : int) (serverId : string) = match Int32.TryParse serverId with | false, _ -> sprintf "Expected an integer server ID, got '%s'" serverId |> Error | true, serverId -> if serverId >= clusterSize || serverId < 0 then sprintf "Server ID must be between 0 and %i inclusive, got %i." (clusterSize - 1) serverId |> Error else let serverId = serverId * 1 match leaders with | None -> Ok serverId | Some leaders -> if leaders |> Set.contains serverId then Ok serverId else sprintf "Cannot heartbeat a non-leader (%i)." serverId |> Error let private getNewClientTarget<'a> (clusterSize : int) (serverId : string) : Result, string> = match Int32.TryParse serverId with | false, _ -> sprintf "Expected an int for a server ID, got '%s'" serverId |> Error | true, serverId -> if serverId >= clusterSize || serverId < 0 then sprintf "Server ID must be between 0 and %i inclusive, got %i." (clusterSize - 1) serverId |> Error else Ok (serverId * 1) let private getClientSubmitData<'a> (parse : string -> Result<'a, string>) (clusterSize : int) (s : string) : Result * int * int * 'a, string> = match s.Split ',' |> List.ofArray with | serverId :: clientId :: clientSequenceNumber :: (_ :: _ as rest) -> let rest = String.concat "," rest |> fun s -> s.TrimStart () match Int32.TryParse (serverId.Trim ()) with | false, _ -> sprintf "Server ID expected as first comma-separated component, got '%s'." serverId |> Error | true, serverId -> if serverId >= clusterSize || serverId < 0 then sprintf "Server ID must be between 0 and %i inclusive, got %i." (clusterSize - 1) serverId |> Error else match Int32.TryParse (clientId.Trim ()) with | false, _ -> sprintf "Client ID expected as second comma-separated component, got '%s'." clientId |> Error | true, clientId -> match Int32.TryParse (clientSequenceNumber.Trim ()) with | false, _ -> sprintf "Client sequence number expected as third comma-separated component, got '%s'." clientSequenceNumber |> Error | true, clientSequenceNumber -> match parse rest with | Ok b -> Ok (serverId * 1, clientId * 1, clientSequenceNumber * 1, b) | Error e -> sprintf "Failed to parse client data: %s" e |> Error | _ -> sprintf "Expected serverId,clientId,clientSequenceNumber,data; got '%s'" s |> Error /// Optionally also validates leaders against the input set of leaders. let tryParse<'a> (parse : string -> Result<'a, string>) (leaders : Set> option) (handleRegisterClientResponse : RegisterClientResponse -> unit) (handleClientDataResponse : ClientResponse -> unit) (clusterSize : int) (s : string) : Result, string> = if String.IsNullOrEmpty s then Error "Can't parse an empty string" else match Char.ToUpper s.[0] with | 'T' -> match getTimeout clusterSize (s.[1..].TrimStart ()) with | Ok t -> t |> InactivityTimeout |> Ok | Error e -> Error e | 'D' -> match getMessage clusterSize (s.[1..].TrimStart ()) with | Ok m -> m |> DropMessage |> Ok | Error e -> Error e | 'M' -> match getMessage clusterSize (s.[1..].TrimStart ()) with | Ok m -> m |> NetworkMessage |> Ok | Error e -> Error e | 'H' -> match getHeartbeat leaders clusterSize (s.[1..].TrimStart ()) with | Ok h -> Heartbeat h |> Ok | Error e -> Error e | 'S' -> match getNewClientTarget clusterSize (s.[1..].TrimStart ()) with | Ok target -> ClientRequest (target, ClientRequest.RegisterClient handleRegisterClientResponse) |> Ok | Error e -> Error e | 'R' -> match getClientSubmitData parse clusterSize (s.[1..].TrimStart ()) with | Ok (server, client, sequence, data) -> (server, ClientRequest.ClientRequest (client, sequence, data, handleClientDataResponse)) |> ClientRequest |> Ok | Error e -> Error e | c -> Error (sprintf "unexpected start char '%c'" c) let toString<'a> (action : NetworkAction<'a>) : string = match action with | NetworkAction.Heartbeat h -> sprintf "h %i" h | NetworkAction.NetworkMessage (server, id) -> sprintf "m %i,%i" server id | NetworkAction.DropMessage (server, id) -> sprintf "d %i,%i" server id | NetworkAction.InactivityTimeout server -> sprintf "t %i" server | NetworkAction.ClientRequest (server, ClientRequest.RegisterClient _) -> sprintf "s %i" server | NetworkAction.ClientRequest (server, ClientRequest.ClientRequest (client, sequence, data, _)) -> sprintf "r %i,%i,%i,%O" server client sequence data