namespace PureGym open System open System.Net.Http open System.Threading open System.Threading.Tasks type private CacheMessage<'a> = | TriggerUpdate | UpdateStored of 'a Task | Get of AsyncReplyChannel<'a> | Quit of AsyncReplyChannel type Cache<'a> (obtainNew : CancellationToken -> 'a Task, expiry : 'a -> DateTime option) = let cts = new CancellationTokenSource () let initialValue = obtainNew cts.Token let rec handle (value : 'a Task) (mailbox : MailboxProcessor>) : Async = async { let! message = mailbox.Receive () match message with | Quit channel -> channel.Reply () return () | CacheMessage.UpdateStored newValue -> return! handle newValue mailbox | CacheMessage.TriggerUpdate -> async { let! a = Async.AwaitTask (obtainNew cts.Token) let expiry = expiry a match expiry with | None -> return () | Some expiry -> // a bit sloppy but :shrug: do! Async.Sleep ((expiry - DateTime.Now) - TimeSpan.FromMinutes 1.0) try mailbox.Post CacheMessage.TriggerUpdate with _ -> // Post during shutdown sequence: drop it on the floor () return () } |> fun a -> Async.Start (a, cancellationToken = cts.Token) return! handle value mailbox | CacheMessage.Get reply -> let! valueAwaited = Async.AwaitTask value reply.Reply valueAwaited return! handle value mailbox } let mailbox = new MailboxProcessor<_> (handle initialValue) do mailbox.Start () mailbox.Post CacheMessage.TriggerUpdate let isDisposing = ref 0 let hasDisposed = TaskCompletionSource () member this.GetCurrentValue () = try mailbox.PostAndAsyncReply CacheMessage.Get with // TODO I think this is the right exception... | :? InvalidOperationException -> raise (ObjectDisposedException (nameof (Cache))) interface IDisposable with member _.Dispose () = if Interlocked.Increment isDisposing = 1 then mailbox.PostAndReply CacheMessage.Quit (mailbox :> IDisposable).Dispose () // We can't terminate the CTS until the mailbox has processed all client requests. // Otherwise we terminate the mailbox's state Task before it has finished querying that // task on behalf of clients. cts.Cancel () cts.Dispose () hasDisposed.SetResult () else hasDisposed.Task.Result /// Methods for interacting with the PureGym REST API. [] module Api = /// Create a REST client, authenticated as the specified user. Creds will be refreshed if possible as long as /// the returned disposable is not disposed. let make (auth : Auth) : (IPureGymApi * IDisposable) Task = let cache, getToken = match auth with | Auth.Token t -> { new IDisposable with member _.Dispose () = () }, fun () -> t | Auth.User cred -> let cache = new Cache<_> (AuthToken.get cred, _.ExpiryTime) cache :> _, (fun () -> Async.RunSynchronously (cache.GetCurrentValue ())) task { let client = new HttpClient () return PureGymApi.make (getToken >> _.AccessToken >> sprintf "Bearer %s") client, cache } /// Create a REST client, authenticated as the specified user. Do not refresh creds. let makeWithoutRefresh (ct : CancellationToken) (auth : Auth) : IPureGymApi Task = task { let! token = match auth with | Auth.Token t -> Task.FromResult<_> t | Auth.User cred -> AuthToken.get cred ct let client = new HttpClient () return PureGymApi.make (fun () -> $"Bearer %s{token.AccessToken}") client }