Skip to content

Commit 6a46862

Browse files
committed
Updates required for persisting function inputs in the persistence log
1 parent 6256e0e commit 6a46862

File tree

3 files changed

+25
-9
lines changed

3 files changed

+25
-9
lines changed

engine2/index.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ Engine.prototype.emitSignals = function(sigs, cb) {
216216
}
217217

218218
// signals which don't have "source" are workflow inputs ==> need to be persisted
219-
// FIXME: add a flag to check if persistence is enabled
219+
// FIXME: add a flag to check if persistence is enabled. EDIT: no need for flag, this is checked in ONE PLACE in 'hflow'
220220
if (!s.source) {
221221
engine.eventServer.emit("persist", ["input", +engine.wfId, s]);
222222
}

engine2/process.js

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -316,25 +316,41 @@ var ProcLogic = function() {
316316
funcOuts, emul,
317317
proc.engine.eventServer,
318318
proc.engine.config,
319-
function(err, outs, options) {
319+
function(err, ins, outs, options) { // 'ins' is returned only for the purpose of persistence
320320
//onsole.log("FUNC INVOKED");
321321
//onsole.log("INS: ", JSON.stringify(proc.sigValues, null, 2));
322322
//onsole.log("OUTS: ", outs);
323-
var outsArray = [];
323+
324+
// convert ins and outs object-arrays to arrays for the purpose of persistence
325+
var outsArray = [], insArray = [];
324326
if (outs == null) {
325327
outsArray = null;
326328
} else {
327-
outs.forEach(function(out) {
329+
outs.forEach(function(out, i) {
328330
outsArray.push(out);
331+
// FIXME: the code below is repeated below in "postInvoke"
332+
// --> TODO: make a function for adding metadata to a signal
333+
outsArray[i]["_id"] = +funcOuts[i];
334+
outsArray[i]["source"] = +proc.procId;
335+
outsArray[i]["firingId"] = +proc.firingId;
336+
});
337+
}
338+
if (ins == null) {
339+
insArray = null;
340+
} else {
341+
ins.forEach(function(input) {
342+
insArray.push(input);
329343
});
330344
}
331345

346+
332347
// persist outputs (originally persistence was disabled DURING recovery,
333348
// but it's probably wrong: currently even when recovering from a previous log,
334-
// workflow execution is persisted normally to a new log
349+
// workflow execution is persisted normally to a new log)
335350
//if (!options || !options.recovered) {
336351
proc.engine.eventServer.emit("persist",
337-
["fired", proc.appId, proc.procId, proc.firingId, outsArray]);
352+
// FIXME: insArray only added for testing "pregel" algorithm
353+
["fired", proc.appId, proc.procId, proc.firingId, outsArray, insArray]);
338354
//}
339355
err ? cb(err): cb(null, outs, asyncInvocation, funcIns, funcOuts);
340356
}

wflib/index.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1479,14 +1479,14 @@ function public_invokeProcFunction(wfId, procId, firingId, insIds_, insValues, o
14791479
prepareFuncOutputs(function(outs, recovered) {
14801480
if (emulate) {
14811481
return setTimeout(function() {
1482-
cb(null, outs);
1482+
cb(null, ins, outs);
14831483
}, 100);
14841484
}
14851485

14861486
// when this is a recovered firing, unless the process has set flag "executeWhenRecovering",
14871487
// we don't invoke the function, just immediately return the recovered outputs!
14881488
if (recovered && !procInfo.executeWhenRecovering) {
1489-
return cb(null, outs, {"recovered": "true"});
1489+
return cb(null, ins, outs, {"recovered": "true"});
14901490
}
14911491

14921492
if ((procInfo.fun == "null") || (!procInfo.fun)) {
@@ -1566,7 +1566,7 @@ function public_invokeProcFunction(wfId, procId, firingId, insIds_, insValues, o
15661566
options.recovered = true;
15671567
}
15681568
}
1569-
cb(null, outs, options);
1569+
cb(null, ins, outs, options);
15701570
});
15711571
});
15721572
});

0 commit comments

Comments
 (0)