Skip to content

Commit 3468baa

Browse files
authored
[storage] wrapper stream destroy source when needed (Azure#12899)
* RetriableReadableStream release source * _destroy callback with error * fix tests as in Node 8 close isn't emitted after calling readable.destroy() * retry on "error" and do not listen on "close" * remove event listener when needed * remove abortSignal support in RetriableReadableStream as it's handled by core-http * nit: increase timeout for syncUploadFromURL large content case * nit: use parallel upload and increase timeout for syncUploadFromURL large content case * changelog
1 parent 06be1a1 commit 3468baa

File tree

7 files changed

+257
-102
lines changed

7 files changed

+257
-102
lines changed

sdk/storage/storage-blob/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
## 12.4.1 (Unreleased)
44

55
- Fixed a compile failure due to "Can't resolve 'crypto'" in Angular. [Issue #13267](https://github.com/Azure/azure-sdk-for-js/issues/13267).
6+
- Fixed an issue that the download stream returned by `BlobClient.download` won't release underlying resources unless it's fully consumed. [Isssue #11850](https://github.com/Azure/azure-sdk-for-js/issues/11850).
67
- Fixed an error when listing blob with a metadata key of `_` [issue #9197](https://github.com/Azure/azure-sdk-for-js/issues/9171)
78
- The `"Unclosed root tag"` XML parser error is now retriable. [PR #13076](https://github.com/Azure/azure-sdk-for-js/pull/13076).
89

sdk/storage/storage-blob/recordings/node/highlevel/recording_download_abort_should_work_when_still_fetching_body.js

Lines changed: 23 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sdk/storage/storage-blob/src/Clients.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1543,7 +1543,6 @@ export class BlobClient extends StorageClient {
15431543
offset,
15441544
res.contentLength!,
15451545
{
1546-
abortSignal: options.abortSignal,
15471546
maxRetryRequests: options.maxRetryRequests,
15481547
onProgress: options.onProgress
15491548
}

sdk/storage/storage-blob/src/utils/RetriableReadableStream.ts

Lines changed: 89 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,9 @@
44
import { TransferProgressEvent } from "@azure/core-http";
55
import { Readable } from "stream";
66

7-
import { AbortSignal, AbortSignalLike, AbortError } from "@azure/abort-controller";
8-
97
export type ReadableStreamGetter = (offset: number) => Promise<NodeJS.ReadableStream>;
108

119
export interface RetriableReadableStreamOptions {
12-
/**
13-
* An implementation of the `AbortSignalLike` interface to signal the request to cancel the operation.
14-
* For example, use the &commat;azure/abort-controller to create an `AbortSignal`.
15-
*
16-
* @type {AbortSignalLike}
17-
* @memberof RetriableReadableStreamOptions
18-
*/
19-
abortSignal?: AbortSignalLike;
20-
2110
/**
2211
* Max retry count (>=0), undefined or invalid value means no retry
2312
*
@@ -46,9 +35,15 @@ export interface RetriableReadableStreamOptions {
4635
* @memberof RetriableReadableStreamOptions
4736
*/
4837
doInjectErrorOnce?: boolean;
49-
}
5038

51-
const ABORT_ERROR = new AbortError("The operation was aborted.");
39+
/**
40+
* A threshold, not a limit. Dictates the amount of data that a stream buffers before it stops asking for more data.
41+
*
42+
* @type {number}
43+
* @memberof RetriableReadableStreamOptions
44+
*/
45+
highWaterMark?: number;
46+
}
5247

5348
/**
5449
* ONLY AVAILABLE IN NODE.JS RUNTIME.
@@ -59,7 +54,6 @@ const ABORT_ERROR = new AbortError("The operation was aborted.");
5954
* @extends {Readable}
6055
*/
6156
export class RetriableReadableStream extends Readable {
62-
private aborter: AbortSignalLike;
6357
private start: number;
6458
private offset: number;
6559
private end: number;
@@ -69,10 +63,6 @@ export class RetriableReadableStream extends Readable {
6963
private maxRetryRequests: number;
7064
private onProgress?: (progress: TransferProgressEvent) => void;
7165
private options: RetriableReadableStreamOptions;
72-
private abortHandler = () => {
73-
this.source.pause();
74-
this.emit("error", ABORT_ERROR);
75-
};
7666

7767
/**
7868
* Creates an instance of RetriableReadableStream.
@@ -92,8 +82,7 @@ export class RetriableReadableStream extends Readable {
9282
count: number,
9383
options: RetriableReadableStreamOptions = {}
9484
) {
95-
super();
96-
this.aborter = options.abortSignal || AbortSignal.none;
85+
super({ highWaterMark: options.highWaterMark });
9786
this.getter = getter;
9887
this.source = source;
9988
this.start = offset;
@@ -104,96 +93,101 @@ export class RetriableReadableStream extends Readable {
10493
this.onProgress = options.onProgress;
10594
this.options = options;
10695

107-
this.aborter.addEventListener("abort", this.abortHandler);
108-
109-
this.setSourceDataHandler();
110-
this.setSourceEndHandler();
111-
this.setSourceErrorHandler();
96+
this.setSourceEventHandlers();
11297
}
11398

11499
public _read() {
115-
if (!this.aborter.aborted) {
116-
this.source.resume();
117-
}
100+
this.source.resume();
118101
}
119102

120-
private setSourceDataHandler() {
121-
this.source.on("data", (data: Buffer) => {
122-
if (this.options.doInjectErrorOnce) {
123-
this.options.doInjectErrorOnce = undefined;
124-
this.source.pause();
125-
this.source.removeAllListeners("data");
126-
this.source.emit("end");
127-
return;
128-
}
103+
private setSourceEventHandlers() {
104+
this.source.on("data", this.sourceDataHandler);
105+
this.source.on("end", this.sourceErrorOrEndHandler);
106+
this.source.on("error", this.sourceErrorOrEndHandler);
107+
}
129108

130-
// console.log(
131-
// `Offset: ${this.offset}, Received ${data.length} from internal stream`
132-
// );
133-
this.offset += data.length;
134-
if (this.onProgress) {
135-
this.onProgress({ loadedBytes: this.offset - this.start });
136-
}
137-
if (!this.push(data)) {
138-
this.source.pause();
139-
}
140-
});
109+
private removeSourceEventHandlers() {
110+
this.source.removeListener("data", this.sourceDataHandler);
111+
this.source.removeListener("end", this.sourceErrorOrEndHandler);
112+
this.source.removeListener("error", this.sourceErrorOrEndHandler);
141113
}
142114

143-
private setSourceEndHandler() {
144-
this.source.on("end", () => {
115+
private sourceDataHandler = (data: Buffer) => {
116+
if (this.options.doInjectErrorOnce) {
117+
this.options.doInjectErrorOnce = undefined;
118+
this.source.pause();
119+
this.source.removeAllListeners("data");
120+
this.source.emit("end");
121+
return;
122+
}
123+
124+
// console.log(
125+
// `Offset: ${this.offset}, Received ${data.length} from internal stream`
126+
// );
127+
this.offset += data.length;
128+
if (this.onProgress) {
129+
this.onProgress({ loadedBytes: this.offset - this.start });
130+
}
131+
if (!this.push(data)) {
132+
this.source.pause();
133+
}
134+
};
135+
136+
private sourceErrorOrEndHandler = (err?: Error) => {
137+
if (err && err.name === "AbortError") {
138+
this.destroy(err);
139+
return;
140+
}
141+
142+
// console.log(
143+
// `Source stream emits end or error, offset: ${
144+
// this.offset
145+
// }, dest end : ${this.end}`
146+
// );
147+
this.removeSourceEventHandlers();
148+
if (this.offset - 1 === this.end) {
149+
this.push(null);
150+
} else if (this.offset <= this.end) {
145151
// console.log(
146-
// `Source stream emits end, offset: ${
147-
// this.offset
148-
// }, dest end : ${this.end}`
152+
// `retries: ${this.retries}, max retries: ${this.maxRetries}`
149153
// );
150-
if (this.offset - 1 === this.end) {
151-
this.aborter.removeEventListener("abort", this.abortHandler);
152-
this.push(null);
153-
} else if (this.offset <= this.end) {
154-
// console.log(
155-
// `retries: ${this.retries}, max retries: ${this.maxRetries}`
156-
// );
157-
if (this.retries < this.maxRetryRequests) {
158-
this.retries += 1;
159-
this.getter(this.offset)
160-
.then((newSource) => {
161-
this.source = newSource;
162-
this.setSourceDataHandler();
163-
this.setSourceEndHandler();
164-
this.setSourceErrorHandler();
165-
})
166-
.catch((error) => {
167-
this.emit("error", error);
168-
});
169-
} else {
170-
this.emit(
171-
"error",
172-
new Error(
173-
// tslint:disable-next-line:max-line-length
174-
`Data corruption failure: received less data than required and reached maxRetires limitation. Received data offset: ${this
175-
.offset - 1}, data needed offset: ${this.end}, retries: ${
176-
this.retries
177-
}, max retries: ${this.maxRetryRequests}`
178-
)
179-
);
180-
}
154+
if (this.retries < this.maxRetryRequests) {
155+
this.retries += 1;
156+
this.getter(this.offset)
157+
.then((newSource) => {
158+
this.source = newSource;
159+
this.setSourceEventHandlers();
160+
})
161+
.catch((error) => {
162+
this.destroy(error);
163+
});
181164
} else {
182-
this.emit(
183-
"error",
165+
this.destroy(
184166
new Error(
185-
`Data corruption failure: Received more data than original request, data needed offset is ${
186-
this.end
187-
}, received offset: ${this.offset - 1}`
167+
// tslint:disable-next-line:max-line-length
168+
`Data corruption failure: received less data than required and reached maxRetires limitation. Received data offset: ${this
169+
.offset - 1}, data needed offset: ${this.end}, retries: ${
170+
this.retries
171+
}, max retries: ${this.maxRetryRequests}`
188172
)
189173
);
190174
}
191-
});
192-
}
175+
} else {
176+
this.destroy(
177+
new Error(
178+
`Data corruption failure: Received more data than original request, data needed offset is ${
179+
this.end
180+
}, received offset: ${this.offset - 1}`
181+
)
182+
);
183+
}
184+
};
185+
186+
_destroy(error: Error | null, callback: (error?: Error) => void): void {
187+
// remove listener from source and release source
188+
this.removeSourceEventHandlers();
189+
(this.source as Readable).destroy();
193190

194-
private setSourceErrorHandler() {
195-
this.source.on("error", (error) => {
196-
this.emit("error", error);
197-
});
191+
callback(error === null ? undefined : error);
198192
}
199193
}

sdk/storage/storage-blob/test/node/blockblobclient.spec.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -376,23 +376,26 @@ describe("syncUploadFromURL", () => {
376376
undefined,
377377
"recording file too large, exceeds GitHub's file size limit of 100.00 MB"
378378
);
379-
await sourceBlob.upload(largeContent, largeContent.byteLength);
379+
await sourceBlob.uploadData(largeContent);
380380
await blockBlobClient.syncUploadFromURL(sourceBlobURLWithSAS);
381-
});
381+
}).timeout(10 * 60 * 1000);
382382

383383
it("large content with timeout", async () => {
384384
recorder.skip(
385385
undefined,
386386
"recording file too large, exceeds GitHub's file size limit of 100.00 MB"
387387
);
388-
await sourceBlob.upload(largeContent, largeContent.byteLength);
388+
await sourceBlob.uploadData(largeContent);
389389

390+
let exceptionCaught = false;
390391
try {
391392
await blockBlobClient.syncUploadFromURL(sourceBlobURLWithSAS, {
392393
timeoutInSeconds: 1
393394
});
394395
} catch (err) {
395396
assert.deepStrictEqual(err.code, "OperationTimedOut");
397+
exceptionCaught = true;
396398
}
397-
});
399+
assert.ok(exceptionCaught);
400+
}).timeout(10 * 60 * 1000);
398401
});

sdk/storage/storage-blob/test/node/highlevel.node.spec.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import { readStreamToLocalFileWithLogs } from "../utils/testutils.node";
1818
import { BLOCK_BLOB_MAX_STAGE_BLOCK_BYTES } from "../../src/utils/constants";
1919
import { Test_CPK_INFO } from "../utils/constants";
2020
import { streamToBuffer2 } from "../../src/utils/utils.node";
21+
import { delay } from "../../src/utils/utils.common";
2122

2223
// tslint:disable:no-empty
2324
describe("Highlevel", () => {
@@ -706,6 +707,27 @@ describe("Highlevel", () => {
706707
fs.unlinkSync(downloadedFile);
707708
});
708709

710+
it("download abort should work when still fetching body", async () => {
711+
recorder.skip("node", "Temp file - recorder doesn't support saving the file");
712+
await blockBlobClient.uploadFile(tempFileSmall, {
713+
blockSize: 4 * 1024 * 1024,
714+
concurrency: 20
715+
});
716+
717+
const aborter = new AbortController();
718+
const res = await blobClient.download(0, undefined, { abortSignal: aborter.signal });
719+
720+
let exceptionCaught = false;
721+
res.readableStreamBody!.on("error", (err) => {
722+
assert.equal(err.name, "AbortError");
723+
exceptionCaught = true;
724+
});
725+
726+
aborter.abort();
727+
await delay(10);
728+
assert.ok(exceptionCaught);
729+
});
730+
709731
it("downloadToFile should success", async () => {
710732
recorder.skip("node", "Temp file - recorder doesn't support saving the file");
711733
const downloadedFilePath = recorder.getUniqueName("downloadedtofile.");

0 commit comments

Comments
 (0)