Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions release-notes.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Release notes:

0.3.0 (unreleased)
- internal renames, improved doc comments, signature files for complex types, hide internal-only types, #111.
- adds support for static TaskLike, allowing the same let! and do! overloads that F# task supports, fixes #110.
- implements 'do!' for non-generic Task like with Task.Delay, fixes #43.
- adds support for 'for .. in ..' with task sequences in F# tasks and async, #75, #93 and #99 (with help from @theangrybyrd).
Expand Down
8 changes: 8 additions & 0 deletions src/FSharp.Control.TaskSeq/AssemblyInfo.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace TaskSeq.Tests

open System.Runtime.CompilerServices

// ensure the test project has access to the internal types
[<assembly: InternalsVisibleToAttribute("FSharp.Control.TaskSeq.Test")>]

do ()
55 changes: 55 additions & 0 deletions src/FSharp.Control.TaskSeq/DebugUtils.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
namespace FSharp.Control

open System.Threading.Tasks
open System
open System.Diagnostics
open System.Threading

type Debug =

[<DefaultValue(false)>]
static val mutable private verbose: bool option

/// Setting from environment variable TASKSEQ_LOG_VERBOSE, which,
/// when set, enables (very) verbose printing of flow and state
static member private getVerboseSetting() =
match Debug.verbose with
| None ->
let verboseEnv =
try
match Environment.GetEnvironmentVariable "TASKSEQ_LOG_VERBOSE" with
| null -> false
| x ->
match x.ToLowerInvariant().Trim() with
| "1"
| "true"
| "on"
| "yes" -> true
| _ -> false

with _ ->
false

Debug.verbose <- Some verboseEnv
verboseEnv

| Some setting -> setting

/// Private helper to log to stdout in DEBUG builds only
[<Conditional("DEBUG")>]
static member private print value =
match Debug.getVerboseSetting () with
| false -> ()
| true ->
// don't use ksprintf here, because the compiler does not remove all allocations due to
// the way PrintfFormat types are compiled, even if we set the Conditional attribute.
let ct = Thread.CurrentThread
printfn "%i (%b): %s" ct.ManagedThreadId ct.IsThreadPoolThread value

/// Log to stdout in DEBUG builds only
[<Conditional("DEBUG")>]
static member logInfo(str) = Debug.print str

/// Log to stdout in DEBUG builds only
[<Conditional("DEBUG")>]
static member logInfo(str, data) = Debug.print $"%s{str}{data}"
13 changes: 13 additions & 0 deletions src/FSharp.Control.TaskSeq/FSharp.Control.TaskSeq.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,27 @@ Generates optimized IL code through the new resumable state machines, and comes
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<OtherFlags></OtherFlags>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|AnyCPU'">
<OtherFlags></OtherFlags>
<Tailcalls>True</Tailcalls>
</PropertyGroup>

<ItemGroup>
<Content Include="..\..\release-notes.txt" Link="release-notes.txt" />
<None Include="..\..\assets\taskseq-icon.png">
<Pack>True</Pack>
<PackagePath>\</PackagePath>
</None>
<None Include="..\..\assets\nuget-package-readme.md" Pack="true" PackagePath="" />
<Compile Include="AssemblyInfo.fs" />
<Compile Include="DebugUtils.fs" />
<Compile Include="Utils.fsi" />
<Compile Include="Utils.fs" />
<Compile Include="TaskSeqBuilder.fsi" />
<Compile Include="TaskSeqBuilder.fs" />
<Compile Include="TaskSeqInternal.fs" />
<Compile Include="TaskSeq.fsi" />
Expand Down
71 changes: 36 additions & 35 deletions src/FSharp.Control.TaskSeq/TaskSeqBuilder.fs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ open FSharp.Control
[<AutoOpen>]
module Internal = // cannot be marked with 'internal' scope

