Skip to content

Commit 74b8351

Browse files
feat: Add keepAliveTimeoutMs config for AppSync WebSocket link (#724)
* feat: Add keepAliveTimeoutMs config for AppSync WebSocket link * fix typo * validate keepAliveTimeoutMs
1 parent 715dc1c commit 74b8351

File tree

4 files changed

+200
-23
lines changed

4 files changed

+200
-23
lines changed

packages/aws-appsync-subscription-link/__tests__/link/realtime-subscription-handshake-link-test.ts

Lines changed: 163 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,14 @@ import { AUTH_TYPE } from "aws-appsync-auth-link";
22
import { execute } from "@apollo/client/core";
33
import gql from 'graphql-tag';
44
import { AppSyncRealTimeSubscriptionHandshakeLink } from '../../src/realtime-subscription-handshake-link';
5+
import { MESSAGE_TYPES } from "../../src/types";
6+
import { v4 as uuid } from "uuid";
7+
jest.mock('uuid', () => ({ v4: jest.fn() }));
58

69
const query = gql`subscription { someSubscription { aField } }`
710

811
class myWebSocket implements WebSocket {
9-
binaryType: BinaryType;
12+
binaryType: BinaryType;
1013
bufferedAmount: number;
1114
extensions: string;
1215
onclose: (this: WebSocket, ev: CloseEvent) => any;
@@ -359,6 +362,164 @@ describe("RealTime subscription link", () => {
359362
}
360363

361364
});
362-
})
365+
});
366+
367+
test("Can use a custom keepAliveTimeoutMs", (done) => {
368+
const id = "abcd-efgh-ijkl-mnop";
369+
uuid.mockImplementationOnce(() => id);
370+
371+
expect.assertions(5);
372+
jest.spyOn(Date.prototype, 'toISOString').mockImplementation(jest.fn(() => {
373+
return "2019-11-13T18:47:04.733Z";
374+
}));
375+
AppSyncRealTimeSubscriptionHandshakeLink.createWebSocket = jest.fn((url, protocol) => {
376+
expect(url).toBe('wss://apikeytest.testcustomdomain.com/graphql/realtime?header=eyJob3N0IjoiYXBpa2V5dGVzdC50ZXN0Y3VzdG9tZG9tYWluLmNvbSIsIngtYW16LWRhdGUiOiIyMDE5MTExM1QxODQ3MDRaIiwieC1hcGkta2V5IjoieHh4eHgifQ==&payload=e30=');
377+
expect(protocol).toBe('graphql-ws');
378+
const socket = new myWebSocket();
379+
380+
setTimeout(() => {
381+
socket.close = () => {};
382+
socket.onopen.call(socket, (undefined as unknown as Event));
383+
socket.send = (msg: string) => {
384+
const { type } = JSON.parse(msg);
385+
386+
switch (type) {
387+
case MESSAGE_TYPES.GQL_CONNECTION_INIT:
388+
socket.onmessage.call(socket, {
389+
data: JSON.stringify({
390+
type: MESSAGE_TYPES.GQL_CONNECTION_ACK,
391+
payload: {
392+
connectionTimeoutMs: 99999,
393+
},
394+
})
395+
} as MessageEvent);
396+
setTimeout(() => {
397+
socket.onmessage.call(socket, {
398+
data: JSON.stringify({
399+
id,
400+
type: MESSAGE_TYPES.GQL_DATA,
401+
payload: {
402+
data: { something: 123 },
403+
},
404+
})
405+
} as MessageEvent);
406+
407+
}, 100);
408+
break;
409+
}
410+
};
411+
}, 100);
412+
413+
return socket;
414+
});
415+
const link = new AppSyncRealTimeSubscriptionHandshakeLink({
416+
auth: {
417+
type: AUTH_TYPE.API_KEY,
418+
apiKey: 'xxxxx'
419+
},
420+
region: 'us-west-2',
421+
url: 'https://apikeytest.testcustomdomain.com/graphql',
422+
keepAliveTimeoutMs: 123456,
423+
});
424+
425+
expect(link).toBeInstanceOf(AppSyncRealTimeSubscriptionHandshakeLink);
426+
expect((link as any).keepAliveTimeout).toBe(123456);
427+
428+
const sub = execute(link, { query }).subscribe({
429+
error: (err) => {
430+
console.log(JSON.stringify(err));
431+
fail();
432+
},
433+
next: (data) => {
434+
expect((link as any).keepAliveTimeout).toBe(123456);
435+
done();
436+
sub.unsubscribe();
437+
},
438+
complete: () => {
439+
console.log('done with this');
440+
fail();
441+
}
442+
443+
});
444+
});
445+
446+
test("Uses service-provided timeout when no custom keepAliveTimeoutMs is configured", (done) => {
447+
const id = "abcd-efgh-ijkl-mnop";
448+
uuid.mockImplementationOnce(() => id);
449+
450+
expect.assertions(5);
451+
jest.spyOn(Date.prototype, 'toISOString').mockImplementation(jest.fn(() => {
452+
return "2019-11-13T18:47:04.733Z";
453+
}));
454+
AppSyncRealTimeSubscriptionHandshakeLink.createWebSocket = jest.fn((url, protocol) => {
455+
expect(url).toBe('wss://apikeytest.testcustomdomain.com/graphql/realtime?header=eyJob3N0IjoiYXBpa2V5dGVzdC50ZXN0Y3VzdG9tZG9tYWluLmNvbSIsIngtYW16LWRhdGUiOiIyMDE5MTExM1QxODQ3MDRaIiwieC1hcGkta2V5IjoieHh4eHgifQ==&payload=e30=');
456+
expect(protocol).toBe('graphql-ws');
457+
const socket = new myWebSocket();
458+
459+
setTimeout(() => {
460+
socket.close = () => {};
461+
socket.onopen.call(socket, (undefined as unknown as Event));
462+
socket.send = (msg: string) => {
463+
const { type } = JSON.parse(msg);
464+
465+
switch (type) {
466+
case MESSAGE_TYPES.GQL_CONNECTION_INIT:
467+
socket.onmessage.call(socket, {
468+
data: JSON.stringify({
469+
type: MESSAGE_TYPES.GQL_CONNECTION_ACK,
470+
payload: {
471+
connectionTimeoutMs: 99999,
472+
},
473+
})
474+
} as MessageEvent);
475+
setTimeout(() => {
476+
socket.onmessage.call(socket, {
477+
data: JSON.stringify({
478+
id,
479+
type: MESSAGE_TYPES.GQL_DATA,
480+
payload: {
481+
data: { something: 123 },
482+
},
483+
})
484+
} as MessageEvent);
485+
486+
}, 100);
487+
break;
488+
}
489+
};
490+
}, 100);
491+
492+
return socket;
493+
});
494+
const link = new AppSyncRealTimeSubscriptionHandshakeLink({
495+
auth: {
496+
type: AUTH_TYPE.API_KEY,
497+
apiKey: 'xxxxx'
498+
},
499+
region: 'us-west-2',
500+
url: 'https://apikeytest.testcustomdomain.com/graphql',
501+
});
502+
503+
expect(link).toBeInstanceOf(AppSyncRealTimeSubscriptionHandshakeLink);
504+
expect((link as any).keepAliveTimeout).toBeUndefined();
505+
506+
const sub = execute(link, { query }).subscribe({
507+
error: (err) => {
508+
console.log(JSON.stringify(err));
509+
fail();
510+
},
511+
next: (data) => {
512+
expect((link as any).keepAliveTimeout).toBe(99999);
513+
done();
514+
sub.unsubscribe();
515+
},
516+
complete: () => {
517+
console.log('done with this');
518+
fail();
519+
}
520+
521+
});
522+
});
523+
363524

