diff --git a/.changeset/seven-chefs-promise.md b/.changeset/seven-chefs-promise.md new file mode 100644 index 000000000..86a63b7b6 --- /dev/null +++ b/.changeset/seven-chefs-promise.md @@ -0,0 +1,5 @@ +--- +'@powersync/service-core': minor +--- + +Added ability to specify app_metadata for sync/stream requests diff --git a/packages/service-core/src/routes/endpoints/socket-route.ts b/packages/service-core/src/routes/endpoints/socket-route.ts index 35e51e027..81ac9d15b 100644 --- a/packages/service-core/src/routes/endpoints/socket-route.ts +++ b/packages/service-core/src/routes/endpoints/socket-route.ts @@ -6,6 +6,7 @@ import { SocketRouteGenerator } from '../router-socket.js'; import { SyncRoutes } from './sync-stream.js'; import { APIMetric, event_types } from '@powersync/service-types'; +import { limitParamsForLogging } from '../../util/param-logging.js'; export const syncStreamReactive: SocketRouteGenerator = (router) => router.reactiveStream(SyncRoutes.STREAM, { @@ -97,6 +98,13 @@ export const syncStreamReactive: SocketRouteGenerator = (router) => // Must be set before we start the stream tracker.setCompressed(connection.tracker.encoding); } + + const formattedAppMetadata = params.app_metadata ? limitParamsForLogging(params.app_metadata) : undefined; + logger.info('Sync stream started', { + app_metadata: formattedAppMetadata, + client_params: params.parameters ? limitParamsForLogging(params.parameters) : undefined + }); + try { for await (const data of sync.streamResponse({ syncContext: syncContext, @@ -179,6 +187,7 @@ export const syncStreamReactive: SocketRouteGenerator = (router) => } logger.info(`Sync stream complete`, { ...tracker.getLogMeta(), + app_metadata: formattedAppMetadata, stream_ms: Date.now() - streamStart, close_reason: closeReason ?? 'unknown' }); diff --git a/packages/service-core/src/routes/endpoints/sync-stream.ts b/packages/service-core/src/routes/endpoints/sync-stream.ts index d9cb3e406..46d02d1e4 100644 --- a/packages/service-core/src/routes/endpoints/sync-stream.ts +++ b/packages/service-core/src/routes/endpoints/sync-stream.ts @@ -5,10 +5,11 @@ import { Readable } from 'stream'; import * as sync from '../../sync/sync-index.js'; import * as util from '../../util/util-index.js'; +import { APIMetric, event_types } from '@powersync/service-types'; import { authUser } from '../auth.js'; import { routeDefinition } from '../router.js'; -import { APIMetric, event_types } from '@powersync/service-types'; +import { limitParamsForLogging } from '../../util/param-logging.js'; import { maybeCompressResponseStream } from '../compression.js'; export enum SyncRoutes { @@ -75,6 +76,16 @@ export const syncStreamed = routeDefinition({ const controller = new AbortController(); const tracker = new sync.RequestTracker(metricsEngine); + + const formattedAppMetadata = payload.params.app_metadata + ? limitParamsForLogging(payload.params.app_metadata) + : undefined; + + logger.info('Sync stream started', { + app_metadata: formattedAppMetadata, + client_params: payload.params.parameters ? limitParamsForLogging(payload.params.parameters) : undefined + }); + try { metricsEngine.getUpDownCounter(APIMetric.CONCURRENT_CONNECTIONS).add(1); service_context.eventsEngine.emit(event_types.EventsEngineEventType.SDK_CONNECT_EVENT, sdkData); @@ -149,6 +160,7 @@ export const syncStreamed = routeDefinition({ }); logger.info(`Sync stream complete`, { ...tracker.getLogMeta(), + app_metadata: formattedAppMetadata, stream_ms: Date.now() - streamStart, close_reason: closeReason ?? 'unknown' }); diff --git a/packages/service-core/src/sync/sync.ts b/packages/service-core/src/sync/sync.ts index 32211f044..01b848da0 100644 --- a/packages/service-core/src/sync/sync.ts +++ b/packages/service-core/src/sync/sync.ts @@ -1,11 +1,5 @@ import { JSONBig, JsonContainer } from '@powersync/service-jsonbig'; -import { - BucketDescription, - BucketPriority, - RequestJwtPayload, - RequestParameters, - SqlSyncRules -} from '@powersync/service-sync-rules'; +import { BucketDescription, BucketPriority, RequestJwtPayload } from '@powersync/service-sync-rules'; import { AbortError } from 'ix/aborterror.js'; @@ -14,11 +8,11 @@ import * as storage from '../storage/storage-index.js'; import * as util from '../util/util-index.js'; import { Logger, logger as defaultLogger } from '@powersync/lib-services-framework'; -import { BucketChecksumState, CheckpointLine, VersionedSyncRules } from './BucketChecksumState.js'; import { mergeAsyncIterables } from '../streams/streams-index.js'; -import { acquireSemaphoreAbortable, settledPromise, tokenStream, TokenStreamOptions } from './util.js'; -import { SyncContext } from './SyncContext.js'; +import { BucketChecksumState, CheckpointLine, VersionedSyncRules } from './BucketChecksumState.js'; import { OperationsSentStats, RequestTracker, statsForBatch } from './RequestTracker.js'; +import { SyncContext } from './SyncContext.js'; +import { TokenStreamOptions, acquireSemaphoreAbortable, settledPromise, tokenStream } from './util.js'; export interface SyncStreamParameters { syncContext: SyncContext; diff --git a/packages/service-core/src/util/param-logging.ts b/packages/service-core/src/util/param-logging.ts new file mode 100644 index 000000000..221fad968 --- /dev/null +++ b/packages/service-core/src/util/param-logging.ts @@ -0,0 +1,60 @@ +/** + * Options for {@link limitParamsForLogging}. + */ +export type ParamLoggingFormatOptions = { + maxKeyCount: number; + maxStringLength: number; +}; + +/** + * Default options for {@link limitParamsForLogging}. + */ +export const DEFAULT_PARAM_LOGGING_FORMAT_OPTIONS: ParamLoggingFormatOptions = { + maxKeyCount: 20, + maxStringLength: 100 +}; + +/** + * Formats potentially arbitrary parameters for logging. + * This limits the number of keys and strings to a maximum length. + * A warning key-value is added if the number of keys exceeds the maximum. + * String values exceeding the maximum length are truncated. + * Non-String values are stringified, the maximum length is then applied. + * @param params - The parameters to format. + * @param options - The options to use. + * @default DEFAULT_PARAM_LOGGING_FORMAT_OPTIONS + * @returns The formatted parameters. + */ +export function limitParamsForLogging( + params: Record, + options: Partial = DEFAULT_PARAM_LOGGING_FORMAT_OPTIONS +) { + const { + maxStringLength = DEFAULT_PARAM_LOGGING_FORMAT_OPTIONS.maxStringLength, + maxKeyCount = DEFAULT_PARAM_LOGGING_FORMAT_OPTIONS.maxKeyCount + } = options; + + function trimString(value: string): string { + if (value.length > maxStringLength) { + return value.slice(0, maxStringLength - 3) + '...'; + } + return value; + } + + return Object.fromEntries( + Object.entries(params).map(([key, value], index) => { + if (index == maxKeyCount) { + return ['⚠️', 'Additional parameters omitted']; + } + + if (index > maxKeyCount) { + return []; + } + + if (typeof value == 'string') { + return [key, trimString(value)]; + } + return [key, trimString(JSON.stringify(value))]; + }) + ); +} diff --git a/packages/service-core/src/util/protocol-types.ts b/packages/service-core/src/util/protocol-types.ts index 4a6410b4c..82baa89e5 100644 --- a/packages/service-core/src/util/protocol-types.ts +++ b/packages/service-core/src/util/protocol-types.ts @@ -1,6 +1,6 @@ -import * as t from 'ts-codec'; -import { BucketPriority, SqliteJsonRow } from '@powersync/service-sync-rules'; import { JsonContainer } from '@powersync/service-jsonbig'; +import { BucketPriority, SqliteJsonRow } from '@powersync/service-sync-rules'; +import * as t from 'ts-codec'; export const BucketRequest = t.object({ name: t.string, @@ -81,6 +81,11 @@ export const StreamingSyncRequest = t.object({ */ parameters: t.record(t.any).optional(), + /** + * Application metadata to be used in logging. + */ + app_metadata: t.record(t.string).optional(), + /** * Unique client id. */ diff --git a/packages/service-core/test/src/routes/stream.test.ts b/packages/service-core/test/src/routes/stream.test.ts index 724437b11..baa74fc82 100644 --- a/packages/service-core/test/src/routes/stream.test.ts +++ b/packages/service-core/test/src/routes/stream.test.ts @@ -1,10 +1,12 @@ import { BasicRouterRequest, Context, SyncRulesBucketStorage } from '@/index.js'; -import { logger, RouterResponse, ServiceError } from '@powersync/lib-services-framework'; +import { RouterResponse, ServiceError, logger } from '@powersync/lib-services-framework'; import { SqlSyncRules } from '@powersync/service-sync-rules'; import { Readable, Writable } from 'stream'; import { pipeline } from 'stream/promises'; import { describe, expect, it } from 'vitest'; +import winston from 'winston'; import { syncStreamed } from '../../../src/routes/endpoints/sync-stream.js'; +import { DEFAULT_PARAM_LOGGING_FORMAT_OPTIONS, limitParamsForLogging } from '../../../src/util/param-logging.js'; import { mockServiceContext } from './mocks.js'; describe('Stream Route', () => { @@ -77,6 +79,97 @@ describe('Stream Route', () => { const r = await drainWithTimeout(stream).catch((error) => error); expect(r.message).toContain('Simulated storage error'); }); + + it('logs the application metadata', async () => { + const storage = { + getParsedSyncRules() { + return new SqlSyncRules('bucket_definitions: {}'); + }, + watchCheckpointChanges: async function* (options) { + throw new Error('Simulated storage error'); + } + } as Partial; + const serviceContext = mockServiceContext(storage); + + // Create a custom format to capture log info objects (which include defaultMeta) + const capturedLogs: any[] = []; + const captureFormat = winston.format((info) => { + // Capture the info object which includes defaultMeta merged in + capturedLogs.push({ ...info }); + return info; + }); + + // Create a test logger with the capture format + const testLogger = winston.createLogger({ + format: winston.format.combine(captureFormat(), winston.format.json()), + transports: [new winston.transports.Console()] + }); + + const context: Context = { + logger: testLogger, + service_context: serviceContext, + token_payload: { + exp: new Date().getTime() / 1000 + 10000, + iat: new Date().getTime() / 1000 - 10000, + sub: 'test-user' + } + }; + + const request: BasicRouterRequest = { + headers: { + 'accept-encoding': 'gzip' + }, + hostname: '', + protocol: 'http' + }; + + const inputMeta = { + test: 'test', + long_meta: 'a'.repeat(1000) + }; + + const response = await (syncStreamed.handler({ + context, + params: { + app_metadata: inputMeta, + parameters: { + user_name: 'bob', + nested_object: { + nested_key: 'b'.repeat(1000) + } + } + }, + request + }) as Promise); + expect(response.status).toEqual(200); + const stream = response.data as Readable; + const r = await drainWithTimeout(stream).catch((error) => error); + expect(r.message).toContain('Simulated storage error'); + + // Find the "Sync stream started" log entry + const syncStartedLog = capturedLogs.find((log) => log.message === 'Sync stream started'); + expect(syncStartedLog).toBeDefined(); + + // Verify that app_metadata from defaultMeta is present in the log + expect(syncStartedLog?.app_metadata).toBeDefined(); + expect(syncStartedLog?.app_metadata).toEqual(limitParamsForLogging(inputMeta)); + // Should trim long metadata + expect(syncStartedLog?.app_metadata.long_meta.length).toEqual( + DEFAULT_PARAM_LOGGING_FORMAT_OPTIONS.maxStringLength + ); + + // Verify the explicit log parameters + expect(syncStartedLog?.client_params).toEqual( + expect.objectContaining({ + user_name: 'bob' + }) + ); + + expect(typeof syncStartedLog?.client_params.nested_object).toEqual('string'); + expect(syncStartedLog?.client_params.nested_object.length).toEqual( + DEFAULT_PARAM_LOGGING_FORMAT_OPTIONS.maxStringLength + ); + }); }); });