Skip to content

Commit c704b2a

Browse files
committed
refactor(kubectl): Switch to Deno.Command
This cleans up the code thanks for the streams interface of Deno.Command. Now supports AbortSignals.
1 parent 36ca783 commit c704b2a

File tree

1 file changed

+22
-99
lines changed

1 file changed

+22
-99
lines changed

transports/via-kubectl-raw.ts

Lines changed: 22 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -41,48 +41,31 @@ export class KubectlRawRestClient implements RestClient {
4141
'--context', this.contextName,
4242
] : [];
4343

44-
const p = Deno.run({
45-
cmd: ["kubectl", ...ctxArgs, ...args],
46-
stdin: hasReqBody ? 'piped' : undefined,
47-
stdout: "piped",
48-
stderr: "inherit",
44+
const kubectl = new Deno.Command('kubectl', {
45+
args: [...ctxArgs, ...args],
46+
stdin: hasReqBody ? 'piped' : 'null',
47+
stdout: 'piped',
48+
stderr: 'inherit',
49+
signal: opts.abortSignal,
4950
});
50-
const status = p.status();
51-
52-
if (opts.abortSignal) {
53-
const abortHandler = () => {
54-
isVerbose && console.error('processing kubectl abort');
55-
p.stdout.close();
56-
};
57-
opts.abortSignal.addEventListener("abort", abortHandler);
58-
status.finally(() => {
59-
isVerbose && console.error('cleaning up abort handler');
60-
opts.abortSignal?.removeEventListener("abort", abortHandler);
61-
});
62-
}
51+
const p = kubectl.spawn();
6352

64-
if (p.stdin) {
53+
if (hasReqBody) {
6554
if (opts.bodyStream) {
66-
const {stdin} = p;
67-
opts.bodyStream.pipeTo(new WritableStream({
68-
async write(chunk, controller) {
69-
await stdin.write(chunk).catch(err => controller.error(err));
70-
},
71-
close() {
72-
stdin.close();
73-
},
74-
}));
55+
await opts.bodyStream.pipeTo(p.stdin);
7556
} else if (opts.bodyRaw) {
76-
await p.stdin.write(opts.bodyRaw);
77-
p.stdin.close();
57+
const writer = p.stdin.getWriter();
58+
await writer.write(opts.bodyRaw);
59+
await writer.close();
7860
} else {
7961
isVerbose && console.error(JSON.stringify(opts.bodyJson))
80-
await p.stdin.write(new TextEncoder().encode(JSON.stringify(opts.bodyJson)));
81-
p.stdin.close();
62+
const writer = p.stdin.getWriter();
63+
await writer.write(new TextEncoder().encode(JSON.stringify(opts.bodyJson)));
64+
await writer.close();
8265
}
8366
}
8467

85-
return [p, status] as const;
68+
return [p, p.status] as const;
8669
}
8770

8871
async performRequest(opts: RequestOptions): Promise<any> {
@@ -131,18 +114,13 @@ export class KubectlRawRestClient implements RestClient {
131114
}
132115
});
133116

134-
// with kubectl --raw, we don't really know if the stream is working or not
135-
// until it exits (maybe bad) or prints to stdout (always good)
136-
// so we await the stream creation in order to throw obvious errors properly
137-
const stream = await readableStreamFromProcess(p, status);
138-
139117
if (opts.expectJson) {
140-
return stream
118+
return p.stdout
141119
.pipeThrough(new TextDecoderStream('utf-8'))
142120
.pipeThrough(new TextLineStream())
143121
.pipeThrough(new JsonParsingTransformer());
144122
} else {
145-
return stream;
123+
return p.stdout;
146124
}
147125
}
148126

@@ -154,18 +132,17 @@ export class KubectlRawRestClient implements RestClient {
154132
}
155133