/// Setting from environment variable TASKSEQ_LOG_VERBOSE, which,
/// when set, enables (very) verbose printing of flow and state
let initVerbose () =
try
match Environment.GetEnvironmentVariable "TASKSEQ_LOG_VERBOSE" with
Expand All @@ -37,10 +35,8 @@ module Internal = // cannot be marked with 'internal' scope
false


/// Call MoveNext on an IAsyncStateMachine by reference
let inline moveNextRef (x: byref<'T> when 'T :> IAsyncStateMachine) = x.MoveNext()

// F# requires that we implement interfaces even on an abstract class
let inline raiseNotImpl () =
NotImplementedException "Abstract Class: method or property not implemented"
|> raise
Expand Down Expand Up @@ -79,7 +75,7 @@ type TaskSeqStateMachineData<'T>() =

/// A reference to 'self', because otherwise we can't use byref in the resumable code.
[<DefaultValue(false)>]
val mutable boxedSelf: TaskSeq<'T>
val mutable boxedSelf: TaskSeqBase<'T>

member data.PushDispose(disposer: unit -> Task) =
if isNull data.disposalStack then
Expand All @@ -91,7 +87,7 @@ type TaskSeqStateMachineData<'T>() =
if not (isNull data.disposalStack) then
data.disposalStack.RemoveAt(data.disposalStack.Count - 1)

and [<AbstractClass; NoEquality; NoComparison>] TaskSeq<'T>() =
and [<AbstractClass; NoEquality; NoComparison>] TaskSeqBase<'T>() =

abstract MoveNextAsyncResult: unit -> ValueTask<bool>

Expand Down Expand Up @@ -119,7 +115,7 @@ and [<AbstractClass; NoEquality; NoComparison>] TaskSeq<'T>() =

and [<NoComparison; NoEquality>] TaskSeq<'Machine, 'T
when 'Machine :> IAsyncStateMachine and 'Machine :> IResumableStateMachine<TaskSeqStateMachineData<'T>>>() =
inherit TaskSeq<'T>()
inherit TaskSeqBase<'T>()
let initialThreadId = Environment.CurrentManagedThreadId

/// Shadows the initial machine, just after it is initialized by the F# compiler-generated state.
Expand Down Expand Up @@ -304,16 +300,16 @@ and [<NoComparison; NoEquality>] TaskSeq<'Machine, 'T
// assume it's a possibly new, not yet supported case, treat as default
ValueTask.ofIValueTaskSource this version

and TaskSeqCode<'T> = ResumableCode<TaskSeqStateMachineData<'T>, unit>
and ResumableTSC<'T> = ResumableCode<TaskSeqStateMachineData<'T>, unit>
and TaskSeqStateMachine<'T> = ResumableStateMachine<TaskSeqStateMachineData<'T>>
and TaskSeqResumptionFunc<'T> = ResumptionFunc<TaskSeqStateMachineData<'T>>
and TaskSeqResumptionDynamicInfo<'T> = ResumptionDynamicInfo<TaskSeqStateMachineData<'T>>

type TaskSeqBuilder() =

