Skip to content

Commit 1f3bc46

Browse files
committed
Refactor getWfMap
1 parent 415d582 commit 1f3bc46

File tree

1 file changed

+178
-174
lines changed

1 file changed

+178
-174
lines changed

wflib/index.js

Lines changed: 178 additions & 174 deletions
Original file line numberDiff line numberDiff line change
@@ -924,9 +924,9 @@ exports.init = function(redisClient) {
924924
}
925925

926926
// Returns a 'map' of a workflow. Should be passed a callback:
927-
// function(nTasks, nData, err, ins, outs, sources, sinks, types, cPortsInfo), where:
928-
// - nTasks = number of tasks (also length of ins and outs arrays)
929-
// - nData = number of data elements (also length of sources and sinks arrays)
927+
// function(nProcs, nData, err, ins, outs, sources, sinks, types, cPortsInfo), where:
928+
// - nProcs = number of processes (also length of ins and outs arrays)
929+
// - nSigs = number of data elements (also length of sources and sinks arrays)
930930
// - ins[i][j] = data id mapped to j-th input port of i-th task
931931
// - outs[i][j] = data id mapped to j-th output port of i-th task
932932
// - sources[i][1] = task id which produces data element with id=i (if none, sources[i]=[])
@@ -936,192 +936,194 @@ exports.init = function(redisClient) {
936936
// - types = ids of tasks with type other than default "task"; format:
937937
// { "foreach": [1,2,5], "choice": [3,4] }
938938
// - cPortsInfo = information about all control ports of all tasks; format:
939-
// { procId: { "ins": { portName: dataId } ... }, "outs": { ... } } }
939+
// { procId: { "ins": { portName: sigId } ... }, "outs": { ... } } }
940940
// e.g.: { '1': { ins: { next: '2' }, outs: { next: '2', done: '4' } } }
941941
// - fullInfo[i] = all additional attributes of i-th task (e.g. firingInterval etc.)
942942
function public_getWfMap(wfId, cb) {
943943
var asyncTasks = [];
944944
var wfKey = "wf:"+wfId;
945-
rcl.zcard(wfKey+":tasks", function(err, ret) {
946-
if (err || ret == -1) { throw(new Error("Redis error")); }
947-
var nTasks = ret;
948-
rcl.zcard(wfKey+":data", function(err, ret) {
949-
if (err || ret == -1) { throw(new Error("Redis error")); }
950-
var nData = ret;
951-
var types = {}, ins = [], outs = [], sources = [], sinks = [], cPortsInfo = {}, fullInfo = [];
952-
//var multi = rcl.multi();
953-
for (var i=1; i<=nTasks; ++i) {
954-
(function(procId) {
955-
var procKey = wfKey+":task:"+procId;
956-
asyncTasks.push(function(callback) {
957-
rcl.hgetall(procKey, function(err, taskInfo) {
958-
//onsole.log("FULL TASK INFO:", taskInfo);
959-
fullInfo[procId] = taskInfo;
960-
// add additional info to fullInfo
961-
async.parallel([
962-
function(cb) {
963-
if (taskInfo.sticky) {
964-
var stickyKey = procKey+":sticky";
965-
rcl.smembers(stickyKey, function(err, stickySigs) {
966-
if (!stickySigs) stickySigs = [];
967-
fullInfo[procId].stickySigs = {};
968-
stickySigs.forEach(function(s) {
969-
fullInfo[procId].stickySigs[+s] = true;
970-
});
971-
cb(err);
972-
});
973-
} else {
974-
cb(null);
975-
}
976-
},
977-
function(cb) {
978-
rcl.smembers(procKey+":cinset", function(err, cins) {
979-
if (!cins) cins = [];
980-
fullInfo[procId].cinset = {};
981-
cins.forEach(function(c) {
982-
fullInfo[procId].cinset[+c] = true;
983-
});
984-
cb(err);
985-
});
986-
},
987-
function(cb) {
988-
rcl.smembers(procKey+":coutset", function(err, couts) {
989-
if (!couts) couts = [];
990-
//onsole.log("COUTS", couts);
991-
fullInfo[procId].coutset = {};
992-
couts.forEach(function(c) {
993-
fullInfo[procId].coutset[+c] = true;
994-
});
995-
cb(err);
996-
});
997-
},
998-
function(cb) {
999-
rcl.hgetall(procKey+":incounts", function(err, incounts) {
1000-
if (incounts && incounts.rev) {
1001-
incounts.rev = JSON.parse(incounts.rev);
1002-
}
1003-
fullInfo[procId].incounts = incounts;
1004-
cb(err);
1005-
});
1006-
},
1007-
function(cb) {
1008-
rcl.hgetall(procKey+":outcounts", function(err, outcounts) {
1009-
fullInfo[procId].outcounts = outcounts;
1010-
cb(err);
945+
946+
var getNumProcsAndSignals = function(cb) {
947+
rcl.zcard(wfKey+":tasks", function(err, nProcs) {
948+
if (err || nProcs == -1) { throw(new Error("Redis error")); }
949+
rcl.zcard(wfKey+":data", function(err, nSigs) {
950+
if (err || nSigs == -1) { throw(new Error("Redis error")); }
951+
cb(nProcs, nSigs);
952+
});
953+
});
954+
}
955+
956+
getNumProcsAndSignals(function(nProcs, nSigs) {
957+
var types = {}, ins = [], outs = [], sources = [], sinks = [], cPortsInfo = {}, fullInfo = [];
958+
for (i=1; i<=nProcs; ++i) {
959+
let procId = i;
960+
let procKey = wfKey+":task:"+procId;
961+
asyncTasks.push(function(callback) {
962+
rcl.hgetall(procKey, function(err, taskInfo) {
963+
//onsole.log("FULL TASK INFO:", taskInfo);
964+
fullInfo[procId] = taskInfo;
965+
// add additional info to fullInfo
966+
async.parallel([
967+
function(cb) {
968+
if (taskInfo.sticky) {
969+
var stickyKey = procKey+":sticky";
970+
rcl.smembers(stickyKey, function(err, stickySigs) {
971+
if (!stickySigs) stickySigs = [];
972+
fullInfo[procId].stickySigs = {};
973+
stickySigs.forEach(function(s) {
974+
fullInfo[procId].stickySigs[+s] = true;
1011975
});
976+
cb(err);
977+
});
978+
} else {
979+
cb(null);
980+
}
981+
},
982+
function(cb) {
983+
rcl.smembers(procKey+":cinset", function(err, cins) {
984+
if (!cins) cins = [];
985+
fullInfo[procId].cinset = {};
986+
cins.forEach(function(c) {
987+
fullInfo[procId].cinset[+c] = true;
988+
});
989+
cb(err);
990+
});
991+
},
992+
function(cb) {
993+
rcl.smembers(procKey+":coutset", function(err, couts) {
994+
if (!couts) couts = [];
995+
//onsole.log("COUTS", couts);
996+
fullInfo[procId].coutset = {};
997+
couts.forEach(function(c) {
998+
fullInfo[procId].coutset[+c] = true;
999+
});
1000+
cb(err);
1001+
});
1002+
},
1003+
function(cb) {
1004+
rcl.hgetall(procKey+":incounts", function(err, incounts) {
1005+
if (incounts && incounts.rev) {
1006+
incounts.rev = JSON.parse(incounts.rev);
10121007
}
1013-
],
1014-
function done(err) {
1015-
callback(err, taskInfo);
1008+
fullInfo[procId].incounts = incounts;
1009+
cb(err);
10161010
});
1017-
});
1018-
});
1019-
asyncTasks.push(function(callback) {
1020-
rcl.zrangebyscore(procKey+":ins", 0, "+inf", function(err, ret) {
1021-
if (err || ret == -1) { throw(new Error("Redis error")); }
1022-
ins[procId] = ret;
1023-
callback(null, ret);
1024-
//ins[procId].unshift(null); // inputs will be indexed from 1 instead of 0
1025-
});
1026-
});
1027-
asyncTasks.push(function(callback) {
1028-
rcl.zrangebyscore(procKey+":outs", 0, "+inf", function(err, ret) {
1029-
if (err || ret == -1) { throw(new Error("Redis error")); }
1030-
outs[procId] = ret;
1031-
//outs[procId].unshift(null);
1032-
callback(null, ret);
1033-
});
1011+
},
1012+
function(cb) {
1013+
rcl.hgetall(procKey+":outcounts", function(err, outcounts) {
1014+
fullInfo[procId].outcounts = outcounts;
1015+
cb(err);
1016+
});
1017+
}
1018+
],
1019+
function done(err) {
1020+
callback(err, taskInfo);
10341021
});
1035-
asyncTasks.push(function(callback) {
1036-
rcl.hgetall(procKey+":cins", function(err, csigs) {
1037-
//onsole.log("CSIGS INS", JSON.stringify(csigs));
1038-
if (err || csigs == -1) { throw(new Error("Redis error")); }
1039-
if (csigs != null) {
1040-
var tmp = {};
1041-
for (var s in csigs) {
1042-
if (tmp[csigs[s]]) {
1043-
tmp[csigs[s]].push(s);
1044-
} else {
1045-
tmp[csigs[s]] = [s];
1046-
}
1047-
}
1048-
for (var i in tmp) {
1049-
if (tmp[i].length == 1) {
1050-
tmp[i] = tmp[i][0];
1051-
}
1052-
}
1053-
if (!(procId in cPortsInfo)) {
1054-
cPortsInfo[procId] = {};
1055-
}
1056-
cPortsInfo[procId].ins = tmp;
1057-
//onsole.log("C PORTS INFO=", JSON.stringify(cPortsInfo));
1022+
});
1023+
});
1024+
asyncTasks.push(function(callback) {
1025+
rcl.zrangebyscore(procKey+":ins", 0, "+inf", function(err, ret) {
1026+
if (err || ret == -1) { throw(new Error("Redis error")); }
1027+
ins[procId] = ret;
1028+
callback(null, ret);
1029+
//ins[procId].unshift(null); // inputs will be indexed from 1 instead of 0
1030+
});
1031+
});
1032+
asyncTasks.push(function(callback) {
1033+
rcl.zrangebyscore(procKey+":outs", 0, "+inf", function(err, ret) {
1034+
if (err || ret == -1) { throw(new Error("Redis error")); }
1035+
outs[procId] = ret;
1036+
//outs[procId].unshift(null);
1037+
callback(null, ret);
1038+
});
1039+
});
1040+
asyncTasks.push(function(callback) {
1041+
rcl.hgetall(procKey+":cins", function(err, csigs) {
1042+
//onsole.log("CSIGS INS", JSON.stringify(csigs));
1043+
if (err || csigs == -1) { throw(new Error("Redis error")); }
1044+
if (csigs != null) {
1045+
var tmp = {};
1046+
for (var s in csigs) {
1047+
if (tmp[csigs[s]]) {
1048+
tmp[csigs[s]].push(s);
1049+
} else {
1050+
tmp[csigs[s]] = [s];
10581051
}
1059-
callback(null, csigs);
1060-
});
1061-
});
1062-
asyncTasks.push(function(callback) {
1063-
rcl.hgetall(procKey+":couts", function(err, ret) {
1064-
//onsole.log("Proc COUTS WFLIB", ret);
1065-
if (err || ret == -1) { throw(new Error("Redis error")); }
1066-
if (ret != null) {
1067-
if (!(procId in cPortsInfo)) {
1068-
cPortsInfo[procId] = {};
1069-
}
1070-
cPortsInfo[procId].outs = ret;
1052+
}
1053+
for (var i in tmp) {
1054+
if (tmp[i].length == 1) {
1055+
tmp[i] = tmp[i][0];
10711056
}
1072-
callback(null, ret);
1073-
});
1074-
});
1075-
})(i);
1076-
}
1077-
for (var i=1; i<=nData; ++i) {
1078-
(function(dataId) {
1079-
var dataKey = wfKey+":data:"+dataId;
1080-
// info about all signal sources
1081-
asyncTasks.push(function(callback) {
1082-
rcl.zrangebyscore(dataKey+":sources", 0, "+inf", "withscores", function(err, ret) {
1083-
if (err || ret == -1) { throw(new Error("Redis error")); }
1084-
sources[dataId] = ret;
1085-
//onsole.log(dataId+";"+ret);
1086-
//sources[dataId].unshift(null);
1087-
callback(null, ret);
1088-
});
1089-
});
1090-
// info about signal sinks
1091-
/*asyncTasks.push(function(callback) {
1092-
rcl.zrange(dataKey+":sinks", 0, -1, function(err, ret) {
1093-
if (err || ret == -1) { throw(new Error("Redis error")); }
1094-
sinks[dataId] = ret;
1095-
//sinks[dataId].unshift(null);
1096-
callback(null, ret);
1097-
});
1098-
});*/
1099-
})(i);
1100-
}
1101-
// Create info about task types (all remaining tasks have the default type "task")
1102-
// TODO: pull the list of types dynamically from redis
1057+
}
1058+
if (!(procId in cPortsInfo)) {
1059+
cPortsInfo[procId] = {};
1060+
}
1061+
cPortsInfo[procId].ins = tmp;
1062+
//onsole.log("C PORTS INFO=", JSON.stringify(cPortsInfo));
1063+
}
1064+
callback(null, csigs);
1065+
});
1066+
});
11031067
asyncTasks.push(function(callback) {
1104-
async.each(["foreach", "splitter", "csplitter", "choice", "cchoice", "dataflow", "join"],
1105-
function iterator(type, next) {
1106-
rcl.smembers(wfKey+":tasktype:"+type, function(err, rep) {
1107-
if (err || rep == -1) { throw(new Error("Redis error")); }
1108-
if (rep) {
1109-
//onsole.log(type, rep); // DEBUG
1110-
types[type] = rep;
1111-
}
1112-
next();
1113-
});
1114-
},
1115-
function done(err) {
1116-
callback(null, types);
1068+
rcl.hgetall(procKey+":couts", function(err, ret) {
1069+
//onsole.log("Proc COUTS WFLIB", ret);
1070+
if (err || ret == -1) { throw(new Error("Redis error")); }
1071+
if (ret != null) {
1072+
if (!(procId in cPortsInfo)) {
1073+
cPortsInfo[procId] = {};
1074+
}
1075+
cPortsInfo[procId].outs = ret;
11171076
}
1118-
);
1077+
callback(null, ret);
1078+
});
11191079
});
1120-
1121-
//onsole.log("async tasks: "+asyncTasks.length);
1122-
async.parallel(asyncTasks, function done(err, result) {
1123-
cb(null, nTasks, nData, ins, outs, sources, sinks, types, cPortsInfo, fullInfo);
1080+
}
1081+
for (i=1; i<=nSigs; ++i) {
1082+
let sigId = i;
1083+
let dataKey = wfKey+":data:"+sigId;
1084+
// info about all signal sources
1085+
asyncTasks.push(function(callback) {
1086+
rcl.zrangebyscore(dataKey+":sources", 0, "+inf", "withscores", function(err, ret) {
1087+
if (err || ret == -1) { throw(new Error("Redis error")); }
1088+
sources[sigId] = ret;
1089+
//onsole.log(sigId+";"+ret);
1090+
//sources[sigId].unshift(null);
1091+
callback(null, ret);
1092+
});
11241093
});
1094+
// info about signal sinks
1095+
/*asyncTasks.push(function(callback) {
1096+
rcl.zrange(dataKey+":sinks", 0, -1, function(err, ret) {
1097+
if (err || ret == -1) { throw(new Error("Redis error")); }
1098+
sinks[sigId] = ret;
1099+
//sinks[sigId].unshift(null);
1100+
callback(null, ret);
1101+
});
1102+
});*/
1103+
}
1104+
// Create info about task types (all remaining tasks have the default type "task")
1105+
// TODO: pull the list of types dynamically from redis
1106+
asyncTasks.push(function(callback) {
1107+
async.each(["foreach", "splitter", "csplitter", "choice", "cchoice", "dataflow", "join"],
1108+
function iterator(type, next) {
1109+
rcl.smembers(wfKey+":tasktype:"+type, function(err, rep) {
1110+
if (err || rep == -1) { throw(new Error("Redis error")); }
1111+
if (rep) {
1112+
//onsole.log(type, rep); // DEBUG
1113+
types[type] = rep;
1114+
}
1115+
next();
1116+
});
1117+
},
1118+
function done(err) {
1119+
callback(null, types);
1120+
}
1121+
);
1122+
});
1123+
1124+
//onsole.log("async tasks: "+asyncTasks.length);
1125+
async.parallel(asyncTasks, function done(err, result) {
1126+
cb(null, nProcs, nSigs, ins, outs, sources, sinks, types, cPortsInfo, fullInfo);
11251127
});
11261128
});
11271129
}
@@ -1609,6 +1611,7 @@ function sendSignalLua(wfId, sigValue, cb) {
16091611
}
16101612

16111613

1614+
/*
16121615
// Part of NEW API for continuous processes with FIFO queues
16131616
// @sig format:
16141617
// ... TODO
@@ -1704,6 +1707,7 @@ function public_sendSignal(wfId, sig, cb) {
17041707
}
17051708
);
17061709
}
1710+
*/
17071711

17081712
function getSigRemoteSinks(wfId, sigId, cb) {
17091713
var rsKey = "wf:"+wfId+":data:"+sigId+":remotesinks";

0 commit comments

Comments
 (0)