Format with Fantomas, and add MIT licence

This commit is contained in:
Smaug123
2021-02-17 19:07:25 +00:00
parent c88d33aec1
commit a076af380c
9 changed files with 241 additions and 188 deletions

13
.editorconfig Normal file
View File

@@ -0,0 +1,13 @@
root = true
[*.fs]
fsharp_space_before_uppercase_invocation=true
fsharp_space_before_member=true
fsharp_space_before_parameter=true
fsharp_space_before_colon=true
fsharp_space_before_semicolon=true
fsharp_multiline_block_brackets_on_same_column=true
fsharp_newline_between_type_definition_and_members=true
fsharp_keep_if_then_in_same_line=true
fsharp_align_function_signature_to_indentation=true
fsharp_alternative_long_member_definitions=true

21
LICENSE Normal file
View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2021 Patrick Stevens
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@@ -11,9 +11,7 @@ type private ExternalInfoProviderMessage<'info> =
/// An entity which periodically pulls information from some external source /// An entity which periodically pulls information from some external source
/// and pushes it out to a collection of ServerAgents. /// and pushes it out to a collection of ServerAgents.
type ExternalInfoProvider<'info> = type ExternalInfoProvider<'info> = private ExternalInfoProvider of MailboxProcessor<ExternalInfoProviderMessage<'info>>
private
| ExternalInfoProvider of MailboxProcessor<ExternalInfoProviderMessage<'info>>
[<RequireQualifiedAccess>] [<RequireQualifiedAccess>]
module ExternalInfoProvider = module ExternalInfoProvider =
@@ -29,14 +27,18 @@ module ExternalInfoProvider =
(receivers : ServerAgent<'info> array) (receivers : ServerAgent<'info> array)
: Async<ExternalInfoProvider<'info>> : Async<ExternalInfoProvider<'info>>
= =
let rec loop (info : 'info option) (receivers : ServerAgent<'info> array) (mailbox : MailboxProcessor<ExternalInfoProviderMessage<'info>>) = let rec loop
(info : 'info option)
(receivers : ServerAgent<'info> array)
(mailbox : MailboxProcessor<ExternalInfoProviderMessage<'info>>)
=
async { async {
match! mailbox.Receive () with match! mailbox.Receive () with
| Get (channel, timeout) -> | Get (channel, timeout) ->
let! newInfo = get let! newInfo = get
match info with match info with
| Some info when newInfo = info -> | Some info when newInfo = info -> ()
()
| _ -> | _ ->
do! do!
receivers receivers
@@ -46,8 +48,8 @@ module ExternalInfoProvider =
match channel with match channel with
| None -> () | None -> ()
| Some channel -> | Some channel -> channel.Reply ()
channel.Reply ()
do! sleep (TimeSpan.FromMilliseconds (float timeout)) do! sleep (TimeSpan.FromMilliseconds (float timeout))
// There's a small inaccuracy here. We actually will wait until the end // There's a small inaccuracy here. We actually will wait until the end
// of a timeout cycle before we can process any new consumers. What we // of a timeout cycle before we can process any new consumers. What we
@@ -63,11 +65,11 @@ module ExternalInfoProvider =
} }
async { async {
let mailbox = MailboxProcessor.Start (loop None receivers) let mailbox =
MailboxProcessor.Start (loop None receivers)
do! mailbox.PostAndAsyncReply (fun channel -> Get (Some channel, timer)) do! mailbox.PostAndAsyncReply (fun channel -> Get (Some channel, timer))
return return mailbox |> ExternalInfoProvider
mailbox
|> ExternalInfoProvider
} }
/// Replace the collection of ServerAgents this ExternalInfoProvider is hooked up to. /// Replace the collection of ServerAgents this ExternalInfoProvider is hooked up to.

View File

@@ -4,7 +4,7 @@ type private ServerAgentMessage<'info> =
| Read of AsyncReplyChannel<unit> * ('info -> unit) | Read of AsyncReplyChannel<unit> * ('info -> unit)
| Write of AsyncReplyChannel<unit> * 'info | Write of AsyncReplyChannel<unit> * 'info
type ServerAgent<'info> = private | ServerAgent of MailboxProcessor<ServerAgentMessage<'info>> type ServerAgent<'info> = private ServerAgent of MailboxProcessor<ServerAgentMessage<'info>>
[<RequireQualifiedAccess>] [<RequireQualifiedAccess>]
module ServerAgent = module ServerAgent =
@@ -39,8 +39,11 @@ module ServerAgent =
/// the async returns once the ServerAgent has finished responding. /// the async returns once the ServerAgent has finished responding.
let giveNextResponse<'info> (ServerAgent agent) : Async<'info> = let giveNextResponse<'info> (ServerAgent agent) : Async<'info> =
let mutable answer = Unchecked.defaultof<'info> let mutable answer = Unchecked.defaultof<'info>
let result = agent.PostAndAsyncReply (fun channel -> Read (channel, fun info -> answer <- info))
let result =
agent.PostAndAsyncReply (fun channel -> Read (channel, (fun info -> answer <- info)))
async { async {
do! result do! result
return answer return answer
} }

View File

@@ -13,11 +13,9 @@ open Microsoft.Extensions.Logging
module Program = module Program =
let createHostBuilder args = let createHostBuilder args =
Host.CreateDefaultBuilder(args) Host
.ConfigureWebHostDefaults(fun webBuilder -> .CreateDefaultBuilder(args)
webBuilder.UseStartup<Startup>() .ConfigureWebHostDefaults(fun webBuilder -> webBuilder.UseStartup<Startup> () |> ignore)
|> ignore
)
[<EntryPoint>] [<EntryPoint>]
let main args = let main args =

View File

@@ -44,108 +44,142 @@ type private ParseOutput =
type Startup() = type Startup() =
let client = new WebClient() let client = new WebClient ()
let servers = Array.init 2 (fun _ -> ServerAgent.make (Some { Hour = 0uy ; Minute = 0uy ; Second = 0uy })) let servers =
Array.init
2
(fun _ ->
ServerAgent.make (
Some
{
Hour = 0uy
Minute = 0uy
Second = 0uy
}
))
/// Convert an input byte to the integer digit it is. /// Convert an input byte to the integer digit it is.
/// For example, ord('0') will match to Char(0). /// For example, ord('0') will match to Char(0).
let (|Char|_|) (b : byte) : byte option = let (|Char|_|) (b : byte) : byte option =
if byte '0' <= b && b <= byte '9' then if byte '0' <= b && b <= byte '9' then Some (b - byte '0') else None
Some (b - byte '0')
else None
/// Extremely rough-and-ready function to get a time out of a stream which contains /// Extremely rough-and-ready function to get a time out of a stream which contains
/// text. /// text.
/// We expect the time to be expressed as hh:mm:ss /// We expect the time to be expressed as hh:mm:ss
/// and we do not bother our pretty little heads with Unicode issues. /// and we do not bother our pretty little heads with Unicode issues.
let parseDateInner (buffer : byte array) (s : Stream) (state : State) : Async<ParseOutput> = async { let parseDateInner (buffer : byte array) (s : Stream) (state : State) : Async<ParseOutput> =
let! written = s.ReadAsync(buffer, 0, 1) |> Async.AwaitTask async {
if written = 0 then return StreamEnded else let! written = s.ReadAsync (buffer, 0, 1) |> Async.AwaitTask
match state with if written = 0 then
| Waiting -> return StreamEnded
match buffer.[0] with else
| Char b -> return State (State.ParsedHourFirstDigit b)
| _ -> return State Waiting
| ParsedHourFirstDigit hour ->
match buffer.[0] with
| Char b -> return State (State.ParsedHourAwaitingColon (hour * 10uy + b))
| _ -> return State Waiting
| ParsedHourAwaitingColon hour ->
match buffer.[0] with
| b when b = byte ':' -> return State (State.ParsedHour hour)
| _ -> return State Waiting
| ParsedHour hour ->
match buffer.[0] with
| Char b -> return State (State.ParsedMinuteFirstDigit (hour, b))
| _ -> return State Waiting
| ParsedMinuteFirstDigit (hour, min) ->
match buffer.[0] with
| Char b -> return State (State.ParsedMinuteAwaitingColon (hour, 10uy * b + min))
| _ -> return State Waiting
| ParsedMinuteAwaitingColon (hour, min) ->
match buffer.[0] with
| b when b = byte ':' -> return State (State.ParsedMinute (hour, min))
| _ -> return State Waiting
| ParsedMinute (hour, min) ->
match buffer.[0] with
| Char b -> return State (State.ParsedSecondFirstDigit (hour, min, b))
| _ -> return State Waiting
| ParsedSecondFirstDigit (hour, min, sec) ->
match buffer.[0] with
| Char b -> return Complete { Hour = hour ; Minute = min ; Second = 10uy * sec + b }
| _ -> return State Waiting
}
let parseDate (stream : Stream) : Async<Date option> = async { match state with
let buffer = [| 0uy |] | Waiting ->
let rec go (state : State) = async { match buffer.[0] with
match! parseDateInner buffer stream state with | Char b -> return State (State.ParsedHourFirstDigit b)
| StreamEnded -> return None | _ -> return State Waiting
| Complete d -> return Some d | ParsedHourFirstDigit hour ->
| State state -> match buffer.[0] with
return! go state | Char b -> return State (State.ParsedHourAwaitingColon (hour * 10uy + b))
| _ -> return State Waiting
| ParsedHourAwaitingColon hour ->
match buffer.[0] with
| b when b = byte ':' -> return State (State.ParsedHour hour)
| _ -> return State Waiting
| ParsedHour hour ->
match buffer.[0] with
| Char b -> return State (State.ParsedMinuteFirstDigit (hour, b))
| _ -> return State Waiting
| ParsedMinuteFirstDigit (hour, min) ->
match buffer.[0] with
| Char b -> return State (State.ParsedMinuteAwaitingColon (hour, 10uy * b + min))
| _ -> return State Waiting
| ParsedMinuteAwaitingColon (hour, min) ->
match buffer.[0] with
| b when b = byte ':' -> return State (State.ParsedMinute (hour, min))
| _ -> return State Waiting
| ParsedMinute (hour, min) ->
match buffer.[0] with
| Char b -> return State (State.ParsedSecondFirstDigit (hour, min, b))
| _ -> return State Waiting
| ParsedSecondFirstDigit (hour, min, sec) ->
match buffer.[0] with
| Char b ->
return
Complete
{
Hour = hour
Minute = min
Second = 10uy * sec + b
}
| _ -> return State Waiting
} }
return! go State.Waiting let parseDate (stream : Stream) : Async<Date option> =
} async {
let buffer = [| 0uy |]
let rec go (state : State) =
async {
match! parseDateInner buffer stream state with
| StreamEnded -> return None
| Complete d -> return Some d
| State state -> return! go state
}
return! go State.Waiting
}
let update : Async<Date option> = let update : Async<Date option> =
async { async {
// Note: there is absolutely no error handling here at all. // Note: there is absolutely no error handling here at all.
// Obviously that would be desirable. // Obviously that would be desirable.
let result = client.OpenRead (Uri "insert your URL here") let result =
client.OpenRead (Uri "https://www.timeanddate.com/worldclock/uk")
return! parseDate result return! parseDate result
} }
// Note that we haven't kicked this off yet - it's still an Async // Note that we haven't kicked this off yet - it's still an Async
let pulses = ExternalInfoProvider.make Async.Sleep update 500<ms> servers let pulses =
ExternalInfoProvider.make Async.Sleep update 500<ms> servers
// This method gets called by the runtime. Use this method to add services to the container. // This method gets called by the runtime. Use this method to add services to the container.
// For more information on how to configure your application, visit https://go.microsoft.com/fwlink/?LinkID=398940 // For more information on how to configure your application, visit https://go.microsoft.com/fwlink/?LinkID=398940
member _.ConfigureServices(services: IServiceCollection) = member _.ConfigureServices (services : IServiceCollection) =
pulses |> Async.RunSynchronously |> ignore pulses |> Async.RunSynchronously |> ignore
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline. // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
member _.Configure(app: IApplicationBuilder, env: IWebHostEnvironment) = member _.Configure (app : IApplicationBuilder, env : IWebHostEnvironment) =
if env.IsDevelopment() then if env.IsDevelopment () then
app.UseDeveloperExceptionPage() |> ignore app.UseDeveloperExceptionPage () |> ignore
app.UseRouting() app
.UseEndpoints(fun endpoints -> .UseRouting()
endpoints.MapGet("/", fun context -> .UseEndpoints(fun endpoints ->
async { endpoints.MapGet (
let! answer = ServerAgent.giveNextResponse servers.[0] "/",
match answer with fun context ->
| None -> do! context.Response.WriteAsync("Oh noes") |> Async.AwaitTask async {
| Some date -> let! answer = ServerAgent.giveNextResponse servers.[0]
do! context.Response.WriteAsync(sprintf "%i:%i:%i" date.Hour date.Minute date.Second) |> Async.AwaitTask
return () match answer with
} | None ->
|> Async.StartAsTask do!
:> Task context.Response.WriteAsync ("Oh noes")
|> Async.AwaitTask
| Some date ->
do!
context.Response.WriteAsync (sprintf "%i:%i:%i" date.Hour date.Minute date.Second)
|> Async.AwaitTask
return ()
}
|> Async.StartAsTask
:> Task
) )
|> ignore |> ignore)
) |> ignore
|> ignore

View File

@@ -1,36 +0,0 @@
namespace PulsingServer.Test
open PulsingServer
open NUnit.Framework
open FsUnitTyped
type AgentIndex = AgentIndex of int
type ReadIndex = ReadIndex of int
type Action<'info> =
| ChangeData of 'info
| BeginRead of AgentIndex
| AwaitRead of ReadIndex
[<TestFixture>]
module TestProperties =
let executeAction
(ext : ExternalInfoProvider<'info>)
(agents : ServerAgent<'info> array)
((readNumber : int), (awaitingRead : Map<ReadIndex, Async<'info>>))
(action : Action<'info>)
=
match action with
| BeginRead (AgentIndex i) ->
let mutable answer = None
let result = ServerAgent.giveNextResponse (fun resp -> answer <- Some resp) agents.[i]
let output =
async {
do! result
return Option.get answer
}
ext, agents, (readNumber + 1, Map.add (ReadIndex readNumber) output awaitingRead)
| AwaitRead index ->
awaitingRead.[index]
|> Async.RunSynchronously

View File

@@ -17,14 +17,19 @@ module TestPulsingServer =
let mutable info = "original info" let mutable info = "original info"
let count = ref 0 let count = ref 0
let getInfo = async { let getInfo =
System.Threading.Interlocked.Increment count |> ignore async {
let info = lock info (fun () -> sprintf "%s" info) System.Threading.Interlocked.Increment count
return info |> ignore
}
let info = lock info (fun () -> sprintf "%s" info)
return info
}
let dontSleep (_ : TimeSpan) = async { return () } let dontSleep (_ : TimeSpan) = async { return () }
let infoProvider = ExternalInfoProvider.make dontSleep getInfo 10<ms> [| responder1 ; responder2 |] let infoProvider =
ExternalInfoProvider.make dontSleep getInfo 10<ms> [| responder1 ; responder2 |]
// We're not getting new info, because we didn't await the construction of ExternalInfoProvider // We're not getting new info, because we didn't await the construction of ExternalInfoProvider
count.Value |> shouldEqual 0 count.Value |> shouldEqual 0
@@ -32,30 +37,28 @@ module TestPulsingServer =
// The two responders are ready, but have not received anything yet. // The two responders are ready, but have not received anything yet.
do do
let response = ServerAgent.giveNextResponse responder1 let response = ServerAgent.giveNextResponse responder1
response response
|> Async.RunSynchronously |> Async.RunSynchronously
|> shouldEqual "hi" |> shouldEqual "hi"
// Now start off the ExternalInfoProvider! // Now start off the ExternalInfoProvider!
let _ = let _ = infoProvider |> Async.RunSynchronously
infoProvider
|> Async.RunSynchronously
// Now we have definitely started pinging... // Now we have definitely started pinging...
count.Value |> shouldBeGreaterThan 0 count.Value |> shouldBeGreaterThan 0
// ... and at some point soon, the first responder will act on the info it receives. // The two responders are ready, but have not received anything yet.
do do
let response = ServerAgent.giveNextResponse responder1 let response = ServerAgent.giveNextResponse responder1
response response
|> Async.RunSynchronously |> Async.RunSynchronously
|> shouldEqual "original info" |> shouldEqual "original info"
// Update the info. responder1 is not going to fail on the `received` check, because that // Update the info. responder1 is not going to fail on the `received` check, because that
// was one-shot. // was one-shot.
lock info (fun () -> lock info (fun () -> info <- "new info!")
info <- "new info!"
)
// Get responder2 ready to act in a couple of different ways. // Get responder2 ready to act in a couple of different ways.
let response2 = ServerAgent.giveNextResponse responder2 let response2 = ServerAgent.giveNextResponse responder2
@@ -69,6 +72,7 @@ module TestPulsingServer =
// By design, we can't distinguish between these two cases. // By design, we can't distinguish between these two cases.
(info = "new info!" || info = "original info") (info = "new info!" || info = "original info")
|> shouldEqual true |> shouldEqual true
response2' response2'
|> Async.RunSynchronously |> Async.RunSynchronously
|> fun info -> |> fun info ->
@@ -78,16 +82,23 @@ module TestPulsingServer =
// Eventually, responder2 does pick up the new info. // Eventually, responder2 does pick up the new info.
let rec go () = let rec go () =
let response = ServerAgent.giveNextResponse responder2 |> Async.RunSynchronously let response =
if response <> "new info!" then go () ServerAgent.giveNextResponse responder2
|> Async.RunSynchronously
if response <> "new info!" then
go ()
go () go ()
[<TestCase (10000, 1)>] [<TestCase(10000, 1)>]
[<TestCase (10000, 3)>] [<TestCase(10000, 3)>]
let ``Stress test`` (n : int, queues : int) = let ``Stress test`` (n : int, queues : int) =
let responders = Array.init queues (fun _ -> ServerAgent.make "uninitialised") let responders =
Array.init queues (fun _ -> ServerAgent.make "uninitialised")
let mutable data = "" let mutable data = ""
let getInfo = let getInfo =
async { async {
// Simulate a slow network call // Simulate a slow network call
@@ -95,6 +106,7 @@ module TestPulsingServer =
let result = lock data (fun () -> sprintf "%s" data) let result = lock data (fun () -> sprintf "%s" data)
return result return result
} }
let _infoProvider = let _infoProvider =
ExternalInfoProvider.make Async.Sleep getInfo 10<ms> responders ExternalInfoProvider.make Async.Sleep getInfo 10<ms> responders
|> Async.RunSynchronously |> Async.RunSynchronously
@@ -107,13 +119,17 @@ module TestPulsingServer =
// n requests come in - note that we don't start them off yet, // n requests come in - note that we don't start them off yet,
// because we want to time them separately // because we want to time them separately
let requests = let requests =
Array.init n (fun i -> Array.init
async { n
let! answer = ServerAgent.giveNextResponse responders.[i % queues] (fun i ->
if answer <> "" then failwith "unexpected response!" async {
return () let! answer = ServerAgent.giveNextResponse responders.[i % queues]
}
) if answer <> "" then
failwith "unexpected response!"
return ()
})
|> Async.Parallel |> Async.Parallel
|> Async.Ignore |> Async.Ignore
@@ -122,8 +138,7 @@ module TestPulsingServer =
time.Restart () time.Restart ()
requests requests |> Async.RunSynchronously
|> Async.RunSynchronously
time.Stop () time.Stop ()
printfn "Time to execute: %i ms" time.ElapsedMilliseconds printfn "Time to execute: %i ms" time.ElapsedMilliseconds
@@ -131,42 +146,47 @@ module TestPulsingServer =
// Now prepare n more requests, but halfway through, we'll be changing the data. // Now prepare n more requests, but halfway through, we'll be changing the data.
// Again, don't kick them off right now; wait for the timer. // Again, don't kick them off right now; wait for the timer.
time.Restart () time.Restart ()
let requests = let requests =
Array.init n (fun i -> Array.init
if i = n / 2 then n
async { (fun i ->
lock data (fun () -> data <- "new data") if i = n / 2 then
return None async {
} lock data (fun () -> data <- "new data")
else return None
async { }
do! Async.Sleep (TimeSpan.FromMilliseconds (float i)) else
let! response = ServerAgent.giveNextResponse (responders.[i % queues]) async {
return Some response do! Async.Sleep (TimeSpan.FromMilliseconds (float i))
} let! response = ServerAgent.giveNextResponse (responders.[i % queues])
) return Some response
})
|> Async.Parallel |> Async.Parallel
time.Stop () time.Stop ()
printfn "Time to construct requests: %i ms" time.ElapsedMilliseconds printfn "Time to construct requests: %i ms" time.ElapsedMilliseconds
time.Restart () time.Restart ()
let results = let results = requests |> Async.RunSynchronously
requests
|> Async.RunSynchronously
time.Stop () time.Stop ()
printfn "Time to execute: %i ms" time.ElapsedMilliseconds printfn "Time to execute: %i ms" time.ElapsedMilliseconds
let grouped = let grouped =
results results |> Array.countBy id |> Map.ofArray
|> Array.countBy id
|> Map.ofArray
grouped.[None] |> shouldEqual 1 grouped.[None] |> shouldEqual 1
let pre = Map.tryFind (Some "") grouped |> Option.defaultValue 0
let post = Map.tryFind (Some "new data") grouped |> Option.defaultValue 0 let pre =
Map.tryFind (Some "") grouped
|> Option.defaultValue 0
let post =
Map.tryFind (Some "new data") grouped
|> Option.defaultValue 0
pre + post |> shouldEqual (n - 1) pre + post |> shouldEqual (n - 1)
printfn "Got old data: %i. Got new data: %i." pre post printfn "Got old data: %i. Got new data: %i." pre post

View File

@@ -1,17 +1,15 @@
open System.Net open System.Net
open System.Diagnostics open System.Diagnostics
use timer = new Stopwatch() let timer = new Stopwatch()
timer.Restart() timer.Restart()
[ [ for i in 1 .. 1000 do
for i in 1..1000 do yield
yield async {
async { let w = new WebClient()
let w = new WebClient() return w.DownloadString("http://localhost:5000/")
return w.DownloadString("http://localhost:5000/") } ]
}
]
|> Async.Parallel |> Async.Parallel
|> Async.RunSynchronously |> Async.RunSynchronously