Skip to content

Commit 1c006ea

Browse files
committed
Add functionality of AMQP job executor
1 parent a81836b commit 1c006ea

File tree

5 files changed

+306
-40
lines changed

5 files changed

+306
-40
lines changed
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
// Runs a job as a Pod (Kubernetes Job) in a Kubernetes cluster
2+
3+
const k8s = require('@kubernetes/client-node');
4+
var BufferManager = require('./buffer_manager.js').BufferManager;
5+
var RestartCounter = require('./restart_counter.js').RestartCounter;
6+
var submitK8sJob = require('./k8sJobSubmit.js').submitK8sJob;
7+
var fs = require('fs');
8+
9+
let bufferManager = new BufferManager();
10+
11+
let backoffLimit = process.env.HF_VAR_BACKOFF_LIMIT || 0;
12+
let restartCounter = new RestartCounter(backoffLimit);
13+
14+
// Function k8sCommandGroup
15+
//
16+
// Inputs:
17+
// - bufferItems - array containing objects with following properties:
18+
// * ins
19+
// * outs
20+
// * context
21+
// * cb
22+
async function k8sCommandGroup(bufferItems) {
23+
24+
// No action needed when buffer is empty
25+
if (bufferItems.length == 0) {
26+
return;
27+
}
28+
29+
let startTime = Date.now();
30+
console.log("k8sCommandGroup started, time:", startTime);
31+
32+
// Function for rebuffering items
33+
let restartFn = (bufferIndex) => {
34+
let bufferItem = bufferItems[bufferIndex];
35+
let taskId = bufferItem.context.taskId;
36+
try {
37+
var partition = bufferItem.context.executor.partition; // in case 'executor' doesn't exist
38+
} catch(error) { }
39+
if (restartCounter.isRestartPossible(taskId)) {
40+
let restartVal = restartCounter.increase(taskId);
41+
console.log("Readding task", taskId, "to buffer (restartCount:", restartVal + ") ...");
42+
let itemName = bufferItem.context.name;
43+
bufferManager.addItem(itemName, bufferItem, partition);
44+
}
45+
return;
46+
}
47+
48+
// Extract particular arrays from buffer items
49+
let jobArr = [];
50+
let taskIdArr = [];
51+
let contextArr = [];
52+
let cbArr = [];
53+
for (let i=0; i<bufferItems.length; i++) {
54+
let bufferItem = bufferItems[i];
55+
let ins = bufferItem.ins;
56+
let outs = bufferItem.outs;
57+
let context = bufferItem.context;
58+
let cb = bufferItem.cb;
59+
60+
var job = context.executor; // object containing 'executable', 'args' and others
61+
job.name = context.name;
62+
job.ins = ins;
63+
job.outs = outs;
64+
65+
jobArr.push(job);
66+
taskIdArr.push(context.taskId);
67+
contextArr.push(context);
68+
cbArr.push(cb);
69+
}
70+
71+
// All jobs in the group must have a similar context!
72+
// Here we retrieve the context of the first job in the group.
73+
// It is used below to read configuration for ALL jobs in the group.
74+
let context = contextArr[0];
75+
76+
// let cluster = await getCluster();
77+
// const token = await getGCPToken();
78+
79+
// Read custom parameters for job template '${var}' variables. These can be
80+
// provided in 'workflow.config.jobvars.json' file.
81+
//
82+
// In addition, to support two (or more) clusters (for cloud bursting), if
83+
// 'partition' is defined, check if there is a custom configuration for that
84+
// partition -- it can be provided in file 'workflow.config.jobvars{$partNum}.json'.
85+
// This partition-specific config may override parameters of the job, possibly even
86+
// define a path to a different kubeconfig to be loaded.
87+
88+
let partition = context.executor.partition; // could be 'undefined'
89+
//let partitionConfigDir = process.env.HF_VAR_PARTITION_CONFIG_DIR || "/opt/hyperflow/partitions";
90+
//let partitionConfigFile = partitionConfigDir + "/" + "part." + partition + ".config.json";
91+
92+
// custom parameters for the job YAML template (will overwrite default values)
93+
// partition-specific configuration, if exists, overrides general configuration
94+
let customParams = context.appConfig.jobvars || {}; // general configuration
95+
let customParamsPartition = partition ? context.appConfig['jobvars'+partition]: null;
96+
if (customParamsPartition) { // partition-specific configuration
97+
Object.keys(customParamsPartition).forEach(function(key) {
98+
customParams[key] = customParamsPartition[key];
99+
});
100+
}
101+
102+
//console.log("CUSTOM params...", customParams);
103+
104+
// Set kubeconfig path if overridden (could point to a remote cluster)
105+
delete process.env.KUBECONFIG;
106+
if (customParams.kubeConfigPath) {
107+
process.env.KUBECONFIG = customParams.kubeConfigPath;
108+
}
109+
110+
const kubeconfig = new k8s.KubeConfig();
111+
kubeconfig.loadFromDefault(); // loadFromString(JSON.stringify(kconfig))
112+
113+
let jobExitCodes = [];
114+
try {
115+
jobExitCodes = await submitK8sJob(kubeconfig, jobArr, taskIdArr, contextArr, customParams, restartFn);
116+
} catch (err) {
117+
console.log("Error when submitting job:", err);
118+
throw err;
119+
}
120+
121+
let endTime = Date.now();
122+
console.log("Ending k8sCommandGroup function, time:", endTime, "exit codes:", jobExitCodes);
123+
124+
// Stop the entire workflow if a job fails (controlled by an environment variable)
125+
for (var i=0; i<jobExitCodes.length; i++) {
126+
let jobExitCode = jobExitCodes[i];
127+
if (jobExitCode != 0 && process.env.HF_VAR_STOP_WORKFLOW_WHEN_JOB_FAILED=="1") {
128+
let taskId = taskIdArr[i];
129+
let job = jobArr[i];
130+
console.log('Error: job', taskId, 'exited with error code', jobExitCode, ', stopping workflow.');
131+
console.log('Error details: job.name: ' + job.name + ', job.args: ' + job.args.join(' '));
132+
process.exit(1);
133+
}
134+
}
135+
136+
// if we're here, the job should have succesfully completed -- we write this
137+
// information to Redis (job executor may make use of it).
138+
let markPromises = [];
139+
for (var i=0; i<contextArr.length; i++) {
140+
// skip failed jobs
141+
if (jobExitCodes[i] != 0) {
142+
continue;
143+
}
144+
145+
let context = contextArr[i];
146+
markPromises.push(context.markTaskCompleted());
147+
}
148+
try {
149+
await Promise.all(markPromises);
150+
} catch {
151+
console.error("Marking jobs", taskIdArr, "as completed failed.")
152+
}
153+
154+
for (var i=0; i<cbArr.length; i++) {
155+
// skip failed jobs
156+
if (jobExitCodes[i] != 0) {
157+
continue;
158+
}
159+
160+
let cb = cbArr[i];
161+
let outs = jobArr[i].outs;
162+
cb(null, outs);
163+
}
164+
165+
return;
166+
}
167+
168+
bufferManager.setCallback((items) => k8sCommandGroup(items));
169+
170+
async function k8sCommand(ins, outs, context, cb) {
171+
/** Buffer Manager configuration. */
172+
buffersConf = context.appConfig.jobAgglomerations;
173+
let alreadyConfigured = bufferManager.isConfigured();
174+
if (alreadyConfigured == false && buffersConf != undefined) {
175+
bufferManager.configure(buffersConf);
176+
}
177+
178+
/** Buffer item. */
179+
let item = {
180+
"ins": ins,
181+
"outs": outs,
182+
"context": context,
183+
"cb": cb
184+
};
185+
186+
try {
187+
var partition = context.executor.partition; // in case 'executor' doesn't exist
188+
} catch(error) { }
189+
bufferManager.addItem(context.name, item, partition);
190+
191+
return;
192+
}
193+
194+
exports.k8sCommand = k8sCommand;
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
const amqplib = require('amqplib'),
2+
createJobMessage = require('../../common/jobMessage').createJobMessage;
3+
let channels = {};
4+
let conn = null;
5+
6+
async function initialize(queue_name) {
7+
8+
if (conn === null) {
9+
conn = await amqplib.connect(`amqp://${process.env.RABBIT_HOSTNAME}`, "heartbeat=60");
10+
}
11+
let ch = await conn.createChannel()
12+
await ch.assertQueue(queue_name, {durable: false, expires: 600000}); // TODO: implement dynamic queue creation & cleanup
13+
channels[queue_name] = ch
14+
15+
}
16+
17+
async function enqueueJobs(jobArr, taskIdArr, contextArr, customParams) {
18+
let context = contextArr[0];
19+
let namespace = process.env.HF_VAR_NAMESPACE || 'default'
20+
let queue_name = namespace + "." + context['name']
21+
if (conn === null || !(queue_name in channels)) {
22+
await initialize(queue_name)
23+
}
24+
let ch = channels[queue_name]
25+
try {
26+
27+
console.log(`jobArr: ${JSON.stringify(jobArr)}, taskIdArr: ${JSON.stringify(taskIdArr)}, contextArr: ${JSON.stringify(contextArr)}, customParams: ${JSON.stringify(customParams)}`)
28+
let tasks = [];
29+
30+
for (let i = 0; i < jobArr.length; i++) {
31+
let job = jobArr[i];
32+
let taskId = taskIdArr[i];
33+
let jobMessage = createJobMessage(job.ins, job.outs, contextArr[i], taskId);
34+
await context.sendMsgToJob(JSON.stringify(jobMessage), taskId) // TODO remove
35+
tasks.push({"id": taskId, "message": jobMessage});
36+
}
37+
38+
await ch.publish('', queue_name, Buffer.from(JSON.stringify({'tasks': tasks})));
39+
} catch (error) {
40+
console.log(error)
41+
}
42+
}
43+
44+
exports.enqueueJobs = enqueueJobs
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
2+
async function synchronizeJobs(jobArr, taskIdArr, contextArr, customParams, restartFn) {
3+
4+
let context = contextArr[0];
5+
// 'awaitJob' -- wait for the job to finish, possibly restarting it
6+
// Restart policy -- enable if "HF_VAR_BACKOFF_LIMIT" (number of retries) is defined
7+
var backoffLimit = process.env.HF_VAR_BACKOFF_LIMIT || 0;
8+
var restartPolicy = backoffLimit > 0 ? "OnFailure" : "Never";
9+
var restartCount = 0;
10+
var awaitJob = async (taskId) => {
11+
try {
12+
var jobResult = await context.jobResult(0, taskId); // timeout=0 means indefinite
13+
} catch (err) {
14+
console.error(err);
15+
throw err;
16+
}
17+
let taskEnd = new Date().toISOString();
18+
console.log('Job ended with result:', jobResult, 'time:', taskEnd);
19+
// job exit code
20+
return parseInt(jobResult[1]);
21+
}
22+
23+
var awaitJobs = async (taskIdArr) => {
24+
let awaitPromises = []
25+
for (var i = 0; i < taskIdArr.length; i++) {
26+
awaitPromises.push(awaitJob(taskIdArr[i]));
27+
}
28+
return Promise.all(awaitPromises);
29+
}
30+
31+
let jobExitCodes = await awaitJobs(taskIdArr);
32+
for (let i = 0; i < jobExitCodes.length; i++) {
33+
let jobExitCode = jobExitCodes[i];
34+
let taskId = taskIdArr[i];
35+
if (jobExitCode !== 0) {
36+
console.log("Job", taskId, "failed");
37+
restartFn(i);
38+
// NOTE: job message is preserved, so we don't have to send it again.
39+
}
40+
}
41+
42+
return jobExitCodes;
43+
44+
}
45+
46+
exports.synchronizeJobs = synchronizeJobs

