Skip to content

Commit 863a75a

Browse files
authored
Merge pull request #43 from koxu1996/feature/job-agglomeration
[k8s] Agglomerating jobs
2 parents 41a568d + 8c054d7 commit 863a75a

File tree

5 files changed

+458
-81
lines changed

5 files changed

+458
-81
lines changed

functions/kubernetes/buffer.js

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/**
2+
* Class used for buffering jobs - kind of agglomeration.
3+
*/
4+
class BufferCountWithTimeout {
5+
constructor(count, idleTimeoutMs, cb) {
6+
this.elements = []
7+
this.triggerCount = count;
8+
this.idleTimeoutMs = idleTimeoutMs;
9+
this.cb = cb;
10+
this._rescheduleTimeout();
11+
}
12+
13+
addItem(item) {
14+
this.elements.push(item);
15+
if (this.elements.length >= this.triggerCount) {
16+
console.log("Running callback [reached count]");
17+
let elementsCopy = this.elements;
18+
this.elements = [];
19+
this.cb(elementsCopy);
20+
}
21+
this._rescheduleTimeout();
22+
}
23+
24+
_rescheduleTimeout() {
25+
if (this.timeoutId) {
26+
clearInterval(this.timeoutId);
27+
}
28+
this.timeoutId = setTimeout(() => {
29+
console.log("Running callback [reached timeout]");
30+
let elementsCopy = this.elements;
31+
this.timeoutId = null;
32+
this.elements = [];
33+
this.cb(elementsCopy);
34+
}, this.idleTimeoutMs);
35+
}
36+
}
37+
38+
/**
39+
* Testing function.
40+
* TODO: REMOVE
41+
*/
42+
async function testBuffer() {
43+
let fn = (items) => {
44+
console.log("Got from buffer:", items);
45+
}
46+
let test = new BufferCountWithTimeout(3, 1000, fn);
47+
test.addItem(1);
48+
test.addItem(2);
49+
test.addItem(3);
50+
test.addItem(4);
51+
test.addItem(5);
52+
53+
await new Promise(resolve => setTimeout(resolve, 2000));
54+
55+
test.addItem(6);
56+
test.addItem(7);
57+
test.addItem(8);
58+
test.addItem(9);
59+
test.addItem(10);
60+
test.addItem(12);
61+
test.addItem(13);
62+
}
63+
64+
exports.BufferCountWithTimeout = BufferCountWithTimeout;
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
var Buffer = require('./buffer.js').BufferCountWithTimeout;
2+
3+
/**
4+
* Class used to store multiple buffers and add items to them
5+
* according to specified configuration.
6+
*/
7+
class BufferManager {
8+
constructor(cb) {
9+
this.buffers = []
10+
this.taskBufferMap = {}
11+
this.configured = false;
12+
this.cb = cb;
13+
}
14+
15+
setCallback(cb) {
16+
if (this.cb != undefined) {
17+
throw Error("Callback is already set");
18+
}
19+
this.cb = cb;
20+
return;
21+
}
22+
23+
isConfigured() {
24+
return this.configured;
25+
}
26+
27+
configure(buffersConf) {
28+
/** Configuration cannot be executed more than once. */
29+
if (this.configured == true) {
30+
throw Error("BufferManager can be configured only once");
31+
}
32+
this.configured = true;
33+
34+
/** Parse configuration. */
35+
if (Array.isArray(buffersConf) == false) {
36+
throw Error("Buffers configuration should be an array");
37+
}
38+
for (let i = 0; i < buffersConf.length; i++) {
39+
let matchTask = buffersConf[i]['matchTask'];
40+
let size = buffersConf[i]['size'];
41+
let timeoutMs = buffersConf[i]['timeoutMs'];
42+
if (matchTask == undefined || size === undefined || timeoutMs == undefined) {
43+
throw Error("Following keys are required: matchTask, size, timeoutMs");
44+
}
45+
let res = this.buffers.push(new Buffer(size, timeoutMs, this.cb));
46+
let buffIndex = res - 1;
47+
48+
/** Build map of taskName -> bufferId. */
49+
for (let j = 0; j < matchTask.length; j++) {
50+
let taskName = matchTask[j];
51+
if (this.taskBufferMap[taskName] != undefined) {
52+
console.log("WARNING: task", taskName, "is already matched in another buffer, ignoring");
53+
continue;
54+
}
55+
this.taskBufferMap[taskName] = buffIndex;
56+
}
57+
}
58+
59+
return;
60+
}
61+
62+
addItem(taskName, item) {
63+
let bufferId = this.taskBufferMap[taskName];
64+
/** If task is not buffered, then execute callback immediately. */
65+
if (bufferId == undefined) {
66+
this.cb([item]);
67+
return;
68+
}
69+
70+
/** Buffering item. */
71+
this.buffers[bufferId].addItem(item);
72+
73+
return;
74+
}
75+
}
76+
77+
async function testBufferManager() {
78+
let cb = (items) => {
79+
console.log("Got from buffer:", items);
80+
}
81+
buffersConf = [
82+
{
83+
matchTask: ['job_a', 'job_b'],
84+
size: 2,
85+
timeoutMs: 3000,
86+
},
87+
{
88+
matchTask: ['job_b', 'job_c'],
89+
size: 3,
90+
timeoutMs: 2500,
91+
},
92+
];
93+
let test = new BufferManager(cb)
94+
test.configure(buffersConf);
95+
test.addItem('job_a', 1);
96+
test.addItem('job_a', 2);
97+
test.addItem('job_b', 3);
98+
test.addItem('job_b', 4);
99+
test.addItem('job_b', 5);
100+
101+
await new Promise(resolve => setTimeout(resolve, 6000));
102+
103+
test.addItem('job_a', 6);
104+
test.addItem('job_c', 7);
105+
test.addItem('job_c', 8);
106+
}
107+
108+
exports.BufferManager = BufferManager;

