Format with Fantomas
This commit is contained in:
@@ -10,16 +10,14 @@ module TestServer =
|
||||
[<Test>]
|
||||
let foo () =
|
||||
let cluster = InMemoryCluster.make<int> 5
|
||||
|
||||
let logger, logs =
|
||||
let logs = ResizeArray ()
|
||||
let logLine (s : string) =
|
||||
lock logs (fun () ->
|
||||
logs.Add s
|
||||
)
|
||||
let logLine (s : string) = lock logs (fun () -> logs.Add s)
|
||||
|
||||
let freezeLogs () =
|
||||
lock logs (fun () ->
|
||||
logs |> Seq.toList
|
||||
)
|
||||
lock logs (fun () -> logs |> Seq.toList)
|
||||
|
||||
logLine, freezeLogs
|
||||
|
||||
let sendMessage = cluster.Servers.[0].OutboundMessageChannel
|
||||
@@ -29,15 +27,10 @@ module TestServer =
|
||||
{
|
||||
CandidateTerm = 0<Term>
|
||||
CandidateId = 1<ServerId>
|
||||
ReplyChannel =
|
||||
fun message ->
|
||||
logger (sprintf "Received message for term %i" message.VoterTerm)
|
||||
ReplyChannel = fun message -> logger (sprintf "Received message for term %i" message.VoterTerm)
|
||||
CandidateLastLogEntry = 0<LogIndex>, 0<Term>
|
||||
}
|
||||
|> Message.RequestVote
|
||||
|> sendMessage 0<ServerId>
|
||||
|
||||
logs ()
|
||||
|> shouldEqual [
|
||||
"Received message for term 0"
|
||||
]
|
||||
logs () |> shouldEqual [ "Received message for term 0" ]
|
||||
|
@@ -2,6 +2,6 @@ namespace Raft.AssemblyInfo
|
||||
|
||||
open System.Runtime.CompilerServices
|
||||
|
||||
[<assembly: InternalsVisibleTo("Raft.Test")>]
|
||||
[<assembly : InternalsVisibleTo("Raft.Test")>]
|
||||
|
||||
do ()
|
||||
|
@@ -1,9 +1,14 @@
|
||||
namespace Raft
|
||||
|
||||
/// LogIndex is indexed from 1. We use 0 to indicate "before any history has started".
|
||||
[<Measure>] type LogIndex
|
||||
[<Measure>] type Term
|
||||
[<Measure>] type ServerId
|
||||
[<Measure>]
|
||||
type LogIndex
|
||||
|
||||
[<Measure>]
|
||||
type Term
|
||||
|
||||
[<Measure>]
|
||||
type ServerId
|
||||
|
||||
/// Server state which need not survive a server crash.
|
||||
type VolatileState =
|
||||
@@ -63,6 +68,12 @@ type AppendEntriesReply =
|
||||
Success : bool
|
||||
}
|
||||
|
||||
type LogEntry =
|
||||
{
|
||||
Index : int<LogIndex>
|
||||
Term : int<Term>
|
||||
}
|
||||
|
||||
/// I am the leader. Followers, update your state as follows.
|
||||
type AppendEntriesMessage<'a> =
|
||||
{
|
||||
@@ -71,7 +82,7 @@ type AppendEntriesMessage<'a> =
|
||||
/// I am your leader! This is me! (so everyone knows where to send clients to)
|
||||
LeaderId : int<ServerId>
|
||||
/// The entry immediately preceding the entry I'm sending you, so you can tell if we've got out of sync.
|
||||
PrevLogEntry : {| Index : int<LogIndex> ; Term : int<Term> |}
|
||||
PrevLogEntry : LogEntry
|
||||
/// Followers, append this entry to your log. (Or, if None, this is just a heartbeat.)
|
||||
/// It was determined at the given term - recall that we might need to bring a restarted node up to speed
|
||||
/// with what happened during terms that took place while it was down.
|
||||
@@ -83,6 +94,7 @@ type AppendEntriesMessage<'a> =
|
||||
type Message<'a> =
|
||||
| AppendEntries of AppendEntriesMessage<'a>
|
||||
| RequestVote of RequestVoteMessage
|
||||
|
||||
member this.Term =
|
||||
match this with
|
||||
| AppendEntries m -> m.LeaderTerm
|
||||
@@ -106,12 +118,10 @@ type Server<'a> =
|
||||
module Server =
|
||||
|
||||
let inline private getLogEntry<'a> (index : int<LogIndex>) (arr : 'a array) : 'a option =
|
||||
arr
|
||||
|> Array.tryItem ((index - 1<LogIndex>) / 1<LogIndex>)
|
||||
arr |> Array.tryItem ((index - 1<LogIndex>) / 1<LogIndex>)
|
||||
|
||||
let inline private truncateLog<'a> (finalIndex : int<LogIndex>) (arr : 'a array) : 'a array =
|
||||
arr
|
||||
|> Array.truncate (finalIndex / 1<LogIndex>)
|
||||
arr |> Array.truncate (finalIndex / 1<LogIndex>)
|
||||
|
||||
let inline private replaceLog<'a> (index : int<LogIndex>) (elt : 'a) (arr : 'a array) : 'a array =
|
||||
let toRet = Array.copy arr
|
||||
@@ -151,8 +161,9 @@ module Server =
|
||||
CurrentTerm = message.Term
|
||||
}
|
||||
}
|
||||
// TODO when persistence is modelled: persist this
|
||||
else s
|
||||
// TODO when persistence is modelled: persist this
|
||||
else
|
||||
s
|
||||
|
||||
match message with
|
||||
| RequestVote message ->
|
||||
@@ -166,8 +177,7 @@ module Server =
|
||||
// Is the candidate advertising a later term than our last-persisted write was made at?
|
||||
// (That would mean it's far in the future of us.)
|
||||
match Array.tryLast s.PersistentState.Log with
|
||||
| Some (_, ourLastTerm) ->
|
||||
snd message.CandidateLastLogEntry > ourLastTerm
|
||||
| Some (_, ourLastTerm) -> snd message.CandidateLastLogEntry > ourLastTerm
|
||||
| None ->
|
||||
// We have persisted no history at all!
|
||||
true
|
||||
@@ -177,7 +187,8 @@ module Server =
|
||||
match Array.tryLast s.PersistentState.Log with
|
||||
| Some (_, ourLastTerm) ->
|
||||
snd message.CandidateLastLogEntry = ourLastTerm
|
||||
&& fst message.CandidateLastLogEntry >= (Array.length s.PersistentState.Log) * 1<LogIndex>
|
||||
&& fst message.CandidateLastLogEntry
|
||||
>= (Array.length s.PersistentState.Log) * 1<LogIndex>
|
||||
| None ->
|
||||
// We've persisted no history; the candidate needs to also be at the start of history,
|
||||
// or else we'd have already considered them in the `messageSupersedesMe` check.
|
||||
@@ -257,7 +268,12 @@ module Server =
|
||||
let heartbeat (message : AppendEntriesMessage<'a>) (s : Server<'a>) =
|
||||
// Just a heartbeat; no change to our log is required.
|
||||
let toReturn =
|
||||
{ s with VolatileState = { s.VolatileState with CommitIndex = message.LeaderCommitIndex } }
|
||||
{ s with
|
||||
VolatileState =
|
||||
{ s.VolatileState with
|
||||
CommitIndex = message.LeaderCommitIndex
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
Success = true
|
||||
@@ -370,7 +386,10 @@ module Server =
|
||||
| Candidate ->
|
||||
// We've already verified that the message was sent from a leader in the current term, so we have
|
||||
// lost the election.
|
||||
let s = { s with Type = Follower }
|
||||
let s =
|
||||
{ s with
|
||||
Type = Follower
|
||||
}
|
||||
// TODO: why does this assertion hold?
|
||||
assert (logIsConsistent message s)
|
||||
acceptRequest s
|
||||
@@ -392,14 +411,16 @@ module InMemoryCluster =
|
||||
let locker = obj ()
|
||||
|
||||
let messageChannel (serverId : int<ServerId>) (message : Message<'a>) : unit =
|
||||
lock locker (fun () ->
|
||||
let newServer = Server.processMessage message servers.[serverId / 1<ServerId>]
|
||||
servers.[serverId / 1<ServerId>] <- newServer
|
||||
)
|
||||
lock
|
||||
locker
|
||||
(fun () ->
|
||||
let newServer = Server.processMessage message servers.[serverId / 1<ServerId>]
|
||||
servers.[serverId / 1<ServerId>] <- newServer
|
||||
)
|
||||
|
||||
for s in 0..servers.Length - 1 do
|
||||
for s in 0 .. servers.Length - 1 do
|
||||
servers.[s] <- Server.create messageChannel
|
||||
|
||||
{
|
||||
Servers = servers
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user