From cb1b7cfd1a1c800e6fd8a1d07ef032ba7771aa4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ramirez=20Vargas=2C=20Jos=C3=A9=20Pablo?= Date: Wed, 24 Sep 2025 00:05:14 -0600 Subject: [PATCH] fix: Use duck typing to differentiate between a dedicated and a shared worker --- README.md | 70 +++++++++++++++++++++++++++----------- src/workers/AsyncWorker.ts | 19 ++++++++++- 2 files changed, 69 insertions(+), 20 deletions(-) diff --git a/README.md b/README.md index 6af63f1..6d9d734 100644 --- a/README.md +++ b/README.md @@ -1,25 +1,16 @@ # @wjfe/async-workers -> Provides thread-safe and atomic synchronization objects, and wrappers to easily use web workers with async/await -> syntax. +> Thread-safe, atomic synchronization objects and asynchronous worker wrappers for the browser and Node environments [Live Demo](https://wjsoftware.github.io/wjfe-async-workers) -> [!CAUTION] -> This NPM package is in its experimental phase. Features may be incomplete or still requiring thorough testing. +> **⚠️ Caution!** +> This NPM package has had minimal testing under NodeJS + web-worker. -## Introduction +## Objectives -Using web workers imply a call to `Worker.postMessge()` to signal the worker you want the work done, and then expect -some notification back via a listener in `Worker.onmessage` to at least know that the work completed, but usually to -get a result back in the form of data (the result of the work). This is just the core, though. You should also add -a listener to `Worker.onerror` just in case the worker has issues processing your request. Otherwise you'll be waiting -forever for the notification, ignorant that an error has occurred and nothing will ever be returned. - -Oh, but this is just on the user interface side. Then there's the matter of doing the web worker side. No point in -continuing the explanation. The point is made: This is incredibly cumbersome. Multi-threaded runtimes like .Net can -use `async/await` with threads and is far more convenient. The whole point of this NPM package is to bring this -convenience to the web workers world. +1. To provide friendly `async/await` syntax to the Node and web workers world. +2. To provide thread-safe, atomic synchronization objects like the ones found in other runtimes like .Net ## Quickstart @@ -30,7 +21,7 @@ to incoming messages. 2. Export the tasks worker object. 3. Create a new instance of `Worker` the way is recommended by your bundler, usually with the syntax `new Worker("./myworker.js", impot.meta.url)`. However, this forces you to write the worker in JavaScript, at least -for Vite-powered projects. +in Vite-powered projects. 4. Create a new instance of `AsyncWorker` (from this package) by passing the worker object and the tasks object from the previous points. 5. Start worker tasks by using the `AsyncWorker.enqueue` property. The functions found in this object return an object @@ -83,11 +74,13 @@ self.onmessage = workerListener(myWorker); This is a 3-step worker. The worker simply waits to be informed which step to run from the user interface thread. +> ℹ️ Worker tasks may take zero arguments. This is perfectly valid. + ### The Async Worker in the UI Thread This is what needs to be done in order to obtain an object that commands the worker: -> [!IMPORTANT] +> **⚡ Important** > This example is using TypeScript and the following assumes a Vite-powered project. We are deviating from the > recommended way of obtaining a worker because the recommended way requires the worker to be written in JavaScript > while in serve mode (`npm run dev`). @@ -120,6 +113,17 @@ Yes! The above is valid: You may queue up as many tasks as you wish without ha previous ones, even if the worker is asynchronous (uses `async/await`). The worker controller will keep perfect record of the order in which the tasks must be run. +This table shows through examples how various call signatures change from from worker to enqueue object: + +| Worker Function | Enqueue Function | +| - | - | +| `init(config: Configuration): void;` | `init(payload: Configuration, options?: QueueingOptions): WorkItem;` | +| `sort(array: MyData[]): MyData[];` | `sort(payload: MyData[], options?: QueueingOptions): WorkItem;` | +| `shutdown(): void;` | `shutdown(payload: void, options?: QueueingOptions): WorkItem;` | +| `primesBetween(payload: { a: number; b: number; }): number[];` | `primesBetween(payload: { a: number; b: number }): WorkItem;` | + +Task functions can only take zero or 1 parameter, so if more than one piece of data is needed as payload, pass an object with all the data, as the last example does. + ## Shared Workers Shared workers are also supported through the same `AsyncWorker` class. Note, however, the following key differences: @@ -139,6 +143,9 @@ self.onconnect = (ev) => { ## Bi-Directional Communication +> **🕓 TL;DR** +> It's OK for workers to transmit intermediate results like progress reports and partial results. It is not recommended for the main thread to have to send data to a paused task. Promises in work item objects resolve once the `QueueingOptions.processMessage()` function returns `true`. + The default functionality is fine for many cases: A worker task is started, the user interface waits for its completion and when the task finishes, the work item's `promise` property spits out the resultant object when awaited. @@ -214,7 +221,7 @@ const defaultRunningTotalWorkItem = myWorkerController.enqueue.calculateRunningT }); ``` -This is it. Bi-directional communcation is fully set up. +This is it. Bi-directional communication is fully set up. ### How About Sending Something Back? @@ -246,6 +253,31 @@ export const myWorker = { Inside `processMessage`, do `myWorkerController.enqueue.supplyMissingOrUpdatedDataWhileInTheAir(theData, { outOfOrder: true })` and hope for the best. +## Worker Task Cancellation + +To fully understand, read [this topic](#cancellationsource) in the section about synchronization objects (coming up next), and also [this other topic about WorkItem class](#the-workitem-class). + +To use a cancellation token, specify the `cancellable` option when the task is enqueued: + +```typescript +const workItem = myWorkerController.enqueue.doSomething(somePayload, { cancellable: true }); +... +// If so needed, cancel at some point. +workItem.cancel(); +... +// At any point where the promise is awaited, an error will be thrown. +try { + const result = await workItem.promise; +} +catch (err: unknown) { + if (err instanceof CancelledMessage) { + // The task was cancelled. + } +} +``` + +To learn about the implementation in the worker side, keep reading. + ## Synchronization Objects This package provides synchronization objects that use `Atomics` to cross the thread boundary safely. @@ -286,7 +318,7 @@ is already simplified. Taking one line from the quickstart example: const defaultRunningTotalWorkItem = myWorkerController.enqueue.calculateRunningTotal(undefined, { cancellable: true }); ``` -By adding the `cancellable` option, a cancellation token will be avilable to `calculateRunningTotal` in its third +By adding the `cancellable` option, a cancellation token will be available to `calculateRunningTotal` in its third parameter, as seen in the previous section. Whenever cancellation is desired, simply call the work item's `cancel()` method. For more information about this diff --git a/src/workers/AsyncWorker.ts b/src/workers/AsyncWorker.ts index e7a3720..c0741a3 100644 --- a/src/workers/AsyncWorker.ts +++ b/src/workers/AsyncWorker.ts @@ -6,6 +6,23 @@ import { InternalWorker } from "./InternalWorker.js"; import { WorkItem } from "./WorkItem.js"; import { WorkItemInternal } from "./WorkItemInternal.js"; +/** + * Determines if a worker-like object is a SharedWorker using duck typing. + * @param worker The worker object to test + * @returns true if the worker appears to be a SharedWorker, false otherwise + */ +function isSharedWorker(worker: any): worker is SharedWorker { + // SharedWorker has a 'port' property (MessagePort) with the required methods + return worker && + typeof worker === 'object' && + 'port' in worker && + worker.port && + typeof worker.port === 'object' && + typeof worker.port.postMessage === 'function' && + typeof worker.port.addEventListener === 'function' && + typeof worker.port.removeEventListener === 'function'; +} + export type EnqueueFn any) = (() => any)> = (payload: Fn extends () => any ? void : Parameters[0], options?: QueueingOptions) => WorkItem>; @@ -55,7 +72,7 @@ export class AsyncWorker any>> #taskRunning; #enqueue; constructor(worker: Worker | SharedWorker, tasks: Tasks) { - this.#iWorker = Object.getPrototypeOf(worker).name === 'Worker' ? new InternalWorker(worker as Worker) : new InternalSharedWorker(worker as SharedWorker); + this.#iWorker = isSharedWorker(worker) ? new InternalSharedWorker(worker) : new InternalWorker(worker as Worker); this.#queue = new Queue(); this.#taskRunning = false; this.#enqueue = Object.keys(tasks).reduce((prev, curr) => {