Skip to content

Commit 18d2827

Browse files
committed
Extend job submission and notification via Redis
1 parent 509ed75 commit 18d2827

File tree

5 files changed

+109
-25
lines changed

5 files changed

+109
-25
lines changed
Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,26 @@
11
var spawn = require('child_process').spawn;
22

33
// Spawns a job "node handler.js" and waits for the notification of its
4-
// completion using the Redis task status notification mechanism
5-
async function task_status_redis_test(ins, outs, context, cb) {
4+
// completion using the Redis job status notification mechanism
5+
async function job_status_redis_test(ins, outs, context, cb) {
66
var n = Number(ins.number.data[0]);
77

88
//console.log("Spawning process...");
99

10-
var proc = spawn('node', ['handler.js', context.taskId, context.redis_url]);
10+
const executable = context.executor.executable;
11+
let jobMessage = JSON.stringify({
12+
"executable": executable,
13+
"args": context.executor.args,
14+
"env": (context.executor.env || {}),
15+
"inputs": ins.map(i => i),
16+
"outputs": outs.map(o => o),
17+
"stdout": context.executor.stdout, // if present, denotes file name to which stdout should be redirected
18+
"redis_url": context.redis_url,
19+
"taskId": context.taskId
20+
});
21+
22+
// "submit" job (start the handler process)
23+
var proc = spawn('npm', ['start', context.taskId, context.redis_url], {shell: true});
1124

1225
proc.stderr.on('data', function(data) {
1326
console.log(data.toString());
@@ -21,17 +34,25 @@ async function task_status_redis_test(ins, outs, context, cb) {
2134
//console.log('Process exited with code', code);
2235
});
2336

24-
// wait for the task to finish (timeout=0 means indefinite)
37+
// send message to the job (command to be executed)
38+
try {
39+
await context.sendMsgToJob(jobMessage);
40+
} catch(err) {
41+
console.error(err);
42+
throw err;
43+
}
44+
45+
// wait for the job to finish (timeout=0 means indefinite)
2546
try {
26-
var taskStatus = await context.taskStatus(0);
27-
console.log('Received task status:', taskStatus);
47+
var jobStatus = await context.jobStatus(0);
48+
console.log('Received job status:', jobStatus);
2849
setTimeout(function() {
2950
cb(null, outs);
3051
}, 5000);
3152
} catch(err) {
32-
console.err(err);
53+
console.error(err);
3354
throw err;
3455
}
3556
}
3657

37-
exports.task_status_redis_test = task_status_redis_test;
58+
exports.job_status_redis_test = job_status_redis_test;

examples/TaskStatusRedis/handler.js

Lines changed: 54 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
var redis = require('redis');
33

44
if (process.argv.length < 4) {
5-
console.err("Usage: node handler.js <taskId> <redis_url>");
5+
console.error("Usage: node handler.js <taskId> <redis_url>");
66
process.exit(1);
77
}
88

@@ -15,16 +15,58 @@ redis_url = process.argv[3];
1515

1616
var rcl = redis.createClient(redis_url);
1717

18-
// After some delay, do a RPUSH to the Redis list to notify job completion
19-
var delay=Math.random()*3000;
20-
// console.log("Delay:", delay);
21-
setTimeout(function() {
22-
rcl.rpush(taskId, "OK", function(err, reply) {
23-
if (err) {
24-
console.err("Redis notification failed in handler.");
18+
// get job message from Redis
19+
var getJobMessage = async function(timeout) {
20+
return new Promise(function(resolve, reject) {
21+
const jobMsgKey = taskId + "_msg";
22+
rcl.brpop(jobMsgKey, timeout, function(err, reply) {
23+
if (err) reject(err)
24+
else {
25+
resolve(reply);
26+
}
27+
});
28+
});
29+
}
30+
31+
// send notification about job completion to Redis
32+
var notifyJobCompletion = async function() {
33+
return new Promise(function(resolve, reject) {
34+
rcl.rpush(taskId, "OK", function(err, reply) {
35+
if (err) reject(err)
36+
else {
37+
resolve(reply);
38+
}
39+
});
40+
});
41+
}
42+
43+
44+
async function executeJob() {
45+
46+
// 1. get job message
47+
try {
48+
var jobMessage = await getJobMessage(10);
49+
} catch(err) {
50+
console.error(err);
2551
throw err;
26-
} else {
27-
process.exit(0);
2852
}
29-
});
30-
}, delay);
53+
console.log("Received job message:", jobMessage);
54+
55+
// 2. HERE the job would be executed
56+
57+
// 3. Notify job completion (delay simulates execution time)
58+
var delay=Math.random()*3000;
59+
// console.log("Delay:", delay);
60+
setTimeout(async function() {
61+
try {
62+
await notifyJobCompletion();
63+
} catch(err) {
64+
console.error("Redis notification failed", err);
65+
throw err;
66+
}
67+
68+
process.exit(0);
69+
}, delay);
70+
}
71+
72+
executeJob()

examples/TaskStatusRedis/workflow.json

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,18 @@
55
"type": "dataflow",
66
"parlevel": 0,
77
"firingLimit": 100,
8-
"function": "task_status_redis_test",
8+
"function": "job_status_redis_test",
9+
"config": {
10+
"executor": {
11+
"executable": "echo",
12+
"args": ["Hello", "world"]
13+
}
14+
},
915
"ins": [ "number" ],
1016
"outs": [ ]
1117
} ],
1218
"signals": [ {
13-
"name": "number",
19+
"name": "number",
1420
"data": [ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100 ]
1521
} ],
1622
"ins": [ "number" ],

functions/awsFargateCommand.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,4 +203,4 @@ class TaskLimitError extends Error {
203203
}
204204
}
205205

206-
exports.awsFargateCommand = awsFargateCommand;
206+
exports.awsFargateCommand = awsFargateCommand;

wflib/index.js

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1575,7 +1575,7 @@ function public_invokeProcFunction(wfId, procId, firingId, insIds_, insValues, o
15751575
// should be set by the task's executor
15761576
// Create duplicate redis client for blocking blpop (one per task)
15771577
var redisCliBlocking = rcl.duplicate();
1578-
var getTaskStatus = async function(timeout) {
1578+
var getJobStatus = async function(timeout) {
15791579
return new Promise(function(resolve, reject) {
15801580
const taskId = conf.taskId;
15811581
const redis_cli = redisCliBlocking;
@@ -1589,9 +1589,24 @@ function public_invokeProcFunction(wfId, procId, firingId, insIds_, insValues, o
15891589
});
15901590
}
15911591

1592-
conf.taskStatus = getTaskStatus;
1592+
conf.jobStatus = getJobStatus;
15931593
conf.redis_url = "redis://" + rcl.address;
15941594

1595+
var sendMessageToJob = async function(message) {
1596+
return new Promise(function(resolve, reject) {
1597+
const taskMessageKey=conf.taskId+"_msg";
1598+
const redis_cli = redisCliBlocking;
1599+
redis_cli.lpush(taskMessageKey, message, function(err, reply) {
1600+
if (err) reject(err)
1601+
else {
1602+
resolve(reply);
1603+
}
1604+
});
1605+
});
1606+
}
1607+
1608+
conf.sendMsgToJob = sendMessageToJob;
1609+
15951610
// Pass the workflow working directory
15961611
if (appConfig.workdir) {
15971612
conf.workdir = appConfig.workdir;

0 commit comments

Comments
 (0)