Permit async ParallelQueue (#279)

This commit is contained in:
Patrick Stevens
2025-07-29 22:24:58 +01:00
committed by GitHub
parent 99e0fdff08
commit eeada219f6
4 changed files with 124 additions and 103 deletions

View File

@@ -4,16 +4,16 @@ open System
open System.Threading open System.Threading
open System.Threading.Tasks open System.Threading.Tasks
type private ThunkEvaluator<'ret> = type private AsyncThunkEvaluator<'ret> =
abstract Eval<'a> : (unit -> 'a) -> AsyncReplyChannel<Result<'a, exn>> -> 'ret abstract Eval<'a> : (unit -> Async<'a>) -> AsyncReplyChannel<Result<'a, exn>> -> 'ret
type private ThunkCrate = type private AsyncThunkCrate =
abstract Apply<'ret> : ThunkEvaluator<'ret> -> 'ret abstract Apply<'ret> : AsyncThunkEvaluator<'ret> -> 'ret
[<RequireQualifiedAccess>] [<RequireQualifiedAccess>]
module private ThunkCrate = module private AsyncThunkCrate =
let make<'a> (t : unit -> 'a) (rc : AsyncReplyChannel<Result<'a, exn>>) : ThunkCrate = let make<'a> (t : unit -> Async<'a>) (rc : AsyncReplyChannel<Result<'a, exn>>) : AsyncThunkCrate =
{ new ThunkCrate with { new AsyncThunkCrate with
member _.Apply e = e.Eval t rc member _.Apply e = e.Eval t rc
} }
@@ -41,7 +41,11 @@ type private MailboxMessage =
| Quit of AsyncReplyChannel<unit> | Quit of AsyncReplyChannel<unit>
/// Check current state, see if we need to start more tests, etc. /// Check current state, see if we need to start more tests, etc.
| Reconcile | Reconcile
| RunTest of within : TestFixture * Parallelizable<unit> option * test : ThunkCrate * context : ExecutionContext | RunTestAsync of
within : TestFixture *
Parallelizable<unit> option *
test : AsyncThunkCrate *
context : ExecutionContext
| BeginTestFixture of TestFixture * AsyncReplyChannel<TestFixtureRunningToken> | BeginTestFixture of TestFixture * AsyncReplyChannel<TestFixtureRunningToken>
| EndTestFixture of TestFixtureTearDownToken * AsyncReplyChannel<unit> | EndTestFixture of TestFixtureTearDownToken * AsyncReplyChannel<unit>
@@ -310,26 +314,31 @@ type ParallelQueue
rc.Reply () rc.Reply ()
m.Post MailboxMessage.Reconcile m.Post MailboxMessage.Reconcile
return! processTask (Running state) m return! processTask (Running state) m
| MailboxMessage.RunTest (withinFixture, par, message, capturedContext) -> | MailboxMessage.RunTestAsync (withinFixture, par, message, capturedContext) ->
let t () = let t () =
{ new ThunkEvaluator<_> with { new AsyncThunkEvaluator<_> with
member _.Eval<'b> (t : unit -> 'b) rc = member _.Eval<'b> (t : unit -> Async<'b>) rc =
let tcs = TaskCompletionSource TaskCreationOptions.RunContinuationsAsynchronously let tcs = TaskCompletionSource TaskCreationOptions.RunContinuationsAsynchronously
fun () -> fun () ->
ExecutionContext.Run ( ExecutionContext.Run (
capturedContext, capturedContext,
(fun _ -> (fun _ ->
let result = async {
let! result =
async {
try try
let r = t () let! r = t ()
Ok r return Ok r
with e -> with e ->
Error e return Error e
}
tcs.SetResult () tcs.SetResult ()
m.Post MailboxMessage.Reconcile m.Post MailboxMessage.Reconcile
rc.Reply result rc.Reply result
}
|> Async.StartImmediate
), ),
() ()
) )
@@ -353,19 +362,19 @@ type ParallelQueue
let mb = new MailboxProcessor<_> (processTask MailboxState.Idle) let mb = new MailboxProcessor<_> (processTask MailboxState.Idle)
do mb.Start () do mb.Start ()
/// Request to run the given action, freely in parallel with other running tests. /// Request to run the given async action, freely in parallel with other running tests.
/// The resulting Task will return when the action has completed. /// The resulting Task will return when the action has completed.
member _.Run<'a> member _.RunAsync<'a>
(TestFixtureSetupToken parent) (TestFixtureSetupToken parent)
(scope : Parallelizable<unit> option) (scope : Parallelizable<unit> option)
(action : unit -> 'a) (action : unit -> Async<'a>)
: 'a Task : 'a Task
= =
let ec = ExecutionContext.Capture () let ec = ExecutionContext.Capture ()
task { task {
let! result = let! result =
(fun rc -> MailboxMessage.RunTest (parent, scope, ThunkCrate.make action rc, ec)) (fun rc -> MailboxMessage.RunTestAsync (parent, scope, AsyncThunkCrate.make action rc, ec))
|> mb.PostAndAsyncReply |> mb.PostAndAsyncReply
|> Async.StartAsTask |> Async.StartAsTask
@@ -374,6 +383,16 @@ type ParallelQueue
| Error e -> return Exception.reraiseWithOriginalStackTrace e | Error e -> return Exception.reraiseWithOriginalStackTrace e
} }
/// Request to run the given action, freely in parallel with other running tests.
/// The resulting Task will return when the action has completed.
member this.Run<'a>
(parent : TestFixtureSetupToken)
(scope : Parallelizable<unit> option)
(action : unit -> 'a)
: 'a Task
=
this.RunAsync parent scope (fun () -> async.Return (action ()))
/// Declare that we wish to start the given test fixture. The resulting Task will return /// Declare that we wish to start the given test fixture. The resulting Task will return
/// when you are allowed to start running tests from that fixture. /// when you are allowed to start running tests from that fixture.
/// Once you've finished running tests from that fixture, call EndTestFixture. /// Once you've finished running tests from that fixture, call EndTestFixture.
@@ -396,7 +415,14 @@ type ParallelQueue
let ec = ExecutionContext.Capture () let ec = ExecutionContext.Capture ()
let! response = let! response =
(fun rc -> MailboxMessage.RunTest (parent, par, ThunkCrate.make action rc, ec)) (fun rc ->
MailboxMessage.RunTestAsync (
parent,
par,
AsyncThunkCrate.make (fun () -> async.Return (action ())) rc,
ec
)
)
|> mb.PostAndAsyncReply |> mb.PostAndAsyncReply
match response with match response with
@@ -422,7 +448,14 @@ type ParallelQueue
let ec = ExecutionContext.Capture () let ec = ExecutionContext.Capture ()
let! response = let! response =
(fun rc -> MailboxMessage.RunTest (parent, par, ThunkCrate.make action rc, ec)) (fun rc ->
MailboxMessage.RunTestAsync (
parent,
par,
AsyncThunkCrate.make (fun () -> async.Return (action ())) rc,
ec
)
)
|> mb.PostAndAsyncReply |> mb.PostAndAsyncReply
match response with match response with

View File

@@ -256,6 +256,7 @@ WoofWare.NUnitTestRunner.ParallelQueue inherit obj, implements IDisposable
WoofWare.NUnitTestRunner.ParallelQueue..ctor [constructor]: (int option, WoofWare.NUnitTestRunner.AssemblyParallelScope WoofWare.NUnitTestRunner.Parallelizable option, System.Threading.CancellationToken option) WoofWare.NUnitTestRunner.ParallelQueue..ctor [constructor]: (int option, WoofWare.NUnitTestRunner.AssemblyParallelScope WoofWare.NUnitTestRunner.Parallelizable option, System.Threading.CancellationToken option)
WoofWare.NUnitTestRunner.ParallelQueue.EndTestFixture [method]: WoofWare.NUnitTestRunner.TestFixtureTearDownToken -> unit System.Threading.Tasks.Task WoofWare.NUnitTestRunner.ParallelQueue.EndTestFixture [method]: WoofWare.NUnitTestRunner.TestFixtureTearDownToken -> unit System.Threading.Tasks.Task
WoofWare.NUnitTestRunner.ParallelQueue.Run [method]: WoofWare.NUnitTestRunner.TestFixtureSetupToken -> unit WoofWare.NUnitTestRunner.Parallelizable option -> (unit -> 'a) -> 'a System.Threading.Tasks.Task WoofWare.NUnitTestRunner.ParallelQueue.Run [method]: WoofWare.NUnitTestRunner.TestFixtureSetupToken -> unit WoofWare.NUnitTestRunner.Parallelizable option -> (unit -> 'a) -> 'a System.Threading.Tasks.Task
WoofWare.NUnitTestRunner.ParallelQueue.RunAsync [method]: WoofWare.NUnitTestRunner.TestFixtureSetupToken -> unit WoofWare.NUnitTestRunner.Parallelizable option -> (unit -> 'a Microsoft.FSharp.Control.FSharpAsync) -> 'a System.Threading.Tasks.Task
WoofWare.NUnitTestRunner.ParallelQueue.RunTestSetup [method]: WoofWare.NUnitTestRunner.TestFixtureRunningToken -> (unit -> 'a) -> ('a * WoofWare.NUnitTestRunner.TestFixtureSetupToken) System.Threading.Tasks.Task WoofWare.NUnitTestRunner.ParallelQueue.RunTestSetup [method]: WoofWare.NUnitTestRunner.TestFixtureRunningToken -> (unit -> 'a) -> ('a * WoofWare.NUnitTestRunner.TestFixtureSetupToken) System.Threading.Tasks.Task
WoofWare.NUnitTestRunner.ParallelQueue.RunTestTearDown [method]: WoofWare.NUnitTestRunner.TestFixtureSetupToken -> (unit -> 'a) -> ('a * WoofWare.NUnitTestRunner.TestFixtureTearDownToken) System.Threading.Tasks.Task WoofWare.NUnitTestRunner.ParallelQueue.RunTestTearDown [method]: WoofWare.NUnitTestRunner.TestFixtureSetupToken -> (unit -> 'a) -> ('a * WoofWare.NUnitTestRunner.TestFixtureTearDownToken) System.Threading.Tasks.Task
WoofWare.NUnitTestRunner.ParallelQueue.StartTestFixture [method]: WoofWare.NUnitTestRunner.TestFixture -> WoofWare.NUnitTestRunner.TestFixtureRunningToken System.Threading.Tasks.Task WoofWare.NUnitTestRunner.ParallelQueue.StartTestFixture [method]: WoofWare.NUnitTestRunner.TestFixture -> WoofWare.NUnitTestRunner.TestFixtureRunningToken System.Threading.Tasks.Task

View File

@@ -1,5 +1,5 @@
{ {
"version": "0.21", "version": "0.22",
"publicReleaseRefSpec": [ "publicReleaseRefSpec": [
"^refs/heads/main$" "^refs/heads/main$"
], ],

View File

@@ -243,17 +243,12 @@ module TestSynchronizationContext =
do! queue.EndTestFixture teardown do! queue.EndTestFixture teardown
} }
(*
[<Test>] [<Test>]
let ``ExecutionContext flows correctly through async operations`` () = let ``ExecutionContext flows correctly through async operations`` () =
task { task {
// Create a test fixture // Create a test fixture
let dummyFixture = let dummyFixture =
TestFixture.Empty TestFixture.Empty typeof<obj> (Some (Parallelizable.Yes ClassParallelScope.All)) [] []
typeof<obj>
(Some (Parallelizable.Yes ClassParallelScope.All))
[]
[]
use contexts = TestContexts.Empty () use contexts = TestContexts.Empty ()
use queue = new ParallelQueue (Some 4, None) use queue = new ParallelQueue (Some 4, None)
@@ -277,7 +272,10 @@ module TestSynchronizationContext =
// Run an async operation that checks the context at multiple points // Run an async operation that checks the context at multiple points
let! actualId = let! actualId =
queue.RunAsync setup None (fun () -> queue.RunAsync
setup
None
(fun () ->
async { async {
// Check context immediately // Check context immediately
let immediate = contexts.AsyncLocal.Value let immediate = contexts.AsyncLocal.Value
@@ -318,8 +316,8 @@ module TestSynchronizationContext =
allValues |> shouldHaveLength 30 // 3 checks per operation * 10 operations allValues |> shouldHaveLength 30 // 3 checks per operation * 10 operations
// Every captured value should match its expected value // Every captured value should match its expected value
for expected, actual in allValues do allValues
actual |> shouldEqual expected |> List.iter (fun (expected, actual) -> actual |> shouldEqual expected)
// Clean up // Clean up
let! _, teardown = queue.RunTestTearDown setup (fun () -> ()) let! _, teardown = queue.RunTestTearDown setup (fun () -> ())
@@ -330,11 +328,7 @@ module TestSynchronizationContext =
let ``ExecutionContext isolation between concurrent operations`` () = let ``ExecutionContext isolation between concurrent operations`` () =
task { task {
let dummyFixture = let dummyFixture =
TestFixture.Empty TestFixture.Empty typeof<obj> (Some (Parallelizable.Yes ClassParallelScope.All)) [] []
typeof<obj>
(Some (Parallelizable.Yes ClassParallelScope.All))
[]
[]
use contexts = TestContexts.Empty () use contexts = TestContexts.Empty ()
use queue = new ParallelQueue (Some 4, None) use queue = new ParallelQueue (Some 4, None)
@@ -344,7 +338,7 @@ module TestSynchronizationContext =
// Use a barrier to ensure operations run concurrently // Use a barrier to ensure operations run concurrently
let barrier = new Barrier (3) let barrier = new Barrier (3)
let seenValues = System.Collections.Concurrent.ConcurrentBag<int * Guid option>() let seenValues = System.Collections.Concurrent.ConcurrentBag<int * Guid> ()
// Create operations that will definitely run concurrently // Create operations that will definitely run concurrently
let tasks = let tasks =
@@ -356,27 +350,28 @@ module TestSynchronizationContext =
contexts.AsyncLocal.Value <- OutputStreamId myId contexts.AsyncLocal.Value <- OutputStreamId myId
let! result = let! result =
queue.RunAsync setup (Some (Parallelizable.Yes ())) (fun () -> queue.RunAsync
setup
(Some (Parallelizable.Yes ()))
(fun () ->
async { async {
// Wait for all tasks to reach this point // Wait for all tasks to reach this point
barrier.SignalAndWait () |> ignore barrier.SignalAndWait () |> ignore
// Now check what value we see // Now check what value we see
let currentValue = contexts.AsyncLocal.Value let currentValue = contexts.AsyncLocal.Value
match currentValue with match currentValue with
| OutputStreamId guid -> seenValues.Add(i, Some guid) | OutputStreamId guid -> seenValues.Add (i, guid)
| _ -> seenValues.Add(i, None)
// Do some async work // Do some async work
do! Async.Sleep 5 do! Async.Sleep 5
// Check again after async work // Check again after async work
let afterAsync = contexts.AsyncLocal.Value let afterAsync = contexts.AsyncLocal.Value
match afterAsync with match afterAsync with
| OutputStreamId guid -> | OutputStreamId guid -> return guid
return guid
| _ ->
return failwith "Lost context after async"
} }
) )
@@ -392,18 +387,10 @@ module TestSynchronizationContext =
let values = seenValues |> Seq.toList let values = seenValues |> Seq.toList
values |> shouldHaveLength 3 values |> shouldHaveLength 3
// Each task should have seen a value
for (taskId, value) in values do
value |> shouldNotEqual None
// All seen values should be different (no context bleeding) // All seen values should be different (no context bleeding)
let uniqueValues = let uniqueValues = values |> List.map snd |> List.distinct
values
|> List.choose snd
|> List.distinct
uniqueValues |> shouldHaveLength 3 uniqueValues |> shouldHaveLength 3
let! _, teardown = queue.RunTestTearDown setup (fun () -> ()) let! _, teardown = queue.RunTestTearDown setup (fun () -> ())
do! queue.EndTestFixture teardown do! queue.EndTestFixture teardown
} }
*)