156134
if (opts.expectJson) {
157-
const data = new TextDecoder("utf-8").decode(rawOutput);
135+
const data = new TextDecoder("utf-8").decode(rawOutput.stdout);
158136
return JSON.parse(data);
159137
} else {
160-
return rawOutput;
138+
return rawOutput.stdout;
161139
}
162140
}
163141

164142
}
165-
// export default KubectlRawRestClient;
166143

167144
// `kubectl patch` doesn't have --raw so we convert the HTTP request into a non-raw `kubectl patch` command
168-
// The resulting command is quite verbose but works for all main resources
145+
// The resulting command is quite verbose but works for virtually all resources
169146
function buildPatchCommand(path: string, contentType?: string) {
170147
if (path.includes('?')) throw new Error(
171148
`TODO: KubectlRawRestClient doesn't know how to PATCH with a querystring yet. ${JSON.stringify(path)}`);
@@ -174,7 +151,7 @@ function buildPatchCommand(path: string, contentType?: string) {
174151
if (patchMode === 'apply') throw new Error(
175152
`TODO: Server-Side Apply is not yet implemented (and also not enabled in vanilla Kubernetes yet)`);
176153
if (!['json', 'merge', 'strategic'].includes(patchMode)) throw new Error(
177-
`Unrecognized Content-Type ${contentType} for PATCH, unable to translate to 'kubectl patch'`);
154+
`Unrecognized Content-Type "${contentType}" for PATCH, unable to translate to 'kubectl patch'`);
178155

179156
const pathParts = path.slice(1).split('/');
180157

@@ -215,57 +192,3 @@ function buildPatchCommand(path: string, contentType?: string) {
215192
`--patch-file`, `/dev/stdin`, // we'll pipe the patch, instead of giving it inline
216193
...resourceArgs];
217194
}
218-
219-
function readableStreamFromProcess(p: Deno.Process<{cmd: any, stdout: 'piped'}>, status: Promise<Deno.ProcessStatus>) {
220-
// with kubectl --raw, we don't really know if the stream is working or not
221-
// until it exits (maybe bad) or prints to stdout (always good)
222-
// so let's wait to return the stream until the first read returns
223-
return new Promise<ReadableStream<Uint8Array>>((ok, err) => {
224-
let isFirstRead = true;
225-
const startTime = new Date;
226-
227-
// Convert Deno.Reader|Deno.Closer into a ReadableStream (like 'fetch' gives)
228-
let ended = false;
229-
const stream = readableStreamFromReader({
230-
close: () => {
231-
p.stdout.close();
232-
// is this the most reliable way??
233-
if (!ended) Deno.run({cmd: ['kill', `${p.pid}`]});
234-
},
235-
// Intercept reads to try doing some error handling/mgmt
236-
read: async buf => {
237-
// do the read
238-
const num = await p.stdout.read(buf);
239-
240-
// if we EOFd, check the process status
241-
if (num === null) {
242-
ended = true;
243-
const stat = await status;
244-
// if it took multiple minutes to fail, probably just failed unrelated
245-
const delayMillis = new Date().valueOf() - startTime.valueOf();
246-
// TODO: some way of passing an error through the ReadableStream?
247-
if (stat.code !== 0 && delayMillis < 3*60*1000) {
248-
// might not be too late to fail the call more properly
249-
err(new Error(`kubectl stream ended with code ${stat.code}`));
250-
return num;
251-
}
252-
// if exit code was 0, let the EOF happen normally, below
253-
}
254-
255-
// if we got our first data, resolve the original promise
256-
if (isFirstRead) {
257-
isFirstRead = false;
258-
ok(stream);
259-
}
260-
261-
return num;
262-
},
263-
}, {
264-
chunkSize: 8*1024, // watch events tend to be pretty small I guess?
265-
strategy: {
266-
highWaterMark: 1, // must be >0 to pre-warm the stream
267-
},
268-
});
269-
// 'stream' gets passed to ok() to be returned
270-
});
271-
}

0 commit comments

Comments
 (0)