Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 51 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,25 +1,16 @@
# @wjfe/async-workers

> Provides thread-safe and atomic synchronization objects, and wrappers to easily use web workers with async/await
> syntax.
> Thread-safe, atomic synchronization objects and asynchronous worker wrappers for the browser and Node environments

[Live Demo](https://wjsoftware.github.io/wjfe-async-workers)

> [!CAUTION]
> This NPM package is in its experimental phase. Features may be incomplete or still requiring thorough testing.
> **⚠️ Caution!**
> This NPM package has had minimal testing under NodeJS + web-worker.

## Introduction
## Objectives

Using web workers imply a call to `Worker.postMessge()` to signal the worker you want the work done, and then expect
some notification back via a listener in `Worker.onmessage` to at least know that the work completed, but usually to
get a result back in the form of data (the result of the work). This is just the core, though. You should also add
a listener to `Worker.onerror` just in case the worker has issues processing your request. Otherwise you'll be waiting
forever for the notification, ignorant that an error has occurred and nothing will ever be returned.

Oh, but this is just on the user interface side. Then there's the matter of doing the web worker side. No point in
continuing the explanation. The point is made: This is incredibly cumbersome. Multi-threaded runtimes like .Net can
use `async/await` with threads and is far more convenient. The whole point of this NPM package is to bring this
convenience to the web workers world.
1. To provide friendly `async/await` syntax to the Node and web workers world.
2. To provide thread-safe, atomic synchronization objects like the ones found in other runtimes like .Net

## Quickstart

Expand All @@ -30,7 +21,7 @@ to incoming messages.
2. Export the tasks worker object.
3. Create a new instance of `Worker` the way is recommended by your bundler, usually with the syntax
`new Worker("./myworker.js", impot.meta.url)`. However, this forces you to write the worker in JavaScript, at least
for Vite-powered projects.
in Vite-powered projects.
4. Create a new instance of `AsyncWorker` (from this package) by passing the worker object and the tasks object from
the previous points.
5. Start worker tasks by using the `AsyncWorker.enqueue` property. The functions found in this object return an object
Expand Down Expand Up @@ -83,11 +74,13 @@ self.onmessage = workerListener(myWorker);

This is a 3-step worker. The worker simply waits to be informed which step to run from the user interface thread.

> ℹ️ Worker tasks may take zero arguments. This is perfectly valid.

### The Async Worker in the UI Thread

This is what needs to be done in order to obtain an object that commands the worker:

> [!IMPORTANT]
> **⚡ Important**
> This example is using TypeScript and the following assumes a Vite-powered project. We are deviating from the
> recommended way of obtaining a worker because the recommended way requires the worker to be written in JavaScript
> while in serve mode (`npm run dev`).
Expand Down Expand Up @@ -120,6 +113,17 @@ Yes! The above is valid: You may queue up as many tasks as you wish without ha
previous ones, even if the worker is asynchronous (uses `async/await`). The worker controller will keep perfect record
of the order in which the tasks must be run.

This table shows through examples how various call signatures change from from worker to enqueue object:

| Worker Function | Enqueue Function |
| - | - |
| `init(config: Configuration): void;` | `init(payload: Configuration, options?: QueueingOptions): WorkItem<void>;` |
| `sort(array: MyData[]): MyData[];` | `sort(payload: MyData[], options?: QueueingOptions): WorkItem<MyData[]>;` |
| `shutdown(): void;` | `shutdown(payload: void, options?: QueueingOptions): WorkItem<void>;` |
| `primesBetween(payload: { a: number; b: number; }): number[];` | `primesBetween(payload: { a: number; b: number }): WorkItem<number[]>;` |

Task functions can only take zero or 1 parameter, so if more than one piece of data is needed as payload, pass an object with all the data, as the last example does.

## Shared Workers

Shared workers are also supported through the same `AsyncWorker` class. Note, however, the following key differences:
Expand All @@ -139,6 +143,9 @@ self.onconnect = (ev) => {

## Bi-Directional Communication

> **🕓 TL;DR**
> It's OK for workers to transmit intermediate results like progress reports and partial results. It is not recommended for the main thread to have to send data to a paused task. Promises in work item objects resolve once the `QueueingOptions.processMessage()` function returns `true`.

The default functionality is fine for many cases: A worker task is started, the user interface waits for its
completion and when the task finishes, the work item's `promise` property spits out the resultant object when awaited.

Expand Down Expand Up @@ -214,7 +221,7 @@ const defaultRunningTotalWorkItem = myWorkerController.enqueue.calculateRunningT
});
```

This is it. Bi-directional communcation is fully set up.
This is it. Bi-directional communication is fully set up.

### How About Sending Something Back?

Expand Down Expand Up @@ -246,6 +253,31 @@ export const myWorker = {
Inside `processMessage`, do `myWorkerController.enqueue.supplyMissingOrUpdatedDataWhileInTheAir(theData, { outOfOrder: true })`
and hope for the best.

## Worker Task Cancellation

To fully understand, read [this topic](#cancellationsource) in the section about synchronization objects (coming up next), and also [this other topic about WorkItem class](#the-workitem-class).

To use a cancellation token, specify the `cancellable` option when the task is enqueued:

```typescript
const workItem = myWorkerController.enqueue.doSomething(somePayload, { cancellable: true });
...
// If so needed, cancel at some point.
workItem.cancel();
...
// At any point where the promise is awaited, an error will be thrown.
try {
const result = await workItem.promise;
}
catch (err: unknown) {
if (err instanceof CancelledMessage) {
// The task was cancelled.
}
}
```

To learn about the implementation in the worker side, keep reading.

## Synchronization Objects

This package provides synchronization objects that use `Atomics` to cross the thread boundary safely.
Expand Down Expand Up @@ -286,7 +318,7 @@ is already simplified. Taking one line from the quickstart example:
const defaultRunningTotalWorkItem = myWorkerController.enqueue.calculateRunningTotal(undefined, { cancellable: true });
```

