@@ -3,247 +3,199 @@ package workers
33import (
44 "context"
55 "errors"
6- "os"
7- "os/signal"
86 "sync"
9- "syscall"
107 "time"
118)
129
13- var defaultWatchSignals = []os.Signal {syscall .SIGINT , syscall .SIGTERM , syscall .SIGKILL }
14-
15- // Worker Contains the work function. Allows an input and output to a channel or another worker for pipeline work.
16- // Return nil if you want the Runner to continue otherwise any error will cause the Runner to shutdown and return the
17- // error.
10+ // Worker Contains the work function.
11+ // Work() get input and could put in outChan for followers.
12+ // ⚠️ outChan could be closed if follower is stoped before producer.
13+ // error returned can be process by afterFunc but will be ignored by default.
1814type Worker interface {
19- Work (in interface {}, out chan <- interface {}) error
20- }
21-
22- // Runner Handles the running the Worker logic.
23- type Runner interface {
24- BeforeFunc (func (ctx context.Context ) error ) Runner
25- AfterFunc (func (ctx context.Context , err error ) error ) Runner
26- SetDeadline (t time.Time ) Runner
27- SetTimeout (duration time.Duration ) Runner
28- SetFollower ()
29- Send (in interface {})
30- InFrom (w ... Runner ) Runner
31- SetOut (chan interface {})
32- Start () Runner
33- Stop () chan error
34- Wait () error
15+ Work (ctx context.Context , in interface {}, out chan <- interface {}) error
3516}
3617
37- type runner struct {
38- ctx context.Context
39- cancel context.CancelFunc
40- inChan chan interface {}
41- outChan chan interface {}
42- errChan chan error
43- signalChan chan os. Signal
44- limiter chan struct {}
18+ type Runner struct {
19+ ctx context.Context
20+ cancel context.CancelFunc
21+ inputCtx context. Context
22+ inputCancel context. CancelFunc
23+ inChan chan interface {}
24+ outChan chan interface {}
25+ limiter chan struct {}
4526
4627 afterFunc func (ctx context.Context , err error ) error
47- workFunc func (in interface {}, out chan <- interface {}) error
28+ workFunc func (ctx context. Context , in interface {}, out chan <- interface {}) error
4829 beforeFunc func (ctx context.Context ) error
4930
50- timeout time.Duration
51- deadline time.Duration
52-
53- isLeader bool
54- stopCalled bool
31+ timeout time.Duration
5532
5633 numWorkers int64
57- lock * sync.RWMutex
58- wg * sync.WaitGroup
59- done * sync.Once
60- once * sync.Once
34+ started * sync.Once
35+ done chan struct {}
6136}
6237
6338// NewRunner Factory function for a new Runner. The Runner will handle running the workers logic.
64- func NewRunner (ctx context.Context , w Worker , numWorkers int64 ) Runner {
65- var runnerCtx , runnerCancel = context .WithCancel (ctx )
66- var runner = & runner {
67- ctx : runnerCtx ,
68- cancel : runnerCancel ,
69- inChan : make (chan interface {}, numWorkers ),
70- outChan : nil ,
71- errChan : make (chan error , 1 ),
72- signalChan : make (chan os.Signal , 1 ),
73- limiter : make (chan struct {}, numWorkers ),
74- afterFunc : func (ctx context.Context , err error ) error { return err },
75- workFunc : w .Work ,
76- beforeFunc : func (ctx context.Context ) error { return nil },
77- numWorkers : numWorkers ,
78- isLeader : true ,
79- lock : new (sync.RWMutex ),
80- wg : new (sync.WaitGroup ),
81- once : new (sync.Once ),
82- done : new (sync.Once ),
39+ func NewRunner (ctx context.Context , w Worker , numWorkers int64 , buffer int64 ) * Runner {
40+ runnerCtx , runnerCancel := context .WithCancel (ctx )
41+ inputCtx , inputCancel := context .WithCancel (runnerCtx )
42+
43+ runner := & Runner {
44+ ctx : runnerCtx ,
45+ cancel : runnerCancel ,
46+ inputCtx : inputCtx ,
47+ inputCancel : inputCancel ,
48+ inChan : make (chan interface {}, buffer ),
49+ outChan : nil ,
50+ limiter : make (chan struct {}, numWorkers ),
51+ afterFunc : func (ctx context.Context , err error ) error { return nil },
52+ workFunc : w .Work ,
53+ beforeFunc : func (ctx context.Context ) error { return nil },
54+ numWorkers : numWorkers ,
55+ started : new (sync.Once ),
56+ done : make (chan struct {}),
8357 }
84- runner .waitForSignal (defaultWatchSignals ... )
8558 return runner
8659}
8760
88- // Send Send an object to the worker for processing.
89- func (r * runner ) Send (in interface {}) {
61+ var ErrInputClosed = errors .New ("input closed" )
62+
63+ // Send Send an object to the worker for processing if context is not Done.
64+ func (r * Runner ) Send (in interface {}) error {
9065 select {
91- case <- r .ctx .Done ():
92- return
66+ case <- r .inputCtx .Done ():
67+ return ErrInputClosed
9368 case r .inChan <- in :
9469 }
70+ return nil
9571}
9672
9773// InFrom Set a worker to accept output from another worker(s).
98- func (r * runner ) InFrom (w ... Runner ) Runner {
99- r .SetFollower ()
74+ func (r * Runner ) InFrom (w ... * Runner ) * Runner {
10075 for _ , wr := range w {
101- wr .SetOut (r .inChan )
76+ // in := make(chan interface{})
77+ // go func(in chan interface{}) {
78+ // for msg := range in {
79+ // if err := r.Send(msg); err != nil {
80+ // return
81+ // }
82+ // }
83+ // }(in)
84+ wr .SetOut (r .inChan ) // nolint
10285 }
10386 return r
10487}
10588
106- // SetFollower Sets the worker as a follower and does not need to close it's in channel.
107- func (r * runner ) SetFollower () {
108- r .lock .Lock ()
109- r .isLeader = false
110- r .lock .Unlock ()
111- }
112-
113- // Start Starts the worker on processing.
114- func (r * runner ) Start () Runner {
115- r .startWork ()
116- return r
89+ // Start execute beforeFunc and launch worker processing.
90+ func (r * Runner ) Start () error {
91+ r .started .Do (func () {
92+ if err := r .beforeFunc (r .ctx ); err == nil {
93+ go r .work ()
94+ }
95+ })
96+ return nil
11797}
11898
11999// BeforeFunc Function to be run before worker starts processing.
120- func (r * runner ) BeforeFunc (f func (ctx context.Context ) error ) Runner {
100+ func (r * Runner ) BeforeFunc (f func (ctx context.Context ) error ) * Runner {
121101 r .beforeFunc = f
122102 return r
123103}
124104
125105// AfterFunc Function to be run after worker has stopped.
126- func (r * runner ) AfterFunc (f func (ctx context.Context , err error ) error ) Runner {
106+ // It can be used for logging and error management.
107+ // input can be retreive with context value:
108+ // ctx.Value(workers.InputKey{})
109+ // ⚠️ If an error is returned it stop Runner execution.
110+ func (r * Runner ) AfterFunc (f func (ctx context.Context , err error ) error ) * Runner {
127111 r .afterFunc = f
128112 return r
129113}
130114
115+ var ErrOutAlready = errors .New ("out already set" )
116+
131117// SetOut Allows the setting of a workers out channel, if not already set.
132- func (r * runner ) SetOut (c chan interface {}) {
118+ func (r * Runner ) SetOut (c chan interface {}) error {
133119 if r .outChan != nil {
134- return
120+ return ErrOutAlready
135121 }
136122 r .outChan = c
123+ return nil
137124}
138125
139- // SetDeadline allows a time to be set when the workers should stop.
140- // Deadline needs to be handled by the IsDone method.
141- func (r * runner ) SetDeadline (t time.Time ) Runner {
142- r .lock .Lock ()
143- defer r .lock .Unlock ()
126+ // SetDeadline allows a time to be set when the Runner should stop.
127+ // ⚠️ Should only be called before Start
128+ func (r * Runner ) SetDeadline (t time.Time ) * Runner {
144129 r .ctx , r .cancel = context .WithDeadline (r .ctx , t )
145130 return r
146131}
147132
148- // SetTimeout allows a time duration to be set when the workers should stop.
149- // Timeout needs to be handled by the IsDone method.
150- func (r * runner ) SetTimeout (duration time.Duration ) Runner {
151- r .lock .Lock ()
152- defer r .lock .Unlock ()
133+ // SetWorkerTimeout allows a time duration to be set when the workers should stop.
134+ // ⚠️ Should only be called before Start
135+ func (r * Runner ) SetWorkerTimeout (duration time.Duration ) * Runner {
153136 r .timeout = duration
154137 return r
155138}
156139
157- // Wait calls stop on workers and waits for the channel to drain.
158- // !!Should only be called when certain nothing will send to worker.
159- func (r * runner ) Wait () error {
160- r .waitForDrain ()
161- if err := <- r .Stop (); err != nil && ! errors .Is (err , context .Canceled ) {
162- return err
140+ // Wait close the input channel and waits it to drain and process.
141+ func (r * Runner ) Wait () * Runner {
142+ if r .inputCtx .Err () == nil {
143+ r .inputCancel ()
144+ close (r .inChan )
163145 }
164- return nil
165- }
166146
167- // Stop Stops the processing of a worker and closes it's channel in.
168- // Returns a blocking channel with type error.
169- // !!Should only be called when certain nothing will send to worker.
170- func (r * runner ) Stop () chan error {
171- r .done .Do (func () {
172- if r .inChan != nil && r .isLeader {
173- close (r .inChan )
174- }
175- })
176- return r .errChan
147+ <- r .done
148+
149+ return r
177150}
178151
179- // IsDone returns a channel signaling the workers context has been canceled.
180- func (r * runner ) IsDone () <- chan struct {} {
181- return r .ctx .Done ()
152+ // Stop Stops the processing of a worker and waits for workers to finish.
153+ func (r * Runner ) Stop () * Runner {
154+ r .cancel ()
155+ r .Wait ()
156+ return r
182157}
183158
184- // waitForSignal make sure we wait for a term signal and shutdown correctly
185- func (r * runner ) waitForSignal (signals ... os.Signal ) {
186- go func () {
187- signal .Notify (r .signalChan , signals ... )
188- <- r .signalChan
189- if r .cancel != nil {
190- r .cancel ()
191- }
159+ type InputKey struct {}
160+
161+ // work starts processing input and limits worker instance number.
162+ func (r * Runner ) work () {
163+ var wg sync.WaitGroup
164+
165+ defer func () {
166+ wg .Wait ()
167+ r .cancel ()
168+ close (r .done )
192169 }()
193- }
194170
195- // waitForDrain Waits for the limiter to be zeroed out and the in channel to be empty.
196- func (r * runner ) waitForDrain () {
197- for len (r .limiter ) > 0 || len (r .inChan ) > 0 {
198- // Wait for the drain.
199- }
200- }
171+ for {
172+ select {
173+ case <- r .ctx .Done ():
174+ return
175+ case input , open := <- r .inChan :
176+ if ! open {
177+ return
178+ }
179+ wg .Add (1 )
201180
202- // startWork Runs the before function and starts processing until one of three things happen.
203- // 1. A term signal is received or cancellation of context.
204- // 2. Stop function is called.
205- // 3. Worker returns an error.
206- func (r * runner ) startWork () {
207- var err error
208- if err = r .beforeFunc (r .ctx ); err != nil {
209- r .errChan <- err
210- return
211- }
212- if r .timeout > 0 {
213- r .ctx , r .cancel = context .WithTimeout (r .ctx , r .timeout )
214- }
215- r .wg .Add (1 )
216- go func () {
217- var workerWG = new (sync.WaitGroup )
218- var closeOnce = new (sync.Once )
219-
220- // write out error if not nil on exit.
221- defer func () {
222- workerWG .Wait ()
223- r .errChan <- err
224- closeOnce .Do (func () {
225- if r .outChan != nil {
226- close (r .outChan )
227- }
228- })
229- r .wg .Done ()
230- }()
231- for in := range r .inChan {
232- input := in
233181 r .limiter <- struct {}{}
234- workerWG .Add (1 )
182+
183+ inputCtx := context .WithValue (r .ctx , InputKey {}, input )
184+ workCtx , workCancel := context .WithCancel (inputCtx )
185+ if r .timeout > 0 {
186+ workCtx , workCancel = context .WithTimeout (inputCtx , r .timeout )
187+ }
188+
235189 go func () {
236190 defer func () {
237191 <- r .limiter
238- workerWG .Done ()
192+ workCancel ()
193+ wg .Done ()
239194 }()
240- if err := r .afterFunc (r .ctx , r .workFunc (input , r .outChan )); err != nil {
241- r .once .Do (func () {
242- r .errChan <- err
243- r .cancel ()
244- })
195+ if err := r .afterFunc (inputCtx , r .workFunc (workCtx , input , r .outChan )); err != nil {
196+ r .cancel ()
245197 }
246198 }()
247199 }
248- }()
200+ }
249201}
0 commit comments