Skip to content

Commit 7872fe9

Browse files
authored
Merge pull request #42 from koxu1996/master
RemoteJobConnector - avoidance of duplicated redis clients
2 parents df418cd + 285e9dd commit 7872fe9

File tree

2 files changed

+132
-15
lines changed

2 files changed

+132
-15
lines changed

wflib/connector.js

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
2+
/**
3+
* Class for getting notifications about tasks' results.
4+
*/
5+
class RemoteJobConnector {
6+
/**
7+
* Constructor.
8+
* @param {RedisClient} redisClient redis client
9+
* @param {string} wfId workflow ID
10+
* @param {number} checkInterval loop interval in ms.
11+
*/
12+
constructor(redisClient, wfId, checkInterval) {
13+
this.jobPromiseResolves = {};
14+
this.rcl = redisClient;
15+
this.running = false;
16+
this.completedNotificationQueueKey = "wf:" + wfId + ":tasksPendingCompletionHandling";
17+
this.checkInterval = checkInterval;
18+
}
19+
20+
/**
21+
* Gives promise, that will be resolved on remote
22+
* job completion.
23+
* @param {*} taskId task ID
24+
*/
25+
waitForTask(taskId) {
26+
if (this.jobPromiseResolves[taskId] !== undefined) {
27+
console.error("[RemoteJobConnector] Task", taskId, "is already observed");
28+
return;
29+
}
30+
console.log("[RemoteJobConnector] Waiting for task", taskId);
31+
let promise = new Promise((resolve, reject) => {
32+
this.jobPromiseResolves[taskId] = resolve;
33+
});
34+
35+
return promise;
36+
}
37+
38+
/**
39+
* Runs connector, that fetches notifications about
40+
* task completions, then makes relevant waiting promises
41+
* resolved.
42+
*/
43+
async run() {
44+
this.running = true;
45+
while (true) {
46+
if (this.running == false) {
47+
console.log("[RemoteJobConnector] Stopping");
48+
break;
49+
}
50+
51+
let taskId = null;
52+
try {
53+
taskId = await new Promise((resolve, reject) => {
54+
this.rcl.srandmember(this.completedNotificationQueueKey, function(err, reply) {
55+
err ? reject(err): resolve(reply);
56+
});
57+
});
58+
} catch (error) {
59+
console.error("[RemoteJobConnector] Unable to fetch new complated jobs", error);
60+
}
61+
62+
if (taskId == null) {
63+
await new Promise((resolve) => setTimeout(resolve, this.checkInterval));
64+
continue;
65+
}
66+
67+
console.log("[RemoteJobConnector] Got completed job:", taskId);
68+
69+
let taskResult = null;
70+
try {
71+
taskResult = await new Promise((resolve, reject) => {
72+
this.rcl.spop(taskId, function(err, reply) {
73+
/* Wrap results into array to preserve
74+
* compatibility with blpop format. */
75+
let replyArr = [null, reply];
76+
err ? reject(err): resolve(replyArr);
77+
});
78+
});
79+
} catch (error) {
80+
console.error("[RemoteJobConnector] Unable to get result of job", taskId);
81+
continue;
82+
}
83+
84+
if (this.jobPromiseResolves[taskId] === undefined) {
85+
console.error("[RemoteJobConnector] Observer for task", taskId, "not found");
86+
continue;
87+
}
88+
let promiseResolve = this.jobPromiseResolves[taskId];
89+
delete this.jobPromiseResolves[taskId];
90+
91+
try {
92+
await new Promise((resolve, reject) => {
93+
this.rcl.srem(this.completedNotificationQueueKey, taskId, function(err, reply) {
94+
err ? reject(err): resolve(reply);
95+
});
96+
});
97+
} catch (error) {
98+
console.error("[RemoteJobConnector] Unable to delete job from completed queue", error);
99+
}
100+
101+
console.log("[RemoteJobConnector] Resolving promise for task", taskId, "| result =", taskResult);
102+
promiseResolve(taskResult);
103+
}
104+
105+
return;
106+
}
107+
108+
/**
109+
* Stops connector.
110+
*/
111+
async stop() {
112+
console.log("[RemoteJobConnector] Requesting stop");
113+
this.running = false;
114+
return;
115+
}
116+
}
117+
118+
module.exports = RemoteJobConnector

wflib/index.js

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ var fs = require('fs'),
1313
//toobusy = require('toobusy'),
1414
shortid = require('shortid'),
1515
Mustache = require('mustache'),
16+
RemoteJobConnector = require('./connector'),
1617
rcl;
1718

1819

@@ -24,6 +25,8 @@ var sendSignalTime = 0;
2425
var global_hfid = 0; // global UUID of this HF engine instance (used for logging)
2526
var globalInfo = {}; // object holding global information for a given HF engine instance
2627

28+
let jobConnectors = {}; // object holding remote jobs' connectors
29+
2730
function p0() {
2831
return (new Date()).getTime();
2932
}
@@ -209,6 +212,9 @@ exports.init = function(redisClient) {
209212
"status", "waiting",
210213
function(err, ret) { });
211214

215+
jobConnectors[wfId] = new RemoteJobConnector(rcl, wfId, 3000);
216+
jobConnectors[wfId].run();
217+
212218
var multi = rcl.multi(); // FIXME: change this to async.parallel
213219

214220
var addSigInfo = function(sigId) {
@@ -1417,22 +1423,15 @@ function public_invokeProcFunction(wfId, procId, firingId, insIds_, insValues, o
14171423
conf.wfname = procInfo.wfname;
14181424

14191425
// This function is passed to the Process' Function (through 'context')
1420-
// and can be used to wait for task completion. It reads a key from redis
1421-
// that should be set by the task's executor.
1422-
// 'taskId' to be waited for is read from the process context, but
1423-
// optionally it can be set by the caller via parameter 'taskIdentifier'
1426+
// and can be used to wait for task completion. It reads a key from redis
1427+
// that should be set by the task's executor.
1428+
// 'taskId' to be waited for is read from the process context, but
1429+
// optionally it can be set by the caller via parameter 'taskIdentifier'
14241430
var getJobResult = async function(timeout, taskIdentifier) {
1425-
return new Promise(function(resolve, reject) {
1426-
// Create duplicate Redis client for blocking blpop (one per task)
1427-
var redisCliBlocking = rcl.duplicate();
1428-
const taskId = taskIdentifier || conf.taskId;
1429-
1430-
redisCliBlocking.blpop(taskId, timeout, function(err, reply) {
1431-
err ? reject(err): resolve(reply);
1432-
// Close connection of the duplicate Redis client
1433-
redisCliBlocking.quit();
1434-
});
1435-
});
1431+
const taskId = taskIdentifier || conf.taskId;
1432+
let wfId = taskId.split(':')[1];
1433+
let connector = jobConnectors[wfId];
1434+
return connector.waitForTask(taskId);
14361435
}
14371436

14381437
conf.jobResult = getJobResult;

0 commit comments

Comments
 (0)