364525
});

packages/aws-appsync-subscription-link/src/index.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,20 +9,20 @@ import { NonTerminatingLink } from "./non-terminating-link";
99
import type { OperationDefinitionNode } from "graphql";
1010

1111
import {
12-
AppSyncRealTimeSubscriptionHandshakeLink
12+
AppSyncRealTimeSubscriptionHandshakeLink,
1313
} from "./realtime-subscription-handshake-link";
14-
import { UrlInfo } from "./types";
14+
import { AppSyncRealTimeSubscriptionConfig } from "./types";
1515

1616
function createSubscriptionHandshakeLink(
17-
args: UrlInfo,
17+
args: AppSyncRealTimeSubscriptionConfig,
1818
resultsFetcherLink?: ApolloLink
1919
): ApolloLink;
2020
function createSubscriptionHandshakeLink(
2121
url: string,
2222
resultsFetcherLink?: ApolloLink
2323
): ApolloLink;
2424
function createSubscriptionHandshakeLink(
25-
infoOrUrl: UrlInfo | string,
25+
infoOrUrl: AppSyncRealTimeSubscriptionConfig | string,
2626
theResultsFetcherLink?: ApolloLink
2727
) {
2828
let resultsFetcherLink: ApolloLink, subscriptionLinks: ApolloLink;
@@ -45,7 +45,7 @@ function createSubscriptionHandshakeLink(
4545

4646
observer.next({ [CONTROL_EVENTS_KEY]: controlEvents });
4747

48-
return () => {};
48+
return () => { };
4949
})
5050
)
5151
}),

