Skip to content

Commit e7e9975

Browse files
authored
Merge pull request #10 from grimmer0125/optioin_skipIfFull
Add option dropWhenReachLimit
2 parents 5761197 + b9d1021 commit e7e9975

File tree

5 files changed

+79
-25
lines changed

5 files changed

+79
-25
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,14 @@ All notable changes to this project will be documented in this file. See [standa
44

55
Those versions which only include documentation change might not be included here.
66

7+
### [1.7.0](https://github.com/grimmer0125/d4c-queue/compare/v1.6.9...v1.7.0) (2021-11-15)
8+
9+
Add option dropWhenReachLimit for better throtting effect.
10+
711
### [1.6.9](https://github.com/grimmer0125/d4c-queue/compare/v1.6.5...v1.6.9) (2021-10-03)
812

913
Update README. Fix potential security vulnerabilities in dependencies and gh-pages publishing.
14+
1015
### [1.6.5](https://github.com/grimmer0125/d4c-queue/compare/v1.6.4...v1.6.5) (2021-07-13)
1116

1217
Update README and fix potential security vulnerabilities in dependencies.

README.md

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,14 @@ Wrap an [async](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Referenc
1818
8. Well tested.
1919
9. Optional parameter, `inheritPreErr`. If current task is waiting for previous tasks, set it as `true` to inherit the error of the previous task and the task will not be executed and throw a custom error `new PreviousError(task.preError.message ?? task.preError)`. If this parameter is omitted or set as `false`, the task will continue whether previous tasks happen errors or not.
2020
10. Optional parameter, `noBlockCurr`. Set it as `true` to forcibly execute the current task in the another (microtask) execution of the event loop. This is useful if you pass a sync function as the first task but do not want it to block the current event loop.
21+
11. Optional parameter, `dropWhenReachLimit`. Set it as `true`. Then it will be dropped when it is called but the system detects the queue concurrency limit is reached. It is like a kind of throttle mechanism but not time interval based. The dropped function call will not be really executed and will throw a execption whose message is `QueueIsFull` and you need to catch it.
2122

2223
## Installation
2324

2425
This package includes two builds.
2526

2627
- ES6 build (ES2015) with CommonJS module for `main` build in package.json.
27-
- ES6 build (ES2015) with ES6 module for `module` build. Some tools will follow the `module` field in `package.json`, like Rollup, Webpack, or Parcel. It is good to let build tools can tree-shake this module build to import only the code they need.
28+
- ES6 build (ES2015) with ES6 module for `module` build. Some tools will follow the `module` field in `package.json`, like Rollup, Webpack, or Parcel.
2829

2930
Either `npm install d4c-queue` or `yarn add d4c-queue`. Then import this package.
3031

@@ -126,18 +127,18 @@ d4c.apply(syncFun, { args: ['syncFun_arg1'] })
126127

127128
#### Concurrency mode
128129

129-
Is it useful for rate-limiting tasks. For example, setup some concurrency limit to avoid send GitHub GraphQL API requests too fast, since it has rate limits control.
130+
Is it useful for rate-limiting or throttling tasks. For example, setup some concurrency limit to avoid send GitHub GraphQL API requests too fast, since it has rate limits control.
130131

131132
Default concurrency limit of D4C instance is `1` in this library.
132133

133134
Usage:
134135

135136
```ts
136137
/** change concurrency limit applied on default queues */
137-
const d4c = new D4C([{ concurrency: { limit: 100 }}])
138+
const d4c = new D4C([{ concurrency: { limit: 100 } }])
138139

139140
/** setup concurrency for specific queue: "2" */
140-
const d4c = new D4C([{ concurrency: { limit: 100, tag: '2' }}])
141+
const d4c = new D4C([{ concurrency: { limit: 100, tag: '2' } }])
141142
```
142143

143144
You can adjust concurrency via `setConcurrency`.
@@ -151,6 +152,20 @@ d4c.setConcurrency([{ limit: 10 }])
151152
d4c.setConcurrency([{ limit: 10, tag: 'queue2' }])
152153
```
153154

155+
When this async task function is called and the system detects the concurrency limit is reached, this tasks will not be really executed and will be enqueued. If you want to drop this task function call, you can set `dropWhenReachLimit` option when wrapping/applying the task function. e.g.
156+
157+
```ts
158+
const fn1 = d4c.wrap(taskFun, { dropWhenReachLimit: true })
159+
160+
try {
161+
await fn1()
162+
} catch (err) {
163+
// when the concurrency limit is reached at this moment.
164+
// err.message is QueueIsFull
165+
console.log({ err })
166+
}
167+
```
168+
154169
### Decorators usage
155170

156171
#### Synchronization mode
@@ -188,7 +203,7 @@ class TestController {
188203
@concurrent
189204
static async fetchData(url: string) {}
190205

191-
@concurrent({ tag: '2' })
206+
@concurrent({ tag: '2', dropWhenReachLimit: true })
192207
async fetchData2(url: string) {}
193208

194209
/** You can still use @synchronized, as long as
@@ -453,6 +468,7 @@ function concurrent(option?: {
453468
tag?: string | symbol
454469
inheritPreErr?: boolean
455470
noBlockCurr?: boolean
471+
dropWhenReachLimit?: boolean
456472
}) {}
457473
```
458474
@@ -467,7 +483,7 @@ Example:
467483
@concurrent
468484
@concurrent()
469485
@concurrent({ tag: "world", inheritPreErr: true })
470-
@concurrent({ inheritPreErr: true, noBlockCurr: true })
486+
@concurrent({ inheritPreErr: true, noBlockCurr: true, dropWhenReachLimit: true })
471487

472488
```
473489
@@ -490,10 +506,10 @@ usage:
490506
const d4c = new D4C()
491507

492508
/** concurrency limit 500 applied on default queues */
493-
const d4c = new D4C([{ concurrency: { limit: 500 }}])
509+
const d4c = new D4C([{ concurrency: { limit: 500 } }])
494510

495511
/** setup concurrency for specific queue: "2" */
496-
const d4c = new D4C([{ concurrency: { limit: 100, tag: '2' }}])
512+
const d4c = new D4C([{ concurrency: { limit: 100, tag: '2' } }])
497513
```
498514
499515
- setConcurrency
@@ -513,6 +529,7 @@ public wrap<T extends IAnyFn>(
513529
tag?: string | symbol;
514530
inheritPreErr?: boolean;
515531
noBlockCurr?: boolean;
532+
dropWhenReachLimit?: boolean;
516533
}
517534
)
518535
```
@@ -530,6 +547,7 @@ public apply<T extends IAnyFn>(
530547
tag?: string | symbol;
531548
inheritPreErr?: boolean;
532549
noBlockCurr?: boolean;
550+
dropWhenReachLimit?: boolean;
533551
args?: Parameters<typeof func>;
534552
}
535553
)

package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "d4c-queue",
3-
"version": "1.6.9",
3+
"version": "1.7.0",
44
"description": "A task queue executes tasks sequentially or concurrently. Wrap an async/promise-returning/sync function as a queue-ready async function for easy reusing. Support passing arguments/getting return value, @synchronized/@concurrent decorator, Node.js/Browser.",
55
"main": "build/main/index.js",
66
"typings": "build/main/index.d.ts",
@@ -47,7 +47,7 @@
4747
"task-queue",
4848
"tasks",
4949
"task-runner",
50-
"microtask",
50+
"microtask",
5151
"angular"
5252
],
5353
"scripts": {

src/lib/D4C.spec.ts

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,18 @@ const funcPromise = (input: string[], input2: string): Promise<string> => {
2222
return Promise.resolve(input[0] + input2)
2323
}
2424

25-
const timeout = (seconds: number, target: { str: string }) => {
25+
const timeout = (seconds: number, target?: { str: string }) => {
2626
return new Promise<void>((resolve, _) =>
2727
setTimeout(() => {
28-
target.str += seconds
28+
if (target?.str != undefined && target?.str != null) {
29+
target.str += seconds
30+
}
2931
resolve()
3032
}, seconds * 100)
3133
)
3234
}
3335

34-
const timeoutError = (seconds: number, result) => {
36+
const timeoutError = (seconds: number, result: string | Error) => {
3537
return new Promise((_, reject) =>
3638
setTimeout(() => {
3739
reject(result)
@@ -606,13 +608,15 @@ test('Decorator usage', async (t) => {
606608
t.is(test.str, '0.10.5')
607609

608610
/** composite case: D4C instance on no autobind decorated method */
611+
let error = null
609612
try {
610613
const d4c = new D4C()
611614
const newFunc = d4c.wrap(testController.greet)
612615
const resp = await newFunc('')
613616
} catch (err) {
614-
t.is(err.message, ErrMsg.MissingThisDueBindIssue)
617+
error = err
615618
}
619+
t.is(error.message, ErrMsg.MissingThisDueBindIssue)
616620

617621
/** composite case: D4C instance on autobind decorated method */
618622
const d4c = new D4C()
@@ -647,7 +651,7 @@ test('Decorator usage', async (t) => {
647651
t.is(test.str, '0.10.5')
648652

649653
/** test invalid decorator */
650-
let error = null
654+
error = null
651655
try {
652656
class TestController4 {
653657
@synchronized({ tag: true } as any)
@@ -669,6 +673,7 @@ test('Decorator usage', async (t) => {
669673
// console.log(" err by purpose")
670674
}
671675
})()
676+
672677
error = null
673678
try {
674679
await testController.instanceTimeout(0.1, { str: '' })
@@ -840,3 +845,17 @@ test("Instance usage: option inheritPreErr enable: task2 inherit task1's error i
840845

841846
t.is(error.message, 'some_error')
842847
})
848+
849+
test('Instance usage: test option dropWhenReachLimit', async (t) => {
850+
const d4c = new D4C([{ concurrency: { limit: 2 } }])
851+
const fn1 = d4c.wrap(timeout, { dropWhenReachLimit: true })
852+
853+
let error = null
854+
try {
855+
await fn1(3)
856+
await Promise.all([fn1(3), fn1(3), fn1(3)])
857+
} catch (err) {
858+
error = err
859+
}
860+
t.is(error.message, ErrMsg.QueueIsFull)
861+
})

src/lib/D4C.ts

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ export enum ErrMsg {
4040
TwoDecoratorsIncompatible = 'TwoDecoratorsInCompatible',
4141
ClassAndMethodDecoratorsIncompatible = 'ClassAndMethodDecoratorsIncompatible',
4242
MissingThisDueBindIssue = 'missingThisDueBindIssue',
43+
QueueIsFull = 'QueueIsFull',
4344
}
4445

4546
const queueSymbol = Symbol('d4cQueues') // subQueue system
@@ -213,6 +214,8 @@ function checkIfDecoratorOptionObject(obj: any): boolean {
213214
(typeof obj.inheritPreErr === 'boolean' ||
214215
obj.inheritPreErr === undefined) &&
215216
(typeof obj.noBlockCurr === 'boolean' || obj.noBlockCurr === undefined) &&
217+
(typeof obj.dropWhenReachLimit === 'boolean' ||
218+
obj.dropWhenReachLimit === undefined) &&
216219
checkTag(obj.tag)
217220
) {
218221
return true
@@ -240,6 +243,7 @@ export function concurrent(option?: {
240243
tag?: string | symbol
241244
inheritPreErr?: boolean
242245
noBlockCurr?: boolean
246+
dropWhenReachLimit?: boolean
243247
}): MethodDecoratorType
244248
export function concurrent(
245249
targetOrOption?: any,
@@ -329,6 +333,7 @@ function _q<T extends IAnyFn>(
329333
tag?: QueueTag
330334
inheritPreErr?: boolean
331335
noBlockCurr?: boolean
336+
dropWhenReachLimit?: boolean
332337
}
333338
): (...args: Parameters<typeof func>) => Promise<UnwrapPromise<typeof func>> {
334339
return async function (...args: any[]): Promise<any> {
@@ -381,16 +386,21 @@ function _q<T extends IAnyFn>(
381386
let err: Error
382387
let task: Task
383388
if (taskQueue.runningTask === taskQueue.concurrency) {
384-
const promise = new Promise(function (resolve) {
385-
task = {
386-
unlock: resolve,
387-
preError: null,
388-
inheritPreErr: option?.inheritPreErr,
389-
}
390-
})
391-
taskQueue.queue.push(task)
392-
await promise
393-
taskQueue.runningTask += 1
389+
if (!option?.dropWhenReachLimit) {
390+
const promise = new Promise(function (resolve) {
391+
task = {
392+
unlock: resolve,
393+
preError: null,
394+
inheritPreErr: option?.inheritPreErr,
395+
}
396+
})
397+
taskQueue.queue.push(task)
398+
await promise
399+
taskQueue.runningTask += 1
400+
} else {
401+
// drop this time, throttle mechanism
402+
throw new Error(ErrMsg.QueueIsFull)
403+
}
394404
} else if (option?.noBlockCurr) {
395405
taskQueue.runningTask += 1
396406
await Promise.resolve()
@@ -528,6 +538,7 @@ export class D4C {
528538
tag?: string | symbol
529539
inheritPreErr?: boolean
530540
noBlockCurr?: boolean
541+
dropWhenReachLimit?: boolean
531542
args?: Parameters<typeof func>
532543
}
533544
): Promise<UnwrapPromise<typeof func>> {
@@ -542,6 +553,7 @@ export class D4C {
542553
tag?: string | symbol
543554
inheritPreErr?: boolean
544555
noBlockCurr?: boolean
556+
dropWhenReachLimit?: boolean
545557
}
546558
): (...args: Parameters<typeof func>) => Promise<UnwrapPromise<typeof func>> {
547559
if (!option || checkTag(option.tag)) {

0 commit comments

Comments
 (0)