diff --git a/Raft.Test/TestServer.fs b/Raft.Test/TestServer.fs index aae3bc6..568a411 100644 --- a/Raft.Test/TestServer.fs +++ b/Raft.Test/TestServer.fs @@ -10,16 +10,14 @@ module TestServer = [] let foo () = let cluster = InMemoryCluster.make 5 + let logger, logs = let logs = ResizeArray () - let logLine (s : string) = - lock logs (fun () -> - logs.Add s - ) + let logLine (s : string) = lock logs (fun () -> logs.Add s) + let freezeLogs () = - lock logs (fun () -> - logs |> Seq.toList - ) + lock logs (fun () -> logs |> Seq.toList) + logLine, freezeLogs let sendMessage = cluster.Servers.[0].OutboundMessageChannel @@ -29,15 +27,10 @@ module TestServer = { CandidateTerm = 0 CandidateId = 1 - ReplyChannel = - fun message -> - logger (sprintf "Received message for term %i" message.VoterTerm) + ReplyChannel = fun message -> logger (sprintf "Received message for term %i" message.VoterTerm) CandidateLastLogEntry = 0, 0 } |> Message.RequestVote |> sendMessage 0 - logs () - |> shouldEqual [ - "Received message for term 0" - ] \ No newline at end of file + logs () |> shouldEqual [ "Received message for term 0" ] diff --git a/Raft/AssemblyInfo.fs b/Raft/AssemblyInfo.fs index 4df2278..7d7f49e 100644 --- a/Raft/AssemblyInfo.fs +++ b/Raft/AssemblyInfo.fs @@ -2,6 +2,6 @@ namespace Raft.AssemblyInfo open System.Runtime.CompilerServices -[] +[] do () diff --git a/Raft/Server.fs b/Raft/Server.fs index 01ed04c..b906e3a 100644 --- a/Raft/Server.fs +++ b/Raft/Server.fs @@ -1,9 +1,14 @@ namespace Raft /// LogIndex is indexed from 1. We use 0 to indicate "before any history has started". -[] type LogIndex -[] type Term -[] type ServerId +[] +type LogIndex + +[] +type Term + +[] +type ServerId /// Server state which need not survive a server crash. type VolatileState = @@ -63,6 +68,12 @@ type AppendEntriesReply = Success : bool } +type LogEntry = + { + Index : int + Term : int + } + /// I am the leader. Followers, update your state as follows. type AppendEntriesMessage<'a> = { @@ -71,7 +82,7 @@ type AppendEntriesMessage<'a> = /// I am your leader! This is me! (so everyone knows where to send clients to) LeaderId : int /// The entry immediately preceding the entry I'm sending you, so you can tell if we've got out of sync. - PrevLogEntry : {| Index : int ; Term : int |} + PrevLogEntry : LogEntry /// Followers, append this entry to your log. (Or, if None, this is just a heartbeat.) /// It was determined at the given term - recall that we might need to bring a restarted node up to speed /// with what happened during terms that took place while it was down. @@ -83,6 +94,7 @@ type AppendEntriesMessage<'a> = type Message<'a> = | AppendEntries of AppendEntriesMessage<'a> | RequestVote of RequestVoteMessage + member this.Term = match this with | AppendEntries m -> m.LeaderTerm @@ -106,12 +118,10 @@ type Server<'a> = module Server = let inline private getLogEntry<'a> (index : int) (arr : 'a array) : 'a option = - arr - |> Array.tryItem ((index - 1) / 1) + arr |> Array.tryItem ((index - 1) / 1) let inline private truncateLog<'a> (finalIndex : int) (arr : 'a array) : 'a array = - arr - |> Array.truncate (finalIndex / 1) + arr |> Array.truncate (finalIndex / 1) let inline private replaceLog<'a> (index : int) (elt : 'a) (arr : 'a array) : 'a array = let toRet = Array.copy arr @@ -151,8 +161,9 @@ module Server = CurrentTerm = message.Term } } - // TODO when persistence is modelled: persist this - else s + // TODO when persistence is modelled: persist this + else + s match message with | RequestVote message -> @@ -166,8 +177,7 @@ module Server = // Is the candidate advertising a later term than our last-persisted write was made at? // (That would mean it's far in the future of us.) match Array.tryLast s.PersistentState.Log with - | Some (_, ourLastTerm) -> - snd message.CandidateLastLogEntry > ourLastTerm + | Some (_, ourLastTerm) -> snd message.CandidateLastLogEntry > ourLastTerm | None -> // We have persisted no history at all! true @@ -177,7 +187,8 @@ module Server = match Array.tryLast s.PersistentState.Log with | Some (_, ourLastTerm) -> snd message.CandidateLastLogEntry = ourLastTerm - && fst message.CandidateLastLogEntry >= (Array.length s.PersistentState.Log) * 1 + && fst message.CandidateLastLogEntry + >= (Array.length s.PersistentState.Log) * 1 | None -> // We've persisted no history; the candidate needs to also be at the start of history, // or else we'd have already considered them in the `messageSupersedesMe` check. @@ -257,7 +268,12 @@ module Server = let heartbeat (message : AppendEntriesMessage<'a>) (s : Server<'a>) = // Just a heartbeat; no change to our log is required. let toReturn = - { s with VolatileState = { s.VolatileState with CommitIndex = message.LeaderCommitIndex } } + { s with + VolatileState = + { s.VolatileState with + CommitIndex = message.LeaderCommitIndex + } + } { Success = true @@ -370,7 +386,10 @@ module Server = | Candidate -> // We've already verified that the message was sent from a leader in the current term, so we have // lost the election. - let s = { s with Type = Follower } + let s = + { s with + Type = Follower + } // TODO: why does this assertion hold? assert (logIsConsistent message s) acceptRequest s @@ -392,14 +411,16 @@ module InMemoryCluster = let locker = obj () let messageChannel (serverId : int) (message : Message<'a>) : unit = - lock locker (fun () -> - let newServer = Server.processMessage message servers.[serverId / 1] - servers.[serverId / 1] <- newServer - ) + lock + locker + (fun () -> + let newServer = Server.processMessage message servers.[serverId / 1] + servers.[serverId / 1] <- newServer + ) - for s in 0..servers.Length - 1 do + for s in 0 .. servers.Length - 1 do servers.[s] <- Server.create messageChannel { Servers = servers - } \ No newline at end of file + }