member inline _.Delay(f: unit -> TaskSeqCode<'T>) : TaskSeqCode<'T> = TaskSeqCode<'T>(fun sm -> f().Invoke(&sm))
member inline _.Delay(f: unit -> ResumableTSC<'T>) : ResumableTSC<'T> = ResumableTSC<'T>(fun sm -> f().Invoke(&sm))

member inline _.Run(code: TaskSeqCode<'T>) : IAsyncEnumerable<'T> =
member inline _.Run(code: ResumableTSC<'T>) : IAsyncEnumerable<'T> =
if __useResumableCode then
// This is the static implementation. A new struct type is created.
__stateMachine<TaskSeqStateMachineData<'T>, IAsyncEnumerable<'T>>
Expand Down Expand Up @@ -386,11 +382,11 @@ type TaskSeqBuilder() =
|> raise


member inline _.Zero() : TaskSeqCode<'T> =
member inline _.Zero() : ResumableTSC<'T> =
Debug.logInfo "at Zero()"
ResumableCode.Zero()

member inline _.Combine(task1: TaskSeqCode<'T>, task2: TaskSeqCode<'T>) : TaskSeqCode<'T> =
member inline _.Combine(task1: ResumableTSC<'T>, task2: ResumableTSC<'T>) : ResumableTSC<'T> =
Debug.logInfo "at Combine(.., ..)"

ResumableCode.Combine(task1, task2)
Expand All @@ -399,8 +395,8 @@ type TaskSeqBuilder() =
member inline _.WhileAsync
(
[<InlineIfLambda>] condition: unit -> ValueTask<bool>,
body: TaskSeqCode<'T>
) : TaskSeqCode<'T> =
body: ResumableTSC<'T>
) : ResumableTSC<'T> =
let mutable condition_res = true

ResumableCode.While(
Expand Down Expand Up @@ -436,17 +432,17 @@ type TaskSeqBuilder() =
false)
)

member inline b.While([<InlineIfLambda>] condition: unit -> bool, body: TaskSeqCode<'T>) : TaskSeqCode<'T> =
member inline b.While([<InlineIfLambda>] condition: unit -> bool, body: ResumableTSC<'T>) : ResumableTSC<'T> =
Debug.logInfo "at While(...)"
ResumableCode.While(condition, body)

member inline _.TryWith(body: TaskSeqCode<'T>, catch: exn -> TaskSeqCode<'T>) : TaskSeqCode<'T> =
member inline _.TryWith(body: ResumableTSC<'T>, catch: exn -> ResumableTSC<'T>) : ResumableTSC<'T> =
ResumableCode.TryWith(body, catch)

member inline _.TryFinallyAsync(body: TaskSeqCode<'T>, compensation: unit -> Task) : TaskSeqCode<'T> =
member inline _.TryFinallyAsync(body: ResumableTSC<'T>, compensation: unit -> Task) : ResumableTSC<'T> =
ResumableCode.TryFinallyAsync(

TaskSeqCode<'T>(fun sm ->
ResumableTSC<'T>(fun sm ->
sm.Data.PushDispose(fun () -> compensation ())
body.Invoke(&sm)),

Expand All @@ -467,9 +463,9 @@ type TaskSeqBuilder() =
__stack_condition_fin)
)

member inline _.TryFinally(body: TaskSeqCode<'T>, compensation: unit -> unit) : TaskSeqCode<'T> =
member inline _.TryFinally(body: ResumableTSC<'T>, compensation: unit -> unit) : ResumableTSC<'T> =
ResumableCode.TryFinally(
TaskSeqCode<'T>(fun sm ->
ResumableTSC<'T>(fun sm ->
sm.Data.PushDispose(fun () ->
compensation ()
Task.CompletedTask)
Expand All @@ -482,7 +478,7 @@ type TaskSeqBuilder() =
true)
)

member inline this.Using(disp: #IAsyncDisposable, body: #IAsyncDisposable -> TaskSeqCode<'T>) : TaskSeqCode<'T> =
member inline this.Using(disp: #IAsyncDisposable, body: #IAsyncDisposable -> ResumableTSC<'T>) : ResumableTSC<'T> =

// A using statement is just a try/finally with the finally block disposing if non-null.
this.TryFinallyAsync(
Expand All @@ -494,8 +490,8 @@ type TaskSeqBuilder() =
Task.CompletedTask)
)

member inline _.Yield(v: 'T) : TaskSeqCode<'T> =
TaskSeqCode<'T>(fun sm ->
member inline _.Yield(v: 'T) : ResumableTSC<'T> =
ResumableTSC<'T>(fun sm ->
// This will yield with __stack_fin = false
// This will resume with __stack_fin = true
Debug.logInfo "at Yield"
Expand Down Expand Up @@ -546,10 +542,10 @@ module LowPriority =
and ^Awaiter: (member GetResult: unit -> 'TResult1)>
(
task: ^TaskLike,
continuation: ('TResult1 -> TaskSeqCode<'TResult2>)
) : TaskSeqCode<'TResult2> =
continuation: ('TResult1 -> ResumableTSC<'TResult2>)
) : ResumableTSC<'TResult2> =

TaskSeqCode<'TResult2>(fun sm ->
ResumableTSC<'TResult2>(fun sm ->
let mutable awaiter = (^TaskLike: (member GetAwaiter: unit -> ^Awaiter) (task))
let mutable __stack_fin = true

Expand Down Expand Up @@ -581,7 +577,7 @@ module LowPriority =
module MediumPriority =
type TaskSeqBuilder with

member inline this.Using(disp: #IDisposable, body: #IDisposable -> TaskSeqCode<'T>) : TaskSeqCode<'T> =
member inline this.Using(disp: #IDisposable, body: #IDisposable -> ResumableTSC<'T>) : ResumableTSC<'T> =

// A using statement is just a try/finally with the finally block disposing if non-null.
this.TryFinally(
Expand All @@ -592,22 +588,22 @@ module MediumPriority =
disp.Dispose())
)

member inline this.For(sequence: seq<'TElement>, body: 'TElement -> TaskSeqCode<'T>) : TaskSeqCode<'T> =
member inline this.For(sequence: seq<'TElement>, body: 'TElement -> ResumableTSC<'T>) : ResumableTSC<'T> =
// A for loop is just a using statement on the sequence's enumerator...
this.Using(
sequence.GetEnumerator(),
// ... and its body is a while loop that advances the enumerator and runs the body on each element.
(fun e -> this.While((fun () -> e.MoveNext()), (fun sm -> (body e.Current).Invoke(&sm))))
)

member inline this.YieldFrom(source: seq<'T>) : TaskSeqCode<'T> = this.For(source, (fun v -> this.Yield(v)))
member inline this.YieldFrom(source: seq<'T>) : ResumableTSC<'T> = this.For(source, (fun v -> this.Yield(v)))

member inline this.For
(
source: #IAsyncEnumerable<'TElement>,
body: 'TElement -> TaskSeqCode<'T>
) : TaskSeqCode<'T> =
TaskSeqCode<'T>(fun sm ->
body: 'TElement -> ResumableTSC<'T>
) : ResumableTSC<'T> =
ResumableTSC<'T>(fun sm ->
this
.Using(
source.GetAsyncEnumerator(sm.Data.cancellationToken),
Expand All @@ -616,7 +612,7 @@ module MediumPriority =
)
.Invoke(&sm))

member inline this.YieldFrom(source: IAsyncEnumerable<'T>) : TaskSeqCode<'T> =
member inline this.YieldFrom(source: IAsyncEnumerable<'T>) : ResumableTSC<'T> =
this.For(source, (fun v -> this.Yield(v)))

[<AutoOpen>]
Expand All @@ -633,8 +629,8 @@ module HighPriority =
// - In contrast, ValueTask<_> *does have* GetResult() -> 'TResult
// - Conclusion: we do not need an extra overload anymore for ValueTask
//
member inline _.Bind(task: Task<'TResult1>, continuation: ('TResult1 -> TaskSeqCode<'T>)) : TaskSeqCode<'T> =
TaskSeqCode<'T>(fun sm ->
member inline _.Bind(task: Task<'TResult1>, continuation: ('TResult1 -> ResumableTSC<'T>)) : ResumableTSC<'T> =
ResumableTSC<'T>(fun sm ->
let mutable awaiter = task.GetAwaiter()
let mutable __stack_fin = true

Expand All @@ -660,3 +656,8 @@ module HighPriority =
sm.Data.awaiter <- awaiter
sm.Data.current <- ValueNone
false)

[<AutoOpen>]
module TaskSeqBuilder =
/// Builds an asynchronous task sequence based on IAsyncEnumerable<'T> using computation expression syntax.
let taskSeq = TaskSeqBuilder()
Loading