functions/kubernetes/k8sCommand.js

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ const k8s = require('@kubernetes/client-node');
44
var BufferManager = require('./buffer_manager.js').BufferManager;
55
var RestartCounter = require('./restart_counter.js').RestartCounter;
66
var submitK8sJob = require('./k8sJobSubmit.js').submitK8sJob;
7+
var amqpEnqueueJobs = require('./amqpConnector.js').enqueueJobs;
8+
var synchronizeJobs = require('./jobSynchronization').synchronizeJobs
79
var fs = require('fs');
810

911
let bufferManager = new BufferManager();
@@ -19,6 +21,18 @@ let restartCounter = new RestartCounter(backoffLimit);
1921
// * outs
2022
// * context
2123
// * cb
24+
25+
function getExecutorType(context) {
26+
if ("workerpools" in context.appConfig) {
27+
for (const taskType of context.appConfig.workerpools) {
28+
if (taskType.name === context['name']) {
29+
return "WORKER_POOL"
30+
}
31+
}
32+
}
33+
return "JOB"
34+
}
35+
2236
async function k8sCommandGroup(bufferItems) {
2337

2438
// No action needed when buffer is empty
@@ -112,7 +126,12 @@ async function k8sCommandGroup(bufferItems) {
112126

113127
let jobExitCodes = [];
114128
try {
115-
jobExitCodes = await submitK8sJob(kubeconfig, jobArr, taskIdArr, contextArr, customParams, restartFn);
129+
if (getExecutorType(context) === "WORKER_POOL") {
130+
await amqpEnqueueJobs(jobArr, taskIdArr, contextArr, customParams)
131+
} else {
132+
await submitK8sJob(kubeconfig, jobArr, taskIdArr, contextArr, customParams, restartFn)
133+
}
134+
jobExitCodes = await synchronizeJobs(jobArr, taskIdArr, contextArr, customParams);
116135
} catch (err) {
117136
console.log("Error when submitting job:", err);
118137
throw err;

functions/kubernetes/k8sJobSubmit.js

Lines changed: 2 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ var createK8sJobYaml = (job, taskIds, context, jobYamlTemplate, customParams) =>
5858
var volumePath = '/work_dir';
5959
var jobName = Math.random().toString(36).substring(7) + '-' +
6060
job.name.replace(/_/g, '-') + "-" + context.procId + '-' + context.firingId;
61+
var workingDirPath = context.workdir;
6162

6263
// remove chars not allowd in Pod names
6364
jobName = jobName.replace(/[^0-9a-z-]/gi, '').toLowerCase();
@@ -80,7 +81,7 @@ var createK8sJobYaml = (job, taskIds, context, jobYamlTemplate, customParams) =>
8081
restartPolicy: restartPolicy, backoffLimit: backoffLimit,
8182
experimentId: context.hfId + ":" + context.appId,
8283
workflowName: context.wfname, taskName: job.name,
83-
appId: context.appId
84+
appId: context.appId, workingDirPath: workingDirPath
8485
}
8586

8687
// Add/override custom parameters for the job
@@ -193,44 +194,6 @@ var submitK8sJob = async(kubeconfig, jobArr, taskIdArr, contextArr, customParams
193194
throw err;
194195
}
195196

196-
// 'awaitJob' -- wait for the job to finish, possibly restarting it
197-
// Restart policy -- enable if "HF_VAR_BACKOFF_LIMIT" (number of retries) is defined
198-
var backoffLimit = process.env.HF_VAR_BACKOFF_LIMIT || 0;
199-
var restartPolicy = backoffLimit > 0 ? "OnFailure": "Never";
200-
var restartCount = 0;
201-
var awaitJob = async(taskId) => {
202-
try {
203-
var jobResult = await context.jobResult(0, taskId); // timeout=0 means indefinite
204-
} catch (err) {
205-
console.error(err);
206-
throw err;
207-
}
208-
let taskEnd = new Date().toISOString();
209-
console.log('Job ended with result:', jobResult, 'time:', taskEnd);
210-
var code = parseInt(jobResult[1]); // job exit code
211-
return code;
212-
}
213-
214-
var awaitJobs = async(taskIdArr) => {
215-
let awaitPromises = []
216-
for (var i=0; i<taskIdArr.length; i++) {
217-
awaitPromises.push(awaitJob(taskIdArr[i]));
218-
}
219-
return Promise.all(awaitPromises);
220-
}
221-
222-
let jobExitCodes = await awaitJobs(taskIdArr);
223-
for (let i = 0; i < jobExitCodes.length; i++) {
224-
let jobExitCode = jobExitCodes[i];
225-
let taskId = taskIdArr[i];
226-
if (jobExitCode != 0) {
227-
console.log("Job", taskId, "failed");
228-
restartFn(i);
229-
// NOTE: job message is preserved, so we don't have to send it again.
230-
}
231-
}
232-
233-
return jobExitCodes;
234197
}
235198

236199
exports.submitK8sJob = submitK8sJob;

0 commit comments

Comments
 (0)