Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -925,6 +925,34 @@ module AsyncSeq =
| Choice1Of2 value -> return value
| Choice2Of2 ex -> return raise ex })
}

let mapAsyncUnorderedParallelThrottled (parallelism:int) (f:'a -> Async<'b>) (s:AsyncSeq<'a>) : AsyncSeq<'b> = asyncSeq {
use mb = MailboxProcessor.Start (fun _ -> async.Return())
use sm = new SemaphoreSlim(parallelism)
let! err =
s
|> iterAsync (fun a -> async {
do! sm.WaitAsync () |> Async.awaitTaskUnitCancellationAsError
let! b = Async.StartChild (async {
try
let! result = f a
sm.Release() |> ignore
return Choice1Of2 result
with ex ->
sm.Release() |> ignore
return Choice2Of2 ex
})
mb.Post (Some b) })
|> Async.map (fun _ -> mb.Post None)
|> Async.StartChildAsTask
yield!
replicateUntilNoneAsync (Task.chooseTask (err |> Task.taskFault) (async.Delay mb.Receive))
|> mapAsync (fun childAsync -> async {
let! result = childAsync
match result with
| Choice1Of2 value -> return value
| Choice2Of2 ex -> return raise ex })
}
#endif

let chooseAsync f (source:AsyncSeq<'T>) =
Expand Down
8 changes: 8 additions & 0 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fsi
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,14 @@ module AsyncSeq =
/// Parallelism is bound by the ThreadPool.
val mapAsyncUnorderedParallel : mapping:('T -> Async<'U>) -> s:AsyncSeq<'T> -> AsyncSeq<'U>

/// Builds a new asynchronous sequence whose elements are generated by
/// applying the specified function to all elements of the input sequence,
/// with at most <c>parallelism</c> mapping operations running concurrently.
///
/// The function is applied to elements in parallel (throttled), and results are emitted
/// in the order they complete (unordered), without preserving the original order.
val mapAsyncUnorderedParallelThrottled : parallelism:int -> mapping:('T -> Async<'U>) -> s:AsyncSeq<'T> -> AsyncSeq<'U>

/// Applies a key-generating function to each element and returns an async sequence containing unique keys
/// and async sequences containing elements corresponding to the key.
///
Expand Down
53 changes: 53 additions & 0 deletions tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -1717,6 +1717,59 @@ let ``AsyncSeq.mapAsyncUnorderedParallel should not preserve order`` () =
Assert.IsTrue(allPresent, "All input elements should be present in results")


[<Test>]
let ``AsyncSeq.mapAsyncUnorderedParallelThrottled should produce all results`` () =
let input = [1; 2; 3; 4; 5]
let expected = [2; 4; 6; 8; 10] |> Set.ofList

let actual =
input
|> AsyncSeq.ofSeq
|> AsyncSeq.mapAsyncUnorderedParallelThrottled 3 (fun x -> async {
do! Async.Sleep(10)
return x * 2
})
|> AsyncSeq.toListAsync
|> runTest
|> Set.ofList

Assert.AreEqual(expected, actual)

[<Test>]
let ``AsyncSeq.mapAsyncUnorderedParallelThrottled should propagate handler exception`` () =
let res =
AsyncSeq.init 100L id
|> AsyncSeq.mapAsyncUnorderedParallelThrottled 10 (fun i -> async {
if i = 50L then return failwith "oh no"
else return i * 2L
})
|> AsyncSeq.toListAsync
|> Async.Catch
|> (fun x -> Async.RunSynchronously (x, timeout = 10000))

match res with
| Choice2Of2 _ -> ()
| Choice1Of2 _ -> Assert.Fail ("error expected")

[<Test>]
let ``AsyncSeq.mapAsyncUnorderedParallelThrottled should throttle`` () =
let count = ref 0
let parallelism = 5

let result =
AsyncSeq.init 50L id
|> AsyncSeq.mapAsyncUnorderedParallelThrottled parallelism (fun i -> async {
let c = Interlocked.Increment count
if c > parallelism then
return failwith (sprintf "concurrency exceeded: %d > %d" c parallelism)
do! Async.Sleep 5
Interlocked.Decrement count |> ignore
return i * 2L })
|> AsyncSeq.toListAsync
|> Async.RunSynchronously

Assert.AreEqual(50, result.Length)

//[<Test>]
//let ``AsyncSeq.mapParallelAsyncBounded should maintain order`` () =
// let ls = List.init 500 id
Expand Down