From 5029d9d0544b192e652e443c149573377ca5b044 Mon Sep 17 00:00:00 2001 From: Abel Braaksma Date: Mon, 14 Nov 2022 04:28:43 +0100 Subject: [PATCH 1/7] Initial attempt at implementing resumable code aware version of `For` in `Task` with `TaskSeq<'T>` --- src/FSharp.Control.TaskSeq/TaskSeq.fs | 32 +++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/src/FSharp.Control.TaskSeq/TaskSeq.fs b/src/FSharp.Control.TaskSeq/TaskSeq.fs index 8e4ae80c..1ad258ea 100644 --- a/src/FSharp.Control.TaskSeq/TaskSeq.fs +++ b/src/FSharp.Control.TaskSeq/TaskSeq.fs @@ -4,6 +4,8 @@ open System.Collections.Generic open System.Threading open System.Threading.Tasks +#nowarn "57" + module TaskSeq = // F# BUG: the following module is 'AutoOpen' and this isn't needed in the Tests project. Why do we need to open it? open FSharp.Control.TaskSeqBuilders @@ -324,3 +326,33 @@ module TaskSeq = let fold folder state source = Internal.fold (FolderAction folder) state source let foldAsync folder state source = Internal.fold (AsyncFolderAction folder) state source + +[] +module AsyncSeqExtensions = + open Microsoft.FSharp.Core.CompilerServices + + // Add asynchronous for loop to the 'async' computation builder + type Microsoft.FSharp.Control.AsyncBuilder with + + member x.For(tasksq: IAsyncEnumerable<'T>, action: 'T -> Async) = + tasksq + |> TaskSeq.iterAsync (action >> Async.StartAsTask) + |> Async.AwaitTask + + // Add asynchronous for loop to the 'task' computation builder + type Microsoft.FSharp.Control.TaskBuilder with + + member inline this.For + ( + tasksq: IAsyncEnumerable<'T>, + body: 'T -> TaskCode<'TOverall, unit> + ) : TaskCode<'TOverall, unit> = + TaskCode<'TOverall, unit>(fun sm -> + this + .Using( + tasksq.GetAsyncEnumerator(CancellationToken()), + (fun e -> + // TODO: fix 'true' with e.MoveNextAsync() + this.While((fun () -> true), (fun sm -> (body e.Current).Invoke(&sm)))) + ) + .Invoke(&sm)) From 16df03d3d78c6c97ee92486c38feba0df0674bfd Mon Sep 17 00:00:00 2001 From: Jimmy Byrd Date: Mon, 21 Nov 2022 22:34:31 -0500 Subject: [PATCH 2/7] WhileDynamic for task files --- .../FSharp.Control.TaskSeq.Test.fsproj | 9 +-- .../TaskSeq.Extensions.Tests.fs | 28 ++++++++ src/FSharp.Control.TaskSeq/TaskSeq.fs | 68 ++++++++++++++++++- src/FSharp.Control.TaskSeq/TaskSeq.fsi | 35 ++++++++++ 4 files changed, 131 insertions(+), 9 deletions(-) create mode 100644 src/FSharp.Control.TaskSeq.Test/TaskSeq.Extensions.Tests.fs diff --git a/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj b/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj index 0d0b6019..a43c5c44 100644 --- a/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj +++ b/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj @@ -1,13 +1,10 @@ - - + net6.0 - false false ..\..\assets\TaskSeq.ico - @@ -47,9 +44,9 @@ + - @@ -67,9 +64,7 @@ all - - diff --git a/src/FSharp.Control.TaskSeq.Test/TaskSeq.Extensions.Tests.fs b/src/FSharp.Control.TaskSeq.Test/TaskSeq.Extensions.Tests.fs new file mode 100644 index 00000000..0b30e575 --- /dev/null +++ b/src/FSharp.Control.TaskSeq.Test/TaskSeq.Extensions.Tests.fs @@ -0,0 +1,28 @@ +module TaskSeq.Extenions + +open System +open Xunit +open FsUnit.Xunit +open FsToolkit.ErrorHandling + +open FSharp.Control + +// +// TaskSeq.except +// TaskSeq.exceptOfSeq +// + + +// module TaskBuilder = +// open TaskSeq.Tests + +// [)>] +// let ``TaskSeq-existsAsync happy path last item of seq`` variant = +// task { +// let values = Gen.getSeqImmutable variant +// let mutable sum = 0 +// for x in values do +// sum <- sum + x +// } + // |> TaskSeq.existsAsync (fun x -> task { return x = 10 }) + // |> Task.map (should be True) diff --git a/src/FSharp.Control.TaskSeq/TaskSeq.fs b/src/FSharp.Control.TaskSeq/TaskSeq.fs index 1ad258ea..2397c8eb 100644 --- a/src/FSharp.Control.TaskSeq/TaskSeq.fs +++ b/src/FSharp.Control.TaskSeq/TaskSeq.fs @@ -327,8 +327,54 @@ module TaskSeq = let foldAsync folder state source = Internal.fold (AsyncFolderAction folder) state source +#nowarn "1204" +#nowarn "3513" + + [] module AsyncSeqExtensions = + + let rec WhileDynamic + ( + sm: byref>, + condition: unit -> ValueTask, + body: TaskCode<'Data, unit> + ) : bool = + let vt = condition () + TaskBuilderBase.BindDynamic(&sm, vt, fun result -> + TaskCode<_,_>(fun sm -> + if result then + if body.Invoke(&sm) then + WhileDynamic(&sm, condition, body) + else + let rf = sm.ResumptionDynamicInfo.ResumptionFunc + + sm.ResumptionDynamicInfo.ResumptionFunc <- + (TaskResumptionFunc<'Data>(fun sm -> WhileBodyDynamicAux(&sm, condition, body, rf))) + + false + else + true + ) + ) + + + and WhileBodyDynamicAux + ( + sm: byref>, + condition: unit -> ValueTask, + body: TaskCode<'Data, unit>, + rf: TaskResumptionFunc<_> + ) : bool = + if rf.Invoke(&sm) then + WhileDynamic(&sm, condition, body) + else + let rf = sm.ResumptionDynamicInfo.ResumptionFunc + + sm.ResumptionDynamicInfo.ResumptionFunc <- + (TaskResumptionFunc<'Data>(fun sm -> WhileBodyDynamicAux(&sm, condition, body, rf))) + + false open Microsoft.FSharp.Core.CompilerServices // Add asynchronous for loop to the 'async' computation builder @@ -342,17 +388,35 @@ module AsyncSeqExtensions = // Add asynchronous for loop to the 'task' computation builder type Microsoft.FSharp.Control.TaskBuilder with + + member inline this.While(condition : unit -> ValueTask, body : TaskCode<'TOverall,unit>) = + TaskCode<_,_>(fun sm -> + WhileDynamic(&sm, condition, body) + + ) + + + member inline this.For ( tasksq: IAsyncEnumerable<'T>, body: 'T -> TaskCode<'TOverall, unit> ) : TaskCode<'TOverall, unit> = TaskCode<'TOverall, unit>(fun sm -> + this .Using( tasksq.GetAsyncEnumerator(CancellationToken()), (fun e -> - // TODO: fix 'true' with e.MoveNextAsync() - this.While((fun () -> true), (fun sm -> (body e.Current).Invoke(&sm)))) + let next () = e.MoveNextAsync() + this.While(next, (fun sm -> (body e.Current).Invoke(&sm)))) ) .Invoke(&sm)) + + let foo () = + task { + let mutable sum = 0 + let xs = taskSeq { 1; 2; 3} + for x in xs do + sum <- sum + x + } diff --git a/src/FSharp.Control.TaskSeq/TaskSeq.fsi b/src/FSharp.Control.TaskSeq/TaskSeq.fsi index aeae538d..cfa4281f 100644 --- a/src/FSharp.Control.TaskSeq/TaskSeq.fsi +++ b/src/FSharp.Control.TaskSeq/TaskSeq.fsi @@ -1,5 +1,7 @@ namespace FSharp.Control +#nowarn "1204" + module TaskSeq = open System.Collections.Generic open System.Threading.Tasks @@ -562,3 +564,36 @@ module TaskSeq = /// If the accumulator function does not need to be asynchronous, consider using . /// val foldAsync: folder: ('State -> 'T -> #Task<'State>) -> state: 'State -> source: taskSeq<'T> -> Task<'State> + + + +[] +module AsyncSeqExtensions = + + val WhileDynamic: + sm: byref> * + condition: (unit -> System.Threading.Tasks.ValueTask) * + body: TaskCode<'Data, unit> -> + bool + + val WhileBodyDynamicAux: + sm: byref> * + condition: (unit -> System.Threading.Tasks.ValueTask) * + body: TaskCode<'Data, unit> * + rf: TaskResumptionFunc<'Data> -> + bool + + type AsyncBuilder with + + member For: tasksq: System.Collections.Generic.IAsyncEnumerable<'T> * action: ('T -> Async) -> Async + + type TaskBuilder with + + member inline While: + condition: (unit -> System.Threading.Tasks.ValueTask) * body: TaskCode<'TOverall, unit> -> + TaskCode<'TOverall, 'a> + + member inline For: + tasksq: System.Collections.Generic.IAsyncEnumerable<'T> * body: ('T -> TaskCode<'TOverall, unit>) -> + TaskCode<'TOverall, unit> + From 9f6c91fd06da694f36b07acd5474911c98983d76 Mon Sep 17 00:00:00 2001 From: Abel Braaksma Date: Mon, 14 Nov 2022 04:28:43 +0100 Subject: [PATCH 3/7] Initial attempt at implementing resumable code aware version of `For` in `Task` with `TaskSeq<'T>` --- src/FSharp.Control.TaskSeq/TaskSeq.fs | 32 +++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/src/FSharp.Control.TaskSeq/TaskSeq.fs b/src/FSharp.Control.TaskSeq/TaskSeq.fs index 8e4ae80c..1ad258ea 100644 --- a/src/FSharp.Control.TaskSeq/TaskSeq.fs +++ b/src/FSharp.Control.TaskSeq/TaskSeq.fs @@ -4,6 +4,8 @@ open System.Collections.Generic open System.Threading open System.Threading.Tasks +#nowarn "57" + module TaskSeq = // F# BUG: the following module is 'AutoOpen' and this isn't needed in the Tests project. Why do we need to open it? open FSharp.Control.TaskSeqBuilders @@ -324,3 +326,33 @@ module TaskSeq = let fold folder state source = Internal.fold (FolderAction folder) state source let foldAsync folder state source = Internal.fold (AsyncFolderAction folder) state source + +[] +module AsyncSeqExtensions = + open Microsoft.FSharp.Core.CompilerServices + + // Add asynchronous for loop to the 'async' computation builder + type Microsoft.FSharp.Control.AsyncBuilder with + + member x.For(tasksq: IAsyncEnumerable<'T>, action: 'T -> Async) = + tasksq + |> TaskSeq.iterAsync (action >> Async.StartAsTask) + |> Async.AwaitTask + + // Add asynchronous for loop to the 'task' computation builder + type Microsoft.FSharp.Control.TaskBuilder with + + member inline this.For + ( + tasksq: IAsyncEnumerable<'T>, + body: 'T -> TaskCode<'TOverall, unit> + ) : TaskCode<'TOverall, unit> = + TaskCode<'TOverall, unit>(fun sm -> + this + .Using( + tasksq.GetAsyncEnumerator(CancellationToken()), + (fun e -> + // TODO: fix 'true' with e.MoveNextAsync() + this.While((fun () -> true), (fun sm -> (body e.Current).Invoke(&sm)))) + ) + .Invoke(&sm)) From 71037130cdf47fd0bf099049aec17340f50ba260 Mon Sep 17 00:00:00 2001 From: Jimmy Byrd Date: Mon, 21 Nov 2022 22:34:31 -0500 Subject: [PATCH 4/7] WhileDynamic for task files --- .../FSharp.Control.TaskSeq.Test.fsproj | 6 +- .../TaskSeq.Extensions.Tests.fs | 28 ++++++++ src/FSharp.Control.TaskSeq/TaskSeq.fs | 68 ++++++++++++++++++- src/FSharp.Control.TaskSeq/TaskSeq.fsi | 35 ++++++++++ 4 files changed, 130 insertions(+), 7 deletions(-) create mode 100644 src/FSharp.Control.TaskSeq.Test/TaskSeq.Extensions.Tests.fs diff --git a/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj b/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj index 7eb542e8..86030a28 100644 --- a/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj +++ b/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj @@ -2,12 +2,10 @@ net6.0 - false false ..\..\assets\TaskSeq.ico - @@ -47,9 +45,9 @@ + - @@ -67,9 +65,7 @@ all - - diff --git a/src/FSharp.Control.TaskSeq.Test/TaskSeq.Extensions.Tests.fs b/src/FSharp.Control.TaskSeq.Test/TaskSeq.Extensions.Tests.fs new file mode 100644 index 00000000..0b30e575 --- /dev/null +++ b/src/FSharp.Control.TaskSeq.Test/TaskSeq.Extensions.Tests.fs @@ -0,0 +1,28 @@ +module TaskSeq.Extenions + +open System +open Xunit +open FsUnit.Xunit +open FsToolkit.ErrorHandling + +open FSharp.Control + +// +// TaskSeq.except +// TaskSeq.exceptOfSeq +// + + +// module TaskBuilder = +// open TaskSeq.Tests + +// [)>] +// let ``TaskSeq-existsAsync happy path last item of seq`` variant = +// task { +// let values = Gen.getSeqImmutable variant +// let mutable sum = 0 +// for x in values do +// sum <- sum + x +// } + // |> TaskSeq.existsAsync (fun x -> task { return x = 10 }) + // |> Task.map (should be True) diff --git a/src/FSharp.Control.TaskSeq/TaskSeq.fs b/src/FSharp.Control.TaskSeq/TaskSeq.fs index 1ad258ea..2397c8eb 100644 --- a/src/FSharp.Control.TaskSeq/TaskSeq.fs +++ b/src/FSharp.Control.TaskSeq/TaskSeq.fs @@ -327,8 +327,54 @@ module TaskSeq = let foldAsync folder state source = Internal.fold (AsyncFolderAction folder) state source +#nowarn "1204" +#nowarn "3513" + + [] module AsyncSeqExtensions = + + let rec WhileDynamic + ( + sm: byref>, + condition: unit -> ValueTask, + body: TaskCode<'Data, unit> + ) : bool = + let vt = condition () + TaskBuilderBase.BindDynamic(&sm, vt, fun result -> + TaskCode<_,_>(fun sm -> + if result then + if body.Invoke(&sm) then + WhileDynamic(&sm, condition, body) + else + let rf = sm.ResumptionDynamicInfo.ResumptionFunc + + sm.ResumptionDynamicInfo.ResumptionFunc <- + (TaskResumptionFunc<'Data>(fun sm -> WhileBodyDynamicAux(&sm, condition, body, rf))) + + false + else + true + ) + ) + + + and WhileBodyDynamicAux + ( + sm: byref>, + condition: unit -> ValueTask, + body: TaskCode<'Data, unit>, + rf: TaskResumptionFunc<_> + ) : bool = + if rf.Invoke(&sm) then + WhileDynamic(&sm, condition, body) + else + let rf = sm.ResumptionDynamicInfo.ResumptionFunc + + sm.ResumptionDynamicInfo.ResumptionFunc <- + (TaskResumptionFunc<'Data>(fun sm -> WhileBodyDynamicAux(&sm, condition, body, rf))) + + false open Microsoft.FSharp.Core.CompilerServices // Add asynchronous for loop to the 'async' computation builder @@ -342,17 +388,35 @@ module AsyncSeqExtensions = // Add asynchronous for loop to the 'task' computation builder type Microsoft.FSharp.Control.TaskBuilder with + + member inline this.While(condition : unit -> ValueTask, body : TaskCode<'TOverall,unit>) = + TaskCode<_,_>(fun sm -> + WhileDynamic(&sm, condition, body) + + ) + + + member inline this.For ( tasksq: IAsyncEnumerable<'T>, body: 'T -> TaskCode<'TOverall, unit> ) : TaskCode<'TOverall, unit> = TaskCode<'TOverall, unit>(fun sm -> + this .Using( tasksq.GetAsyncEnumerator(CancellationToken()), (fun e -> - // TODO: fix 'true' with e.MoveNextAsync() - this.While((fun () -> true), (fun sm -> (body e.Current).Invoke(&sm)))) + let next () = e.MoveNextAsync() + this.While(next, (fun sm -> (body e.Current).Invoke(&sm)))) ) .Invoke(&sm)) + + let foo () = + task { + let mutable sum = 0 + let xs = taskSeq { 1; 2; 3} + for x in xs do + sum <- sum + x + } diff --git a/src/FSharp.Control.TaskSeq/TaskSeq.fsi b/src/FSharp.Control.TaskSeq/TaskSeq.fsi index aeae538d..cfa4281f 100644 --- a/src/FSharp.Control.TaskSeq/TaskSeq.fsi +++ b/src/FSharp.Control.TaskSeq/TaskSeq.fsi @@ -1,5 +1,7 @@ namespace FSharp.Control +#nowarn "1204" + module TaskSeq = open System.Collections.Generic open System.Threading.Tasks @@ -562,3 +564,36 @@ module TaskSeq = /// If the accumulator function does not need to be asynchronous, consider using . /// val foldAsync: folder: ('State -> 'T -> #Task<'State>) -> state: 'State -> source: taskSeq<'T> -> Task<'State> + + + +[] +module AsyncSeqExtensions = + + val WhileDynamic: + sm: byref> * + condition: (unit -> System.Threading.Tasks.ValueTask) * + body: TaskCode<'Data, unit> -> + bool + + val WhileBodyDynamicAux: + sm: byref> * + condition: (unit -> System.Threading.Tasks.ValueTask) * + body: TaskCode<'Data, unit> * + rf: TaskResumptionFunc<'Data> -> + bool + + type AsyncBuilder with + + member For: tasksq: System.Collections.Generic.IAsyncEnumerable<'T> * action: ('T -> Async) -> Async + + type TaskBuilder with + + member inline While: + condition: (unit -> System.Threading.Tasks.ValueTask) * body: TaskCode<'TOverall, unit> -> + TaskCode<'TOverall, 'a> + + member inline For: + tasksq: System.Collections.Generic.IAsyncEnumerable<'T> * body: ('T -> TaskCode<'TOverall, unit>) -> + TaskCode<'TOverall, unit> + From 135df8e26a3648d8ee1f57157b0d994cadd315ee Mon Sep 17 00:00:00 2001 From: Jimmy Byrd Date: Tue, 22 Nov 2022 00:27:30 -0500 Subject: [PATCH 5/7] WhileAsync statically compiled --- .../TaskSeq.Extensions.Tests.fs | 31 ++++++++------ src/FSharp.Control.TaskSeq/TaskSeq.fs | 42 ++++++++++++++++--- src/FSharp.Control.TaskSeq/TaskSeq.fsi | 4 +- 3 files changed, 57 insertions(+), 20 deletions(-) diff --git a/src/FSharp.Control.TaskSeq.Test/TaskSeq.Extensions.Tests.fs b/src/FSharp.Control.TaskSeq.Test/TaskSeq.Extensions.Tests.fs index 0b30e575..6fc7ad6a 100644 --- a/src/FSharp.Control.TaskSeq.Test/TaskSeq.Extensions.Tests.fs +++ b/src/FSharp.Control.TaskSeq.Test/TaskSeq.Extensions.Tests.fs @@ -13,16 +13,21 @@ open FSharp.Control // -// module TaskBuilder = -// open TaskSeq.Tests - -// [)>] -// let ``TaskSeq-existsAsync happy path last item of seq`` variant = -// task { -// let values = Gen.getSeqImmutable variant -// let mutable sum = 0 -// for x in values do -// sum <- sum + x -// } - // |> TaskSeq.existsAsync (fun x -> task { return x = 10 }) - // |> Task.map (should be True) +module TaskBuilder = + open TaskSeq.Tests + + [)>] + let ``TaskSeq-existsAsync happy path last item of seq`` variant = + task { + let values = Gen.getSeqImmutable variant + + let mutable sum = 0 + for x in values do + sum <- sum + x + + // let! expected = + // (0, values) + // ||> TaskSeq.fold((+)) + Assert.Equal(55, sum) + } + diff --git a/src/FSharp.Control.TaskSeq/TaskSeq.fs b/src/FSharp.Control.TaskSeq/TaskSeq.fs index 2397c8eb..7ea9c794 100644 --- a/src/FSharp.Control.TaskSeq/TaskSeq.fs +++ b/src/FSharp.Control.TaskSeq/TaskSeq.fs @@ -376,6 +376,8 @@ module AsyncSeqExtensions = false open Microsoft.FSharp.Core.CompilerServices + open Microsoft.FSharp.Core.CompilerServices.StateMachineHelpers + open Microsoft.FSharp.Core.LanguagePrimitives.IntrinsicOperators // Add asynchronous for loop to the 'async' computation builder type Microsoft.FSharp.Control.AsyncBuilder with @@ -389,14 +391,44 @@ module AsyncSeqExtensions = type Microsoft.FSharp.Control.TaskBuilder with - member inline this.While(condition : unit -> ValueTask, body : TaskCode<'TOverall,unit>) = - TaskCode<_,_>(fun sm -> - WhileDynamic(&sm, condition, body) + member inline _.WhileAsync + ( + [] condition: unit -> ValueTask, + body: TaskCode<_,unit> + ) : TaskCode<_,_> = + let mutable condition_res = true + + ResumableCode.While( + (fun () -> condition_res), + ResumableCode<_, _>(fun sm -> + let mutable __stack_condition_fin = true + let __stack_vtask = condition () + + let mutable awaiter = __stack_vtask.GetAwaiter() + if awaiter.IsCompleted then + // logInfo "at WhileAsync: returning completed task" + + __stack_condition_fin <- true + condition_res <- __stack_vtask.Result + else + // logInfo "at WhileAsync: awaiting non-completed task" - ) + // This will yield with __stack_fin = false + // This will resume with __stack_fin = true + let __stack_yield_fin = ResumableCode.Yield().Invoke(&sm) + __stack_condition_fin <- __stack_yield_fin + if __stack_condition_fin then + condition_res <- awaiter.GetResult() + if __stack_condition_fin then + if condition_res then body.Invoke(&sm) else true + else + sm.Data.MethodBuilder.AwaitUnsafeOnCompleted(&awaiter, &sm) + false) + ) + member inline this.For ( tasksq: IAsyncEnumerable<'T>, @@ -409,7 +441,7 @@ module AsyncSeqExtensions = tasksq.GetAsyncEnumerator(CancellationToken()), (fun e -> let next () = e.MoveNextAsync() - this.While(next, (fun sm -> (body e.Current).Invoke(&sm)))) + this.WhileAsync(next, (fun sm -> (body e.Current).Invoke(&sm)))) ) .Invoke(&sm)) diff --git a/src/FSharp.Control.TaskSeq/TaskSeq.fsi b/src/FSharp.Control.TaskSeq/TaskSeq.fsi index cfa4281f..ef7ff24d 100644 --- a/src/FSharp.Control.TaskSeq/TaskSeq.fsi +++ b/src/FSharp.Control.TaskSeq/TaskSeq.fsi @@ -589,9 +589,9 @@ module AsyncSeqExtensions = type TaskBuilder with - member inline While: + member inline WhileAsync: condition: (unit -> System.Threading.Tasks.ValueTask) * body: TaskCode<'TOverall, unit> -> - TaskCode<'TOverall, 'a> + TaskCode<'TOverall, unit> member inline For: tasksq: System.Collections.Generic.IAsyncEnumerable<'T> * body: ('T -> TaskCode<'TOverall, unit>) -> From 2355c6ee408faa06cc054d554921b24957ce19f5 Mon Sep 17 00:00:00 2001 From: Jimmy Byrd Date: Tue, 22 Nov 2022 20:43:36 -0500 Subject: [PATCH 6/7] 3 different ways to possibly implement WhileAsync --- src/FSharp.Control.TaskSeq/TaskSeq.fs | 27 +++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/src/FSharp.Control.TaskSeq/TaskSeq.fs b/src/FSharp.Control.TaskSeq/TaskSeq.fs index 7ea9c794..a75efba3 100644 --- a/src/FSharp.Control.TaskSeq/TaskSeq.fs +++ b/src/FSharp.Control.TaskSeq/TaskSeq.fs @@ -400,7 +400,7 @@ module AsyncSeqExtensions = ResumableCode.While( (fun () -> condition_res), - ResumableCode<_, _>(fun sm -> + TaskCode<_, _>(fun sm -> let mutable __stack_condition_fin = true let __stack_vtask = condition () @@ -409,7 +409,7 @@ module AsyncSeqExtensions = // logInfo "at WhileAsync: returning completed task" __stack_condition_fin <- true - condition_res <- __stack_vtask.Result + condition_res <- awaiter.GetResult() else // logInfo "at WhileAsync: awaiting non-completed task" @@ -432,16 +432,31 @@ module AsyncSeqExtensions = member inline this.For ( tasksq: IAsyncEnumerable<'T>, - body: 'T -> TaskCode<'TOverall, unit> - ) : TaskCode<'TOverall, unit> = + body: 'T -> TaskCode<_, unit> + ) : TaskCode<_, unit> = + // tasksq + // |> TaskSeq.iterAsync (body >> task.Run) + // |> task.ReturnFrom + + // task.ReturnFrom <| + // task { + // let mutable continueWhile = true + // use e = tasksq.GetAsyncEnumerator() + // while continueWhile do + // let! next = e.MoveNextAsync() + // if next then + // do! task.Run(body e.Current) + // else + // continueWhile <- false + // } + TaskCode<'TOverall, unit>(fun sm -> this .Using( tasksq.GetAsyncEnumerator(CancellationToken()), (fun e -> - let next () = e.MoveNextAsync() - this.WhileAsync(next, (fun sm -> (body e.Current).Invoke(&sm)))) + this.WhileAsync(e.MoveNextAsync, (fun sm -> (body e.Current).Invoke(&sm)))) ) .Invoke(&sm)) From c2d117c75a0ff00b48539b62e0b7a39d7ef6ee79 Mon Sep 17 00:00:00 2001 From: Jimmy Byrd Date: Mon, 21 Nov 2022 22:34:31 -0500 Subject: [PATCH 7/7] WhileDynamic for task files --- src/FSharp.Control.TaskSeq.Test/TaskSeq.Extensions.Tests.fs | 1 - src/FSharp.Control.TaskSeq/TaskSeq.fsi | 1 - 2 files changed, 2 deletions(-) diff --git a/src/FSharp.Control.TaskSeq.Test/TaskSeq.Extensions.Tests.fs b/src/FSharp.Control.TaskSeq.Test/TaskSeq.Extensions.Tests.fs index 6fc7ad6a..7cbd1b08 100644 --- a/src/FSharp.Control.TaskSeq.Test/TaskSeq.Extensions.Tests.fs +++ b/src/FSharp.Control.TaskSeq.Test/TaskSeq.Extensions.Tests.fs @@ -30,4 +30,3 @@ module TaskBuilder = // ||> TaskSeq.fold((+)) Assert.Equal(55, sum) } - diff --git a/src/FSharp.Control.TaskSeq/TaskSeq.fsi b/src/FSharp.Control.TaskSeq/TaskSeq.fsi index ef7ff24d..1a2e6ea2 100644 --- a/src/FSharp.Control.TaskSeq/TaskSeq.fsi +++ b/src/FSharp.Control.TaskSeq/TaskSeq.fsi @@ -596,4 +596,3 @@ module AsyncSeqExtensions = member inline For: tasksq: System.Collections.Generic.IAsyncEnumerable<'T> * body: ('T -> TaskCode<'TOverall, unit>) -> TaskCode<'TOverall, unit> -