Skip to content

Commit 17da72e

Browse files
committed
Add a quite generic Reflector implementation
1 parent fecd8ed commit 17da72e

File tree

1 file changed

+287
-0
lines changed

1 file changed

+287
-0
lines changed

reflector.ts

Lines changed: 287 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,287 @@
1+
import { WatchEvent, WatchEventError, WatchEventBookmark } from "./common.ts";
2+
3+
/**
4+
* Implementation of a Kubernetes List/Watch client which furnishes a view of the watched data.
5+
* This is commonly called a 'Reflector' among the client libraries, though this one is less standard.
6+
7+
* The core class here is a bit generic and tries to depend mostly on things within this file.
8+
* This is because a Reflector works pretty directly with Kubernetes API objects,
9+
* while this overall kubernetes_client module only serves as an API access path.
10+
* So you'll be furnishing your exact API structures by instiantiating the Reflector class.
11+
* (Usually the API will be from /x/kubernetes_apis/builtin/meta@v1/struct.ts but it can vary)
12+
*
13+
* This class/file work fine, but the types and APIs are still *unstable* while I figure out
14+
* what's necesary for various usecases.
15+
* Please file issues or start discussions if you have any input!
16+
*/
17+
18+
// some of this should be centralized...
19+
export type KindIds = { metadata?: {
20+
name?: string | null;
21+
namespace?: string | null;
22+
uid?: string | null;
23+
resourceVersion?: string | null;
24+
} | null };
25+
// inside our database, make sure we have the IDs
26+
export type KindIdsReq = { metadata: {
27+
name: string;
28+
namespace?: string | null;
29+
uid: string;
30+
resourceVersion: string;
31+
} };
32+
type ListOf<T> = { metadata: { resourceVersion?: string | null }; items: Array<T> };
33+
type ListOpts = { abortSignal?: AbortSignal, resourceVersion?: string; limit?: number; continue?: string };
34+
type WatchOpts = { abortSignal?: AbortSignal, resourceVersion?: string; timeoutSeconds?: number; allowWatchBookmarks?: boolean };
35+
36+
type VersionRef = { metadata: { resourceVersion: string | null }}; // WeakMap key
37+
38+
// synthetic event to indicate that we have emitted a complete picture
39+
type WatchEventSynced = {
40+
'type': "SYNCED";
41+
'object': VersionRef;
42+
} | {
43+
'type': "DESYNCED";
44+
'object': {metadata: {}};
45+
};
46+
type WatchEventObject<T> = {
47+
'type': "ADDED" | "DELETED";
48+
'object': T;
49+
};
50+
type WatchEventModified<T> = {
51+
'type': "MODIFIED";
52+
'object': T;
53+
'previous': T;
54+
};
55+
56+
57+
export type ReflectorEvent<T,S=ApiStatus> =
58+
| WatchEventObject<T & KindIdsReq>
59+
| WatchEventModified<T & KindIdsReq>
60+
| WatchEventError<S>
61+
| WatchEventBookmark
62+
| WatchEventSynced;
63+
type NextEvent<T,S> = {evt: ReflectorEvent<T,S>, ref: VersionRef};
64+
65+
export interface ApiStatus {
66+
metadata?: { resourceVersion?: string | null } | null;
67+
code?: number | null;
68+
message?: string | null;
69+
reason?: string | null;
70+
status?: string | null;
71+
}
72+
73+
export class Reflector<T extends KindIds, S extends ApiStatus> {
74+
#lister: (opts: ListOpts) => Promise<ListOf<T>>;
75+
#watcher: (opts: WatchOpts) => Promise<ReadableStream<WatchEvent<T,S>>>;
76+
constructor(lister: (opts: ListOpts) => Promise<ListOf<T>>, watcher: (opts: WatchOpts) => Promise<ReadableStream<WatchEvent<T,S>>>) {
77+
this.#lister = lister;
78+
this.#watcher = watcher;
79+
}
80+
81+
#resources = new Map<string, T & KindIdsReq>();
82+
#latestVersion?: string;
83+
84+
getCached(namespace: string, name: string): (T & KindIdsReq) | undefined {
85+
return this.#resources.get(`${namespace||''}/${name}`);
86+
}
87+
listCached(): Iterable<T & KindIdsReq> {
88+
return this.#resources.values();
89+
}
90+
isSynced(): boolean {
91+
return !!this.#latestVersion;
92+
}
93+
94+
#cancelled = false;
95+
96+
#nextEvents = new WeakMap<VersionRef, NextEvent<T,S>>(); // linked list i guess...
97+
#latestRef: VersionRef = {metadata: {resourceVersion: null}};
98+
99+
#waitingCbs = new Array<() => void>();
100+
101+
stop() {
102+
if (this.#cancelled) throw new Error(`BUG: double-cancel`);
103+
this.#cancelled = true;
104+
// if (this.#cancel) this.#cancel();
105+
106+
this._emit({
107+
type: 'ERROR',
108+
object: {reason: "ReflectorStopped"} as S,
109+
}, null);
110+
111+
throw new Error(`TODO: a ton of cleanup`);
112+
}
113+
114+
_emit(evt: ReflectorEvent<T,S>, refVersion: string | null): void {
115+
// console.log('emitting', evt.type);
116+
const ref: VersionRef = {metadata: {resourceVersion: refVersion}};
117+
118+
this.#nextEvents.set(this.#latestRef, {
119+
evt: evt,
120+
ref: ref,
121+
});
122+
this.#latestRef = ref;
123+
if (refVersion != null) {
124+
this.#latestVersion = refVersion;
125+
}
126+
127+
// TODO: is this safe enough?
128+
for (const cb of this.#waitingCbs) {
129+
cb();
130+
}
131+
this.#waitingCbs.length = 0;
132+
// throw new Error(`TODO`);
133+
// console.log('emitted', evt.type, refVersion);
134+
}
135+
136+
// main thread running the whole Reflector show
137+
// tries to return cleanly if reflector is cancelled
138+
// TODO: if this crashes, crash every consumer too
139+
async run(abortSignal?: AbortSignal) {
140+
if (abortSignal) {
141+
if (abortSignal.aborted) return;
142+
abortSignal.addEventListener('abort', () => this.stop());
143+
}
144+
145+
while (!this.#cancelled) {
146+
const listFrom = this.#latestVersion || "0";
147+
const missingItems = new Map(this.#resources);
148+
149+
const list = await this.#lister({resourceVersion: listFrom, abortSignal});
150+
if (!list.metadata.resourceVersion) throw new Error(`BUG: list had no version`);
151+
const listVer = list.metadata.resourceVersion;
152+
153+
for (const item of list.items) {
154+
if (!isKeyed(item)) throw new Error(`BUG: got unkeyed item from list ${listVer}`);
155+
156+
const key = `${item.metadata.namespace}/${item.metadata.name}`;
157+
const known = this.#resources.get(key);
158+
if (known) {
159+
missingItems.delete(key);
160+
if (known.metadata.resourceVersion !== item.metadata.resourceVersion) {
161+
this.#resources.set(key, item);
162+
this._emit({type: "MODIFIED", object: item, previous: known}, null);
163+
}
164+
} else {
165+
this.#resources.set(key, item);
166+
this._emit({type: "ADDED", object: item}, null);
167+
}
168+
}
169+
170+
for (const [key, item] of missingItems) {
171+
this.#resources.delete(key);
172+
this._emit({type: "DELETED", object: item}, null);
173+
}
174+
175+
this._emit({type: "SYNCED", object: {
176+
metadata: {resourceVersion: listVer}},
177+
}, listVer); // finally set this.#latestVersion
178+
179+
// loop watching as long as our resourceVersion is valid
180+
loop: while (!this.#cancelled) {
181+
const watch = await this.#watcher({
182+
resourceVersion: this.#latestVersion,
183+
allowWatchBookmarks: true,
184+
abortSignal,
185+
});
186+
for await (const evt of watch) {
187+
if (this.#cancelled) return;
188+
189+
if (evt.type === 'ERROR') {
190+
// the only expected error is our resourceVersion being too old (might be others later)
191+
if (evt.object.reason === 'Expired') {
192+
console.log('Reflector watch expired, starting from clean resync');
193+
// don't even tell downstreams about the expiration
194+
// they'll be able to know via the DESYNCED event
195+
break loop;
196+
}
197+
198+
this._emit(evt, null);
199+
console.log('TODO: reflector got error:', evt.object);
200+
// throw new Error(`TODO: handle reflector error`);
201+
this.stop(); // TODO: maybe stop WITH a status, always
202+
203+
} else if (evt.type === 'BOOKMARK') {
204+
this._emit(evt, evt.object.metadata.resourceVersion);
205+
206+
} else {
207+
if (!isKeyed(evt.object)) throw new Error(`BUG: got unkeyed item from watch ${evt.type}`);
208+
const {namespace, name, resourceVersion} = evt.object.metadata;
209+
const key = `${namespace}/${name}`;
210+
if (evt.type === 'DELETED') {
211+
this.#resources.delete(key);
212+
this._emit({type: evt.type, object: evt.object}, resourceVersion);
213+
} else if (evt.type === 'MODIFIED') {
214+
const previous = this.#resources.get(key);
215+
this.#resources.set(key, evt.object);
216+
if (!previous) {
217+
console.log(`WARN: Reflector got MODIFIED for ${key} but didn't have existing item`);
218+
this._emit({type: 'ADDED', object: evt.object}, resourceVersion);
219+
} else {
220+
this._emit({type: evt.type, object: evt.object, previous}, resourceVersion);
221+
}
222+
} else {
223+
this.#resources.set(key, evt.object);
224+
this._emit({type: evt.type, object: evt.object}, resourceVersion);
225+
}
226+
}
227+
228+
}
229+
}
230+
if (this.#cancelled) return;
231+
232+
// indicate that we're no longer contiguous
233+
this._emit({type: "DESYNCED", object: {
234+
metadata: {}},
235+
}, ""); // clear latest version
236+
}
237+
}
238+
239+
async *observeAll(): AsyncIterableIterator<ReflectorEvent<T,S>> {
240+
// take snapshots for consistent state
241+
const knownVer = this.#latestVersion;
242+
const knownRef = this.#latestRef;
243+
const startupBurst = Array.from(this.#resources.values());
244+
245+
// send what we already knew
246+
// console.log('refobs startup burst:', startupBurst.length)
247+
for (const item of startupBurst) {
248+
yield {type: "ADDED", object: item};
249+
}
250+
251+
if (knownVer) {
252+
yield {type: "SYNCED", object: {metadata: {resourceVersion: knownVer }}};
253+
}
254+
255+
// cycle between catching up and being up-to-date/waiting forever
256+
257+
let ref = knownRef;
258+
while (true) {
259+
// send all pending events
260+
// console.log('refobs flushing from', ref);
261+
let next: NextEvent<T,S> | undefined;
262+
while (next = this.#nextEvents.get(ref)) {
263+
// console.log('refobs got', next.evt.type, next.ref)
264+
yield next.evt;
265+
if (next.evt.type === 'ERROR') {
266+
throw new Error(`Reflector gave us an ERROR`);
267+
}
268+
ref = next.ref;
269+
}
270+
271+
// wait for new events
272+
// console.log('refobs waiting from', ref);
273+
await new Promise<void>(ok => this.#waitingCbs.push(ok));
274+
}
275+
}
276+
277+
goObserveAll<U>(cb: (iter: AsyncIterableIterator<ReflectorEvent<T,S>>) => Promise<U>): Promise<U> {
278+
return cb(this.observeAll());
279+
}
280+
}
281+
282+
function isKeyed<T>(item: T & KindIds): item is T & KindIdsReq {
283+
return !!item.metadata
284+
&& item.metadata.name != null
285+
&& item.metadata.resourceVersion != null
286+
&& item.metadata.uid != null;
287+
}

0 commit comments

Comments
 (0)