functions/kubernetes/k8sCommand.js

Lines changed: 128 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,80 @@
11
// Runs a job as a Pod (Kubernetes Job) in a Kubernetes cluster
22

33
const k8s = require('@kubernetes/client-node');
4+
var BufferManager = require('./buffer_manager.js').BufferManager;
5+
var RestartCounter = require('./restart_counter.js').RestartCounter;
46
var submitK8sJob = require('./k8sJobSubmit.js').submitK8sJob;
57
var fs = require('fs');
68

7-
async function k8sCommand(ins, outs, context, cb) {
9+
let bufferManager = new BufferManager();
10+
11+
let backoffLimit = process.env.HF_VAR_BACKOFF_LIMIT || 0;
12+
let restartCounter = new RestartCounter(backoffLimit);
13+
14+
// Function k8sCommandGroup
15+
//
16+
// Inputs:
17+
// - bufferItems - array containing objects with following properties:
18+
// * ins
19+
// * outs
20+
// * context
21+
// * cb
22+
async function k8sCommandGroup(bufferItems) {
23+
24+
// No action needed when buffer is empty
25+
if (bufferItems.length == 0) {
26+
return;
27+
}
828

929
let startTime = Date.now();
10-
console.log("k8sCommand started, time:", startTime);
30+
console.log("k8sCommandGroup started, time:", startTime);
31+
32+
// Function for rebuffering items
33+
let restartFn = (bufferIndex) => {
34+
let bufferItem = bufferItems[bufferIndex];
35+
let taskId = bufferItem.context.taskId;
36+
if (restartCounter.isRestartPossible(taskId)) {
37+
let restartVal = restartCounter.increase(taskId);
38+
console.log("Readding task", taskId, "to buffer (restartCount:", restartVal + ") ...");
39+
let itemName = bufferItem.context.name;
40+
bufferManager.addItem(itemName, bufferItem);
41+
}
42+
return;
43+
}
44+
45+
// Extract particular arrays from buffer items
46+
let jobArr = [];
47+
let taskIdArr = [];
48+
let contextArr = [];
49+
let cbArr = [];
50+
for (let i=0; i<bufferItems.length; i++) {
51+
let bufferItem = bufferItems[i];
52+
let ins = bufferItem.ins;
53+
let outs = bufferItem.outs;
54+
let context = bufferItem.context;
55+
let cb = bufferItem.cb;
56+
57+
var job = context.executor; // object containing 'executable', 'args' and others
58+
job.name = context.name;
59+
job.ins = ins;
60+
job.outs = outs;
61+
62+
jobArr.push(job);
63+
taskIdArr.push(context.taskId);
64+
contextArr.push(context);
65+
cbArr.push(cb);
66+
}
67+
68+
let context = contextArr[0];
69+
1170
// let cluster = await getCluster();
1271
// const token = await getGCPToken();
1372

14-
// support for two (or more) clusters (for cloud bursting)
73+
// support for two (or more) clusters (for cloud bursting)
1574
// if 'partition' is defined, check if there is a custom config file
1675
// for that partition. This config file may override parameters of the job,
1776
// possibly even define a path to a different kube_config to be loaded
18-
let partition = context.executor.partition;
77+
let partition = context.executor.partition;
1978
let partitionConfigDir = process.env.HF_VAR_PARTITION_CONFIG_DIR || "/opt/hyperflow/partitions";
2079
let partitionConfigFile = partitionConfigDir + "/" + "part." + partition + ".config.json";
2180

@@ -30,7 +89,7 @@ async function k8sCommand(ins, outs, context, cb) {
3089
console.log(partitionConfigFile);
3190
console.log("CUSTOM...", customParams);
3291

33-
// Set kube_config path if overridden
92+
// Set kube_config path if overridden
3493
if (customParams.kubeConfigPath) {
3594
process.env.KUBECONFIG = customParams.kubeConfigPath;
3695
console.log(process.env.KUBECONFIG);
@@ -39,31 +98,81 @@ async function k8sCommand(ins, outs, context, cb) {
3998
const kubeconfig = new k8s.KubeConfig();
4099
kubeconfig.loadFromDefault(); // loadFromString(JSON.stringify(kconfig))
41100

42-
var job = context.executor; // object containing 'executable', 'args' and others
43-
job.name = context.name;
44-
job.ins = ins;
45-
job.outs = outs;
46-
47-
let jobExitCode = await submitK8sJob(kubeconfig, job, context.taskId, context, customParams);
101+
let jobExitCodes = [];
102+
try {
103+
jobExitCodes = await submitK8sJob(kubeconfig, jobArr, taskIdArr, contextArr, customParams, restartFn);
104+
} catch (err) {
105+
console.log("Error when submitting job:", err);
106+
throw err;
107+
}
48108

49109
let endTime = Date.now();
50-
console.log("Ending k8sCommand function, time:", endTime);
110+
console.log("Ending k8sCommandGroup function, time:", endTime, "exit codes:", jobExitCodes);
51111

52112
// Stop the entire workflow if a job fails (controlled by an environment variable)
53-
if (jobExitCode != 0 && process.env.HF_VAR_STOP_WORKFLOW_WHEN_JOB_FAILED=="1") {
54-
console.log('Error: job exited with error code, stopping workflow.');
55-
console.log('Error details: job.name: ' + job.name + ', job.args: ' + job.args.join(' '));
56-
process.exit(1);
113+
for (var i=0; i<jobExitCodes.length; i++) {
114+
let jobExitCode = jobExitCodes[i];
115+
if (jobExitCode != 0 && process.env.HF_VAR_STOP_WORKFLOW_WHEN_JOB_FAILED=="1") {
116+
let taskId = taskIdArr[i];
117+
let job = jobArr[i];
118+
console.log('Error: job', taskId, 'exited with error code', jobExitCode, ', stopping workflow.');
119+
console.log('Error details: job.name: ' + job.name + ', job.args: ' + job.args.join(' '));
120+
process.exit(1);
121+
}
57122
}
58123

59124
// if we're here, the job should have succesfully completed -- we write this
60125
// information to Redis (job executor may make use of it).
126+
let markPromises = [];
127+
for (var i=0; i<contextArr.length; i++) {
128+
// skip failed jobs
129+
if (jobExitCodes[i] != 0) {
130+
continue;
131+
}
132+
133+
let context = contextArr[i];
134+
markPromises.push(context.markTaskCompleted());
135+
}
61136
try {
62-
await context.markTaskCompleted();
137+
await Promise.all(markPromises);
63138
} catch {
64-
console.error("Marking job", context.taskId, "as completed failed.")
139+
console.error("Marking jobs", taskIdArr, "as completed failed.")
140+
}
141+
142+
for (var i=0; i<cbArr.length; i++) {
143+
// skip failed jobs
144+
if (jobExitCodes[i] != 0) {
145+
continue;
146+
}
147+
148+
let cb = cbArr[i];
149+
let outs = jobArr[i].outs;
150+
cb(null, outs);
151+
}
152+
153+
return;
154+
}
155+
156+
bufferManager.setCallback((items) => k8sCommandGroup(items));
157+
158+
async function k8sCommand(ins, outs, context, cb) {
159+
/** Buffer Manager configuration. */
160+
buffersConf = context.appConfig.jobAgglomerations;
161+
let alreadyConfigured = bufferManager.isConfigured();
162+
if (alreadyConfigured == false && buffersConf != undefined) {
163+
bufferManager.configure(buffersConf);
65164
}
66-
cb(null, outs);
165+
166+
/** Buffer item. */
167+
let item = {
168+
"ins": ins,
169+
"outs": outs,
170+
"context": context,
171+
"cb": cb
172+
};
173+
bufferManager.addItem(context.name, item);
174+
175+
return;
67176
}
68177

69178
exports.k8sCommand = k8sCommand;

0 commit comments

Comments
 (0)