Skip to content

Commit 33c0b1e

Browse files
authored
feat(realtime): add metadata to realtime user broadcast push (#1894)
1 parent 68d3b2c commit 33c0b1e

File tree

2 files changed

+311
-47
lines changed

2 files changed

+311
-47
lines changed

packages/core/realtime-js/src/lib/serializer.ts

Lines changed: 56 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,18 @@ export type Msg<T> = {
1010

1111
export default class Serializer {
1212
HEADER_LENGTH = 1
13-
META_LENGTH = 4
14-
USER_BROADCAST_PUSH_META_LENGTH = 5
13+
USER_BROADCAST_PUSH_META_LENGTH = 6
1514
KINDS = { userBroadcastPush: 3, userBroadcast: 4 }
1615
BINARY_ENCODING = 0
1716
JSON_ENCODING = 1
1817
BROADCAST_EVENT = 'broadcast'
1918

19+
allowedMetadataKeys: string[] = []
20+
21+
constructor(allowedMetadataKeys?: string[] | null) {
22+
this.allowedMetadataKeys = allowedMetadataKeys ?? []
23+
}
24+
2025
encode(msg: Msg<{ [key: string]: any }>, callback: (result: ArrayBuffer | string) => any) {
2126
if (
2227
msg.event === this.BROADCAST_EVENT &&
@@ -41,57 +46,58 @@ export default class Serializer {
4146
}
4247

4348
private _encodeBinaryUserBroadcastPush(message: Msg<{ event: string } & { [key: string]: any }>) {
44-
const topic = message.topic
45-
const ref = message.ref ?? ''
46-
const joinRef = message.join_ref ?? ''
47-
const userEvent = message.payload.event
4849
const userPayload = message.payload?.payload ?? new ArrayBuffer(0)
49-
50-
const metaLength =
51-
this.USER_BROADCAST_PUSH_META_LENGTH +
52-
joinRef.length +
53-
ref.length +
54-
topic.length +
55-
userEvent.length
56-
57-
const header = new ArrayBuffer(this.HEADER_LENGTH + metaLength)
58-
let view = new DataView(header)
59-
let offset = 0
60-
61-
view.setUint8(offset++, this.KINDS.userBroadcastPush) // kind
62-
view.setUint8(offset++, joinRef.length)
63-
view.setUint8(offset++, ref.length)
64-
view.setUint8(offset++, topic.length)
65-
view.setUint8(offset++, userEvent.length)
66-
view.setUint8(offset++, this.BINARY_ENCODING)
67-
Array.from(joinRef, (char) => view.setUint8(offset++, char.charCodeAt(0)))
68-
Array.from(ref, (char) => view.setUint8(offset++, char.charCodeAt(0)))
69-
Array.from(topic, (char) => view.setUint8(offset++, char.charCodeAt(0)))
70-
Array.from(userEvent, (char) => view.setUint8(offset++, char.charCodeAt(0)))
71-
72-
var combined = new Uint8Array(header.byteLength + userPayload.byteLength)
73-
combined.set(new Uint8Array(header), 0)
74-
combined.set(new Uint8Array(userPayload), header.byteLength)
75-
76-
return combined.buffer
50+
return this._encodeUserBroadcastPush(message, this.BINARY_ENCODING, userPayload)
7751
}
7852

7953
private _encodeJsonUserBroadcastPush(message: Msg<{ event: string } & { [key: string]: any }>) {
54+
const userPayload = message.payload?.payload ?? {}
55+
const encoder = new TextEncoder()
56+
const encodedUserPayload = encoder.encode(JSON.stringify(userPayload)).buffer
57+
return this._encodeUserBroadcastPush(message, this.JSON_ENCODING, encodedUserPayload)
58+
}
59+
60+
private _encodeUserBroadcastPush(
61+
message: Msg<{ event: string } & { [key: string]: any }>,
62+
encodingType: number,
63+
encodedPayload: ArrayBuffer
64+
) {
8065
const topic = message.topic
8166
const ref = message.ref ?? ''
8267
const joinRef = message.join_ref ?? ''
8368
const userEvent = message.payload.event
84-
const userPayload = message.payload?.payload ?? {}
8569

86-
const encoder = new TextEncoder() // Encodes to UTF-8
87-
const encodedUserPayload = encoder.encode(JSON.stringify(userPayload)).buffer
70+
// Filter metadata based on allowed keys
71+
const rest = this.allowedMetadataKeys
72+
? this._pick(message.payload, this.allowedMetadataKeys)
73+
: {}
74+
75+
const metadata = Object.keys(rest).length === 0 ? '' : JSON.stringify(rest)
76+
77+
// Validate lengths don't exceed uint8 max value (255)
78+
if (joinRef.length > 255) {
79+
throw new Error(`joinRef length ${joinRef.length} exceeds maximum of 255`)
80+
}
81+
if (ref.length > 255) {
82+
throw new Error(`ref length ${ref.length} exceeds maximum of 255`)
83+
}
84+
if (topic.length > 255) {
85+
throw new Error(`topic length ${topic.length} exceeds maximum of 255`)
86+
}
87+
if (userEvent.length > 255) {
88+
throw new Error(`userEvent length ${userEvent.length} exceeds maximum of 255`)
89+
}
90+
if (metadata.length > 255) {
91+
throw new Error(`metadata length ${metadata.length} exceeds maximum of 255`)
92+
}
8893

8994
const metaLength =
9095
this.USER_BROADCAST_PUSH_META_LENGTH +
9196
joinRef.length +
9297
ref.length +
9398
topic.length +
94-
userEvent.length
99+
userEvent.length +
100+
metadata.length
95101

96102
const header = new ArrayBuffer(this.HEADER_LENGTH + metaLength)
97103
let view = new DataView(header)
@@ -102,15 +108,17 @@ export default class Serializer {
102108
view.setUint8(offset++, ref.length)
103109
view.setUint8(offset++, topic.length)
104110
view.setUint8(offset++, userEvent.length)
105-
view.setUint8(offset++, this.JSON_ENCODING)
111+
view.setUint8(offset++, metadata.length)
112+
view.setUint8(offset++, encodingType)
106113
Array.from(joinRef, (char) => view.setUint8(offset++, char.charCodeAt(0)))
107114
Array.from(ref, (char) => view.setUint8(offset++, char.charCodeAt(0)))
108115
Array.from(topic, (char) => view.setUint8(offset++, char.charCodeAt(0)))
109116
Array.from(userEvent, (char) => view.setUint8(offset++, char.charCodeAt(0)))
117+
Array.from(metadata, (char) => view.setUint8(offset++, char.charCodeAt(0)))
110118

111-
var combined = new Uint8Array(header.byteLength + encodedUserPayload.byteLength)
119+
var combined = new Uint8Array(header.byteLength + encodedPayload.byteLength)
112120
combined.set(new Uint8Array(header), 0)
113-
combined.set(new Uint8Array(encodedUserPayload), header.byteLength)
121+
combined.set(new Uint8Array(encodedPayload), header.byteLength)
114122

115123
return combined.buffer
116124
}
@@ -185,4 +193,11 @@ export default class Serializer {
185193
private _isArrayBuffer(buffer: any): boolean {
186194
return buffer instanceof ArrayBuffer || buffer?.constructor?.name === 'ArrayBuffer'
187195
}
196+
197+
private _pick(obj: Record<string, any> | null | undefined, keys: string[]): Record<string, any> {
198+
if (!obj || typeof obj !== 'object') {
199+
return {}
200+
}
201+
return Object.fromEntries(Object.entries(obj).filter(([key]) => keys.includes(key)))
202+
}
188203
}

0 commit comments

Comments
 (0)