@@ -15,6 +15,10 @@ function connect() {
1515
1616 connection . then ( function ( conn ) {
1717 console . log ( "[AMQP] Connected!" ) ;
18+
19+ return when ( conn . createChannel ( ) . then ( function ( ch ) {
20+ var ok = ch . assertQueue ( 'hyperflow.jobs' , { durable : true } ) . then ( function ( qok ) { return qok . queue ; } ) ;
21+ } ) ) ;
1822 } , function ( err ) {
1923 console . error ( '[AMQP] Connect failed: %s' , err ) ;
2024 } )
@@ -23,17 +27,27 @@ var taskCount = 0;
2327
2428function amqpCommand ( ins , outs , config , cb ) {
2529 if ( ! connection ) connect ( ) ;
26-
30+
2731 connection . then ( function ( connection ) {
2832 return when ( connection . createChannel ( ) . then ( function ( ch ) {
33+ var options = executor_config . options ;
34+ if ( config . executor . hasOwnProperty ( 'options' ) ) {
35+ var executorOptions = config . executor . options ;
36+ for ( var opt in executorOptions ) {
37+ if ( executorOptions . hasOwnProperty ( opt ) ) {
38+ options [ opt ] = executorOptions [ opt ] ;
39+ }
40+ }
41+ }
2942 var jobMessage = {
3043 "executable" : config . executor . executable ,
31- "args" : config . executor . args ,
32- "inputs" : ins . map ( identity ) ,
33- "outputs" : outs . map ( identity ) ,
34- "options" : executor_config . options
44+ "args" : config . executor . args ,
45+ "env" : ( config . executor . env || { } ) ,
46+ "inputs" : ins . map ( identity ) ,
47+ "outputs" : outs . map ( identity ) ,
48+ "options" : options
3549 } ;
36-
50+
3751 var answer = defer ( ) ;
3852 var corrId = uuid . v4 ( ) ;
3953 function maybeAnswer ( msg ) {
@@ -55,7 +69,7 @@ function amqpCommand(ins, outs, config, cb) {
5569 // console.log("[AMQP][" + corrId + "][" + taskCount + "] Publishing job " + JSON.stringify(jobMessage));
5670 ch . sendToQueue ( 'hyperflow.jobs' , new Buffer ( JSON . stringify ( jobMessage ) ) , { replyTo : queue , contentType : 'application/json' , correlationId : corrId } ) ;
5771 return answer . promise ;
58- } ) ;
72+ } ) ;
5973
6074 return ok . then ( function ( message ) {
6175 var parsed = JSON . parse ( message ) ;
@@ -71,7 +85,7 @@ function amqpCommand(ins, outs, config, cb) {
7185 } ) ;
7286 } ) )
7387 } ) . then ( null , function ( err ) { console . trace ( err . stack ) ; } ) ;
74- }
88+ }
7589
7690
7791exports . amqpCommand = amqpCommand ;
0 commit comments