diff --git a/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj b/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj index 7eb542e8..86030a28 100644 --- a/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj +++ b/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj @@ -2,12 +2,10 @@ net6.0 - false false ..\..\assets\TaskSeq.ico - @@ -47,9 +45,9 @@ + - @@ -67,9 +65,7 @@ all - - diff --git a/src/FSharp.Control.TaskSeq.Test/TaskSeq.Extensions.Tests.fs b/src/FSharp.Control.TaskSeq.Test/TaskSeq.Extensions.Tests.fs new file mode 100644 index 00000000..7cbd1b08 --- /dev/null +++ b/src/FSharp.Control.TaskSeq.Test/TaskSeq.Extensions.Tests.fs @@ -0,0 +1,32 @@ +module TaskSeq.Extenions + +open System +open Xunit +open FsUnit.Xunit +open FsToolkit.ErrorHandling + +open FSharp.Control + +// +// TaskSeq.except +// TaskSeq.exceptOfSeq +// + + +module TaskBuilder = + open TaskSeq.Tests + + [)>] + let ``TaskSeq-existsAsync happy path last item of seq`` variant = + task { + let values = Gen.getSeqImmutable variant + + let mutable sum = 0 + for x in values do + sum <- sum + x + + // let! expected = + // (0, values) + // ||> TaskSeq.fold((+)) + Assert.Equal(55, sum) + } diff --git a/src/FSharp.Control.TaskSeq/TaskSeq.fs b/src/FSharp.Control.TaskSeq/TaskSeq.fs index 8e4ae80c..a75efba3 100644 --- a/src/FSharp.Control.TaskSeq/TaskSeq.fs +++ b/src/FSharp.Control.TaskSeq/TaskSeq.fs @@ -4,6 +4,8 @@ open System.Collections.Generic open System.Threading open System.Threading.Tasks +#nowarn "57" + module TaskSeq = // F# BUG: the following module is 'AutoOpen' and this isn't needed in the Tests project. Why do we need to open it? open FSharp.Control.TaskSeqBuilders @@ -324,3 +326,144 @@ module TaskSeq = let fold folder state source = Internal.fold (FolderAction folder) state source let foldAsync folder state source = Internal.fold (AsyncFolderAction folder) state source + +#nowarn "1204" +#nowarn "3513" + + +[] +module AsyncSeqExtensions = + + let rec WhileDynamic + ( + sm: byref>, + condition: unit -> ValueTask, + body: TaskCode<'Data, unit> + ) : bool = + let vt = condition () + TaskBuilderBase.BindDynamic(&sm, vt, fun result -> + TaskCode<_,_>(fun sm -> + if result then + if body.Invoke(&sm) then + WhileDynamic(&sm, condition, body) + else + let rf = sm.ResumptionDynamicInfo.ResumptionFunc + + sm.ResumptionDynamicInfo.ResumptionFunc <- + (TaskResumptionFunc<'Data>(fun sm -> WhileBodyDynamicAux(&sm, condition, body, rf))) + + false + else + true + ) + ) + + + and WhileBodyDynamicAux + ( + sm: byref>, + condition: unit -> ValueTask, + body: TaskCode<'Data, unit>, + rf: TaskResumptionFunc<_> + ) : bool = + if rf.Invoke(&sm) then + WhileDynamic(&sm, condition, body) + else + let rf = sm.ResumptionDynamicInfo.ResumptionFunc + + sm.ResumptionDynamicInfo.ResumptionFunc <- + (TaskResumptionFunc<'Data>(fun sm -> WhileBodyDynamicAux(&sm, condition, body, rf))) + + false + open Microsoft.FSharp.Core.CompilerServices + open Microsoft.FSharp.Core.CompilerServices.StateMachineHelpers + open Microsoft.FSharp.Core.LanguagePrimitives.IntrinsicOperators + + // Add asynchronous for loop to the 'async' computation builder + type Microsoft.FSharp.Control.AsyncBuilder with + + member x.For(tasksq: IAsyncEnumerable<'T>, action: 'T -> Async) = + tasksq + |> TaskSeq.iterAsync (action >> Async.StartAsTask) + |> Async.AwaitTask + + // Add asynchronous for loop to the 'task' computation builder + type Microsoft.FSharp.Control.TaskBuilder with + + + member inline _.WhileAsync + ( + [] condition: unit -> ValueTask, + body: TaskCode<_,unit> + ) : TaskCode<_,_> = + let mutable condition_res = true + + ResumableCode.While( + (fun () -> condition_res), + TaskCode<_, _>(fun sm -> + let mutable __stack_condition_fin = true + let __stack_vtask = condition () + + let mutable awaiter = __stack_vtask.GetAwaiter() + if awaiter.IsCompleted then + // logInfo "at WhileAsync: returning completed task" + + __stack_condition_fin <- true + condition_res <- awaiter.GetResult() + else + // logInfo "at WhileAsync: awaiting non-completed task" + + // This will yield with __stack_fin = false + // This will resume with __stack_fin = true + let __stack_yield_fin = ResumableCode.Yield().Invoke(&sm) + __stack_condition_fin <- __stack_yield_fin + + if __stack_condition_fin then + condition_res <- awaiter.GetResult() + + + if __stack_condition_fin then + if condition_res then body.Invoke(&sm) else true + else + sm.Data.MethodBuilder.AwaitUnsafeOnCompleted(&awaiter, &sm) + false) + ) + + member inline this.For + ( + tasksq: IAsyncEnumerable<'T>, + body: 'T -> TaskCode<_, unit> + ) : TaskCode<_, unit> = + // tasksq + // |> TaskSeq.iterAsync (body >> task.Run) + // |> task.ReturnFrom + + // task.ReturnFrom <| + // task { + // let mutable continueWhile = true + // use e = tasksq.GetAsyncEnumerator() + // while continueWhile do + // let! next = e.MoveNextAsync() + // if next then + // do! task.Run(body e.Current) + // else + // continueWhile <- false + // } + + TaskCode<'TOverall, unit>(fun sm -> + + this + .Using( + tasksq.GetAsyncEnumerator(CancellationToken()), + (fun e -> + this.WhileAsync(e.MoveNextAsync, (fun sm -> (body e.Current).Invoke(&sm)))) + ) + .Invoke(&sm)) + + let foo () = + task { + let mutable sum = 0 + let xs = taskSeq { 1; 2; 3} + for x in xs do + sum <- sum + x + } diff --git a/src/FSharp.Control.TaskSeq/TaskSeq.fsi b/src/FSharp.Control.TaskSeq/TaskSeq.fsi index aeae538d..1a2e6ea2 100644 --- a/src/FSharp.Control.TaskSeq/TaskSeq.fsi +++ b/src/FSharp.Control.TaskSeq/TaskSeq.fsi @@ -1,5 +1,7 @@ namespace FSharp.Control +#nowarn "1204" + module TaskSeq = open System.Collections.Generic open System.Threading.Tasks @@ -562,3 +564,35 @@ module TaskSeq = /// If the accumulator function does not need to be asynchronous, consider using . /// val foldAsync: folder: ('State -> 'T -> #Task<'State>) -> state: 'State -> source: taskSeq<'T> -> Task<'State> + + + +[] +module AsyncSeqExtensions = + + val WhileDynamic: + sm: byref> * + condition: (unit -> System.Threading.Tasks.ValueTask) * + body: TaskCode<'Data, unit> -> + bool + + val WhileBodyDynamicAux: + sm: byref> * + condition: (unit -> System.Threading.Tasks.ValueTask) * + body: TaskCode<'Data, unit> * + rf: TaskResumptionFunc<'Data> -> + bool + + type AsyncBuilder with + + member For: tasksq: System.Collections.Generic.IAsyncEnumerable<'T> * action: ('T -> Async) -> Async + + type TaskBuilder with + + member inline WhileAsync: + condition: (unit -> System.Threading.Tasks.ValueTask) * body: TaskCode<'TOverall, unit> -> + TaskCode<'TOverall, unit> + + member inline For: + tasksq: System.Collections.Generic.IAsyncEnumerable<'T> * body: ('T -> TaskCode<'TOverall, unit>) -> + TaskCode<'TOverall, unit>