Skip to content

Commit 5c2c5cc

Browse files
committed
Added processing of "<executable>" elements. Parsing corrections.
1 parent d6c0001 commit 5c2c5cc

File tree

1 file changed

+35
-15
lines changed

1 file changed

+35
-15
lines changed

converters/pegasus_dax.js

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/* Hypermedia workflow.
22
** Converts from pegasus dax file to hyperflow workflow representation (json)
3-
** Author: Bartosz Balis (2013)
3+
** Author: Bartosz Balis (2013-2018)
44
*/
55

66
/*
@@ -43,7 +43,6 @@ var sources = {}, sinks = {};
4343

4444
var nextTaskId = -1, nextDataId = -1, dataNames = {};
4545

46-
4746
function parseDax(filename, cb) {
4847
var self = this;
4948
var parser = new xml2js.Parser({normalize: true});
@@ -52,8 +51,10 @@ function parseDax(filename, cb) {
5251
cb(new Error("File read error. Doesn't exist?"));
5352
} else {
5453
var dag = data.toString();
55-
dag = dag.replace(/<file name="(.*)".*>/g, "$1");
56-
dag = dag.replace(/<filename file="(.*)".*>/g, "$1");
54+
// without the following replacements parsing of 'argument' elements is a hell
55+
// (because DAX uses 'mixed content', i.e. elements mix text and child elements)
56+
dag = dag.replace(/<file name="(.*?)".*?\/>/g, "$1"); // DAX 3.x
57+
dag = dag.replace(/<filename file="(.*?)".*?\/>/g, "$1"); // DAX 2.1
5758
parser.parseString(dag, function(err, result) {
5859
if (err) {
5960
cb(new Error("File parse error."));
@@ -76,18 +77,33 @@ function createWorkflow(dax, functionName, cb) {
7677
outs: []
7778
};
7879

80+
// in DAX 3.x there can be "executable" elements
81+
var execs={};
82+
if (dax.adag.executable) {
83+
dax.adag.executable.forEach(function(exec) {
84+
var name = exec['$'].name,
85+
execName = exec.pfn[0]['$'].url.replace(/^.*[\\\/]/, '');
86+
execs[name] = execName;
87+
//console.log(name, execName);
88+
});
89+
}
90+
7991
dax.adag.job.forEach(function(job) {
92+
8093
++nextTaskId;
81-
var args = parse(job.argument[0]);
94+
95+
var args = job.argument ? parse(job.argument[0]): [],
96+
jname = job['$'].name;
97+
8298
wfOut.tasks.push({
83-
"name": job['$'].name,
99+
"name": jname,
84100
"function": functionName,
85101
"type": "dataflow",
86102
"executor": "syscommand",
87103
"firingLimit": 1,
88104
"config": {
89105
"executor": {
90-
"executable": job['$'].name,
106+
"executable": execs[jname] ? execs[jname]: jname,
91107
"args": args
92108
}
93109
},
@@ -99,16 +115,13 @@ function createWorkflow(dax, functionName, cb) {
99115
wfOut.tasks[nextTaskId].runtime = job['$'].runtime;
100116
}
101117

102-
//var
103-
//if (config
104-
105118
var dataId, dataName;
106119
job.uses.forEach(function(job_data) {
107-
if (job_data['$'].name) {
108-
dataName = job_data['$'].name; // dax v3.3
109-
} else {
110-
dataName = job_data['$'].file; // dax v2.1
111-
}
120+
if (job_data['$'].name) {
121+
dataName = job_data['$'].name; // dax v3.3
122+
} else {
123+
dataName = job_data['$'].file; // dax v2.1
124+
}
112125
if (!dataNames[dataName]) {
113126
++nextDataId;
114127
wfOut.data.push({
@@ -133,9 +146,11 @@ function createWorkflow(dax, functionName, cb) {
133146
}
134147
});
135148
});
149+
136150
for (var i=0; i<wfOut.data.length; ++i) {
137151
if (wfOut.data[i].sources.length == 0) {
138152
wfOut.ins.push(i);
153+
wfOut.data[i].data = [{ }]; // uncomment to send initial signals to the workflow
139154
}
140155
if (wfOut.data[i].sinks.length == 0) {
141156
wfOut.outs.push(i);
@@ -150,6 +165,11 @@ function createWorkflow(dax, functionName, cb) {
150165
delete wfOut.data[i].sinks;
151166
}
152167

168+
wfOut.processes = wfOut.tasks;
169+
wfOut.signals = wfOut.data;
170+
delete wfOut.tasks;
171+
delete wfOut.data;
172+
153173
cb(null, wfOut);
154174
}
155175

0 commit comments

Comments
 (0)