Skip to content

Commit b408e68

Browse files
committed
Duplicate redis client for every invocation of getJobResult
1 parent 93de2d4 commit b408e68

File tree

2 files changed

+11
-15
lines changed

2 files changed

+11
-15
lines changed

examples/RemoteJobs/package-lock.json

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

wflib/index.js

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1421,15 +1421,16 @@ function public_invokeProcFunction(wfId, procId, firingId, insIds_, insValues, o
14211421
// that should be set by the task's executor.
14221422
// 'taskId' to be waited for is read from the process context, but
14231423
// optionally it can be set by the caller via parameter 'taskIdentifier'
1424-
// Create duplicate redis client for blocking blpop (one per task)
1425-
var redisCliBlocking = rcl.duplicate();
14261424
var getJobResult = async function(timeout, taskIdentifier) {
14271425
return new Promise(function(resolve, reject) {
1426+
// Create duplicate Redis client for blocking blpop (one per task)
1427+
var redisCliBlocking = rcl.duplicate();
14281428
const taskId = taskIdentifier || conf.taskId;
1429-
const redis_cli = redisCliBlocking;
14301429

1431-
redis_cli.blpop(taskId, timeout, function(err, reply) {
1430+
redisCliBlocking.blpop(taskId, timeout, function(err, reply) {
14321431
err ? reject(err): resolve(reply);
1432+
// Close connection of the duplicate Redis client
1433+
redisCliBlocking.quit();
14331434
});
14341435
});
14351436
}
@@ -1445,8 +1446,7 @@ function public_invokeProcFunction(wfId, procId, firingId, insIds_, insValues, o
14451446
return new Promise(function(resolve, reject) {
14461447
const completedTasksSetKey = "wf:" + wfId + ":completedTasks";
14471448
const taskId = taskIdentifier || conf.taskId;
1448-
const redis_cli = redisCliBlocking;
1449-
redis_cli.sadd(completedTasksSetKey, taskId, function(err, reply) {
1449+
rcl.sadd(completedTasksSetKey, taskId, function(err, reply) {
14501450
err ? reject(err): resolve(reply);
14511451
});
14521452
});
@@ -1455,8 +1455,7 @@ function public_invokeProcFunction(wfId, procId, firingId, insIds_, insValues, o
14551455
return new Promise(function(resolve, reject) {
14561456
const completedTasksSetKey = "wf:" + wfId + ":completedTasks";
14571457
const taskId = taskIdentifier || conf.taskId;
1458-
const redis_cli = redisCliBlocking;
1459-
redis_cli.sismember(completedTasksSetKey, taskId, function(err, hasCompleted) {
1458+
rcl.sismember(completedTasksSetKey, taskId, function(err, hasCompleted) {
14601459
err ? reject(err): resolve(hasCompleted);
14611460
});
14621461
});
@@ -1472,8 +1471,7 @@ function public_invokeProcFunction(wfId, procId, firingId, insIds_, insValues, o
14721471
return new Promise(function(resolve, reject) {
14731472
const taskId = taskIdentifier || conf.taskId;
14741473
const taskMessageKey=taskId+"_msg";
1475-
const redis_cli = redisCliBlocking;
1476-
redis_cli.lpush(taskMessageKey, message, function(err, reply) {
1474+
rcl.lpush(taskMessageKey, message, function(err, reply) {
14771475
err ? reject(err): resolve(reply);
14781476
});
14791477
});
@@ -1490,8 +1488,6 @@ function public_invokeProcFunction(wfId, procId, firingId, insIds_, insValues, o
14901488
if (recovered) { conf.recovered = true; }
14911489
f(ins, outs, conf, function(err, outs, options) {
14921490
//if (outs) { onsole.log("VALUE="+outs[0].value); } // DEBUG
1493-
// Close connection of the duplicate Redis client
1494-
redisCliBlocking.quit();
14951491
if (recovered) {
14961492
if (!options) {
14971493
options = { recovered: true }

0 commit comments

Comments
 (0)