From 9870e8963214bf6d61bd57884b63bc1f6c57088c Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 27 Nov 2025 10:44:33 +0100 Subject: [PATCH] Streams: Support subqueries with multiple parameters --- .changeset/old-buckets-lay.md | 5 + packages/sync-rules/src/streams/from_sql.ts | 59 ++++++++-- packages/sync-rules/src/streams/variant.ts | 6 +- packages/sync-rules/test/src/streams.test.ts | 116 +++++++++++++++++++ 4 files changed, 177 insertions(+), 9 deletions(-) create mode 100644 .changeset/old-buckets-lay.md diff --git a/.changeset/old-buckets-lay.md b/.changeset/old-buckets-lay.md new file mode 100644 index 000000000..ef11cd25d --- /dev/null +++ b/.changeset/old-buckets-lay.md @@ -0,0 +1,5 @@ +--- +'@powersync/service-sync-rules': patch +--- + +Fix compiling streams with multiple parameter matchers in a subquery. diff --git a/packages/sync-rules/src/streams/from_sql.ts b/packages/sync-rules/src/streams/from_sql.ts index ef7cfebac..ba325167a 100644 --- a/packages/sync-rules/src/streams/from_sql.ts +++ b/packages/sync-rules/src/streams/from_sql.ts @@ -407,18 +407,63 @@ class SyncStreamCompiler { tools.error('This subquery must return exactly one column', query); } - const column = tools.compileRowValueExtractor(query.columns?.[0]?.expr); - if (isClauseError(column)) { + const columnOrError = tools.compileRowValueExtractor(query.columns?.[0]?.expr); + if (isClauseError(columnOrError)) { return; } + const column = columnOrError; - const where = tools.compileClause(query.where); + const where = this.whereClauseToFilters(tools, query.where); + const filter = where.toDisjunctiveNormalForm(tools); + function checkValidSubqueryFilter( + operator: FilterOperator + ): CompareRowValueWithStreamParameter | EvaluateSimpleCondition | null { + if (operator instanceof CompareRowValueWithStreamParameter || operator instanceof EvaluateSimpleCondition) { + return operator; + } + + tools.error('Unsupported condition for stream subqueries', operator.location ?? undefined); + return null; + } + + function constructSubquery(filter: FilterOperator) { + if (filter instanceof Or) { + // Subqueries can't have variants, so the DNF must be a single conjunction. + if (filter.inner.length != 1) { + tools.error("Stream subqueries can't use OR filters", filter.location ?? undefined); + } + + return constructSubquery(filter.inner[0]); + } else if (filter instanceof And) { + const first = checkValidSubqueryFilter(filter.inner[0]); + if (!first) { + return; + } + const subquery = new Subquery(sourceTable, column, first); + for (const rest of filter.inner.slice(1)) { + const checked = checkValidSubqueryFilter(rest); + if (checked) { + subquery.addFilter(checked); + } + } + + return subquery; + } else { + const validated = checkValidSubqueryFilter(filter); + if (validated) { + return new Subquery(sourceTable, column, validated); + } + } + } + + const compiledSubquery = constructSubquery(filter); this.errors.push(...tools.errors); - return [ - new Subquery(sourceTable, column, this.compiledClauseToFilter(tools, query.where?._location, where)), - tools - ]; + + if (!compiledSubquery) { + return; + } + return [compiledSubquery, tools]; } private checkValidSelectStatement(stmt: Statement) { diff --git a/packages/sync-rules/src/streams/variant.ts b/packages/sync-rules/src/streams/variant.ts index 825d3a306..1e9c7097e 100644 --- a/packages/sync-rules/src/streams/variant.ts +++ b/packages/sync-rules/src/streams/variant.ts @@ -214,8 +214,10 @@ export class StreamVariant { return []; } - // This will be an array of values (i.e. a total evaluation) because there are no dynamic parameters. - return this.partiallyEvaluateParameters(params) as SqliteJsonValue[][]; + return this.cartesianProductOfParameterInstantiations( + // This will be an array of values (i.e. a total evaluation) because there are no dynamic parameters. + this.partiallyEvaluateParameters(params) as SqliteJsonValue[][] + ); } /** diff --git a/packages/sync-rules/test/src/streams.test.ts b/packages/sync-rules/test/src/streams.test.ts index 855f21f88..0be75a505 100644 --- a/packages/sync-rules/test/src/streams.test.ts +++ b/packages/sync-rules/test/src/streams.test.ts @@ -511,6 +511,33 @@ describe('streams', () => { ]); }); + test('OR in subquery', () => { + const [_, errors] = syncStreamFromSql( + 's', + `select * from comments where issue_id in (select id from issues where owner_id = auth.user_id() or name = 'test')`, + options + ); + + expect(errors).toMatchObject([ + expect.toBeSqlRuleError(`Stream subqueries can't use OR filters`, `owner_id = auth.user_id() or name = 'test'`) + ]); + }); + + test('nested subqueries', () => { + const [_, errors] = syncStreamFromSql( + 's', + `select * from comments where issue_id in (select id from issues where owner_id in (select id from users where is_admin))`, + options + ); + + expect(errors).toMatchObject([ + expect.toBeSqlRuleError( + `Unsupported condition for stream subqueries`, + `owner_id in (select id from users where is_admin` + ) + ]); + }); + test('subquery with two columns', () => { const [_, errors] = syncStreamFromSql( 's', @@ -719,6 +746,95 @@ describe('streams', () => { stream.resolveResultSets(schema, outputSchema); expect(Object.keys(outputSchema)).toStrictEqual(['outer']); }); + + test('multiple matchers in subquery', async () => { + // https://discord.com/channels/1138230179878154300/1422138173907144724/1443338137660031117 + const scene = new TestSourceTable('Scene'); + const projectInvitation = new TestSourceTable('ProjectInvitation'); + const schema = new StaticSchema([ + { + tag: DEFAULT_TAG, + schemas: [ + { + name: 'test_schema', + tables: [ + { + name: 'Scene', + columns: [ + { name: '_id', pg_type: 'uuid' }, + { name: 'project', pg_type: 'uuid' } + ] + }, + { + name: 'ProjectInvitation', + columns: [ + { name: 'project', pg_type: 'uuid' }, + { name: 'appliedTo', pg_type: 'text' }, + { name: 'appliedTo', pg_type: 'text' }, + { name: 'status', pg_type: 'text' } + ] + } + ] + } + ] + } + ]); + + const desc = parseStream( + `SELECT _id as id, * +FROM "Scene" +WHERE + project IN ( + SELECT project + FROM "ProjectInvitation" + WHERE "appliedTo" != '' + AND (auth.parameters() ->> 'haystack_id') IN "appliedTo" + AND project = subscription.parameter('project') + AND "status" = 'CLAIMED' + ) + `, + 'stream', + { ...options, schema } + ); + + expect(evaluateBucketIds(desc, scene, { _id: 'scene', project: 'foo' })).toStrictEqual(['1#stream|0["foo"]']); + + expect( + desc.evaluateParameterRow(projectInvitation, { + project: 'foo', + appliedTo: '[1,2]', + status: 'CLAIMED' + }) + ).toStrictEqual([ + { + lookup: ParameterLookup.normalized('stream', '0', [1n, 'foo']), + bucketParameters: [ + { + result: 'foo' + } + ] + }, + { + lookup: ParameterLookup.normalized('stream', '0', [2n, 'foo']), + bucketParameters: [ + { + result: 'foo' + } + ] + } + ]); + + expect( + await queryBucketIds(desc, { + token: { sub: 'user1', haystack_id: 1 }, + parameters: { project: 'foo' }, + getParameterSets(lookups) { + expect(lookups).toStrictEqual([ParameterLookup.normalized('stream', '0', [1n, 'foo'])]); + return [{ result: 'foo' }]; + } + }) + ).toStrictEqual(['1#stream|0["foo"]']); + }); }); });