@@ -357,27 +357,28 @@ exports.init = function(redisClient) {
357357
358358 // add process inputs and outputs + signals sources and sinks
359359 for ( var i = 0 ; i < task . ins . length ; ++ i ) {
360- ( function ( inId , dataId ) {
361- var dataKey = wfKey + ":data:" + dataId ;
362- multi . zadd ( procKey + ":ins" , inId , dataId , function ( err , rep ) { } ) ;
363- multi . zadd ( dataKey + ":sinks" , inId /* score: port id */ , procId , function ( err , ret ) { } ) ;
364- if ( sigs [ dataId - 1 ] . control ) { // add all control inputs to a separate hash
365- //multi.hmset(procKey+":cins", sigs[dataId-1].control, dataId);
366- multi . hmset ( procKey + ":cins" , dataId , sigs [ dataId - 1 ] . control ) ;
367- multi . sadd ( procKey + ":cinset" , dataId ) ;
368- }
369- } ) ( i + 1 , task . ins [ i ] + 1 ) ;
360+ let inId = i + 1 ;
361+ let dataId = task . ins [ i ] + 1 ;
362+ var dataKey = wfKey + ":data:" + dataId ;
363+ //console.log("inId", inId, "dataId", dataId)
364+ multi . zadd ( procKey + ":ins" , inId , dataId , function ( err , rep ) { } ) ;
365+ multi . zadd ( dataKey + ":sinks" , inId /* score: port id */ , procId , function ( err , ret ) { } ) ;
366+ if ( sigs [ dataId - 1 ] . control ) { // add all control inputs to a separate hash
367+ //multi.hmset(procKey+":cins", sigs[dataId-1].control, dataId);
368+ multi . hmset ( procKey + ":cins" , dataId , sigs [ dataId - 1 ] . control ) ;
369+ multi . sadd ( procKey + ":cinset" , dataId ) ;
370+ }
370371 }
371372 for ( var i = 0 ; i < task . outs . length ; ++ i ) {
372- ( function ( outId , dataId ) {
373- var dataKey = wfKey + ":data:" + dataId ;
374- multi . zadd ( procKey + ":outs" , outId , dataId , function ( err , rep ) { } ) ;
375- multi . zadd ( dataKey + ":sources " , outId /* score: port Id */ , procId , function ( err , ret ) { } ) ;
376- if ( sigs [ dataId - 1 ] . control ) { // add all control outputs to a separate hash
377- multi . hmset ( procKey + ":couts" , sigs [ dataId - 1 ] . control , dataId ) ;
378- multi . sadd ( procKey + ":coutset" , dataId ) ;
379- }
380- } ) ( i + 1 , task . outs [ i ] + 1 ) ;
373+ let outId = i + 1 ;
374+ let dataId = task . outs [ i ] + 1 ;
375+ var dataKey = wfKey + ":data:" + dataId ;
376+ multi . zadd ( procKey + ":outs " , outId , dataId , function ( err , rep ) { } ) ;
377+ multi . zadd ( dataKey + ":sources" , outId /* score: port Id */ , procId , function ( err , ret ) { } ) ;
378+ if ( sigs [ dataId - 1 ] . control ) { // add all control outputs to a separate hash
379+ multi . hmset ( procKey + ":couts" , sigs [ dataId - 1 ] . control , dataId ) ;
380+ multi . sadd ( procKey + ":coutset" , dataId ) ;
381+ }
381382 }
382383
383384 // add info about input and output counts
@@ -576,171 +577,6 @@ exports.init = function(redisClient) {
576577 }
577578 }
578579
579- // Returns full task info. Format:
580- // TODO ......
581- function public_getTaskInfoFull ( wfId , procId , cb ) {
582- var procKey = "wf:" + wfId + ":task:" + procId ;
583- var task , ins , outs , data_ins = { } , data_outs = { } , asyncTasks = [ ] ;
584-
585- // Retrieve task info
586- asyncTasks . push ( function ( callback ) {
587- rcl . hgetall ( procKey , function ( err , reply ) {
588- task = err ? err : reply ;
589- callback ( null , task ) ;
590- } ) ;
591- } ) ;
592-
593- // Retrieve all ids of inputs of the task
594- asyncTasks . push ( function ( callback ) {
595- rcl . zrangebyscore ( procKey + ":ins" , 0 , "+inf" , function ( err , ret ) {
596- ins = err ? err : ret ;
597- callback ( null , ins ) ;
598- } ) ;
599- } ) ;
600-
601- // Retrieve all ids of outputs of the task
602- asyncTasks . push ( function ( callback ) {
603- rcl . zrangebyscore ( procKey + ":outs" , 0 , "+inf" , function ( err , ret ) {
604- outs = err ? err : ret ;
605- callback ( null , outs ) ;
606- } ) ;
607- } ) ;
608-
609- async . parallel ( asyncTasks , function done ( err , result ) {
610- if ( err ) {
611- cb ( err ) ;
612- } else {
613- asyncTasks = [ ] ;
614- for ( var i = 0 ; i < ins . length ; ++ i ) {
615- ( function ( i ) {
616- var dataKey = "wf:" + wfId + ":data:" + ins [ i ] ;
617- asyncTasks . push ( function ( callback ) {
618- rcl . hgetall ( dataKey , function ( err , reply ) {
619- if ( err ) {
620- data_ins [ ins [ i ] ] = err ;
621- } else {
622- data_ins [ ins [ i ] ] = reply ;
623- data_ins [ ins [ i ] ] . id = ins [ i ] ; // FIXME: redundant (key is the id)
624- // but WARNING: invoke currently may rely on it
625- }
626- callback ( null , reply ) ;
627- } ) ;
628- } ) ;
629- } ) ( i ) ;
630- }
631- for ( var i = 0 ; i < outs . length ; ++ i ) {
632- ( function ( i ) {
633- var dataKey = "wf:" + wfId + ":data:" + outs [ i ] ;
634- asyncTasks . push ( function ( callback ) {
635- rcl . hgetall ( dataKey , function ( err , reply ) {
636- if ( err ) {
637- data_outs [ outs [ i ] ] = err ;
638- } else {
639- data_outs [ outs [ i ] ] = reply ;
640- data_outs [ outs [ i ] ] . id = outs [ i ] ; // FIXME: redundant
641- }
642- callback ( null , reply ) ;
643- } ) ;
644- } ) ;
645- } ) ( i ) ;
646- }
647-
648- async . parallel ( asyncTasks , function done ( err , result ) {
649- if ( err ) {
650- cb ( err ) ;
651- } else {
652- cb ( null , task , data_ins , data_outs ) ;
653- }
654- } ) ;
655- }
656- } ) ;
657- }
658-
659- // Part of NEW API for continuous processes with FIFO queues
660- function public_getTaskInfoFull1 ( wfId , procId , insIds , outsIds , cb ) {
661- var procKey = "wf:" + wfId + ":task:" + procId ;
662- var task , ins , outs , data_ins = { } , data_outs = { } , asyncTasks = [ ] ;
663-
664- // Retrieve task info
665- asyncTasks . push ( function ( callback ) {
666- rcl . hgetall ( procKey , function ( err , reply ) {
667- task = err ? err : reply ;
668- callback ( null , task ) ;
669- } ) ;
670- } ) ;
671-
672- // Retrieve all inputs of the task given in 'insIds'
673- for ( var i = 0 ; i < insIds . length ; ++ i ) {
674- ( function ( inIdx ) {
675- var sigQueueKey = "wf:" + wfId + ":task:" + procId + ":ins:" + insIds [ inIdx ] ;
676- var sigInstanceKey = "wf:" + wfId + ":sigs:" + sigId + ":" + idx ;
677- asyncTasks . push ( function ( callback ) {
678- rcl . zrangebyscore ( procKey + ":ins" , 0 , "+inf" , function ( err , ret ) {
679- ins = err ? err : ret ;
680- callback ( null , ins ) ;
681- } ) ;
682- } ) ;
683- } ) ( i ) ;
684- }
685-
686- // Retrieve all outputs of the task given in 'outsIds'
687- asyncTasks . push ( function ( callback ) {
688- rcl . zrangebyscore ( procKey + ":outs" , 0 , "+inf" , function ( err , ret ) {
689- outs = err ? err : ret ;
690- callback ( null , outs ) ;
691- } ) ;
692- } ) ;
693-
694- async . parallel ( asyncTasks , function done ( err , result ) {
695- if ( err ) {
696- cb ( err ) ;
697- } else {
698- asyncTasks = [ ] ;
699- for ( var i = 0 ; i < ins . length ; ++ i ) {
700- ( function ( i ) {
701- var dataKey = "wf:" + wfId + ":data:" + ins [ i ] ;
702- asyncTasks . push ( function ( callback ) {
703- rcl . hgetall ( dataKey , function ( err , reply ) {
704- if ( err ) {
705- data_ins [ ins [ i ] ] = err ;
706- } else {
707- data_ins [ ins [ i ] ] = reply ;
708- data_ins [ ins [ i ] ] . id = ins [ i ] ; // TODO: redundant (key is the id)
709- // but WARNING: invoke currently may rely on it
710- }
711- callback ( null , reply ) ;
712- } ) ;
713- } ) ;
714- } ) ( i ) ;
715- }
716- for ( var i = 0 ; i < outs . length ; ++ i ) {
717- ( function ( i ) {
718- var dataKey = "wf:" + wfId + ":data:" + outs [ i ] ;
719- asyncTasks . push ( function ( callback ) {
720- rcl . hgetall ( dataKey , function ( err , reply ) {
721- if ( err ) {
722- data_outs [ outs [ i ] ] = err ;
723- } else {
724- data_outs [ outs [ i ] ] = reply ;
725- data_outs [ outs [ i ] ] . id = outs [ i ] ; // TODO: redundant
726- }
727- callback ( null , reply ) ;
728- } ) ;
729- } ) ;
730- } ) ( i ) ;
731- }
732-
733- async . parallel ( asyncTasks , function done ( err , result ) {
734- if ( err ) {
735- cb ( err ) ;
736- } else {
737- cb ( null , task , data_ins , data_outs ) ;
738- }
739- } ) ;
740- }
741- } ) ;
742- }
743-
744580 function pushInput ( wfId , procId , sigId , sigIdx , cb ) {
745581 var isStickyKey = "wf:" + wfId + ":task:" + procId + ":sticky" ; // KEYS[1]
746582 var queueKey = "wf:" + wfId + ":task:" + procId + ":ins:" + sigId ; // KEYS[2]
@@ -1928,36 +1764,6 @@ function getStickySigs(wfId, procId, cb) {
19281764}
19291765
19301766
1931- // Part of NEW API for continuous processes with FIFO queues
1932- // Creates a new signal group to wait for.
1933- // @spec = specification of the group: "{ sigGroupName: [ sigId, sigId, sigId, ... ], ... }", where
1934- // sigGroup - a unique name for the signal group to wait for.
1935- // sigId - signal Id; can be repeated multiple times which denotes wait for multiple
1936- // occurrences of this signal.
1937- // @cb = function(err) - callback
1938- //
1939- // Example: waitForSignals(1, 44, { "data": [1,2,3,3,4,4,5] }, function(err) { })
1940- //
1941- // TODO DEPRECATED: delete this function, deprecated by fetchSignals and no longer used
1942- function public_waitForSignals ( wfId , procId , spec , cb ) {
1943- for ( group in spec ) {
1944- // add the group name to the set of all waiting groups
1945- rcl . sadd ( "wf:" + wfId + ":task:" + procId + ":waiting" , group , function ( err , reply ) {
1946- if ( err ) return cb ( err ) ;
1947- // For each group, add the sigIds to their respective waiting set, increasing its score by 1 for each
1948- // occurrence of the sigId
1949- async . each ( spec . group , function iterator ( sigId , doneIter ) {
1950- var waitSetKey = "wf:" + wfId + ":task:" + procId + ":waiting:" + group ;
1951- rcl . zincrby ( waitSetKey , 1 , sigId , function ( err , rep ) {
1952- doneIter ( err ) ;
1953- } ) ;
1954- } , function doneAll ( err ) {
1955- cb ( err ) ;
1956- } ) ;
1957- } ) ;
1958- }
1959- }
1960-
19611767// checks if all signals with specified ids are ready for a given task; if so, returns their values
19621768// @spec - array of elements: [ { "id": id, "count": count }, { "id": id, "count": count }, ... ] where
19631769// id - input signal identifier for task procId
@@ -2103,7 +1909,6 @@ return {
21031909 getWfOuts : public_getWfOuts ,
21041910 //getWfInsAndOutsInfoFull: public_getWfInsAndOutsInfoFull,
21051911 getTaskInfo : public_getTaskInfo ,
2106- //getTaskInfoFull: public_getTaskInfoFull,
21071912 //getTaskIns: public_getTaskIns,
21081913 //getTaskOuts: public_getTaskOuts,
21091914 setTaskState : public_setTaskState ,
@@ -2135,6 +1940,6 @@ return {
21351940
21361941
21371942process . on ( 'exit' , function ( ) {
2138- console . log ( "fetchInputs total time:" , fetchInputsTime / 1000 ) ;
2139- console . log ( "sendSignal total time:" , sendSignalTime / 1000 ) ;
1943+ // console.log("fetchInputs total time:", fetchInputsTime/1000);
1944+ // console.log("sendSignal total time:", sendSignalTime/1000);
21401945} ) ;
0 commit comments