Skip to content

Commit a2aea74

Browse files
committed
Add watch stream structs/transforming
Generified so that the caller can supply their own MetaV1.Status implementation.
1 parent 4ea0d05 commit a2aea74

File tree

3 files changed

+98
-5
lines changed

3 files changed

+98
-5
lines changed

common.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,23 @@ export interface RestClient {
3232
defaultNamespace?: string;
3333
}
3434

35-
// Things that JSON can encode directly
35+
// Structures that JSON can encode directly
3636
export type JSONPrimitive = string | number | boolean | null | undefined;
3737
export type JSONValue = JSONPrimitive | JSONObject | JSONArray;
3838
export type JSONObject = {[key: string]: JSONValue};
3939
export type JSONArray = JSONValue[];
40+
41+
// Types seen in a resource watch stream
42+
export type WatchEvent<T,U> = WatchEventObject<T> | WatchEventError<U> | WatchEventBookmark;
43+
export type WatchEventObject<T> = {
44+
'type': "ADDED" | "MODIFIED" | "DELETED";
45+
'object': T;
46+
};
47+
export type WatchEventError<U> = {
48+
'type': "ERROR";
49+
'object': U;
50+
};
51+
export type WatchEventBookmark = {
52+
'type': "BOOKMARK";
53+
'object': { metadata: { resourceVersion: string }};
54+
};

mod.ts

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,16 @@ export { InClusterRestClient, KubectlRawRestClient };
1818

1919
import type { RestClient } from './common.ts';
2020

21-
// Feeble attempt at automatically deciding how to talk to Kubernetes
22-
// You'll still need to set the correct permissions for where you are running.
23-
// You can probably be more specific and secure with app-specific Deno.args flags
21+
/**
22+
* Trial-and-error approach for automatically deciding how to talk to Kubernetes.
23+
* You'll still need to set the correct permissions for where you are running.
24+
* You can probably be more specific and secure with app-specific Deno.args flags.
25+
*/
2426
export async function autoDetectClient(): Promise<RestClient> {
2527

2628
// try reading the incluster service account files
2729
try {
30+
// TODO: this should be async
2831
return new InClusterRestClient();
2932
} catch (err) {
3033
console.log('debug: InCluster client failed:', err.name);
@@ -33,5 +36,24 @@ export async function autoDetectClient(): Promise<RestClient> {
3336
// TODO: try hitting localhost:9001 (for KubectlProxyRestClient)
3437

3538
// fall back to execing kubectl
39+
// TODO: try execing first (probably to select default namespace)
3640
return new KubectlRawRestClient();
3741
}
42+
43+
44+
/** Paginates through an API request, yielding each successive page as a whole */
45+
export async function* readAllPages<T, U extends {continue?: string | null}>(pageFunc: (token?: string) => Promise<{metadata: U, items: T[]}>) {
46+
let pageToken: string | undefined;
47+
do {
48+
const page = await pageFunc(pageToken ?? undefined);
49+
yield page;
50+
pageToken = page.metadata.continue ?? undefined;
51+
} while (pageToken);
52+
}
53+
54+
/** Paginates through an API request, yielding every individual item returned */
55+
export async function* readAllItems<T>(pageFunc: (token?: string) => Promise<{metadata: {continue?: string | null}, items: T[]}>) {
56+
for await (const page of readAllPages(pageFunc)) {
57+
yield* page.items;
58+
}
59+
}

stream-transformers.ts

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { JSONObject } from "./common.ts";
1+
import { JSONObject, WatchEvent } from "./common.ts";
22

33
class TextLineReader {
44
decoder: TextDecoder;
@@ -105,3 +105,59 @@ export function readableStreamFromReaderCloser(
105105
() => source.close(),
106106
);
107107
}
108+
109+
110+
class WatchEventReader<T,U> {
111+
objValidator: (val: JSONObject) => T;
112+
errValidator: (val: JSONObject) => U;
113+
constructor(objValidator: (val: JSONObject) => T, errValidator: (val: JSONObject) => U) {
114+
this.objValidator = objValidator;
115+
this.errValidator = errValidator;
116+
}
117+
processObject(raw: JSONObject, controller: TransformStreamDefaultController<WatchEvent<T,U>>) {
118+
const {type, object} = raw;
119+
if (typeof type !== 'string') {
120+
throw new Error(`BUG: watch record 'type' field was ${typeof type}`);
121+
}
122+
if (object == null) {
123+
throw new Error(`BUG: watch record 'object' field was null`);
124+
}
125+
if (typeof object !== 'object') {
126+
throw new Error(`BUG: watch record 'object' field was ${typeof object}`);
127+
}
128+
if (Array.isArray(object)) {
129+
throw new Error(`BUG: watch record 'object' field was Array`);
130+
}
131+
132+
switch (type) {
133+
case 'ERROR':
134+
controller.enqueue({type, object: this.errValidator(object)});
135+
break;
136+
case 'ADDED':
137+
case 'MODIFIED':
138+
case 'DELETED':
139+
controller.enqueue({type, object: this.objValidator(object)});
140+
break;
141+
case 'BOOKMARK':
142+
if (object.metadata && typeof object.metadata === 'object' && !Array.isArray(object.metadata)) {
143+
if (typeof object.metadata.resourceVersion === 'string') {
144+
controller.enqueue({type, object: {
145+
metadata: { resourceVersion: object.metadata.resourceVersion },
146+
}});
147+
break;
148+
}
149+
}
150+
throw new Error(`BUG: BOOKMARK event wasn't recognizable: ${JSON.stringify(object)}`);
151+
default:
152+
throw new Error(`BUG: watch record got unknown event type ${type}`);
153+
}
154+
}
155+
}
156+
157+
/** Validates JSON objects belonging to a watch stream */
158+
export class WatchEventTransformer<T,U> extends TransformStream<JSONObject, WatchEvent<T,U>> {
159+
constructor(objValidator: (val: JSONObject) => T, errValidator: (val: JSONObject) => U) {
160+
const reader = new WatchEventReader(objValidator, errValidator);
161+
super({ transform: reader.processObject.bind(reader) });
162+
}
163+
}

0 commit comments

Comments
 (0)