From 12970c08920ab6e771727134ab0063b18bbee35f Mon Sep 17 00:00:00 2001 From: George Fu Date: Thu, 19 Sep 2024 17:02:45 +0000 Subject: [PATCH 1/9] feat(util-stream): create checksum stream adapters --- .changeset/red-cameras-repair.md | 5 + .../checksum/createChecksumStream.browser.ts | 98 +++++++++++++ .../src/checksum/createChecksumStream.spec.ts | 133 ++++++++++++++++++ .../src/checksum/createChecksumStream.ts | 133 ++++++++++++++++++ 4 files changed, 369 insertions(+) create mode 100644 .changeset/red-cameras-repair.md create mode 100644 packages/util-stream/src/checksum/createChecksumStream.browser.ts create mode 100644 packages/util-stream/src/checksum/createChecksumStream.spec.ts create mode 100644 packages/util-stream/src/checksum/createChecksumStream.ts diff --git a/.changeset/red-cameras-repair.md b/.changeset/red-cameras-repair.md new file mode 100644 index 00000000000..00f6218e7cf --- /dev/null +++ b/.changeset/red-cameras-repair.md @@ -0,0 +1,5 @@ +--- +"@smithy/util-stream": minor +--- + +create checksum stream adapter diff --git a/packages/util-stream/src/checksum/createChecksumStream.browser.ts b/packages/util-stream/src/checksum/createChecksumStream.browser.ts new file mode 100644 index 00000000000..81ca3cc98b8 --- /dev/null +++ b/packages/util-stream/src/checksum/createChecksumStream.browser.ts @@ -0,0 +1,98 @@ +import { Checksum, Encoder } from "@smithy/types"; +import { toBase64 } from "@smithy/util-base64"; + +/** + * @internal + */ +export interface ChecksumStreamInit { + /** + * Base64 value of the expected checksum. + */ + expectedChecksum: string; + /** + * For error messaging, the location from which the checksum value was read. + */ + checksumSourceLocation: string; + /** + * The checksum calculator. + */ + checksum: Checksum; + /** + * The stream to be checked. + */ + source: ReadableStream; + + /** + * Optional base 64 encoder if calling from a request context. + */ + base64Encoder?: Encoder; +} + +/** + * Alias prevents compiler from turning + * ReadableStream into ReadableStream, which is incompatible + * with the NodeJS.ReadableStream global type. + */ +export type ReadableStreamType = ReadableStream; + +/** + * @internal + * + * Creates a stream adapter for throwing checksum errors for streams without + * buffering the stream. + */ +export const createChecksumStream = ({ + expectedChecksum, + checksum, + source, + checksumSourceLocation, + base64Encoder, +}: ChecksumStreamInit): ReadableStreamType => { + if (!(source instanceof ReadableStream)) { + throw new Error( + `@smithy/util-stream: unsupported source type ${(source as any)?.constructor?.name ?? source} in ChecksumStream.` + ); + } + + const encoder = base64Encoder ?? toBase64; + + if (typeof TransformStream !== "function") { + throw new Error( + "@smithy/util-stream: unable to instantiate ChecksumStream because API unavailable: ReadableStream/TransformStream." + ); + } + + const transform = new TransformStream({ + start() {}, + transform: async (chunk: any, controller: TransformStreamDefaultController) => { + /** + * When the upstream source finishes, perform the checksum comparison. + */ + if (null === chunk) { + const digest: Uint8Array = await checksum.digest(); + const received = encoder(digest); + + if (expectedChecksum !== received) { + const error = new Error( + `Checksum mismatch: expected "${expectedChecksum}" but received "${received}"` + + ` in response header "${checksumSourceLocation}".` + ); + controller.error(error); + throw error; + } + controller.terminate(); + return; + } + /** + * When the upstream source flows data to this stream, + * calculate a step update of the checksum. + */ + checksum.update(chunk); + controller.enqueue(chunk); + }, + flush() {}, + }); + + source.pipeThrough(transform); + return transform.readable; +}; diff --git a/packages/util-stream/src/checksum/createChecksumStream.spec.ts b/packages/util-stream/src/checksum/createChecksumStream.spec.ts new file mode 100644 index 00000000000..ddb7563785e --- /dev/null +++ b/packages/util-stream/src/checksum/createChecksumStream.spec.ts @@ -0,0 +1,133 @@ +import { Checksum } from "@smithy/types"; +import { toBase64 } from "@smithy/util-base64"; +import { toUtf8 } from "@smithy/util-utf8"; +import { Readable } from "stream"; + +import { headStream } from "../headStream"; +import { createChecksumStream } from "./createChecksumStream"; + +describe("Checksum streams", () => { + /** + * Hash "algorithm" that appends all data together. + */ + class Appender implements Checksum { + public hash = ""; + async digest(): Promise { + return Buffer.from(this.hash); + } + reset(): void { + throw new Error("Function not implemented."); + } + update(chunk: Uint8Array): void { + this.hash += toUtf8(chunk); + } + } + + const canonicalData = new Uint8Array("abcdefghijklmnopqrstuvwxyz".split("").map((_) => _.charCodeAt(0))); + + const canonicalUtf8 = toUtf8(canonicalData); + const canonicalBase64 = toBase64(canonicalUtf8); + + describe(createChecksumStream.name, () => { + const makeStream = () => { + return Readable.from(Buffer.from(canonicalData.buffer, 0, 26)); + }; + + it("should extend a Readable stream", async () => { + const stream = makeStream(); + const checksumStream = createChecksumStream({ + expectedChecksum: canonicalBase64, + checksum: new Appender(), + checksumSourceLocation: "my-header", + source: stream, + }); + + expect(checksumStream.constructor.name).toEqual("ChecksumStream"); + + const collected = toUtf8(await headStream(checksumStream, Infinity)); + expect(collected).toEqual(canonicalUtf8); + expect(stream.readableEnded).toEqual(true); + expect(checksumStream.readableEnded).toEqual(true); + }); + + it("should throw during stream read if the checksum does not match", async () => { + const stream = makeStream(); + const checksumStream = createChecksumStream({ + expectedChecksum: "different-expected-checksum", + checksum: new Appender(), + checksumSourceLocation: "my-header", + source: stream, + }); + + try { + toUtf8(await headStream(checksumStream, Infinity)); + throw new Error("stream was read successfully"); + } catch (e: unknown) { + expect(String(e)).toEqual( + `Error: Checksum mismatch: expected "different-expected-checksum" but` + + ` received "${canonicalBase64}"` + + ` in response header "my-header".` + ); + } + }); + }); + + describe(createChecksumStream.name + " webstreams API", () => { + if (typeof ReadableStream !== "function") { + // test not applicable to Node.js 16. + return; + } + + const makeStream = () => { + return new ReadableStream({ + start(controller) { + canonicalData.forEach((byte) => { + controller.enqueue(new Uint8Array([byte])); + }); + controller.enqueue(null); + controller.close(); + }, + }); + }; + + it("should extend a ReadableStream", async () => { + const stream = makeStream(); + const checksumStream = createChecksumStream({ + expectedChecksum: canonicalBase64, + checksum: new Appender(), + checksumSourceLocation: "my-header", + source: stream, + }); + + expect(checksumStream).toBeInstanceOf(ReadableStream); + + const collected = toUtf8(await headStream(checksumStream, Infinity)); + expect(collected).toEqual(canonicalUtf8); + expect(stream.locked).toEqual(true); + + // expectation is that it is resolved. + expect(await checksumStream.getReader().closed); + }); + + it("should throw during stream read if the checksum does not match", async () => { + const stream = makeStream(); + const checksumStream = createChecksumStream({ + expectedChecksum: "different-expected-checksum", + checksum: new Appender(), + checksumSourceLocation: "my-header", + source: stream, + }); + + try { + toUtf8(await headStream(checksumStream, Infinity)); + throw new Error("stream was read successfully"); + } catch (e: unknown) { + expect(String(e)).toEqual( + `Error: Checksum mismatch: expected "different-expected-checksum" but` + + ` received "${canonicalBase64}"` + + ` in response header "my-header".` + ); + } + }); + }); +}); diff --git a/packages/util-stream/src/checksum/createChecksumStream.ts b/packages/util-stream/src/checksum/createChecksumStream.ts new file mode 100644 index 00000000000..caee7636267 --- /dev/null +++ b/packages/util-stream/src/checksum/createChecksumStream.ts @@ -0,0 +1,133 @@ +import { Checksum, Encoder } from "@smithy/types"; +import { toBase64 } from "@smithy/util-base64"; +import { Duplex, Readable } from "stream"; + +import { isReadableStream } from "../stream-type-check"; +import { createChecksumStream as createChecksumStreamWeb, ReadableStreamType } from "./createChecksumStream.browser"; + +/** + * @internal + */ +export interface ChecksumStreamInit { + /** + * Base64 value of the expected checksum. + */ + expectedChecksum: string; + /** + * For error messaging, the location from which the checksum value was read. + */ + checksumSourceLocation: string; + /** + * The checksum calculator. + */ + checksum: Checksum; + /** + * The stream to be checked. + */ + source: T; + + /** + * Optional base 64 encoder if calling from a request context. + */ + base64Encoder?: Encoder; +} + +/** + * @internal + * + * Creates a stream mirroring the input stream's interface, but + * performs checksumming when reading to the end of the stream. + */ +export function createChecksumStream(init: ChecksumStreamInit): ReadableStreamType; +export function createChecksumStream(init: ChecksumStreamInit): Readable; +export function createChecksumStream( + init: ChecksumStreamInit +): Readable | ReadableStreamType { + if (typeof ReadableStream === "function" && isReadableStream(init.source)) { + return createChecksumStreamWeb(init as ChecksumStreamInit); + } + return new ChecksumStream(init as ChecksumStreamInit); +} + +/** + * @internal + * + * Wrapper for throwing checksum errors for streams without + * buffering the stream. + * + */ +class ChecksumStream extends Duplex { + private expectedChecksum: string; + private checksumSourceLocation: string; + private checksum: Checksum; + private source?: Readable; + private base64Encoder: Encoder; + + public constructor({ + expectedChecksum, + checksum, + source, + checksumSourceLocation, + base64Encoder, + }: ChecksumStreamInit) { + super(); + if (typeof (source as Readable).pipe === "function") { + this.source = source as Readable; + } else { + throw new Error( + `@smithy/util-stream: unsupported source type ${source?.constructor?.name ?? source} in ChecksumStream.` + ); + } + + this.base64Encoder = base64Encoder ?? toBase64; + this.expectedChecksum = expectedChecksum; + this.checksum = checksum; + this.checksumSourceLocation = checksumSourceLocation; + + // connect this stream to the end of the source stream. + this.source.pipe(this); + } + + /** + * @internal do not call this directly. + */ + public _read( + // eslint-disable-next-line @typescript-eslint/no-unused-vars + size: number + ): void {} + + /** + * @internal do not call this directly. + * + * When the upstream source flows data to this stream, + * calculate a step update of the checksum. + */ + public _write(chunk: Buffer, encoding: string, callback: (err?: Error) => void): void { + this.checksum.update(chunk); + this.push(chunk); + callback(); + } + + /** + * @internal do not call this directly. + * + * When the upstream source finishes, perform the checksum comparison. + */ + public async _final(callback: (err?: Error) => void) { + try { + const digest: Uint8Array = await this.checksum.digest(); + const received = this.base64Encoder(digest); + if (this.expectedChecksum !== received) { + callback( + new Error( + `Checksum mismatch: expected "${this.expectedChecksum}" but received "${received}"` + + ` in response header "${this.checksumSourceLocation}".` + ) + ); + } + } catch (e: unknown) { + callback(e as Error); + } + this.push(null); + } +} From 1da7cc5d8c85e9a0616f37d0e0c1731a8f652c32 Mon Sep 17 00:00:00 2001 From: George Fu Date: Fri, 20 Sep 2024 14:44:55 +0000 Subject: [PATCH 2/9] add bundler metadata --- packages/util-stream/package.json | 3 +++ packages/util-stream/src/checksum/createChecksumStream.ts | 1 + 2 files changed, 4 insertions(+) diff --git a/packages/util-stream/package.json b/packages/util-stream/package.json index f0f62d154c2..139251428f6 100644 --- a/packages/util-stream/package.json +++ b/packages/util-stream/package.json @@ -56,16 +56,19 @@ ], "browser": { "./dist-es/getAwsChunkedEncodingStream": "./dist-es/getAwsChunkedEncodingStream.browser", + "./dist-es/checksum/createChecksumStream": "./dist-es/checksum/createChecksumStream.browser", "./dist-es/headStream": "./dist-es/headStream.browser", "./dist-es/sdk-stream-mixin": "./dist-es/sdk-stream-mixin.browser", "./dist-es/splitStream": "./dist-es/splitStream.browser" }, "react-native": { "./dist-es/getAwsChunkedEncodingStream": "./dist-es/getAwsChunkedEncodingStream.browser", + "./dist-es/checksum/createChecksumStream": "./dist-es/checksum/createChecksumStream.browser", "./dist-es/sdk-stream-mixin": "./dist-es/sdk-stream-mixin.browser", "./dist-es/headStream": "./dist-es/headStream.browser", "./dist-es/splitStream": "./dist-es/splitStream.browser", "./dist-cjs/getAwsChunkedEncodingStream": "./dist-cjs/getAwsChunkedEncodingStream.browser", + "./dist-cjs/checksum/createChecksumStream": "./dist-cjs/checksum/createChecksumStream.browser", "./dist-cjs/sdk-stream-mixin": "./dist-cjs/sdk-stream-mixin.browser", "./dist-cjs/headStream": "./dist-cjs/headStream.browser", "./dist-cjs/splitStream": "./dist-cjs/splitStream.browser" diff --git a/packages/util-stream/src/checksum/createChecksumStream.ts b/packages/util-stream/src/checksum/createChecksumStream.ts index caee7636267..606967f5a58 100644 --- a/packages/util-stream/src/checksum/createChecksumStream.ts +++ b/packages/util-stream/src/checksum/createChecksumStream.ts @@ -129,5 +129,6 @@ class ChecksumStream extends Duplex { callback(e as Error); } this.push(null); + callback(); } } From 05beb9511eb60cb675e17e314b17ae7db7b3db29 Mon Sep 17 00:00:00 2001 From: George Fu Date: Fri, 20 Sep 2024 15:27:43 +0000 Subject: [PATCH 3/9] move TransformStream checksum to flush event --- .../checksum/createChecksumStream.browser.ts | 46 +++++++++++-------- .../src/checksum/createChecksumStream.spec.ts | 2 +- .../src/checksum/createChecksumStream.ts | 18 +++++--- 3 files changed, 38 insertions(+), 28 deletions(-) diff --git a/packages/util-stream/src/checksum/createChecksumStream.browser.ts b/packages/util-stream/src/checksum/createChecksumStream.browser.ts index 81ca3cc98b8..c3a5c56691a 100644 --- a/packages/util-stream/src/checksum/createChecksumStream.browser.ts +++ b/packages/util-stream/src/checksum/createChecksumStream.browser.ts @@ -35,6 +35,17 @@ export interface ChecksumStreamInit { */ export type ReadableStreamType = ReadableStream; +/** + * This is a local copy of + * https://developer.mozilla.org/en-US/docs/Web/API/TransformStreamDefaultController + * in case users do not have this type. + */ +interface TransformStreamDefaultController { + enqueue(chunk: any): void; + error(error: unknown): void; + terminate(): void; +} + /** * @internal * @@ -64,25 +75,7 @@ export const createChecksumStream = ({ const transform = new TransformStream({ start() {}, - transform: async (chunk: any, controller: TransformStreamDefaultController) => { - /** - * When the upstream source finishes, perform the checksum comparison. - */ - if (null === chunk) { - const digest: Uint8Array = await checksum.digest(); - const received = encoder(digest); - - if (expectedChecksum !== received) { - const error = new Error( - `Checksum mismatch: expected "${expectedChecksum}" but received "${received}"` + - ` in response header "${checksumSourceLocation}".` - ); - controller.error(error); - throw error; - } - controller.terminate(); - return; - } + async transform(chunk: any, controller: TransformStreamDefaultController) { /** * When the upstream source flows data to this stream, * calculate a step update of the checksum. @@ -90,7 +83,20 @@ export const createChecksumStream = ({ checksum.update(chunk); controller.enqueue(chunk); }, - flush() {}, + async flush(controller: TransformStreamDefaultController) { + const digest: Uint8Array = await checksum.digest(); + const received = encoder(digest); + + if (expectedChecksum !== received) { + const error = new Error( + `Checksum mismatch: expected "${expectedChecksum}" but received "${received}"` + + ` in response header "${checksumSourceLocation}".` + ); + controller.error(error); + } else { + controller.terminate(); + } + }, }); source.pipeThrough(transform); diff --git a/packages/util-stream/src/checksum/createChecksumStream.spec.ts b/packages/util-stream/src/checksum/createChecksumStream.spec.ts index ddb7563785e..7b48e74a9db 100644 --- a/packages/util-stream/src/checksum/createChecksumStream.spec.ts +++ b/packages/util-stream/src/checksum/createChecksumStream.spec.ts @@ -43,6 +43,7 @@ describe("Checksum streams", () => { }); expect(checksumStream.constructor.name).toEqual("ChecksumStream"); + expect(checksumStream).toBeInstanceOf(Readable); const collected = toUtf8(await headStream(checksumStream, Infinity)); expect(collected).toEqual(canonicalUtf8); @@ -84,7 +85,6 @@ describe("Checksum streams", () => { canonicalData.forEach((byte) => { controller.enqueue(new Uint8Array([byte])); }); - controller.enqueue(null); controller.close(); }, }); diff --git a/packages/util-stream/src/checksum/createChecksumStream.ts b/packages/util-stream/src/checksum/createChecksumStream.ts index 606967f5a58..961b833b565 100644 --- a/packages/util-stream/src/checksum/createChecksumStream.ts +++ b/packages/util-stream/src/checksum/createChecksumStream.ts @@ -103,9 +103,13 @@ class ChecksumStream extends Duplex { * calculate a step update of the checksum. */ public _write(chunk: Buffer, encoding: string, callback: (err?: Error) => void): void { - this.checksum.update(chunk); - this.push(chunk); - callback(); + try { + this.checksum.update(chunk); + this.push(chunk); + } catch (e: unknown) { + return callback(e as Error); + } + return callback(); } /** @@ -113,12 +117,12 @@ class ChecksumStream extends Duplex { * * When the upstream source finishes, perform the checksum comparison. */ - public async _final(callback: (err?: Error) => void) { + public async _final(callback: (err?: Error) => void): Promise { try { const digest: Uint8Array = await this.checksum.digest(); const received = this.base64Encoder(digest); if (this.expectedChecksum !== received) { - callback( + return callback( new Error( `Checksum mismatch: expected "${this.expectedChecksum}" but received "${received}"` + ` in response header "${this.checksumSourceLocation}".` @@ -126,9 +130,9 @@ class ChecksumStream extends Duplex { ); } } catch (e: unknown) { - callback(e as Error); + return callback(e as Error); } this.push(null); - callback(); + return callback(); } } From 1cf6e3166783bb3561c2355c12c3defee3971f8a Mon Sep 17 00:00:00 2001 From: George Fu Date: Fri, 20 Sep 2024 15:43:20 +0000 Subject: [PATCH 4/9] improve uniformity of node/web checksumstream api --- .../checksum/createChecksumStream.browser.ts | 22 +++++++++++++++++-- .../src/checksum/createChecksumStream.spec.ts | 6 ++++- .../src/checksum/createChecksumStream.ts | 2 +- packages/util-stream/src/index.ts | 1 + 4 files changed, 27 insertions(+), 4 deletions(-) diff --git a/packages/util-stream/src/checksum/createChecksumStream.browser.ts b/packages/util-stream/src/checksum/createChecksumStream.browser.ts index c3a5c56691a..711c8a2f01b 100644 --- a/packages/util-stream/src/checksum/createChecksumStream.browser.ts +++ b/packages/util-stream/src/checksum/createChecksumStream.browser.ts @@ -1,6 +1,8 @@ import { Checksum, Encoder } from "@smithy/types"; import { toBase64 } from "@smithy/util-base64"; +import { isReadableStream } from "../stream-type-check"; + /** * @internal */ @@ -59,7 +61,7 @@ export const createChecksumStream = ({ checksumSourceLocation, base64Encoder, }: ChecksumStreamInit): ReadableStreamType => { - if (!(source instanceof ReadableStream)) { + if (!isReadableStream(source)) { throw new Error( `@smithy/util-stream: unsupported source type ${(source as any)?.constructor?.name ?? source} in ChecksumStream.` ); @@ -100,5 +102,21 @@ export const createChecksumStream = ({ }); source.pipeThrough(transform); - return transform.readable; + const readable = transform.readable; + Object.setPrototypeOf(readable, ChecksumStream.prototype); + return readable; }; + +/** + * This stub exists so that the readable returned by createChecksumStream + * identifies as "ChecksumStream" in alignment with the Node.js + * implementation. + * + * @extends ReadableStream + */ +export class ChecksumStream {} + +if (typeof ReadableStream === "function") { + ChecksumStream.prototype = Object.create(ReadableStream.prototype); + ChecksumStream.prototype.constructor = ChecksumStream; +} diff --git a/packages/util-stream/src/checksum/createChecksumStream.spec.ts b/packages/util-stream/src/checksum/createChecksumStream.spec.ts index 7b48e74a9db..33513ca58f6 100644 --- a/packages/util-stream/src/checksum/createChecksumStream.spec.ts +++ b/packages/util-stream/src/checksum/createChecksumStream.spec.ts @@ -4,7 +4,8 @@ import { toUtf8 } from "@smithy/util-utf8"; import { Readable } from "stream"; import { headStream } from "../headStream"; -import { createChecksumStream } from "./createChecksumStream"; +import { ChecksumStream, createChecksumStream } from "./createChecksumStream"; +import { ChecksumStream as ChecksumStreamWeb } from "./createChecksumStream.browser"; describe("Checksum streams", () => { /** @@ -44,6 +45,7 @@ describe("Checksum streams", () => { expect(checksumStream.constructor.name).toEqual("ChecksumStream"); expect(checksumStream).toBeInstanceOf(Readable); + expect(checksumStream).toBeInstanceOf(ChecksumStream); const collected = toUtf8(await headStream(checksumStream, Infinity)); expect(collected).toEqual(canonicalUtf8); @@ -99,7 +101,9 @@ describe("Checksum streams", () => { source: stream, }); + expect(checksumStream.constructor.name).toEqual("ChecksumStream"); expect(checksumStream).toBeInstanceOf(ReadableStream); + expect(checksumStream).toBeInstanceOf(ChecksumStreamWeb); const collected = toUtf8(await headStream(checksumStream, Infinity)); expect(collected).toEqual(canonicalUtf8); diff --git a/packages/util-stream/src/checksum/createChecksumStream.ts b/packages/util-stream/src/checksum/createChecksumStream.ts index 961b833b565..5f41f19ab41 100644 --- a/packages/util-stream/src/checksum/createChecksumStream.ts +++ b/packages/util-stream/src/checksum/createChecksumStream.ts @@ -56,7 +56,7 @@ export function createChecksumStream( * buffering the stream. * */ -class ChecksumStream extends Duplex { +export class ChecksumStream extends Duplex { private expectedChecksum: string; private checksumSourceLocation: string; private checksum: Checksum; diff --git a/packages/util-stream/src/index.ts b/packages/util-stream/src/index.ts index 305896d2fa0..d10ba11a984 100644 --- a/packages/util-stream/src/index.ts +++ b/packages/util-stream/src/index.ts @@ -4,3 +4,4 @@ export * from "./sdk-stream-mixin"; export * from "./splitStream"; export * from "./headStream"; export * from "./stream-type-check"; +export * from "./checksum/createChecksumStream"; From 6b36ebc8b2231467371b6148e3bf30546586c359 Mon Sep 17 00:00:00 2001 From: George Fu Date: Fri, 20 Sep 2024 15:45:18 +0000 Subject: [PATCH 5/9] alphabetization --- packages/util-stream/package.json | 6 +++--- .../src/checksum/createChecksumStream.browser.ts | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/util-stream/package.json b/packages/util-stream/package.json index 139251428f6..e4ecd15c130 100644 --- a/packages/util-stream/package.json +++ b/packages/util-stream/package.json @@ -55,20 +55,20 @@ "dist-*/**" ], "browser": { - "./dist-es/getAwsChunkedEncodingStream": "./dist-es/getAwsChunkedEncodingStream.browser", "./dist-es/checksum/createChecksumStream": "./dist-es/checksum/createChecksumStream.browser", + "./dist-es/getAwsChunkedEncodingStream": "./dist-es/getAwsChunkedEncodingStream.browser", "./dist-es/headStream": "./dist-es/headStream.browser", "./dist-es/sdk-stream-mixin": "./dist-es/sdk-stream-mixin.browser", "./dist-es/splitStream": "./dist-es/splitStream.browser" }, "react-native": { - "./dist-es/getAwsChunkedEncodingStream": "./dist-es/getAwsChunkedEncodingStream.browser", "./dist-es/checksum/createChecksumStream": "./dist-es/checksum/createChecksumStream.browser", + "./dist-es/getAwsChunkedEncodingStream": "./dist-es/getAwsChunkedEncodingStream.browser", "./dist-es/sdk-stream-mixin": "./dist-es/sdk-stream-mixin.browser", "./dist-es/headStream": "./dist-es/headStream.browser", "./dist-es/splitStream": "./dist-es/splitStream.browser", - "./dist-cjs/getAwsChunkedEncodingStream": "./dist-cjs/getAwsChunkedEncodingStream.browser", "./dist-cjs/checksum/createChecksumStream": "./dist-cjs/checksum/createChecksumStream.browser", + "./dist-cjs/getAwsChunkedEncodingStream": "./dist-cjs/getAwsChunkedEncodingStream.browser", "./dist-cjs/sdk-stream-mixin": "./dist-cjs/sdk-stream-mixin.browser", "./dist-cjs/headStream": "./dist-cjs/headStream.browser", "./dist-cjs/splitStream": "./dist-cjs/splitStream.browser" diff --git a/packages/util-stream/src/checksum/createChecksumStream.browser.ts b/packages/util-stream/src/checksum/createChecksumStream.browser.ts index 711c8a2f01b..7dc69867552 100644 --- a/packages/util-stream/src/checksum/createChecksumStream.browser.ts +++ b/packages/util-stream/src/checksum/createChecksumStream.browser.ts @@ -114,7 +114,7 @@ export const createChecksumStream = ({ * * @extends ReadableStream */ -export class ChecksumStream {} +export function ChecksumStream(): void {} if (typeof ReadableStream === "function") { ChecksumStream.prototype = Object.create(ReadableStream.prototype); From 672fd32911468f3538e1093b28986661190e8344 Mon Sep 17 00:00:00 2001 From: George Fu Date: Fri, 20 Sep 2024 16:57:36 +0000 Subject: [PATCH 6/9] use class inheritance --- .../src/checksum/createChecksumStream.browser.ts | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/packages/util-stream/src/checksum/createChecksumStream.browser.ts b/packages/util-stream/src/checksum/createChecksumStream.browser.ts index 7dc69867552..d354ec16c70 100644 --- a/packages/util-stream/src/checksum/createChecksumStream.browser.ts +++ b/packages/util-stream/src/checksum/createChecksumStream.browser.ts @@ -107,6 +107,8 @@ export const createChecksumStream = ({ return readable; }; +const ReadableStreamRef = typeof ReadableStream === "function" ? ReadableStream : function (): void {}; + /** * This stub exists so that the readable returned by createChecksumStream * identifies as "ChecksumStream" in alignment with the Node.js @@ -114,9 +116,4 @@ export const createChecksumStream = ({ * * @extends ReadableStream */ -export function ChecksumStream(): void {} - -if (typeof ReadableStream === "function") { - ChecksumStream.prototype = Object.create(ReadableStream.prototype); - ChecksumStream.prototype.constructor = ChecksumStream; -} +export class ChecksumStream extends (ReadableStreamRef as any) {} From 33ed844d939199bd5e26bfab9b5e7ce776294da8 Mon Sep 17 00:00:00 2001 From: George Fu Date: Fri, 20 Sep 2024 17:14:11 +0000 Subject: [PATCH 7/9] inheritance issue in jest --- packages/util-stream/src/checksum/createChecksumStream.spec.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/packages/util-stream/src/checksum/createChecksumStream.spec.ts b/packages/util-stream/src/checksum/createChecksumStream.spec.ts index 33513ca58f6..e7152e7656a 100644 --- a/packages/util-stream/src/checksum/createChecksumStream.spec.ts +++ b/packages/util-stream/src/checksum/createChecksumStream.spec.ts @@ -43,7 +43,6 @@ describe("Checksum streams", () => { source: stream, }); - expect(checksumStream.constructor.name).toEqual("ChecksumStream"); expect(checksumStream).toBeInstanceOf(Readable); expect(checksumStream).toBeInstanceOf(ChecksumStream); @@ -101,7 +100,6 @@ describe("Checksum streams", () => { source: stream, }); - expect(checksumStream.constructor.name).toEqual("ChecksumStream"); expect(checksumStream).toBeInstanceOf(ReadableStream); expect(checksumStream).toBeInstanceOf(ChecksumStreamWeb); From 8865ccf94a1b13334a9f5ed7f62092f24e10fe00 Mon Sep 17 00:00:00 2001 From: George Fu Date: Fri, 20 Sep 2024 18:14:56 +0000 Subject: [PATCH 8/9] add karma test for checksum stream --- packages/util-stream/karma.conf.js | 9 +- .../createChecksumStream.browser.spec.ts | 89 +++++++++++++++++++ 2 files changed, 97 insertions(+), 1 deletion(-) create mode 100644 packages/util-stream/src/checksum/createChecksumStream.browser.spec.ts diff --git a/packages/util-stream/karma.conf.js b/packages/util-stream/karma.conf.js index f652e2493e6..320b9143470 100644 --- a/packages/util-stream/karma.conf.js +++ b/packages/util-stream/karma.conf.js @@ -3,7 +3,14 @@ module.exports = function (config) { config.set({ frameworks: ["jasmine", "karma-typescript"], - files: ["src/getAwsChunkedEncodingStream.browser.ts", "src/getAwsChunkedEncodingStream.browser.spec.ts"], + files: [ + "src/checksum/createChecksumStream.browser.spec.ts", + "src/checksum/createChecksumStream.browser.ts", + "src/getAwsChunkedEncodingStream.browser.spec.ts", + "src/getAwsChunkedEncodingStream.browser.ts", + "src/headStream.browser.ts", + "src/stream-type-check.ts", + ], exclude: ["**/*.d.ts"], preprocessors: { "**/*.ts": "karma-typescript", diff --git a/packages/util-stream/src/checksum/createChecksumStream.browser.spec.ts b/packages/util-stream/src/checksum/createChecksumStream.browser.spec.ts new file mode 100644 index 00000000000..4e722eefbd8 --- /dev/null +++ b/packages/util-stream/src/checksum/createChecksumStream.browser.spec.ts @@ -0,0 +1,89 @@ +import { Checksum } from "@smithy/types"; +import { toBase64 } from "@smithy/util-base64"; +import { toUtf8 } from "@smithy/util-utf8"; + +import { headStream } from "../headStream.browser"; +import { createChecksumStream } from "./createChecksumStream.browser"; +import { ChecksumStream as ChecksumStreamWeb } from "./createChecksumStream.browser"; + +describe("Checksum streams", () => { + /** + * Hash "algorithm" that appends all data together. + */ + class Appender implements Checksum { + public hash = ""; + async digest(): Promise { + return Buffer.from(this.hash); + } + reset(): void { + throw new Error("Function not implemented."); + } + update(chunk: Uint8Array): void { + this.hash += toUtf8(chunk); + } + } + + const canonicalData = new Uint8Array("abcdefghijklmnopqrstuvwxyz".split("").map((_) => _.charCodeAt(0))); + + const canonicalUtf8 = toUtf8(canonicalData); + const canonicalBase64 = toBase64(canonicalUtf8); + + describe(createChecksumStream.name + " webstreams API", () => { + if (typeof ReadableStream !== "function") { + // test not applicable to Node.js 16. + return; + } + + const makeStream = () => { + return new ReadableStream({ + start(controller) { + canonicalData.forEach((byte) => { + controller.enqueue(new Uint8Array([byte])); + }); + controller.close(); + }, + }); + }; + + it("should extend a ReadableStream", async () => { + const stream = makeStream(); + const checksumStream = createChecksumStream({ + expectedChecksum: canonicalBase64, + checksum: new Appender(), + checksumSourceLocation: "my-header", + source: stream, + }); + + expect(checksumStream).toBeInstanceOf(ReadableStream); + expect(checksumStream).toBeInstanceOf(ChecksumStreamWeb); + + const collected = toUtf8(await headStream(checksumStream, Infinity)); + expect(collected).toEqual(canonicalUtf8); + expect(stream.locked).toEqual(true); + + // expectation is that it is resolved. + expect(await checksumStream.getReader().closed); + }); + + it("should throw during stream read if the checksum does not match", async () => { + const stream = makeStream(); + const checksumStream = createChecksumStream({ + expectedChecksum: "different-expected-checksum", + checksum: new Appender(), + checksumSourceLocation: "my-header", + source: stream, + }); + + try { + toUtf8(await headStream(checksumStream, Infinity)); + throw new Error("stream was read successfully"); + } catch (e: unknown) { + expect(String(e)).toEqual( + `Error: Checksum mismatch: expected "different-expected-checksum" but` + + ` received "${canonicalBase64}"` + + ` in response header "my-header".` + ); + } + }); + }); +}); From dd4b5fff3e2a2faa6d0b6e1c600981e70eb0fbb8 Mon Sep 17 00:00:00 2001 From: George Fu Date: Fri, 20 Sep 2024 18:24:36 +0000 Subject: [PATCH 9/9] separate files --- packages/util-stream/karma.conf.js | 1 + .../src/checksum/ChecksumStream.browser.ts | 39 ++++++ .../src/checksum/ChecksumStream.ts | 118 +++++++++++++++++ .../createChecksumStream.browser.spec.ts | 2 +- .../checksum/createChecksumStream.browser.ts | 39 +----- .../src/checksum/createChecksumStream.spec.ts | 5 +- .../src/checksum/createChecksumStream.ts | 120 +----------------- packages/util-stream/src/index.ts | 1 + 8 files changed, 166 insertions(+), 159 deletions(-) create mode 100644 packages/util-stream/src/checksum/ChecksumStream.browser.ts create mode 100644 packages/util-stream/src/checksum/ChecksumStream.ts diff --git a/packages/util-stream/karma.conf.js b/packages/util-stream/karma.conf.js index 320b9143470..b2ae368b568 100644 --- a/packages/util-stream/karma.conf.js +++ b/packages/util-stream/karma.conf.js @@ -6,6 +6,7 @@ module.exports = function (config) { files: [ "src/checksum/createChecksumStream.browser.spec.ts", "src/checksum/createChecksumStream.browser.ts", + "src/checksum/ChecksumStream.browser.ts", "src/getAwsChunkedEncodingStream.browser.spec.ts", "src/getAwsChunkedEncodingStream.browser.ts", "src/headStream.browser.ts", diff --git a/packages/util-stream/src/checksum/ChecksumStream.browser.ts b/packages/util-stream/src/checksum/ChecksumStream.browser.ts new file mode 100644 index 00000000000..18628dbc42c --- /dev/null +++ b/packages/util-stream/src/checksum/ChecksumStream.browser.ts @@ -0,0 +1,39 @@ +import { Checksum, Encoder } from "@smithy/types"; + +/** + * @internal + */ +export interface ChecksumStreamInit { + /** + * Base64 value of the expected checksum. + */ + expectedChecksum: string; + /** + * For error messaging, the location from which the checksum value was read. + */ + checksumSourceLocation: string; + /** + * The checksum calculator. + */ + checksum: Checksum; + /** + * The stream to be checked. + */ + source: ReadableStream; + + /** + * Optional base 64 encoder if calling from a request context. + */ + base64Encoder?: Encoder; +} + +const ReadableStreamRef = typeof ReadableStream === "function" ? ReadableStream : function (): void {}; + +/** + * This stub exists so that the readable returned by createChecksumStream + * identifies as "ChecksumStream" in alignment with the Node.js + * implementation. + * + * @extends ReadableStream + */ +export class ChecksumStream extends (ReadableStreamRef as any) {} diff --git a/packages/util-stream/src/checksum/ChecksumStream.ts b/packages/util-stream/src/checksum/ChecksumStream.ts new file mode 100644 index 00000000000..769d0076d5e --- /dev/null +++ b/packages/util-stream/src/checksum/ChecksumStream.ts @@ -0,0 +1,118 @@ +import { Checksum, Encoder } from "@smithy/types"; +import { toBase64 } from "@smithy/util-base64"; +import { Duplex, Readable } from "stream"; + +/** + * @internal + */ +export interface ChecksumStreamInit { + /** + * Base64 value of the expected checksum. + */ + expectedChecksum: string; + /** + * For error messaging, the location from which the checksum value was read. + */ + checksumSourceLocation: string; + /** + * The checksum calculator. + */ + checksum: Checksum; + /** + * The stream to be checked. + */ + source: T; + + /** + * Optional base 64 encoder if calling from a request context. + */ + base64Encoder?: Encoder; +} + +/** + * @internal + * + * Wrapper for throwing checksum errors for streams without + * buffering the stream. + * + */ +export class ChecksumStream extends Duplex { + private expectedChecksum: string; + private checksumSourceLocation: string; + private checksum: Checksum; + private source?: Readable; + private base64Encoder: Encoder; + + public constructor({ + expectedChecksum, + checksum, + source, + checksumSourceLocation, + base64Encoder, + }: ChecksumStreamInit) { + super(); + if (typeof (source as Readable).pipe === "function") { + this.source = source as Readable; + } else { + throw new Error( + `@smithy/util-stream: unsupported source type ${source?.constructor?.name ?? source} in ChecksumStream.` + ); + } + + this.base64Encoder = base64Encoder ?? toBase64; + this.expectedChecksum = expectedChecksum; + this.checksum = checksum; + this.checksumSourceLocation = checksumSourceLocation; + + // connect this stream to the end of the source stream. + this.source.pipe(this); + } + + /** + * @internal do not call this directly. + */ + public _read( + // eslint-disable-next-line @typescript-eslint/no-unused-vars + size: number + ): void {} + + /** + * @internal do not call this directly. + * + * When the upstream source flows data to this stream, + * calculate a step update of the checksum. + */ + public _write(chunk: Buffer, encoding: string, callback: (err?: Error) => void): void { + try { + this.checksum.update(chunk); + this.push(chunk); + } catch (e: unknown) { + return callback(e as Error); + } + return callback(); + } + + /** + * @internal do not call this directly. + * + * When the upstream source finishes, perform the checksum comparison. + */ + public async _final(callback: (err?: Error) => void): Promise { + try { + const digest: Uint8Array = await this.checksum.digest(); + const received = this.base64Encoder(digest); + if (this.expectedChecksum !== received) { + return callback( + new Error( + `Checksum mismatch: expected "${this.expectedChecksum}" but received "${received}"` + + ` in response header "${this.checksumSourceLocation}".` + ) + ); + } + } catch (e: unknown) { + return callback(e as Error); + } + this.push(null); + return callback(); + } +} diff --git a/packages/util-stream/src/checksum/createChecksumStream.browser.spec.ts b/packages/util-stream/src/checksum/createChecksumStream.browser.spec.ts index 4e722eefbd8..0f90a5eb2f0 100644 --- a/packages/util-stream/src/checksum/createChecksumStream.browser.spec.ts +++ b/packages/util-stream/src/checksum/createChecksumStream.browser.spec.ts @@ -3,8 +3,8 @@ import { toBase64 } from "@smithy/util-base64"; import { toUtf8 } from "@smithy/util-utf8"; import { headStream } from "../headStream.browser"; +import { ChecksumStream as ChecksumStreamWeb } from "./ChecksumStream.browser"; import { createChecksumStream } from "./createChecksumStream.browser"; -import { ChecksumStream as ChecksumStreamWeb } from "./createChecksumStream.browser"; describe("Checksum streams", () => { /** diff --git a/packages/util-stream/src/checksum/createChecksumStream.browser.ts b/packages/util-stream/src/checksum/createChecksumStream.browser.ts index d354ec16c70..6ca56bab956 100644 --- a/packages/util-stream/src/checksum/createChecksumStream.browser.ts +++ b/packages/util-stream/src/checksum/createChecksumStream.browser.ts @@ -1,36 +1,10 @@ -import { Checksum, Encoder } from "@smithy/types"; import { toBase64 } from "@smithy/util-base64"; import { isReadableStream } from "../stream-type-check"; +import { ChecksumStream, ChecksumStreamInit } from "./ChecksumStream.browser"; /** * @internal - */ -export interface ChecksumStreamInit { - /** - * Base64 value of the expected checksum. - */ - expectedChecksum: string; - /** - * For error messaging, the location from which the checksum value was read. - */ - checksumSourceLocation: string; - /** - * The checksum calculator. - */ - checksum: Checksum; - /** - * The stream to be checked. - */ - source: ReadableStream; - - /** - * Optional base 64 encoder if calling from a request context. - */ - base64Encoder?: Encoder; -} - -/** * Alias prevents compiler from turning * ReadableStream into ReadableStream, which is incompatible * with the NodeJS.ReadableStream global type. @@ -106,14 +80,3 @@ export const createChecksumStream = ({ Object.setPrototypeOf(readable, ChecksumStream.prototype); return readable; }; - -const ReadableStreamRef = typeof ReadableStream === "function" ? ReadableStream : function (): void {}; - -/** - * This stub exists so that the readable returned by createChecksumStream - * identifies as "ChecksumStream" in alignment with the Node.js - * implementation. - * - * @extends ReadableStream - */ -export class ChecksumStream extends (ReadableStreamRef as any) {} diff --git a/packages/util-stream/src/checksum/createChecksumStream.spec.ts b/packages/util-stream/src/checksum/createChecksumStream.spec.ts index e7152e7656a..8abf019b592 100644 --- a/packages/util-stream/src/checksum/createChecksumStream.spec.ts +++ b/packages/util-stream/src/checksum/createChecksumStream.spec.ts @@ -4,8 +4,9 @@ import { toUtf8 } from "@smithy/util-utf8"; import { Readable } from "stream"; import { headStream } from "../headStream"; -import { ChecksumStream, createChecksumStream } from "./createChecksumStream"; -import { ChecksumStream as ChecksumStreamWeb } from "./createChecksumStream.browser"; +import { ChecksumStream } from "./ChecksumStream"; +import { ChecksumStream as ChecksumStreamWeb } from "./ChecksumStream.browser"; +import { createChecksumStream } from "./createChecksumStream"; describe("Checksum streams", () => { /** diff --git a/packages/util-stream/src/checksum/createChecksumStream.ts b/packages/util-stream/src/checksum/createChecksumStream.ts index 5f41f19ab41..348c8e8b860 100644 --- a/packages/util-stream/src/checksum/createChecksumStream.ts +++ b/packages/util-stream/src/checksum/createChecksumStream.ts @@ -1,37 +1,9 @@ -import { Checksum, Encoder } from "@smithy/types"; -import { toBase64 } from "@smithy/util-base64"; -import { Duplex, Readable } from "stream"; +import { Readable } from "stream"; import { isReadableStream } from "../stream-type-check"; +import { ChecksumStream, ChecksumStreamInit } from "./ChecksumStream"; import { createChecksumStream as createChecksumStreamWeb, ReadableStreamType } from "./createChecksumStream.browser"; -/** - * @internal - */ -export interface ChecksumStreamInit { - /** - * Base64 value of the expected checksum. - */ - expectedChecksum: string; - /** - * For error messaging, the location from which the checksum value was read. - */ - checksumSourceLocation: string; - /** - * The checksum calculator. - */ - checksum: Checksum; - /** - * The stream to be checked. - */ - source: T; - - /** - * Optional base 64 encoder if calling from a request context. - */ - base64Encoder?: Encoder; -} - /** * @internal * @@ -48,91 +20,3 @@ export function createChecksumStream( } return new ChecksumStream(init as ChecksumStreamInit); } - -/** - * @internal - * - * Wrapper for throwing checksum errors for streams without - * buffering the stream. - * - */ -export class ChecksumStream extends Duplex { - private expectedChecksum: string; - private checksumSourceLocation: string; - private checksum: Checksum; - private source?: Readable; - private base64Encoder: Encoder; - - public constructor({ - expectedChecksum, - checksum, - source, - checksumSourceLocation, - base64Encoder, - }: ChecksumStreamInit) { - super(); - if (typeof (source as Readable).pipe === "function") { - this.source = source as Readable; - } else { - throw new Error( - `@smithy/util-stream: unsupported source type ${source?.constructor?.name ?? source} in ChecksumStream.` - ); - } - - this.base64Encoder = base64Encoder ?? toBase64; - this.expectedChecksum = expectedChecksum; - this.checksum = checksum; - this.checksumSourceLocation = checksumSourceLocation; - - // connect this stream to the end of the source stream. - this.source.pipe(this); - } - - /** - * @internal do not call this directly. - */ - public _read( - // eslint-disable-next-line @typescript-eslint/no-unused-vars - size: number - ): void {} - - /** - * @internal do not call this directly. - * - * When the upstream source flows data to this stream, - * calculate a step update of the checksum. - */ - public _write(chunk: Buffer, encoding: string, callback: (err?: Error) => void): void { - try { - this.checksum.update(chunk); - this.push(chunk); - } catch (e: unknown) { - return callback(e as Error); - } - return callback(); - } - - /** - * @internal do not call this directly. - * - * When the upstream source finishes, perform the checksum comparison. - */ - public async _final(callback: (err?: Error) => void): Promise { - try { - const digest: Uint8Array = await this.checksum.digest(); - const received = this.base64Encoder(digest); - if (this.expectedChecksum !== received) { - return callback( - new Error( - `Checksum mismatch: expected "${this.expectedChecksum}" but received "${received}"` + - ` in response header "${this.checksumSourceLocation}".` - ) - ); - } - } catch (e: unknown) { - return callback(e as Error); - } - this.push(null); - return callback(); - } -} diff --git a/packages/util-stream/src/index.ts b/packages/util-stream/src/index.ts index d10ba11a984..035d92f1e7e 100644 --- a/packages/util-stream/src/index.ts +++ b/packages/util-stream/src/index.ts @@ -5,3 +5,4 @@ export * from "./splitStream"; export * from "./headStream"; export * from "./stream-type-check"; export * from "./checksum/createChecksumStream"; +export * from "./checksum/ChecksumStream";