By adding the `cancellable` option, a cancellation token will be avilable to `calculateRunningTotal` in its third
By adding the `cancellable` option, a cancellation token will be available to `calculateRunningTotal` in its third
parameter, as seen in the previous section.

Whenever cancellation is desired, simply call the work item's `cancel()` method. For more information about this
Expand Down
19 changes: 18 additions & 1 deletion src/workers/AsyncWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,23 @@ import { InternalWorker } from "./InternalWorker.js";
import { WorkItem } from "./WorkItem.js";
import { WorkItemInternal } from "./WorkItemInternal.js";

/**
* Determines if a worker-like object is a SharedWorker using duck typing.
* @param worker The worker object to test
* @returns true if the worker appears to be a SharedWorker, false otherwise
*/
function isSharedWorker(worker: any): worker is SharedWorker {
// SharedWorker has a 'port' property (MessagePort) with the required methods
return worker &&
typeof worker === 'object' &&
'port' in worker &&
worker.port &&
typeof worker.port === 'object' &&
typeof worker.port.postMessage === 'function' &&
typeof worker.port.addEventListener === 'function' &&
typeof worker.port.removeEventListener === 'function';
}

export type EnqueueFn<Fn extends ((...args: any[]) => any) = (() => any)> =
(payload: Fn extends () => any ? void : Parameters<Fn>[0], options?: QueueingOptions) => WorkItem<ReturnType<Fn>>;

Expand Down Expand Up @@ -55,7 +72,7 @@ export class AsyncWorker<Tasks extends Record<string, (...args: any[]) => any>>
#taskRunning;
#enqueue;
constructor(worker: Worker | SharedWorker, tasks: Tasks) {
this.#iWorker = Object.getPrototypeOf(worker).name === 'Worker' ? new InternalWorker(worker as Worker) : new InternalSharedWorker(worker as SharedWorker);
this.#iWorker = isSharedWorker(worker) ? new InternalSharedWorker(worker) : new InternalWorker(worker as Worker);
this.#queue = new Queue<WorkItemInternal>();
this.#taskRunning = false;
this.#enqueue = Object.keys(tasks).reduce((prev, curr) => {
Expand Down