diff --git a/codegen/smithy-aws-typescript-codegen/src/main/java/software/amazon/smithy/aws/typescript/codegen/AddDocumentClientPlugin.java b/codegen/smithy-aws-typescript-codegen/src/main/java/software/amazon/smithy/aws/typescript/codegen/AddDocumentClientPlugin.java index 4b910a0afa9ea..8668f53c87279 100644 --- a/codegen/smithy-aws-typescript-codegen/src/main/java/software/amazon/smithy/aws/typescript/codegen/AddDocumentClientPlugin.java +++ b/codegen/smithy-aws-typescript-codegen/src/main/java/software/amazon/smithy/aws/typescript/codegen/AddDocumentClientPlugin.java @@ -105,6 +105,7 @@ private void writeAdditionalFiles( writerFactory.accept(String.format("%s%s/index.ts", DocumentClientUtils.DOC_CLIENT_PREFIX, DocumentClientPaginationGenerator.PAGINATION_FOLDER), writer -> { writer.write("export * from './Interfaces';"); + writer.write("export * from './BatchGetPaginator';"); for (OperationShape operation : overridenOperationsList) { if (operation.hasTrait(PaginatedTrait.ID)) { String paginationFileName = diff --git a/lib/lib-dynamodb/src/pagination/BatchGetPaginator.ts b/lib/lib-dynamodb/src/pagination/BatchGetPaginator.ts new file mode 100644 index 0000000000000..ee1a5e177911b --- /dev/null +++ b/lib/lib-dynamodb/src/pagination/BatchGetPaginator.ts @@ -0,0 +1,48 @@ +import { type DynamoDBClient } from "@aws-sdk/client-dynamodb"; + +import { BatchGetCommand, BatchGetCommandInput, BatchGetCommandOutput } from "../commands/BatchGetCommand"; +import { DynamoDBDocumentClient } from "../DynamoDBDocumentClient"; + +/** + * Async generator that issues {@link BatchGetCommand}s repeatedly until all keys are processed or an error response is received. + * + * @public + * + * @see {@link paginatedBatchGet} for a variant that uses the {@link DynamoDBClient | low level DynamoDB client}. + * + * @example + * + * ```typescript + * const client = new DynamoDBClient(); + * const docClient = DynamoDBDocumentClient.from(client); + * const input: BatchGetCommandInput = { + * RequestItems: { + * table1: Keys: [...], + * table2: Keys: [...], + * } + * }; + * + * let pageNumber = 1; + * for await (const page of paginateBatchGet({ client: docClient }, input)) { + * console.log("page:", pageNumber++); + * console.log("items:", page.Responses); + * console.log("unprocessed:", page.UnprocessedKeys); // will be returned in the next page(s) + * } + * ``` + */ +export async function* paginatedBatchGet( + config: { + client: DynamoDBDocumentClient; + }, + input: BatchGetCommandInput +): AsyncGenerator { + let RequestItems = input.RequestItems; + + while (RequestItems && Object.keys(RequestItems).length > 0) { + const cmd = new BatchGetCommand({ ...input, RequestItems }); + const response = await config.client.send(cmd); + RequestItems = { ...response.UnprocessedKeys }; + + yield response; + } +} diff --git a/lib/lib-dynamodb/src/pagination/index.ts b/lib/lib-dynamodb/src/pagination/index.ts index 844e9cbb4adf4..803dd24fa8217 100644 --- a/lib/lib-dynamodb/src/pagination/index.ts +++ b/lib/lib-dynamodb/src/pagination/index.ts @@ -1,3 +1,4 @@ +export * from "./BatchGetPaginator"; // smithy-typescript generated code export * from "./Interfaces"; export * from "./QueryPaginator"; diff --git a/packages/core/integ/pagination.integ.spec.ts b/packages/core/integ/pagination.integ.spec.ts index 7b23733d4d460..b50afaf92e321 100644 --- a/packages/core/integ/pagination.integ.spec.ts +++ b/packages/core/integ/pagination.integ.spec.ts @@ -1,5 +1,6 @@ import { requireRequestsFrom } from "@aws-sdk/aws-util-test/src"; import { DynamoDB, paginateScan, ScanCommandInput } from "@aws-sdk/client-dynamodb"; +import { BatchGetCommandInput, paginatedBatchGet } from "@aws-sdk/lib-dynamodb"; import { HttpResponse } from "@smithy/protocol-http"; import { describe, expect, test as it } from "vitest"; @@ -94,4 +95,149 @@ describe("pagination", () => { }); expect.assertions(7); }); + + /** + * This test makes a DynamoDB paginated batch get request for 5 items, with keys 1-2-3-4-5, in this exact order. + * + * The first returned page contains items 2 and 1 (order switched to simulate the unpredictability of the order of the + * items returned by the DDB API BatchGetItem command), plus unprocessed keys 3 and 4. The second page contains the + * items 3 and 4, and no further unprocessed keys. + * + * Item 5 is asked for, but we consider that the table does not contain it, so it's not returned at all. That's a + * valid use case and does not generate an error. + * + * In the second part of the test, another paginated request is done for 2 items, with keys 1 and 1. So the same + * item is requested twice. As the API will return an error, we want to catch the generated SDK exception. + */ + it("processes batch items until all items are processed or an error is received", async () => { + const ddb = new DynamoDB({ + credentials: { + accessKeyId: "INTEG_TEST", + secretAccessKey: "INTEG_TEST", + }, + region: "us-west-2", + }); + + requireRequestsFrom(ddb) + .toMatch( + // first page request + { + hostname: /dynamodb/, + body(b) { + expect(b).toContain( + '"RequestItems":{"test":{"Keys":[{"id":{"S":"1"}},{"id":{"S":"2"}},{"id":{"S":"3"}},{"id":{"S":"4"}},{"id":{"S":"5"}}]}}' + ); + }, + }, + // second page request + { + hostname: /dynamodb/, + body(b) { + expect(b).toContain('"RequestItems":{"test":{"Keys":[{"id":{"S":"4"}},{"id":{"S":"3"}}]}}'); + }, + }, + // invalid request (duplicate key) + { + hostname: /dynamodb/, + body(b) { + expect(b).toContain('"RequestItems":{"test":{"Keys":[{"id":{"S":"1"}},{"id":{"S":"1"}}]}}'); + }, + } + ) + .respondWith( + // first page response + new HttpResponse({ + statusCode: 200, + headers: {}, + body: Buffer.from( + JSON.stringify({ + Responses: { + test: [ + { id: { S: "2" }, name: { S: "Item 2" } }, + { id: { S: "1" }, name: { S: "Item 1" } }, + ], + }, + UnprocessedKeys: { + test: { + Keys: [{ id: { S: "4" } }, { id: { S: "3" } }], + }, + }, + }) + ), + }), + // second page response + new HttpResponse({ + statusCode: 200, + headers: {}, + body: Buffer.from( + JSON.stringify({ + Responses: { + test: [ + { id: { S: "3" }, name: { S: "Item 3" } }, + { id: { S: "4" }, name: { S: "Item 4" } }, + ], + }, + UnprocessedKeys: {}, + }) + ), + }), + // error response + new HttpResponse({ + statusCode: 400, + headers: {}, + body: Buffer.from( + JSON.stringify({ + message: "Provided list of item keys contains duplicates", + }) + ), + }) + ); + + const requestParams: BatchGetCommandInput = { + RequestItems: { + test: { Keys: [{ id: "1" }, { id: "2" }, { id: "3" }, { id: "4" }, { id: "5" }] }, + }, + }; + + let pages = 0; + for await (const page of paginatedBatchGet({ client: ddb }, requestParams)) { + pages += 1; + if (pages === 1) { + expect(page.Responses?.test).toEqual([ + { id: "2", name: "Item 2" }, + { id: "1", name: "Item 1" }, + ]); + } else { + expect(page.Responses?.test).toEqual([ + { id: "3", name: "Item 3" }, + { id: "4", name: "Item 4" }, + ]); + } + } + + expect(pages).toEqual(2); + + let thrownError; + + try { + for await (const page of paginatedBatchGet( + { client: ddb }, + { + RequestItems: { + test: { Keys: [{ id: "1" }, { id: "1" }] }, + }, + } + )) { + void page; + throw new Error("Received unexpected page"); + } + } catch (error) { + thrownError = error; + } + + expect(thrownError).toBeInstanceOf(Error); + expect((thrownError as Error).message).toBe("Provided list of item keys contains duplicates"); + + expect.assertions(11); + }); }); diff --git a/packages/util-dynamodb/src/index.ts b/packages/util-dynamodb/src/index.ts index 60877561d3349..6e3fd8beb4f53 100644 --- a/packages/util-dynamodb/src/index.ts +++ b/packages/util-dynamodb/src/index.ts @@ -3,4 +3,5 @@ export * from "./convertToAttr"; export * from "./convertToNative"; export * from "./marshall"; export * from "./models"; +export * from "./paginatedBatchGetItem"; export * from "./unmarshall"; diff --git a/packages/util-dynamodb/src/paginatedBatchGetItem.ts b/packages/util-dynamodb/src/paginatedBatchGetItem.ts new file mode 100644 index 0000000000000..53d5b8ee602dd --- /dev/null +++ b/packages/util-dynamodb/src/paginatedBatchGetItem.ts @@ -0,0 +1,40 @@ +import { BatchGetItemCommand, BatchGetItemCommandInput, DynamoDBClient } from "@aws-sdk/client-dynamodb"; +import { type DynamoDBDocumentClient } from "@aws-sdk/lib-dynamodb"; + +/** + * Async generator that issues {@link BatchGetItemCommand}s repeatedly until all keys are processed or an error response is received. + * + * @public + * + * @see {@link paginatedBatchGetItem} for a variant that uses the {@link DynamoDBDocumentClient | DynamoDB document client}. + * + * @example + * + * ```typescript + * const client = new DynamoDBClient(); + * const input: BatchGetCommandInput = { + * RequestItems: { + * table1: Keys: [...], + * table2: Keys: [...], + * } + * }; + * + * let pageNumber = 1; + * for await (const page of paginateBatchGetItem({ client }, input)) { + * console.log("page:", pageNumber++); + * console.log("items:", page.Responses); + * console.log("unprocessed:", page.UnprocessedKeys); // will be returned in the next page(s) + * } + * ``` + */ +export async function* paginatedBatchGetItem(config: { client: DynamoDBClient }, input: BatchGetItemCommandInput) { + let RequestItems = input.RequestItems; + + while (RequestItems && Object.keys(RequestItems).length > 0) { + const cmd = new BatchGetItemCommand({ ...input, RequestItems }); + const response = await config.client.send(cmd); + RequestItems = { ...response.UnprocessedKeys }; + + yield response; + } +}