From 06847c65a616278732f8564bc4298714ff7d4ac6 Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Thu, 20 Nov 2025 16:56:52 +0200 Subject: [PATCH 01/10] add app metadata --- .changeset/seven-chefs-promise.md | 5 ++ .../src/routes/endpoints/socket-route.ts | 5 +- .../src/routes/endpoints/sync-stream.ts | 11 ++- packages/service-core/src/sync/sync.ts | 20 ++--- .../service-core/src/util/param-logging.ts | 40 +++++++++ .../service-core/src/util/protocol-types.ts | 9 +- .../test/src/routes/stream.test.ts | 86 ++++++++++++++++++- 7 files changed, 159 insertions(+), 17 deletions(-) create mode 100644 .changeset/seven-chefs-promise.md create mode 100644 packages/service-core/src/util/param-logging.ts diff --git a/.changeset/seven-chefs-promise.md b/.changeset/seven-chefs-promise.md new file mode 100644 index 000000000..8583b0f33 --- /dev/null +++ b/.changeset/seven-chefs-promise.md @@ -0,0 +1,5 @@ +--- +'@powersync/service-core': minor +--- + +Added ability to specify app_metada for sync/stream requests diff --git a/packages/service-core/src/routes/endpoints/socket-route.ts b/packages/service-core/src/routes/endpoints/socket-route.ts index 35e51e027..625df514e 100644 --- a/packages/service-core/src/routes/endpoints/socket-route.ts +++ b/packages/service-core/src/routes/endpoints/socket-route.ts @@ -6,6 +6,7 @@ import { SocketRouteGenerator } from '../router-socket.js'; import { SyncRoutes } from './sync-stream.js'; import { APIMetric, event_types } from '@powersync/service-types'; +import { formatParamsForLogging } from '../../util/param-logging.js'; export const syncStreamReactive: SocketRouteGenerator = (router) => router.reactiveStream(SyncRoutes.STREAM, { @@ -19,7 +20,8 @@ export const syncStreamReactive: SocketRouteGenerator = (router) => ...logger.defaultMeta, user_id: context.token_payload?.sub, client_id: params.client_id, - user_agent: context.user_agent + user_agent: context.user_agent, + app_metadata: params.applicationMetadata ? formatParamsForLogging(params.applicationMetadata ?? {}) : undefined }; const sdkData: event_types.ConnectedUserData & event_types.ClientConnectionEventData = { @@ -97,6 +99,7 @@ export const syncStreamReactive: SocketRouteGenerator = (router) => // Must be set before we start the stream tracker.setCompressed(connection.tracker.encoding); } + try { for await (const data of sync.streamResponse({ syncContext: syncContext, diff --git a/packages/service-core/src/routes/endpoints/sync-stream.ts b/packages/service-core/src/routes/endpoints/sync-stream.ts index d9cb3e406..ec63af2bc 100644 --- a/packages/service-core/src/routes/endpoints/sync-stream.ts +++ b/packages/service-core/src/routes/endpoints/sync-stream.ts @@ -5,10 +5,11 @@ import { Readable } from 'stream'; import * as sync from '../../sync/sync-index.js'; import * as util from '../../util/util-index.js'; +import { APIMetric, event_types } from '@powersync/service-types'; import { authUser } from '../auth.js'; import { routeDefinition } from '../router.js'; -import { APIMetric, event_types } from '@powersync/service-types'; +import { formatParamsForLogging } from '../../util/param-logging.js'; import { maybeCompressResponseStream } from '../compression.js'; export enum SyncRoutes { @@ -42,7 +43,10 @@ export const syncStreamed = routeDefinition({ user_agent: userAgent, client_id: clientId, user_id: payload.context.user_id, - bson: useBson + bson: useBson, + app_metadata: payload.params.applicationMetadata + ? formatParamsForLogging(payload.params.applicationMetadata) + : undefined }; const sdkData: event_types.ConnectedUserData & event_types.ClientConnectionEventData = { client_id: clientId ?? '', @@ -75,6 +79,7 @@ export const syncStreamed = routeDefinition({ const controller = new AbortController(); const tracker = new sync.RequestTracker(metricsEngine); + try { metricsEngine.getUpDownCounter(APIMetric.CONCURRENT_CONNECTIONS).add(1); service_context.eventsEngine.emit(event_types.EventsEngineEventType.SDK_CONNECT_EVENT, sdkData); @@ -133,7 +138,7 @@ export const syncStreamed = routeDefinition({ return new router.RouterResponse({ status: 200, headers: { - 'Content-Type': useBson ? concatenatedBsonContentType : ndJsonContentType, + 'Content-Type': 'text/event-stream', // useBson ? concatenatedBsonContentType : ndJsonContentType, ...encodingHeaders }, data: stream, diff --git a/packages/service-core/src/sync/sync.ts b/packages/service-core/src/sync/sync.ts index 32211f044..cdb9e9284 100644 --- a/packages/service-core/src/sync/sync.ts +++ b/packages/service-core/src/sync/sync.ts @@ -1,11 +1,5 @@ import { JSONBig, JsonContainer } from '@powersync/service-jsonbig'; -import { - BucketDescription, - BucketPriority, - RequestJwtPayload, - RequestParameters, - SqlSyncRules -} from '@powersync/service-sync-rules'; +import { BucketDescription, BucketPriority, RequestJwtPayload } from '@powersync/service-sync-rules'; import { AbortError } from 'ix/aborterror.js'; @@ -14,11 +8,12 @@ import * as storage from '../storage/storage-index.js'; import * as util from '../util/util-index.js'; import { Logger, logger as defaultLogger } from '@powersync/lib-services-framework'; -import { BucketChecksumState, CheckpointLine, VersionedSyncRules } from './BucketChecksumState.js'; import { mergeAsyncIterables } from '../streams/streams-index.js'; -import { acquireSemaphoreAbortable, settledPromise, tokenStream, TokenStreamOptions } from './util.js'; -import { SyncContext } from './SyncContext.js'; +import { formatParamsForLogging } from '../util/param-logging.js'; +import { BucketChecksumState, CheckpointLine, VersionedSyncRules } from './BucketChecksumState.js'; import { OperationsSentStats, RequestTracker, statsForBatch } from './RequestTracker.js'; +import { SyncContext } from './SyncContext.js'; +import { TokenStreamOptions, acquireSemaphoreAbortable, settledPromise, tokenStream } from './util.js'; export interface SyncStreamParameters { syncContext: SyncContext; @@ -53,6 +48,11 @@ export async function* streamResponse( } = options; const logger = options.logger ?? defaultLogger; + logger.info('Sync stream started', { + client_params: params.parameters ? formatParamsForLogging(params.parameters) : undefined, + streams: params.streams?.subscriptions.map((subscription) => subscription.stream) + }); + // We also need to be able to abort, so we create our own controller. const controller = new AbortController(); if (signal) { diff --git a/packages/service-core/src/util/param-logging.ts b/packages/service-core/src/util/param-logging.ts new file mode 100644 index 000000000..51a67aa4a --- /dev/null +++ b/packages/service-core/src/util/param-logging.ts @@ -0,0 +1,40 @@ +export type ParamLoggingFormatOptions = { + maxKeyCount: number; + maxStringLength: number; +}; + +export const DEFAULT_PARAM_LOGGING_FORMAT_OPTIONS: ParamLoggingFormatOptions = { + maxKeyCount: 10, + maxStringLength: 20 +}; + +export function formatParamsForLogging(params: Record, options: Partial = {}) { + const { + maxStringLength = DEFAULT_PARAM_LOGGING_FORMAT_OPTIONS.maxStringLength, + maxKeyCount = DEFAULT_PARAM_LOGGING_FORMAT_OPTIONS.maxKeyCount + } = options; + + function trimString(value: string): string { + if (value.length > maxStringLength) { + return value.slice(0, maxStringLength - 3) + '...'; + } + return value; + } + + return Object.fromEntries( + Object.entries(params).map(([key, value], index) => { + if (index == maxKeyCount) { + return ['[...]', '[...]']; + } + + if (index > maxKeyCount) { + return []; + } + + if (typeof value === 'string') { + return [key, trimString(value)]; + } + return [key, trimString(JSON.stringify(value))]; + }) + ); +} diff --git a/packages/service-core/src/util/protocol-types.ts b/packages/service-core/src/util/protocol-types.ts index 4a6410b4c..270646703 100644 --- a/packages/service-core/src/util/protocol-types.ts +++ b/packages/service-core/src/util/protocol-types.ts @@ -1,6 +1,6 @@ -import * as t from 'ts-codec'; -import { BucketPriority, SqliteJsonRow } from '@powersync/service-sync-rules'; import { JsonContainer } from '@powersync/service-jsonbig'; +import { BucketPriority, SqliteJsonRow } from '@powersync/service-sync-rules'; +import * as t from 'ts-codec'; export const BucketRequest = t.object({ name: t.string, @@ -81,6 +81,11 @@ export const StreamingSyncRequest = t.object({ */ parameters: t.record(t.any).optional(), + /** + * Application metadata to be used in logging. + */ + applicationMetadata: t.record(t.string).optional(), + /** * Unique client id. */ diff --git a/packages/service-core/test/src/routes/stream.test.ts b/packages/service-core/test/src/routes/stream.test.ts index 724437b11..daf6885cc 100644 --- a/packages/service-core/test/src/routes/stream.test.ts +++ b/packages/service-core/test/src/routes/stream.test.ts @@ -1,10 +1,12 @@ import { BasicRouterRequest, Context, SyncRulesBucketStorage } from '@/index.js'; -import { logger, RouterResponse, ServiceError } from '@powersync/lib-services-framework'; +import { RouterResponse, ServiceError, logger } from '@powersync/lib-services-framework'; import { SqlSyncRules } from '@powersync/service-sync-rules'; import { Readable, Writable } from 'stream'; import { pipeline } from 'stream/promises'; import { describe, expect, it } from 'vitest'; +import winston from 'winston'; import { syncStreamed } from '../../../src/routes/endpoints/sync-stream.js'; +import { DEFAULT_PARAM_LOGGING_FORMAT_OPTIONS, formatParamsForLogging } from '../../../src/util/param-logging.js'; import { mockServiceContext } from './mocks.js'; describe('Stream Route', () => { @@ -77,6 +79,88 @@ describe('Stream Route', () => { const r = await drainWithTimeout(stream).catch((error) => error); expect(r.message).toContain('Simulated storage error'); }); + + it('logs the application metadata', async () => { + const storage = { + getParsedSyncRules() { + return new SqlSyncRules('bucket_definitions: {}'); + }, + watchCheckpointChanges: async function* (options) { + throw new Error('Simulated storage error'); + } + } as Partial; + const serviceContext = mockServiceContext(storage); + + // Create a custom format to capture log info objects (which include defaultMeta) + // Winston merges defaultMeta into the info object during formatting + const capturedLogs: any[] = []; + const captureFormat = winston.format((info) => { + // Capture the info object which includes defaultMeta merged in + capturedLogs.push({ ...info }); + return info; + }); + + // Create a test logger with the capture format + const testLogger = winston.createLogger({ + format: winston.format.combine(captureFormat(), winston.format.json()), + transports: [new winston.transports.Console()] + }); + + const context: Context = { + logger: testLogger, + service_context: serviceContext, + token_payload: { + exp: new Date().getTime() / 1000 + 10000, + iat: new Date().getTime() / 1000 - 10000, + sub: 'test-user' + } + }; + + const request: BasicRouterRequest = { + headers: { + 'accept-encoding': 'gzip' + }, + hostname: '', + protocol: 'http' + }; + + const inputMeta = { + test: 'test', + long_meta: 'a'.repeat(1000) + }; + + const response = await (syncStreamed.handler({ + context, + params: { + applicationMetadata: inputMeta, + parameters: { + user_name: 'bob' + } + }, + request + }) as Promise); + expect(response.status).toEqual(200); + const stream = response.data as Readable; + const r = await drainWithTimeout(stream).catch((error) => error); + expect(r.message).toContain('Simulated storage error'); + + // Find the "Sync stream started" log entry + const syncStartedLog = capturedLogs.find((log) => log.message === 'Sync stream started'); + expect(syncStartedLog).toBeDefined(); + + // Verify that app_metadata from defaultMeta is present in the log + expect(syncStartedLog?.app_metadata).toBeDefined(); + expect(syncStartedLog?.app_metadata).toEqual(formatParamsForLogging(inputMeta)); + // Should trim long metadata + expect(syncStartedLog?.app_metadata.long_meta.length).toEqual( + DEFAULT_PARAM_LOGGING_FORMAT_OPTIONS.maxStringLength + ); + + // Verify the explicit log parameters + expect(syncStartedLog?.client_params).toEqual({ + user_name: 'bob' + }); + }); }); }); From 1f4ca9421f1c8c45bfc82b32d5940d5dac819b38 Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Thu, 20 Nov 2025 17:44:15 +0200 Subject: [PATCH 02/10] rename variable --- packages/service-core/src/routes/endpoints/socket-route.ts | 2 +- packages/service-core/src/routes/endpoints/sync-stream.ts | 4 +--- packages/service-core/src/util/protocol-types.ts | 2 +- packages/service-core/test/src/routes/stream.test.ts | 2 +- 4 files changed, 4 insertions(+), 6 deletions(-) diff --git a/packages/service-core/src/routes/endpoints/socket-route.ts b/packages/service-core/src/routes/endpoints/socket-route.ts index 625df514e..f8699a3f6 100644 --- a/packages/service-core/src/routes/endpoints/socket-route.ts +++ b/packages/service-core/src/routes/endpoints/socket-route.ts @@ -21,7 +21,7 @@ export const syncStreamReactive: SocketRouteGenerator = (router) => user_id: context.token_payload?.sub, client_id: params.client_id, user_agent: context.user_agent, - app_metadata: params.applicationMetadata ? formatParamsForLogging(params.applicationMetadata ?? {}) : undefined + app_metadata: params.app_metadata ? formatParamsForLogging(params.app_metadata ?? {}) : undefined }; const sdkData: event_types.ConnectedUserData & event_types.ClientConnectionEventData = { diff --git a/packages/service-core/src/routes/endpoints/sync-stream.ts b/packages/service-core/src/routes/endpoints/sync-stream.ts index ec63af2bc..42992d739 100644 --- a/packages/service-core/src/routes/endpoints/sync-stream.ts +++ b/packages/service-core/src/routes/endpoints/sync-stream.ts @@ -44,9 +44,7 @@ export const syncStreamed = routeDefinition({ client_id: clientId, user_id: payload.context.user_id, bson: useBson, - app_metadata: payload.params.applicationMetadata - ? formatParamsForLogging(payload.params.applicationMetadata) - : undefined + app_metadata: payload.params.app_metadata ? formatParamsForLogging(payload.params.app_metadata) : undefined }; const sdkData: event_types.ConnectedUserData & event_types.ClientConnectionEventData = { client_id: clientId ?? '', diff --git a/packages/service-core/src/util/protocol-types.ts b/packages/service-core/src/util/protocol-types.ts index 270646703..82baa89e5 100644 --- a/packages/service-core/src/util/protocol-types.ts +++ b/packages/service-core/src/util/protocol-types.ts @@ -84,7 +84,7 @@ export const StreamingSyncRequest = t.object({ /** * Application metadata to be used in logging. */ - applicationMetadata: t.record(t.string).optional(), + app_metadata: t.record(t.string).optional(), /** * Unique client id. diff --git a/packages/service-core/test/src/routes/stream.test.ts b/packages/service-core/test/src/routes/stream.test.ts index daf6885cc..bf1220955 100644 --- a/packages/service-core/test/src/routes/stream.test.ts +++ b/packages/service-core/test/src/routes/stream.test.ts @@ -132,7 +132,7 @@ describe('Stream Route', () => { const response = await (syncStreamed.handler({ context, params: { - applicationMetadata: inputMeta, + app_metadata: inputMeta, parameters: { user_name: 'bob' } From f9e6e7f924aa9eab86b961beb6190fa9ab48e814 Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Mon, 24 Nov 2025 15:05:16 +0200 Subject: [PATCH 03/10] Improve truncation. Increase limits. Log at sync stream start and end. --- .../src/routes/endpoints/socket-route.ts | 8 ++++++++ .../src/routes/endpoints/sync-stream.ts | 14 ++++++++++++-- packages/service-core/src/sync/sync.ts | 6 ------ packages/service-core/src/util/param-logging.ts | 4 ++-- 4 files changed, 22 insertions(+), 10 deletions(-) diff --git a/packages/service-core/src/routes/endpoints/socket-route.ts b/packages/service-core/src/routes/endpoints/socket-route.ts index f8699a3f6..00078b5f5 100644 --- a/packages/service-core/src/routes/endpoints/socket-route.ts +++ b/packages/service-core/src/routes/endpoints/socket-route.ts @@ -100,6 +100,13 @@ export const syncStreamReactive: SocketRouteGenerator = (router) => tracker.setCompressed(connection.tracker.encoding); } + const formattedAppMetadata = params.app_metadata ? formatParamsForLogging(params.app_metadata) : undefined; + logger.info('Sync stream started', { + app_metadata: formattedAppMetadata, + client_params: params.parameters ? formatParamsForLogging(params.parameters) : undefined, + streams: params.streams?.subscriptions.map((subscription) => subscription.stream) + }); + try { for await (const data of sync.streamResponse({ syncContext: syncContext, @@ -182,6 +189,7 @@ export const syncStreamReactive: SocketRouteGenerator = (router) => } logger.info(`Sync stream complete`, { ...tracker.getLogMeta(), + app_metadata: formattedAppMetadata, stream_ms: Date.now() - streamStart, close_reason: closeReason ?? 'unknown' }); diff --git a/packages/service-core/src/routes/endpoints/sync-stream.ts b/packages/service-core/src/routes/endpoints/sync-stream.ts index 42992d739..65558b369 100644 --- a/packages/service-core/src/routes/endpoints/sync-stream.ts +++ b/packages/service-core/src/routes/endpoints/sync-stream.ts @@ -43,8 +43,7 @@ export const syncStreamed = routeDefinition({ user_agent: userAgent, client_id: clientId, user_id: payload.context.user_id, - bson: useBson, - app_metadata: payload.params.app_metadata ? formatParamsForLogging(payload.params.app_metadata) : undefined + bson: useBson }; const sdkData: event_types.ConnectedUserData & event_types.ClientConnectionEventData = { client_id: clientId ?? '', @@ -78,6 +77,16 @@ export const syncStreamed = routeDefinition({ const controller = new AbortController(); const tracker = new sync.RequestTracker(metricsEngine); + const formattedAppMetadata = payload.params.app_metadata + ? formatParamsForLogging(payload.params.app_metadata) + : undefined; + + logger.info('Sync stream started', { + app_metadata: formattedAppMetadata, + client_params: payload.params.parameters ? formatParamsForLogging(payload.params.parameters) : undefined, + streams: payload.params.streams?.subscriptions.map((subscription) => subscription.stream) + }); + try { metricsEngine.getUpDownCounter(APIMetric.CONCURRENT_CONNECTIONS).add(1); service_context.eventsEngine.emit(event_types.EventsEngineEventType.SDK_CONNECT_EVENT, sdkData); @@ -152,6 +161,7 @@ export const syncStreamed = routeDefinition({ }); logger.info(`Sync stream complete`, { ...tracker.getLogMeta(), + app_metadata: formattedAppMetadata, stream_ms: Date.now() - streamStart, close_reason: closeReason ?? 'unknown' }); diff --git a/packages/service-core/src/sync/sync.ts b/packages/service-core/src/sync/sync.ts index cdb9e9284..01b848da0 100644 --- a/packages/service-core/src/sync/sync.ts +++ b/packages/service-core/src/sync/sync.ts @@ -9,7 +9,6 @@ import * as util from '../util/util-index.js'; import { Logger, logger as defaultLogger } from '@powersync/lib-services-framework'; import { mergeAsyncIterables } from '../streams/streams-index.js'; -import { formatParamsForLogging } from '../util/param-logging.js'; import { BucketChecksumState, CheckpointLine, VersionedSyncRules } from './BucketChecksumState.js'; import { OperationsSentStats, RequestTracker, statsForBatch } from './RequestTracker.js'; import { SyncContext } from './SyncContext.js'; @@ -48,11 +47,6 @@ export async function* streamResponse( } = options; const logger = options.logger ?? defaultLogger; - logger.info('Sync stream started', { - client_params: params.parameters ? formatParamsForLogging(params.parameters) : undefined, - streams: params.streams?.subscriptions.map((subscription) => subscription.stream) - }); - // We also need to be able to abort, so we create our own controller. const controller = new AbortController(); if (signal) { diff --git a/packages/service-core/src/util/param-logging.ts b/packages/service-core/src/util/param-logging.ts index 51a67aa4a..96fcec3b6 100644 --- a/packages/service-core/src/util/param-logging.ts +++ b/packages/service-core/src/util/param-logging.ts @@ -5,7 +5,7 @@ export type ParamLoggingFormatOptions = { export const DEFAULT_PARAM_LOGGING_FORMAT_OPTIONS: ParamLoggingFormatOptions = { maxKeyCount: 10, - maxStringLength: 20 + maxStringLength: 50 }; export function formatParamsForLogging(params: Record, options: Partial = {}) { @@ -24,7 +24,7 @@ export function formatParamsForLogging(params: Record, options: Par return Object.fromEntries( Object.entries(params).map(([key, value], index) => { if (index == maxKeyCount) { - return ['[...]', '[...]']; + return ['⚠️', 'Additional parameters omitted']; } if (index > maxKeyCount) { From a9d6212f11f82f8482725c57ed57a6e0fe2e9240 Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Mon, 24 Nov 2025 15:47:40 +0200 Subject: [PATCH 04/10] remove from websocket general meta --- packages/service-core/src/routes/endpoints/socket-route.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/service-core/src/routes/endpoints/socket-route.ts b/packages/service-core/src/routes/endpoints/socket-route.ts index 00078b5f5..e816290c9 100644 --- a/packages/service-core/src/routes/endpoints/socket-route.ts +++ b/packages/service-core/src/routes/endpoints/socket-route.ts @@ -20,8 +20,7 @@ export const syncStreamReactive: SocketRouteGenerator = (router) => ...logger.defaultMeta, user_id: context.token_payload?.sub, client_id: params.client_id, - user_agent: context.user_agent, - app_metadata: params.app_metadata ? formatParamsForLogging(params.app_metadata ?? {}) : undefined + user_agent: context.user_agent }; const sdkData: event_types.ConnectedUserData & event_types.ClientConnectionEventData = { From 4e4db6e0fa0693879326668df2244ed0af58700c Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Mon, 24 Nov 2025 15:57:07 +0200 Subject: [PATCH 05/10] cleanup --- packages/service-core/src/routes/endpoints/sync-stream.ts | 2 +- packages/service-core/test/src/routes/stream.test.ts | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/service-core/src/routes/endpoints/sync-stream.ts b/packages/service-core/src/routes/endpoints/sync-stream.ts index 65558b369..b6162e5ca 100644 --- a/packages/service-core/src/routes/endpoints/sync-stream.ts +++ b/packages/service-core/src/routes/endpoints/sync-stream.ts @@ -145,7 +145,7 @@ export const syncStreamed = routeDefinition({ return new router.RouterResponse({ status: 200, headers: { - 'Content-Type': 'text/event-stream', // useBson ? concatenatedBsonContentType : ndJsonContentType, + 'Content-Type': useBson ? concatenatedBsonContentType : ndJsonContentType, ...encodingHeaders }, data: stream, diff --git a/packages/service-core/test/src/routes/stream.test.ts b/packages/service-core/test/src/routes/stream.test.ts index bf1220955..432d077d2 100644 --- a/packages/service-core/test/src/routes/stream.test.ts +++ b/packages/service-core/test/src/routes/stream.test.ts @@ -92,7 +92,6 @@ describe('Stream Route', () => { const serviceContext = mockServiceContext(storage); // Create a custom format to capture log info objects (which include defaultMeta) - // Winston merges defaultMeta into the info object during formatting const capturedLogs: any[] = []; const captureFormat = winston.format((info) => { // Capture the info object which includes defaultMeta merged in From 5de8c2791197bd4a8534f96de5b825b82bdcef30 Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Mon, 24 Nov 2025 16:06:46 +0200 Subject: [PATCH 06/10] remove streams for now --- packages/service-core/src/routes/endpoints/socket-route.ts | 3 +-- packages/service-core/src/routes/endpoints/sync-stream.ts | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/packages/service-core/src/routes/endpoints/socket-route.ts b/packages/service-core/src/routes/endpoints/socket-route.ts index e816290c9..7b551c6d4 100644 --- a/packages/service-core/src/routes/endpoints/socket-route.ts +++ b/packages/service-core/src/routes/endpoints/socket-route.ts @@ -102,8 +102,7 @@ export const syncStreamReactive: SocketRouteGenerator = (router) => const formattedAppMetadata = params.app_metadata ? formatParamsForLogging(params.app_metadata) : undefined; logger.info('Sync stream started', { app_metadata: formattedAppMetadata, - client_params: params.parameters ? formatParamsForLogging(params.parameters) : undefined, - streams: params.streams?.subscriptions.map((subscription) => subscription.stream) + client_params: params.parameters ? formatParamsForLogging(params.parameters) : undefined }); try { diff --git a/packages/service-core/src/routes/endpoints/sync-stream.ts b/packages/service-core/src/routes/endpoints/sync-stream.ts index b6162e5ca..10027d554 100644 --- a/packages/service-core/src/routes/endpoints/sync-stream.ts +++ b/packages/service-core/src/routes/endpoints/sync-stream.ts @@ -83,8 +83,7 @@ export const syncStreamed = routeDefinition({ logger.info('Sync stream started', { app_metadata: formattedAppMetadata, - client_params: payload.params.parameters ? formatParamsForLogging(payload.params.parameters) : undefined, - streams: payload.params.streams?.subscriptions.map((subscription) => subscription.stream) + client_params: payload.params.parameters ? formatParamsForLogging(payload.params.parameters) : undefined }); try { From 55568709ba1087a63bf9d892b75b00c8672ea28f Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Mon, 24 Nov 2025 16:07:27 +0200 Subject: [PATCH 07/10] changeset typo --- .changeset/seven-chefs-promise.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.changeset/seven-chefs-promise.md b/.changeset/seven-chefs-promise.md index 8583b0f33..86a63b7b6 100644 --- a/.changeset/seven-chefs-promise.md +++ b/.changeset/seven-chefs-promise.md @@ -2,4 +2,4 @@ '@powersync/service-core': minor --- -Added ability to specify app_metada for sync/stream requests +Added ability to specify app_metadata for sync/stream requests From d51cd460765f4aa4e0e40fc50056676fd78ce441 Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Tue, 25 Nov 2025 10:45:55 +0200 Subject: [PATCH 08/10] Code cleanup --- .../src/routes/endpoints/socket-route.ts | 6 ++--- .../src/routes/endpoints/sync-stream.ts | 6 ++--- .../service-core/src/util/param-logging.ts | 24 +++++++++++++++++-- .../test/src/routes/stream.test.ts | 4 ++-- 4 files changed, 30 insertions(+), 10 deletions(-) diff --git a/packages/service-core/src/routes/endpoints/socket-route.ts b/packages/service-core/src/routes/endpoints/socket-route.ts index 7b551c6d4..81ac9d15b 100644 --- a/packages/service-core/src/routes/endpoints/socket-route.ts +++ b/packages/service-core/src/routes/endpoints/socket-route.ts @@ -6,7 +6,7 @@ import { SocketRouteGenerator } from '../router-socket.js'; import { SyncRoutes } from './sync-stream.js'; import { APIMetric, event_types } from '@powersync/service-types'; -import { formatParamsForLogging } from '../../util/param-logging.js'; +import { limitParamsForLogging } from '../../util/param-logging.js'; export const syncStreamReactive: SocketRouteGenerator = (router) => router.reactiveStream(SyncRoutes.STREAM, { @@ -99,10 +99,10 @@ export const syncStreamReactive: SocketRouteGenerator = (router) => tracker.setCompressed(connection.tracker.encoding); } - const formattedAppMetadata = params.app_metadata ? formatParamsForLogging(params.app_metadata) : undefined; + const formattedAppMetadata = params.app_metadata ? limitParamsForLogging(params.app_metadata) : undefined; logger.info('Sync stream started', { app_metadata: formattedAppMetadata, - client_params: params.parameters ? formatParamsForLogging(params.parameters) : undefined + client_params: params.parameters ? limitParamsForLogging(params.parameters) : undefined }); try { diff --git a/packages/service-core/src/routes/endpoints/sync-stream.ts b/packages/service-core/src/routes/endpoints/sync-stream.ts index 10027d554..46d02d1e4 100644 --- a/packages/service-core/src/routes/endpoints/sync-stream.ts +++ b/packages/service-core/src/routes/endpoints/sync-stream.ts @@ -9,7 +9,7 @@ import { APIMetric, event_types } from '@powersync/service-types'; import { authUser } from '../auth.js'; import { routeDefinition } from '../router.js'; -import { formatParamsForLogging } from '../../util/param-logging.js'; +import { limitParamsForLogging } from '../../util/param-logging.js'; import { maybeCompressResponseStream } from '../compression.js'; export enum SyncRoutes { @@ -78,12 +78,12 @@ export const syncStreamed = routeDefinition({ const tracker = new sync.RequestTracker(metricsEngine); const formattedAppMetadata = payload.params.app_metadata - ? formatParamsForLogging(payload.params.app_metadata) + ? limitParamsForLogging(payload.params.app_metadata) : undefined; logger.info('Sync stream started', { app_metadata: formattedAppMetadata, - client_params: payload.params.parameters ? formatParamsForLogging(payload.params.parameters) : undefined + client_params: payload.params.parameters ? limitParamsForLogging(payload.params.parameters) : undefined }); try { diff --git a/packages/service-core/src/util/param-logging.ts b/packages/service-core/src/util/param-logging.ts index 96fcec3b6..d81b15aa9 100644 --- a/packages/service-core/src/util/param-logging.ts +++ b/packages/service-core/src/util/param-logging.ts @@ -1,14 +1,34 @@ +/** + * Options for {@link limitParamsForLogging}. + */ export type ParamLoggingFormatOptions = { maxKeyCount: number; maxStringLength: number; }; +/** + * Default options for {@link limitParamsForLogging}. + */ export const DEFAULT_PARAM_LOGGING_FORMAT_OPTIONS: ParamLoggingFormatOptions = { maxKeyCount: 10, maxStringLength: 50 }; -export function formatParamsForLogging(params: Record, options: Partial = {}) { +/** + * Formats potentially arbitrary parameters for logging. + * This limits the number of keys and strings to a maximum length. + * A warning key-value is added if the number of keys exceeds the maximum. + * String values exceeding the maximum length are truncated. + * Non-String values are stringified, the maximum length is then applied. + * @param params - The parameters to format. + * @param options - The options to use. + * @default DEFAULT_PARAM_LOGGING_FORMAT_OPTIONS + * @returns The formatted parameters. + */ +export function limitParamsForLogging( + params: Record, + options: Partial = DEFAULT_PARAM_LOGGING_FORMAT_OPTIONS +) { const { maxStringLength = DEFAULT_PARAM_LOGGING_FORMAT_OPTIONS.maxStringLength, maxKeyCount = DEFAULT_PARAM_LOGGING_FORMAT_OPTIONS.maxKeyCount @@ -31,7 +51,7 @@ export function formatParamsForLogging(params: Record, options: Par return []; } - if (typeof value === 'string') { + if (typeof value == 'string') { return [key, trimString(value)]; } return [key, trimString(JSON.stringify(value))]; diff --git a/packages/service-core/test/src/routes/stream.test.ts b/packages/service-core/test/src/routes/stream.test.ts index 432d077d2..0e1666fa9 100644 --- a/packages/service-core/test/src/routes/stream.test.ts +++ b/packages/service-core/test/src/routes/stream.test.ts @@ -6,7 +6,7 @@ import { pipeline } from 'stream/promises'; import { describe, expect, it } from 'vitest'; import winston from 'winston'; import { syncStreamed } from '../../../src/routes/endpoints/sync-stream.js'; -import { DEFAULT_PARAM_LOGGING_FORMAT_OPTIONS, formatParamsForLogging } from '../../../src/util/param-logging.js'; +import { DEFAULT_PARAM_LOGGING_FORMAT_OPTIONS, limitParamsForLogging } from '../../../src/util/param-logging.js'; import { mockServiceContext } from './mocks.js'; describe('Stream Route', () => { @@ -149,7 +149,7 @@ describe('Stream Route', () => { // Verify that app_metadata from defaultMeta is present in the log expect(syncStartedLog?.app_metadata).toBeDefined(); - expect(syncStartedLog?.app_metadata).toEqual(formatParamsForLogging(inputMeta)); + expect(syncStartedLog?.app_metadata).toEqual(limitParamsForLogging(inputMeta)); // Should trim long metadata expect(syncStartedLog?.app_metadata.long_meta.length).toEqual( DEFAULT_PARAM_LOGGING_FORMAT_OPTIONS.maxStringLength From 94ea401ef6b0e94919d7823b6c7b7cf6e2b1e4b9 Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Tue, 25 Nov 2025 11:47:54 +0200 Subject: [PATCH 09/10] explicit client params in test --- .../test/src/routes/stream.test.ts | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/packages/service-core/test/src/routes/stream.test.ts b/packages/service-core/test/src/routes/stream.test.ts index 0e1666fa9..baa74fc82 100644 --- a/packages/service-core/test/src/routes/stream.test.ts +++ b/packages/service-core/test/src/routes/stream.test.ts @@ -133,7 +133,10 @@ describe('Stream Route', () => { params: { app_metadata: inputMeta, parameters: { - user_name: 'bob' + user_name: 'bob', + nested_object: { + nested_key: 'b'.repeat(1000) + } } }, request @@ -156,9 +159,16 @@ describe('Stream Route', () => { ); // Verify the explicit log parameters - expect(syncStartedLog?.client_params).toEqual({ - user_name: 'bob' - }); + expect(syncStartedLog?.client_params).toEqual( + expect.objectContaining({ + user_name: 'bob' + }) + ); + + expect(typeof syncStartedLog?.client_params.nested_object).toEqual('string'); + expect(syncStartedLog?.client_params.nested_object.length).toEqual( + DEFAULT_PARAM_LOGGING_FORMAT_OPTIONS.maxStringLength + ); }); }); }); From 139c380c8895fd0e3b5b89170cdaa5ea5d86612b Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Fri, 28 Nov 2025 14:58:30 +0200 Subject: [PATCH 10/10] update logging limits --- packages/service-core/src/util/param-logging.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/service-core/src/util/param-logging.ts b/packages/service-core/src/util/param-logging.ts index d81b15aa9..221fad968 100644 --- a/packages/service-core/src/util/param-logging.ts +++ b/packages/service-core/src/util/param-logging.ts @@ -10,8 +10,8 @@ export type ParamLoggingFormatOptions = { * Default options for {@link limitParamsForLogging}. */ export const DEFAULT_PARAM_LOGGING_FORMAT_OPTIONS: ParamLoggingFormatOptions = { - maxKeyCount: 10, - maxStringLength: 50 + maxKeyCount: 20, + maxStringLength: 100 }; /**