packages/aws-appsync-subscription-link/src/realtime-subscription-handshake-link.ts

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import { GraphQLError, print } from "graphql";
1717
import * as url from "url";
1818
import { v4 as uuid } from "uuid";
1919
import {
20-
UrlInfo,
20+
AppSyncRealTimeSubscriptionConfig,
2121
SOCKET_STATUS,
2222
ObserverQuery,
2323
SUBSCRIPTION_STATUS,
@@ -50,6 +50,11 @@ const CONNECTION_INIT_TIMEOUT = 15000;
5050
*/
5151
const START_ACK_TIMEOUT = 15000;
5252

53+
/**
54+
* Frequency in milliseconds in which the server sends GQL_CONNECTION_KEEP_ALIVE messages
55+
*/
56+
const SERVER_KEEP_ALIVE_TIMEOUT = 1 * 60 * 1000;
57+
5358
/**
5459
* Default Time in milliseconds to wait for GQL_CONNECTION_KEEP_ALIVE message
5560
*/
@@ -66,21 +71,28 @@ export class AppSyncRealTimeSubscriptionHandshakeLink extends ApolloLink {
6671
private awsRealTimeSocket: WebSocket;
6772
private socketStatus: SOCKET_STATUS = SOCKET_STATUS.CLOSED;
6873
private keepAliveTimeoutId;
69-
private keepAliveTimeout = DEFAULT_KEEP_ALIVE_TIMEOUT;
74+
private keepAliveTimeout?: number = undefined;
7075
private subscriptionObserverMap: Map<string, ObserverQuery> = new Map();
7176
private promiseArray: Array<{ res: Function; rej: Function }> = [];
7277

73-
constructor({ url: theUrl, region: theRegion, auth: theAuth }: UrlInfo) {
78+
constructor({ url: theUrl, region: theRegion, auth: theAuth, keepAliveTimeoutMs }: AppSyncRealTimeSubscriptionConfig) {
7479
super();
7580
this.url = theUrl;
7681
this.region = theRegion;
7782
this.auth = theAuth;
83+
this.keepAliveTimeout = keepAliveTimeoutMs;
84+
85+
if (this.keepAliveTimeout < SERVER_KEEP_ALIVE_TIMEOUT) {
86+
let configName: keyof AppSyncRealTimeSubscriptionConfig = 'keepAliveTimeoutMs';
87+
88+
throw new Error(`${configName} must be greater than or equal to ${SERVER_KEEP_ALIVE_TIMEOUT} (${this.keepAliveTimeout} used).`);
89+
}
7890
}
7991

8092
// Check if url matches standard domain pattern
81-
private isCustomDomain(url: string): boolean {
82-
return url.match(standardDomainPattern) === null;
83-
}
93+
private isCustomDomain(url: string): boolean {
94+
return url.match(standardDomainPattern) === null;
95+
}
8496

8597
request(operation: Operation) {
8698
const { query, variables } = operation;
@@ -384,7 +396,7 @@ export class AppSyncRealTimeSubscriptionHandshakeLink extends ApolloLink {
384396
region,
385397
credentials,
386398
token,
387-
graphql_headers: () => {}
399+
graphql_headers: () => { }
388400
})
389401
);
390402
const headerQs = Buffer.from(headerString).toString("base64");
@@ -394,16 +406,16 @@ export class AppSyncRealTimeSubscriptionHandshakeLink extends ApolloLink {
394406
let discoverableEndpoint = appSyncGraphqlEndpoint;
395407

396408
if (this.isCustomDomain(discoverableEndpoint)) {
397-
discoverableEndpoint = discoverableEndpoint.concat(
398-
customDomainPath
399-
);
400-
} else {
401-
discoverableEndpoint = discoverableEndpoint.replace('appsync-api', 'appsync-realtime-api').replace('gogi-beta', 'grt-beta');
402-
}
409+
discoverableEndpoint = discoverableEndpoint.concat(
410+
customDomainPath
411+
);
412+
} else {
413+
discoverableEndpoint = discoverableEndpoint.replace('appsync-api', 'appsync-realtime-api').replace('gogi-beta', 'grt-beta');
414+
}
403415

404416
discoverableEndpoint = discoverableEndpoint
405-
.replace("https://", "wss://")
406-
.replace('http://', 'ws://')
417+
.replace("https://", "wss://")
418+
.replace('http://', 'ws://')
407419

408420
const awsRealTimeUrl = `${discoverableEndpoint}?header=${headerQs}&payload=${payloadQs}`;
409421

@@ -607,7 +619,7 @@ export class AppSyncRealTimeSubscriptionHandshakeLink extends ApolloLink {
607619
} = data;
608620
if (type === MESSAGE_TYPES.GQL_CONNECTION_ACK) {
609621
ackOk = true;
610-
this.keepAliveTimeout = connectionTimeoutMs;
622+
this.keepAliveTimeout = this.keepAliveTimeout ?? connectionTimeoutMs;
611623
this.awsRealTimeSocket.onmessage = this._handleIncomingSubscriptionMessage.bind(
612624
this
613625
);

packages/aws-appsync-subscription-link/src/types/index.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,10 @@ export type UrlInfo = {
8383
region: string;
8484
};
8585

86+
export type AppSyncRealTimeSubscriptionConfig = UrlInfo & {
87+
keepAliveTimeoutMs?: number;
88+
};
89+
8690
export type ObserverQuery = {
8791
observer: ZenObservable.SubscriptionObserver<any>;
8892
query: string;

0 commit comments

Comments
 (0)