Skip to content

Commit a0a0ded

Browse files
authored
feat: Add exponential doubling capacity algorithm. (#21)
1 parent 084d6f0 commit a0a0ded

File tree

7 files changed

+161
-54
lines changed

7 files changed

+161
-54
lines changed

.circleci/config.yml

Lines changed: 0 additions & 43 deletions
This file was deleted.

.github/workflows/ci.yml

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
name: ci
2+
on:
3+
push:
4+
branches: [main, rlamb/exponential-doubling-capacity-scaling]
5+
paths-ignore:
6+
- '**.md' #Do not need to run CI for markdown changes.
7+
pull_request:
8+
branches: [main]
9+
paths-ignore:
10+
- '**.md'
11+
12+
jobs:
13+
build-test:
14+
runs-on: ubuntu-latest
15+
strategy:
16+
matrix:
17+
node: [14, latest]
18+
max-parallel: 1
19+
steps:
20+
- uses: actions/checkout@v3
21+
- uses: actions/setup-node@v3
22+
with:
23+
node-version: ${{ matrix.node }}
24+
- name: NPM Install
25+
run: npm install
26+
- name: Run Unit Tests
27+
run: npm run test
28+
- name: Build Contract Tests
29+
run: make build-contract-tests
30+
- name: Run Contract Tests
31+
run: |
32+
make start-contract-test-service > test-service.log 2>&1 & disown
33+
set -o pipefail
34+
make run-contract-tests | tee test-harness.log
35+
- name: Upload Test Service Logs
36+
uses: actions/upload-artifact@v3
37+
if: always()
38+
with:
39+
name: test-service-${{ matrix.node }}
40+
path: test-service.log
41+
- name: Upload Test Harness Logs
42+
uses: actions/upload-artifact@v3
43+
if: always()
44+
with:
45+
name: test-harness-${{ matrix.node }}
46+
path: test-harness.log

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@ yarn.lock
55
.nyc_output
66
/coverage
77
package-lock.json
8+
tmp

lib/capacity.js

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/**
2+
* Determine if a new capacity is needed for a buffer, and if it is, then what
3+
* that new capacity should be.
4+
*
5+
* @param {number} currentCapacity The current capacity of the buffer.
6+
* @param {number} requiredCapacity The required capacity from the buffer.
7+
* @param {number} maxOverAllocation The maximum extra capacity to allocate.
8+
* This is how much the capacity can exceed the required capacity. If the over
9+
* allocation exceeds this amount (from doubling), then instead the amount
10+
* over allocated will be equal to maxOverAllocation.
11+
*
12+
* @returns {[boolean, number]} Either [false, 0] if no allocation is needed, or [true, <capacity>] if an
13+
* allocation is needed.
14+
*/
15+
function CalculateCapacity (currentCapacity, requiredCapacity, maxOverAllocation) {
16+
if (requiredCapacity > currentCapacity) {
17+
let newCapacity = requiredCapacity
18+
19+
// Might as well start with a buffer of the maximum size that can be pooled.
20+
// It is unlikely the initial buffer would be small enough to encounter
21+
// this case.
22+
if (newCapacity < Buffer.poolSize) {
23+
newCapacity = Buffer.poolSize
24+
}
25+
26+
// Exponential doubling capacity scaling.
27+
let doubleCapacity = currentCapacity * 2
28+
if (newCapacity < doubleCapacity) {
29+
newCapacity = doubleCapacity
30+
}
31+
32+
// Doubling could become problematic over a certain size, so limit the total
33+
// amount of extra capacity allocated.
34+
const overAllocation = newCapacity - requiredCapacity
35+
if (overAllocation > maxOverAllocation) {
36+
newCapacity = requiredCapacity + maxOverAllocation
37+
}
38+
return [true, newCapacity]
39+
}
40+
return [false, 0]
41+
}
42+
43+
module.exports = CalculateCapacity

lib/eventsource.js

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
var retryDelay = require('./retry-delay')
2+
const CalculateCapacity = require('./capacity')
23

34
var parse = require('url').parse
45
var URL = require('url').URL
@@ -18,6 +19,8 @@ var space = 32
1819
var lineFeed = 10
1920
var carriageReturn = 13
2021

22+
const MAX_OVER_ALLOCATION = 1024 * 1024 // 1 MiB
23+
2124
function hasBom (buf) {
2225
return bom.every(function (charCode, index) {
2326
return buf[index] === charCode
@@ -211,7 +214,7 @@ function EventSource (url, eventSourceInitDict) {
211214
data = ''
212215
eventName = ''
213216
eventId = undefined
214-
217+
215218
readyState = EventSource.OPEN
216219
res.on('close', function () {
217220
res.removeAllListeners('close')
@@ -232,16 +235,31 @@ function EventSource (url, eventSourceInitDict) {
232235
var buf
233236
var startingPos = 0
234237
var startingFieldLength = -1
238+
let sizeUsed = 0
235239

236240
res.on('data', function (chunk) {
237-
buf = buf ? Buffer.concat([buf, chunk]) : chunk
238-
if (isFirst && hasBom(buf)) {
239-
buf = buf.slice(bom.length)
241+
if (!buf) {
242+
buf = chunk
243+
if (isFirst && hasBom(buf)) {
244+
buf = buf.slice(bom.length)
245+
sizeUsed -= bom.length
246+
}
247+
} else {
248+
// allocate new buffer
249+
const [resize, newCapacity] = CalculateCapacity(buf.length, chunk.length + sizeUsed, MAX_OVER_ALLOCATION)
250+
if (resize) {
251+
let newBuffer = Buffer.alloc(newCapacity)
252+
buf.copy(newBuffer, 0, 0, sizeUsed)
253+
buf = newBuffer
254+
}
255+
256+
chunk.copy(buf, sizeUsed)
240257
}
241258

259+
sizeUsed += chunk.length
242260
isFirst = false
243-
var pos = 0
244-
var length = buf.length
261+
let pos = 0
262+
const length = sizeUsed
245263

246264
while (pos < length) {
247265
if (discardTrailingNewline) {
@@ -285,15 +303,17 @@ function EventSource (url, eventSourceInitDict) {
285303

286304
if (pos === length) {
287305
buf = void 0
306+
sizeUsed = 0
288307
} else if (pos > 0) {
289308
buf = buf.slice(pos)
309+
sizeUsed = buf.length
290310
}
291311
})
292312
}
293313
var api = isSecure ? https : http
294-
req = urlAndOptions.url ?
295-
api.request(urlAndOptions.url, urlAndOptions.options, callback) :
296-
api.request(urlAndOptions.options, callback)
314+
req = urlAndOptions.url
315+
? api.request(urlAndOptions.url, urlAndOptions.options, callback)
316+
: api.request(urlAndOptions.options, callback)
297317

298318
if (config.readTimeoutMillis) {
299319
req.setTimeout(config.readTimeoutMillis)
@@ -376,7 +396,7 @@ function EventSource (url, eventSourceInitDict) {
376396
} else if (field === 'event') {
377397
eventName = value
378398
} else if (field === 'id') {
379-
if (!value.includes("\u0000")) {
399+
if (!value.includes('\u0000')) {
380400
eventId = value
381401
}
382402
} else if (field === 'retry') {

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@
4444
"scripts": {
4545
"test": "mocha --reporter spec --exit && standard 'src/**/*.js' 'test/**/*.js'",
4646
"polyfill": "webpack lib/eventsource-polyfill.js example/eventsource-polyfill.js",
47-
"coverage": "nyc --reporter=html --reporter=text _mocha --reporter spec"
47+
"coverage": "nyc --reporter=html --reporter=text _mocha --reporter spec",
48+
"act": "gh act --artifact-server-path=./tmp --env ACTIONS_RUNTIME_TOKEN=foo --env ACTIONS_RUNTIME_URL=http://localhost:8080/"
4849
},
4950
"engines": {
5051
"node": ">=0.12.0"

test/capacity_test.js

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
const CalculateCapacity = require('../lib/capacity')
2+
3+
const mocha = require('mocha')
4+
const assert = require('assert')
5+
const it = mocha.it
6+
7+
it('Uses the minimum capacity', () => {
8+
const [resize, newCapacity] = CalculateCapacity(0, 1, 1024 * 1024)
9+
assert.equal(resize, true)
10+
assert.equal(newCapacity, Buffer.poolSize)
11+
})
12+
13+
it('Does not increase capacity when there is sufficient capacity', () => {
14+
const [resize, newCapacity] = CalculateCapacity(1024, 1023, 1024 * 1024)
15+
16+
assert.equal(resize, false)
17+
assert.equal(newCapacity, 0)
18+
})
19+
20+
it('Uses exponential doubling capacity scaling', () => {
21+
const res1 = CalculateCapacity(8192, 8193, 1024 * 1024)
22+
assert.deepEqual(res1, [true, 16384])
23+
const res2 = CalculateCapacity(16384, 16385, 1024 * 1024)
24+
assert.deepEqual(res2, [true, 32768])
25+
})
26+
27+
it('Uses required capacity when it exceeds doubling', () => {
28+
const [resize, newCapacity] = CalculateCapacity(1024, 16384, 1024 * 1024)
29+
assert.equal(resize, true)
30+
assert.equal(newCapacity, 16384)
31+
})
32+
33+
it('Does not exceed max over allocation', () => {
34+
const capacity = 1024 * 1024 * 3
35+
const maxOverAllocation = 1024 * 1024
36+
const [resize, newCapacity] = CalculateCapacity(capacity, capacity + 1, 1024 * 1024)
37+
assert.equal(resize, true)
38+
assert.equal(newCapacity, capacity + maxOverAllocation + 1)
39+
})

0 commit comments

Comments
 (0)