Skip to content

Commit cf6beab

Browse files
author
mochatek
committed
fix(createReadStream): Default batch size and change method name
1 parent 9846a4d commit cf6beab

File tree

3 files changed

+13
-6
lines changed

3 files changed

+13
-6
lines changed

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ const readableStream = Readable.fromWeb(
5252
const streamer = await createReadStream(readableStream);
5353

5454
// Stream JSON array elements in batches of 100
55-
for await (const chunk of streamer.stream(100)) {
55+
for await (const elements of streamer.batch(100)) {
5656
// Your processing logic here
5757
}
5858
```
@@ -67,8 +67,8 @@ const streamer = await createReadStream<{ offer: boolean; price: number }>(
6767
{ encoding: "utf-8" }
6868
);
6969

70-
// Add filter to the stream to fetch only relevant elements
71-
for await (const chunk of streamer.stream(
70+
// Add filter to the batch to fetch only relevant elements
71+
for await (const elements of streamer.batch(
7272
100,
7373
(element) => element.price < 500 || element.offer
7474
)) {

package-lock.json

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/modules/JsonArrayStreamer.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ class JsonArrayStreamer<T> {
138138
}
139139
}
140140

141-
public async *stream(chunkSize: number, filter?: (element: T) => boolean) {
141+
public async *batch(batchSize?: number, filter?: (element: T) => boolean) {
142142
try {
143143
characterStream: for await (const chunk of this.chunkGenerator()) {
144144
for (let char of chunk) {
@@ -190,9 +190,13 @@ class JsonArrayStreamer<T> {
190190

191191
this.elementParser(char, filter);
192192

193-
if (this.resultBuffer.length === chunkSize) {
193+
const targetSize = Math.max(
194+
1,
195+
batchSize || this.resultBuffer.length
196+
);
197+
if (this.resultBuffer.length === targetSize) {
194198
if (!this.readStream?.closed) this.readStream?.pause();
195-
yield this.resultBuffer.splice(0, chunkSize);
199+
yield this.resultBuffer.splice(0, targetSize);
196200
if (!this.readStream?.closed) this.readStream?.resume();
197201
}
198202
}

0 commit comments

Comments
 (0)