Skip to content

Commit 880b2db

Browse files
committed
added sleep example
1 parent 672c520 commit 880b2db

File tree

3 files changed

+132
-0
lines changed

3 files changed

+132
-0
lines changed

examples/ParallelSleep/README.md

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
2+
3+
This example creates a split-parallel-join workflow of DAG type.
4+
5+
```
6+
7+
fork
8+
9+
/ | \
10+
11+
sleep sleep sleep ...
12+
13+
\ | /
14+
15+
fork
16+
17+
```
18+
19+
20+
Usage:
21+
22+
```
23+
node sleep_generator.js 3 > sleep3.json
24+
hflow run gzip3.json
25+
```

examples/ParallelSleep/sleep3.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Usage: node sleep_generator.js steps
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
////////////////////////////////////////////////////////////////////////////////////
2+
//
3+
// Creates example fork-sleep-join workflow of DAG type.
4+
//
5+
// fork
6+
//
7+
// / | \
8+
//
9+
// sleep sleep sleep ...
10+
//
11+
// \ | /
12+
//
13+
// fork
14+
//
15+
//
16+
//
17+
//////////////////////////////////////////////////////////////////////////////////////
18+
19+
20+
var
21+
argv = require('optimist').argv;
22+
23+
24+
// convert number to string with leading zeros, e.g. 0001, 0002, etc.
25+
function pad (str, max) {
26+
str = str.toString();
27+
return str.length < max ? pad("0" + str, max) : str;
28+
}
29+
30+
// create task object
31+
function task(name, functionName, executable, args, ins, outs) {
32+
return {
33+
"name": name,
34+
"function": functionName,
35+
"type": "dataflow",
36+
"firingLimit": 1,
37+
"config": {
38+
"executor": {
39+
// "queue_name": "test1",
40+
"executable": executable,
41+
"args": args
42+
}
43+
},
44+
"ins": ins,
45+
"outs": outs
46+
}
47+
}
48+
49+
function createWf(functionName, steps) {
50+
51+
var wfOut = {
52+
processes: [],
53+
signals: [],
54+
ins: [0],
55+
outs: [2*steps+1]
56+
};
57+
58+
// create fork task
59+
var outs = [];
60+
for (i=1; i<=steps; i++) { outs.push(i); }
61+
62+
wfOut.processes.push(
63+
task("fork", functionName, "echo", ["Starting parallel sleeps"], [0], outs)
64+
);
65+
66+
67+
//create sleep tasks
68+
for (i=0; i<steps; i++) {
69+
wfOut.processes.push(
70+
task("sleep" + i, functionName, "sleep", [i+1], [i+1], [i+steps+1])
71+
);
72+
}
73+
74+
//create join task
75+
var ins = [];
76+
for (i=1; i<=steps; i++) { ins.push(steps+i); }
77+
78+
wfOut.processes.push(
79+
task("join", functionName, "echo", ["join complete"], ins, [2*steps+1])
80+
);
81+
82+
// create data array with file names
83+
var signals = []
84+
signals.push("0");
85+
signals = signals.concat(outs);
86+
signals = signals.concat(ins);
87+
signals.push(2*steps+1)
88+
89+
wfOut.signals.push({name: signals[0], data: [signals[0]]});
90+
for (i=1; i<signals.length; i++) {
91+
wfOut.signals.push({name: signals[i]});
92+
}
93+
94+
95+
// output workflow json to stdout
96+
console.log(JSON.stringify(wfOut, null, 2));
97+
98+
}
99+
100+
if (!argv._[0]) {
101+
console.log("Usage: node sleep_generator.js steps");
102+
process.exit();
103+
}
104+
105+
//createWf("amqpCommand", argv._[0]);
106+
createWf("command", argv._[0]);

0 commit comments

Comments
 (0)