From 54376ef479bc7414fd2345964f7361d3fe86dc18 Mon Sep 17 00:00:00 2001 From: Jorge Cortes Date: Thu, 27 Nov 2025 11:30:58 -0500 Subject: [PATCH 1/2] [ACTION] Google Sheets - Provide polling options as an alternative to current webhook based ones --- components/google_sheets/package.json | 2 +- .../sources/common/new-comment.mjs | 36 ++++++++++ .../new-comment-polling.mjs | 48 +++++++++++++ .../new-comment-polling/test-event.mjs | 21 ++++++ .../new-updates-polling.mjs | 54 ++++++++++++++ .../new-updates-polling/test-event.mjs | 71 +++++++++++++++++++ 6 files changed, 231 insertions(+), 1 deletion(-) create mode 100644 components/google_sheets/sources/common/new-comment.mjs create mode 100644 components/google_sheets/sources/new-comment-polling/new-comment-polling.mjs create mode 100644 components/google_sheets/sources/new-comment-polling/test-event.mjs create mode 100644 components/google_sheets/sources/new-updates-polling/new-updates-polling.mjs create mode 100644 components/google_sheets/sources/new-updates-polling/test-event.mjs diff --git a/components/google_sheets/package.json b/components/google_sheets/package.json index dc6c3074ffe3e..73ac86c14ce26 100644 --- a/components/google_sheets/package.json +++ b/components/google_sheets/package.json @@ -1,6 +1,6 @@ { "name": "@pipedream/google_sheets", - "version": "0.12.0", + "version": "0.13.0", "description": "Pipedream Google_sheets Components", "main": "google_sheets.app.mjs", "keywords": [ diff --git a/components/google_sheets/sources/common/new-comment.mjs b/components/google_sheets/sources/common/new-comment.mjs new file mode 100644 index 0000000000000..c7b9433db58b2 --- /dev/null +++ b/components/google_sheets/sources/common/new-comment.mjs @@ -0,0 +1,36 @@ +export default { + methods: { + _getLastTs() { + return this.db.get("lastTs"); + }, + _setLastTs(lastTs) { + this.db.set("lastTs", lastTs); + }, + generateMeta(comment) { + return { + id: comment.id, + summary: `New Comment: ${comment.content}`, + ts: Date.parse(comment.createdTime), + }; + }, + getSheetId() { + return this.sheetID.toString(); + }, + async processSpreadsheet() { + const comments = []; + const lastTs = this._getLastTs(); + const results = this.googleSheets.listComments(this.sheetID, lastTs); + for await (const comment of results) { + comments.push(comment); + } + if (!comments.length) { + return; + } + this._setLastTs(comments[0].createdTime); + comments.reverse().forEach((comment) => { + const meta = this.generateMeta(comment); + this.$emit(comment, meta); + }); + }, + }, +}; diff --git a/components/google_sheets/sources/new-comment-polling/new-comment-polling.mjs b/components/google_sheets/sources/new-comment-polling/new-comment-polling.mjs new file mode 100644 index 0000000000000..43f17b86b4e49 --- /dev/null +++ b/components/google_sheets/sources/new-comment-polling/new-comment-polling.mjs @@ -0,0 +1,48 @@ +import googleSheets from "../../google_sheets.app.mjs"; +import common from "../common/new-comment.mjs"; +import base from "../common/http-based/base.mjs"; +import { DEFAULT_POLLING_SOURCE_TIMER_INTERVAL } from "@pipedream/platform"; + +export default { + ...common, + key: "google_sheets-new-comment-polling", + name: "New Comment", + description: "Emit new event each time a comment is added to a spreadsheet.", + version: "0.0.1", + dedupe: "unique", + type: "source", + props: { + googleSheets, + db: "$.service.db", + timer: { + type: "$.interface.timer", + static: { + intervalSeconds: DEFAULT_POLLING_SOURCE_TIMER_INTERVAL, + }, + }, + watchedDrive: { + propDefinition: [ + googleSheets, + "watchedDrive", + ], + description: "Defaults to My Drive. To select a [Shared Drive](https://support.google.com/a/users/answer/9310351) instead, select it from this list.", + }, + sheetID: { + propDefinition: [ + googleSheets, + "sheetID", + (c) => ({ + driveId: googleSheets.methods.getDriveId(c.watchedDrive), + }), + ], + }, + ...common.props, + }, + methods: { + ...base.methods, + ...common.methods, + }, + async run() { + return this.processSpreadsheet(); + }, +}; diff --git a/components/google_sheets/sources/new-comment-polling/test-event.mjs b/components/google_sheets/sources/new-comment-polling/test-event.mjs new file mode 100644 index 0000000000000..0eeb905bff652 --- /dev/null +++ b/components/google_sheets/sources/new-comment-polling/test-event.mjs @@ -0,0 +1,21 @@ +export default { + "id": "AAABM3vICvg", + "kind": "drive#comment", + "createdTime": "2024-05-08T21:32:04.823Z", + "modifiedTime": "2024-05-08T21:32:04.823Z", + "anchor": "{\"type\":\"workbook-range\",\"uid\":0,\"range\":\"1600938329\"}", + "replies": [], + "author": { + "displayName": "Test User", + "kind": "drive#user", + "me": true, + "photoLink": "//lh3.googleusercontent.com/a/ACg8ocKv3FxHiUdLT981ghC9w01W50yqe5fi2XWOSA4TgnZf8pCxmg=s50-c-k-no" + }, + "deleted": false, + "htmlContent": "comment", + "content": "comment", + "quotedFileContent": { + "mimeType": "text/html", + "value": "1" + } +} diff --git a/components/google_sheets/sources/new-updates-polling/new-updates-polling.mjs b/components/google_sheets/sources/new-updates-polling/new-updates-polling.mjs new file mode 100644 index 0000000000000..4e4119ebd7b4f --- /dev/null +++ b/components/google_sheets/sources/new-updates-polling/new-updates-polling.mjs @@ -0,0 +1,54 @@ +import googleSheets from "../../google_sheets.app.mjs"; +import common from "../common/new-updates.mjs"; +import base from "../common/http-based/base.mjs"; +import { DEFAULT_POLLING_SOURCE_TIMER_INTERVAL } from "@pipedream/platform"; + +export default { + ...common, + key: "google_sheets-new-updates-polling", + name: "New Updates", + description: "Emit new event each time a row or cell is updated in a spreadsheet.", + version: "0.0.1", + dedupe: "unique", + type: "source", + props: { + googleSheets, + db: "$.service.db", + timer: { + type: "$.interface.timer", + static: { + intervalSeconds: DEFAULT_POLLING_SOURCE_TIMER_INTERVAL, + }, + }, + watchedDrive: { + propDefinition: [ + googleSheets, + "watchedDrive", + ], + description: "Defaults to My Drive. To select a [Shared Drive](https://support.google.com/a/users/answer/9310351) instead, select it from this list.", + }, + sheetID: { + propDefinition: [ + googleSheets, + "sheetID", + (c) => ({ + driveId: googleSheets.methods.getDriveId(c.watchedDrive), + }), + ], + }, + ...common.props, + }, + methods: { + ...base.methods, + ...common.methods, + }, + hooks: { + async deploy() { + await this.takeSheetSnapshot(); + }, + }, + async run() { + const spreadsheet = await this.googleSheets.getSpreadsheet(this.sheetID); + return this.processSpreadsheet(spreadsheet); + }, +}; diff --git a/components/google_sheets/sources/new-updates-polling/test-event.mjs b/components/google_sheets/sources/new-updates-polling/test-event.mjs new file mode 100644 index 0000000000000..248273a67e7de --- /dev/null +++ b/components/google_sheets/sources/new-updates-polling/test-event.mjs @@ -0,0 +1,71 @@ +export default { + "worksheet": { + "properties": { + "sheetId": 358595775, + "title": "Test", + "index": 0, + "sheetType": "GRID", + "gridProperties": { + "rowCount": 1029, + "columnCount": 50 + } + } + }, + "currentValues": { + "values": [ + [ + "1", + "Leo 1 Updated" + ], + [ + "2", + "Leo 2" + ], + [ + "3", + "Leo 3" + ], + [ + "4", + "Updated" + ], + [ + "5", + "Updated 0857" + ], + [ + "6", + "Update 0858" + ] + ], + "range": "Test!A1:AX1029", + "majorDimension": "ROWS" + }, + "changes": [ + { + "cell": "B:4", + "previous_value": "", + "new_value": "Updated" + }, + { + "cell": "A:5", + "previous_value": "", + "new_value": "5" + }, + { + "cell": "B:5", + "previous_value": "", + "new_value": "Updated 0857" + }, + { + "cell": "A:6", + "previous_value": "", + "new_value": "6" + }, + { + "cell": "B:6", + "previous_value": "", + "new_value": "Update 0858" + } + ] +} From 251e796d19fe737c1e22679b140c927c58c70ba5 Mon Sep 17 00:00:00 2001 From: Jorge Cortes Date: Tue, 2 Dec 2025 14:38:38 -0500 Subject: [PATCH 2/2] Fixing OOM error in new-updates source component --- .../sources/common/new-updates.mjs | 84 +++++++++++-- .../new-updates-polling.mjs | 117 +++++++++++++++++- .../sources/new-updates/new-updates.mjs | 2 +- 3 files changed, 190 insertions(+), 13 deletions(-) diff --git a/components/google_sheets/sources/common/new-updates.mjs b/components/google_sheets/sources/common/new-updates.mjs index a3834c9ab74fb..381dcf291587f 100644 --- a/components/google_sheets/sources/common/new-updates.mjs +++ b/components/google_sheets/sources/common/new-updates.mjs @@ -20,8 +20,21 @@ export default { label: "Worksheet ID(s)", description: "Select one or more worksheet(s), or provide an array of worksheet IDs.", }, + maxRows: { + type: "integer", + label: "Max Rows to Monitor", + description: "Maximum number of rows to monitor for changes. Defaults to 10000. Increase with caution as larger values may cause memory issues.", + optional: true, + default: 10000, + }, }, methods: { + getBatchSize() { + return 1000; // Process 1000 rows at a time + }, + getMaxRows() { + return this.maxRows || 10000; + }, getMeta(spreadsheet, worksheet) { const { sheetId: worksheetId, @@ -56,6 +69,9 @@ export default { getWorksheetIds() { return this.worksheetIDs.map((i) => i.toString()); }, + _getBatchKey(baseId, batchIndex) { + return `${baseId}_batch_${batchIndex}`; + }, _getSheetValues(id) { const stringBuffer = this.db.get(id); @@ -72,6 +88,51 @@ export default { const stringBuffer = compressed.toString("base64"); this.db.set(id, stringBuffer); }, + _getBatchedSheetValues(baseId) { + const allValues = []; + let batchIndex = 0; + let hasMore = true; + + while (hasMore) { + const batchKey = this._getBatchKey(baseId, batchIndex); + const batchValues = this._getSheetValues(batchKey); + + if (!batchValues) { + hasMore = false; + break; + } + + allValues.push(...batchValues); + batchIndex++; + } + + return allValues.length > 0 + ? allValues + : null; + }, + _setBatchedSheetValues(baseId, sheetValues) { + const batchSize = this.getBatchSize(); + const maxRows = this.getMaxRows(); + + // Limit to maxRows + const limitedValues = sheetValues.slice(0, maxRows); + + // Clear old batches first + let batchIndex = 0; + while (this.db.get(this._getBatchKey(baseId, batchIndex))) { + this.db.set(this._getBatchKey(baseId, batchIndex), undefined); + batchIndex++; + } + + // Store in batches + batchIndex = 0; + for (let i = 0; i < limitedValues.length; i += batchSize) { + const batch = limitedValues.slice(i, i + batchSize); + const batchKey = this._getBatchKey(baseId, batchIndex); + this._setSheetValues(batchKey, batch); + batchIndex++; + } + }, indexToColumnLabel(index) { let columnLabel = ""; while (index >= 0) { @@ -133,10 +194,8 @@ export default { }, async getContentDiff(spreadsheet, worksheet) { const sheetId = this.getSheetId(); - const oldValues = - this._getSheetValues( - `${spreadsheet.spreadsheetId}${worksheet.properties.sheetId}`, - ) || null; + const baseId = `${spreadsheet.spreadsheetId}${worksheet.properties.sheetId}`; + const oldValues = this._getBatchedSheetValues(baseId) || null; const currentValues = await this.googleSheets.getSpreadsheetValues( sheetId, worksheet.properties.title, @@ -169,9 +228,11 @@ export default { continue; } - const offsetLength = Math.max(values.length - offset, 0); + const maxRows = this.getMaxRows(); + const offsetLength = Math.max(Math.min(values.length, maxRows) - offset, 0); const offsetValues = values.slice(0, offsetLength); - this._setSheetValues(`${sheetId}${worksheetId}`, offsetValues); + const baseId = `${sheetId}${worksheetId}`; + this._setBatchedSheetValues(baseId, offsetValues); } }, async processSpreadsheet(spreadsheet) { @@ -191,8 +252,11 @@ export default { spreadsheet, worksheet, ); - const newValues = currentValues.values || []; + const maxRows = this.getMaxRows(); + const rawNewValues = currentValues.values || []; + const newValues = rawNewValues.slice(0, maxRows); let changes = []; + // check if there are differences in the spreadsheet values if (JSON.stringify(oldValues) !== JSON.stringify(newValues)) { let rowCount = this.getRowCount(newValues, oldValues); @@ -214,10 +278,8 @@ export default { this.getMeta(spreadsheet, worksheet), ); } - this._setSheetValues( - `${spreadsheet.spreadsheetId}${worksheet.properties.sheetId}`, - newValues || [], - ); + const baseId = `${spreadsheet.spreadsheetId}${worksheet.properties.sheetId}`; + this._setBatchedSheetValues(baseId, newValues || []); } }, }, diff --git a/components/google_sheets/sources/new-updates-polling/new-updates-polling.mjs b/components/google_sheets/sources/new-updates-polling/new-updates-polling.mjs index 4e4119ebd7b4f..21e94235df850 100644 --- a/components/google_sheets/sources/new-updates-polling/new-updates-polling.mjs +++ b/components/google_sheets/sources/new-updates-polling/new-updates-polling.mjs @@ -20,6 +20,12 @@ export default { intervalSeconds: DEFAULT_POLLING_SOURCE_TIMER_INTERVAL, }, }, + // eslint-disable-next-line pipedream/props-label, pipedream/props-description + alert: { + type: "alert", + alertType: "info", + content: "**Important**: If your sheet has more than 1000 rows, please set the **Monitoring Range** below to avoid performance issues and potential disruptions. Example: `A1:Z1000` to monitor the first 1000 rows with columns A through Z.", + }, watchedDrive: { propDefinition: [ googleSheets, @@ -36,11 +42,120 @@ export default { }), ], }, - ...common.props, + worksheetIDs: { + propDefinition: [ + googleSheets, + "worksheetIDs", + (c) => ({ + sheetId: c.sheetID, + }), + ], + type: "integer[]", + label: "Worksheet ID(s)", + description: "Select one or more worksheet(s), or provide an array of worksheet IDs.", + }, + monitoringRange: { + type: "string", + label: "Monitoring Range", + description: "The A1 notation range to monitor for changes (e.g., `A1:B100` or `Sheet1!A1:Z1000`). If not specified, the entire sheet will be monitored up to 10000 rows. **Recommended for sheets with more than 1000 rows**.", + optional: true, + }, }, methods: { ...base.methods, ...common.methods, + getMonitoringRange() { + return this.monitoringRange; + }, + getRowCount(newValues, oldValues) { + return Math.max(newValues.length, oldValues?.length || 0); + }, + getColCount(newValues, oldValues, i) { + let colCount = 0; + if ( + typeof newValues[i] === "undefined" && + typeof oldValues?.[i] !== "undefined" + ) + colCount = oldValues[i].length; + else if ( + typeof oldValues?.[i] === "undefined" && + typeof newValues[i] !== "undefined" + ) + colCount = newValues[i].length; + else if (newValues[i] && oldValues?.[i]) + colCount = + newValues[i].length > oldValues[i].length + ? newValues[i].length + : oldValues[i].length; + return colCount; + }, + getContentChanges(colCount, newValues, oldValues, changes, i) { + // loop through comparing the values of each cell + for (let j = 0; j < colCount; j++) { + let newValue = + typeof newValues[i] !== "undefined" && + typeof newValues[i][j] !== "undefined" + ? newValues[i][j] + : ""; + let oldValue = + typeof oldValues?.[i] !== "undefined" && + typeof oldValues[i][j] !== "undefined" + ? oldValues[i][j] + : ""; + if (newValue !== oldValue) { + changes.push({ + cell: `${this.indexToColumnLabel(j)}:${i + 1}`, + previous_value: oldValue, + new_value: newValue, + }); + } + } + return changes; + }, + async takeSheetSnapshot(offset = 0) { + const sheetId = this.getSheetId(); + + // If monitoring range is specified, use it to get values + if (this.monitoringRange) { + const currentValues = await this.googleSheets.getSpreadsheetValues( + sheetId, + this.monitoringRange, + ); + + // Get the worksheet ID from the spreadsheet + const spreadsheet = await this.googleSheets.getSpreadsheet(sheetId); + const worksheet = spreadsheet.sheets[0]; // Use first worksheet or extract from range + const worksheetId = worksheet.properties.sheetId; + + const values = currentValues.values || []; + const offsetLength = Math.max(values.length - offset, 0); + const offsetValues = values.slice(0, offsetLength); + const baseId = `${sheetId}${worksheetId}`; + this._setBatchedSheetValues(baseId, offsetValues); + } else { + // Fall back to default behavior + return common.methods.takeSheetSnapshot.call(this, offset); + } + }, + async getContentDiff(spreadsheet, worksheet) { + const sheetId = this.getSheetId(); + const baseId = `${spreadsheet.spreadsheetId}${worksheet.properties.sheetId}`; + const oldValues = this._getBatchedSheetValues(baseId) || null; + + // Use monitoring range if specified, otherwise use worksheet title + const range = this.monitoringRange + ? this.monitoringRange + : worksheet.properties.title; + + const currentValues = await this.googleSheets.getSpreadsheetValues( + sheetId, + range, + ); + return { + oldValues, + currentValues, + }; + }, }, hooks: { async deploy() { diff --git a/components/google_sheets/sources/new-updates/new-updates.mjs b/components/google_sheets/sources/new-updates/new-updates.mjs index 99583bda991cb..957154c7370a9 100644 --- a/components/google_sheets/sources/new-updates/new-updates.mjs +++ b/components/google_sheets/sources/new-updates/new-updates.mjs @@ -9,7 +9,7 @@ export default { type: "source", name: "New Updates (Instant)", description: "Emit new event each time a row or cell is updated in a spreadsheet.", - version: "0.3.3", + version: "0.3.4", dedupe: "unique", props: { ...httpBase.props,