@@ -5,6 +5,7 @@ const fs = require('fs');
55const { Logger } = require ( './Logger' ) ;
66const { Helper } = require ( './Helper' ) ;
77const Crypt = require ( './Crypt' ) ;
8+ const Queue = require ( './Queue' ) ;
89
910const defaultExecAssets = [
1011 {
@@ -26,7 +27,7 @@ class Master {
2627 transferEncryptToken = null ,
2728 } = { } ) {
2829 this . availableWorkers = [ ] ;
29- this . jobs = [ ] ;
30+ this . jobs = new Queue ( ) ;
3031
3132 this . event = new events . EventEmitter ( ) ;
3233 this . log = new Logger ( { level : loglevel || process . env . LOG_LEVEL } ) ;
@@ -53,6 +54,7 @@ class Master {
5354 // this.peer.on('left', (address)=> );
5455
5556 this . event . addListener ( 'init' , this . init ) ;
57+ this . event . addListener ( 'resultsShared' , this . onResults ) ;
5658 this . event . emit ( 'init' , transferEncryptToken ) ;
5759 }
5860
@@ -102,33 +104,47 @@ class Master {
102104 } ) ;
103105 this . peer . register ( 'requestWork' , ( pk , args , cb ) => {
104106 switch ( true ) {
105- case this . jobs . length > 1 : {
106- if ( args . getBatch && this . jobs . length > args . batchSize ) {
107- args . batchTasks = this . jobs . splice ( 0 , args . batchSize ) ; // splice mutates original array which slices it
107+ case this . jobs . size > 1 : {
108+ if ( args . getBatch ) {
109+ args . batchTasks = [ ] ;
110+ let totalJobsSend = args . batchSize ;
111+ if ( this . jobs . size <= args . batchSize ) totalJobsSend = 1 ; // get only available
112+ for ( let i = 0 ; i < totalJobsSend ; i += 1 ) {
113+ const queuedJob = this . jobs . dequeue ( ) ;
114+ if ( ! queuedJob ) break ;
115+ args . batchTasks . push ( queuedJob . value ) ;
116+ }
117+ // args.batchTasks = this.jobs.splice(0, args.batchSize); // splice mutates original array which slices it
108118 this . log . debug (
109- `task queue reduced:${ this . jobs . length } - ${ args . batchSize } `
119+ `task queue reduced:${ this . jobs . size } - ${ args . batchSize } `
110120 ) ;
111121 break ;
112122 }
113- args . task = this . jobs . shift ( ) ;
114- this . log . debug ( `task queue reduced:${ this . jobs . length } ` ) ;
123+ const queuedJob = this . jobs . dequeue ( ) ;
124+ args . task = null ;
125+ if ( queuedJob ) {
126+ args . task = queuedJob . value ;
127+ this . log . debug ( `task queue reduced:${ this . jobs . size } ` ) ;
128+ }
115129 break ;
116130 }
117- case this . jobs . length === 1 : {
118- // shift leaves array undefined on last element
119- [ args . task ] = this . jobs ; // this case is to avoid that
120- this . jobs = [ ] ;
121- this . log . debug ( `task queue finished:${ this . jobs . length } ` ) ;
131+ case this . jobs . size === 1 : {
132+ const queuedJob = this . jobs . dequeue ( ) ;
133+ args . task = null ;
134+ if ( queuedJob ) {
135+ args . task = queuedJob . value ;
136+ this . log . debug ( `task queue reduced:${ this . jobs . size } ` ) ;
137+ }
122138 break ;
123139 }
124- case this . jobs . length === 0 : {
140+ case this . jobs . size === 0 : {
125141 args . task = null ;
126- this . log . debug ( `task queue is empty:${ this . jobs . length } ` ) ;
142+ this . log . debug ( `task queue is empty:${ this . jobs . size } ` ) ;
127143 break ;
128144 }
129145 default : {
130146 args . task = null ;
131- this . log . warning ( `task queue is empty:${ this . jobs . length } ` ) ;
147+ this . log . warning ( `task queue is empty:${ this . jobs . size } ` ) ;
132148
133149 break ;
134150 }
@@ -137,7 +153,7 @@ class Master {
137153 } ) ;
138154 this . peer . register ( 'shareResults' , ( pk , args ) => {
139155 const results = JSON . parse ( this . crypt . decrypt ( JSON . parse ( args ) ) ) ;
140- this . onResults ( results ) ;
156+ this . event . emit ( 'resultsShared' , results ) ;
141157 } ) ;
142158 this . peer . register ( 'requestExecAssets' , ( pk , args , cb ) => {
143159 const currentHash = args ?. currentHash ; // hash of current assets array
@@ -179,14 +195,13 @@ class Master {
179195
180196 const encryptedPayload = this . crypt . encrypt ( payloadJson ) ;
181197 this . log . debug ( 'pushNewJob payload: ' , payload ) ;
182- if ( this . jobs . length > 20 ) {
183- // resolve();
184- setTimeout ( ( ) => {
185- this . jobs . push ( JSON . stringify ( encryptedPayload ) ) ;
198+ if ( this . jobs . size >= 1000 ) {
199+ Helper . sleep ( this . jobs . size * 0.3 ) . then ( ( ) => {
200+ this . jobs . enqueue ( JSON . stringify ( encryptedPayload ) ) ;
186201 resolve ( ) ;
187- } , this . jobs . length * 3 ) ;
202+ } ) ;
188203 } else {
189- this . jobs . push ( JSON . stringify ( encryptedPayload ) ) ;
204+ this . jobs . enqueue ( JSON . stringify ( encryptedPayload ) ) ;
190205 resolve ( ) ;
191206 }
192207 } ) ;
0 commit comments