Skip to content

Commit 6103d6a

Browse files
committed
Add notifications endpoint with streaming websocket updates
1 parent e3421bd commit 6103d6a

File tree

1 file changed

+70
-0
lines changed

1 file changed

+70
-0
lines changed

src/features/notifications/notificationsSlice.js

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,79 @@ import {
22
createSlice,
33
createAsyncThunk,
44
createEntityAdapter,
5+
createSelector,
56
} from '@reduxjs/toolkit'
67

78
import { client } from '../../api/client'
9+
import { forceGenerateNotifications } from '../../api/server'
10+
import { apiSlice } from '../api/apiSlice'
11+
12+
export const extendedApi = apiSlice.injectEndpoints({
13+
endpoints: (builder) => ({
14+
getNotifications: builder.query({
15+
query: () => '/notifications',
16+
transformResponse: (res) => res.notifications,
17+
async onCacheEntryAdded(
18+
arg,
19+
{ updateCachedData, cacheDataLoaded, cacheEntryRemoved }
20+
) {
21+
// create a websocket connection when the cache subscription starts
22+
const ws = new WebSocket('ws://localhost')
23+
try {
24+
// wait for the initial query to resolve before proceeding
25+
await cacheDataLoaded
26+
27+
// when data is received from the socket connection to the server,
28+
// update our query result with the received message
29+
const listener = (event) => {
30+
const message = JSON.parse(event.data)
31+
switch (message.type) {
32+
case 'notifications': {
33+
updateCachedData((draft) => {
34+
// Insert all received notifications from the websocket
35+
// into the existing RTKQ cache array
36+
draft.push(...message.payload)
37+
draft.sort((a, b) => b.date.localeCompare(a.date))
38+
})
39+
break
40+
}
41+
default:
42+
break
43+
}
44+
}
45+
46+
ws.addEventListener('message', listener)
47+
} catch {
48+
// no-op in case `cacheEntryRemoved` resolves before `cacheDataLoaded`,
49+
// in which case `cacheDataLoaded` will throw
50+
}
51+
// cacheEntryRemoved will resolve when the cache subscription is no longer active
52+
await cacheEntryRemoved
53+
// perform cleanup steps once the `cacheEntryRemoved` promise resolves
54+
ws.close()
55+
},
56+
}),
57+
}),
58+
})
59+
60+
export const { useGetNotificationsQuery } = extendedApi
61+
62+
const emptyNotifications = []
63+
64+
export const selectNotificationsResult = extendedApi.endpoints.getNotifications.select()
65+
66+
const selectNotificationsData = createSelector(
67+
selectNotificationsResult,
68+
(notificationsResult) => notificationsResult.data ?? emptyNotifications
69+
)
70+
71+
export const fetchNotificationsWebsocket = () => (dispatch, getState) => {
72+
const allNotifications = selectNotificationsData(getState())
73+
const [latestNotification] = allNotifications
74+
const latestTimestamp = latestNotification?.date ?? ''
75+
// Hardcode a call to the mock server to simulate a server push scenario over websockets
76+
forceGenerateNotifications(latestTimestamp)
77+
}
878

979
const notificationsAdapter = createEntityAdapter({
1080
sortComparer: (a, b) => b.date.localeCompare(a.date),

0 commit comments

Comments
 (0)