From e396721ebd1b71162e424fdd4f7670c657667a60 Mon Sep 17 00:00:00 2001 From: Guilhem Lettron Date: Tue, 21 Sep 2021 12:21:30 +0200 Subject: [PATCH 1/4] feat: add afterFunc behavior Useful to catch errors in a infinit worker pool --- workers.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/workers.go b/workers.go index dc36433..63050a9 100644 --- a/workers.go +++ b/workers.go @@ -237,11 +237,10 @@ func (r *runner) startWork() { <-r.limiter workerWG.Done() }() - if err := r.workFunc(input, r.outChan); err != nil { + if err := r.afterFunc(r.ctx, r.workFunc(input, r.outChan)); err != nil { r.once.Do(func() { r.errChan <- err r.cancel() - return }) } }() From 5d207673710c4ea7ccc0f38a54fc572fd3312ea9 Mon Sep 17 00:00:00 2001 From: Guilhem Lettron Date: Tue, 21 Sep 2021 18:20:33 +0200 Subject: [PATCH 2/4] refactor: change (almost) everything --- workers.go | 290 ++++++++++++++++++++---------------------------- workers_test.go | 159 ++++++++++++-------------- 2 files changed, 194 insertions(+), 255 deletions(-) diff --git a/workers.go b/workers.go index 63050a9..d06b3f8 100644 --- a/workers.go +++ b/workers.go @@ -3,247 +3,199 @@ package workers import ( "context" "errors" - "os" - "os/signal" "sync" - "syscall" "time" ) -var defaultWatchSignals = []os.Signal{syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL} - -// Worker Contains the work function. Allows an input and output to a channel or another worker for pipeline work. -// Return nil if you want the Runner to continue otherwise any error will cause the Runner to shutdown and return the -// error. +// Worker Contains the work function. +// Work() get input and could put in outChan for followers. +// ⚠️ outChan could be closed if follower is stoped before producer. +// error returned can be process by afterFunc but will be ignored by default. type Worker interface { - Work(in interface{}, out chan<- interface{}) error -} - -// Runner Handles the running the Worker logic. -type Runner interface { - BeforeFunc(func(ctx context.Context) error) Runner - AfterFunc(func(ctx context.Context, err error) error) Runner - SetDeadline(t time.Time) Runner - SetTimeout(duration time.Duration) Runner - SetFollower() - Send(in interface{}) - InFrom(w ...Runner) Runner - SetOut(chan interface{}) - Start() Runner - Stop() chan error - Wait() error + Work(ctx context.Context, in interface{}, out chan<- interface{}) error } -type runner struct { - ctx context.Context - cancel context.CancelFunc - inChan chan interface{} - outChan chan interface{} - errChan chan error - signalChan chan os.Signal - limiter chan struct{} +type Runner struct { + ctx context.Context + cancel context.CancelFunc + inputCtx context.Context + inputCancel context.CancelFunc + inChan chan interface{} + outChan chan interface{} + limiter chan struct{} afterFunc func(ctx context.Context, err error) error - workFunc func(in interface{}, out chan<- interface{}) error + workFunc func(ctx context.Context, in interface{}, out chan<- interface{}) error beforeFunc func(ctx context.Context) error - timeout time.Duration - deadline time.Duration - - isLeader bool - stopCalled bool + timeout time.Duration numWorkers int64 - lock *sync.RWMutex - wg *sync.WaitGroup - done *sync.Once - once *sync.Once + started *sync.Once + done chan struct{} } // NewRunner Factory function for a new Runner. The Runner will handle running the workers logic. -func NewRunner(ctx context.Context, w Worker, numWorkers int64) Runner { - var runnerCtx, runnerCancel = context.WithCancel(ctx) - var runner = &runner{ - ctx: runnerCtx, - cancel: runnerCancel, - inChan: make(chan interface{}, numWorkers), - outChan: nil, - errChan: make(chan error, 1), - signalChan: make(chan os.Signal, 1), - limiter: make(chan struct{}, numWorkers), - afterFunc: func(ctx context.Context, err error) error { return err }, - workFunc: w.Work, - beforeFunc: func(ctx context.Context) error { return nil }, - numWorkers: numWorkers, - isLeader: true, - lock: new(sync.RWMutex), - wg: new(sync.WaitGroup), - once: new(sync.Once), - done: new(sync.Once), +func NewRunner(ctx context.Context, w Worker, numWorkers int64, buffer int64) *Runner { + runnerCtx, runnerCancel := context.WithCancel(ctx) + inputCtx, inputCancel := context.WithCancel(runnerCtx) + + runner := &Runner{ + ctx: runnerCtx, + cancel: runnerCancel, + inputCtx: inputCtx, + inputCancel: inputCancel, + inChan: make(chan interface{}, buffer), + outChan: nil, + limiter: make(chan struct{}, numWorkers), + afterFunc: func(ctx context.Context, err error) error { return nil }, + workFunc: w.Work, + beforeFunc: func(ctx context.Context) error { return nil }, + numWorkers: numWorkers, + started: new(sync.Once), + done: make(chan struct{}), } - runner.waitForSignal(defaultWatchSignals...) return runner } -// Send Send an object to the worker for processing. -func (r *runner) Send(in interface{}) { +var ErrInputClosed = errors.New("input closed") + +// Send Send an object to the worker for processing if context is not Done. +func (r *Runner) Send(in interface{}) error { select { - case <-r.ctx.Done(): - return + case <-r.inputCtx.Done(): + return ErrInputClosed case r.inChan <- in: } + return nil } // InFrom Set a worker to accept output from another worker(s). -func (r *runner) InFrom(w ...Runner) Runner { - r.SetFollower() +func (r *Runner) InFrom(w ...*Runner) *Runner { for _, wr := range w { - wr.SetOut(r.inChan) + // in := make(chan interface{}) + // go func(in chan interface{}) { + // for msg := range in { + // if err := r.Send(msg); err != nil { + // return + // } + // } + // }(in) + wr.SetOut(r.inChan) // nolint } return r } -// SetFollower Sets the worker as a follower and does not need to close it's in channel. -func (r *runner) SetFollower() { - r.lock.Lock() - r.isLeader = false - r.lock.Unlock() -} - -// Start Starts the worker on processing. -func (r *runner) Start() Runner { - r.startWork() - return r +// Start execute beforeFunc and launch worker processing. +func (r *Runner) Start() error { + r.started.Do(func() { + if err := r.beforeFunc(r.ctx); err == nil { + go r.work() + } + }) + return nil } // BeforeFunc Function to be run before worker starts processing. -func (r *runner) BeforeFunc(f func(ctx context.Context) error) Runner { +func (r *Runner) BeforeFunc(f func(ctx context.Context) error) *Runner { r.beforeFunc = f return r } // AfterFunc Function to be run after worker has stopped. -func (r *runner) AfterFunc(f func(ctx context.Context, err error) error) Runner { +// It can be used for logging and error management. +// input can be retreive with context value: +// ctx.Value(workers.InputKey{}) +// ⚠️ If an error is returned it stop Runner execution. +func (r *Runner) AfterFunc(f func(ctx context.Context, err error) error) *Runner { r.afterFunc = f return r } +var ErrOutAlready = errors.New("out already set") + // SetOut Allows the setting of a workers out channel, if not already set. -func (r *runner) SetOut(c chan interface{}) { +func (r *Runner) SetOut(c chan interface{}) error { if r.outChan != nil { - return + return ErrOutAlready } r.outChan = c + return nil } -// SetDeadline allows a time to be set when the workers should stop. -// Deadline needs to be handled by the IsDone method. -func (r *runner) SetDeadline(t time.Time) Runner { - r.lock.Lock() - defer r.lock.Unlock() +// SetDeadline allows a time to be set when the Runner should stop. +// ⚠️ Should only be called before Start +func (r *Runner) SetDeadline(t time.Time) *Runner { r.ctx, r.cancel = context.WithDeadline(r.ctx, t) return r } -// SetTimeout allows a time duration to be set when the workers should stop. -// Timeout needs to be handled by the IsDone method. -func (r *runner) SetTimeout(duration time.Duration) Runner { - r.lock.Lock() - defer r.lock.Unlock() +// SetWorkerTimeout allows a time duration to be set when the workers should stop. +// ⚠️ Should only be called before Start +func (r *Runner) SetWorkerTimeout(duration time.Duration) *Runner { r.timeout = duration return r } -// Wait calls stop on workers and waits for the channel to drain. -// !!Should only be called when certain nothing will send to worker. -func (r *runner) Wait() error { - r.waitForDrain() - if err := <-r.Stop(); err != nil && !errors.Is(err, context.Canceled) { - return err +// Wait close the input channel and waits it to drain and process. +func (r *Runner) Wait() *Runner { + if r.inputCtx.Err() == nil { + r.inputCancel() + close(r.inChan) } - return nil -} -// Stop Stops the processing of a worker and closes it's channel in. -// Returns a blocking channel with type error. -// !!Should only be called when certain nothing will send to worker. -func (r *runner) Stop() chan error { - r.done.Do(func() { - if r.inChan != nil && r.isLeader { - close(r.inChan) - } - }) - return r.errChan + <-r.done + + return r } -// IsDone returns a channel signaling the workers context has been canceled. -func (r *runner) IsDone() <-chan struct{} { - return r.ctx.Done() +// Stop Stops the processing of a worker and waits for workers to finish. +func (r *Runner) Stop() *Runner { + r.cancel() + r.Wait() + return r } -// waitForSignal make sure we wait for a term signal and shutdown correctly -func (r *runner) waitForSignal(signals ...os.Signal) { - go func() { - signal.Notify(r.signalChan, signals...) - <-r.signalChan - if r.cancel != nil { - r.cancel() - } +type InputKey struct{} + +// work starts processing input and limits worker instance number. +func (r *Runner) work() { + var wg sync.WaitGroup + + defer func() { + wg.Wait() + r.cancel() + close(r.done) }() -} -// waitForDrain Waits for the limiter to be zeroed out and the in channel to be empty. -func (r *runner) waitForDrain() { - for len(r.limiter) > 0 || len(r.inChan) > 0 { - // Wait for the drain. - } -} + for { + select { + case <-r.ctx.Done(): + return + case input, open := <-r.inChan: + if !open { + return + } + wg.Add(1) -// startWork Runs the before function and starts processing until one of three things happen. -// 1. A term signal is received or cancellation of context. -// 2. Stop function is called. -// 3. Worker returns an error. -func (r *runner) startWork() { - var err error - if err = r.beforeFunc(r.ctx); err != nil { - r.errChan <- err - return - } - if r.timeout > 0 { - r.ctx, r.cancel = context.WithTimeout(r.ctx, r.timeout) - } - r.wg.Add(1) - go func() { - var workerWG = new(sync.WaitGroup) - var closeOnce = new(sync.Once) - - // write out error if not nil on exit. - defer func() { - workerWG.Wait() - r.errChan <- err - closeOnce.Do(func() { - if r.outChan != nil { - close(r.outChan) - } - }) - r.wg.Done() - }() - for in := range r.inChan { - input := in r.limiter <- struct{}{} - workerWG.Add(1) + + inputCtx := context.WithValue(r.ctx, InputKey{}, input) + workCtx, workCancel := context.WithCancel(inputCtx) + if r.timeout > 0 { + workCtx, workCancel = context.WithTimeout(inputCtx, r.timeout) + } + go func() { defer func() { <-r.limiter - workerWG.Done() + workCancel() + wg.Done() }() - if err := r.afterFunc(r.ctx, r.workFunc(input, r.outChan)); err != nil { - r.once.Do(func() { - r.errChan <- err - r.cancel() - }) + if err := r.afterFunc(inputCtx, r.workFunc(workCtx, input, r.outChan)); err != nil { + r.cancel() } }() } - }() + } } diff --git a/workers_test.go b/workers_test.go index 7401727..266d795 100644 --- a/workers_test.go +++ b/workers_test.go @@ -1,16 +1,18 @@ -package workers +package workers_test import ( "context" "errors" - "fmt" "math/rand" "os" "runtime" "runtime/debug" "sync" + "sync/atomic" "testing" "time" + + "github.com/catmullet/go-workers" ) const ( @@ -20,11 +22,11 @@ const ( ) type WorkerOne struct { - Count int + Count int32 sync.Mutex } type WorkerTwo struct { - Count int + Count int32 sync.Mutex } @@ -37,15 +39,11 @@ func NewWorkerTwo() *WorkerTwo { } func (wo *WorkerOne) CurrentCount() int { - wo.Lock() - defer wo.Unlock() - return wo.Count + return int(wo.Count) } -func (wo *WorkerOne) Work(in interface{}, out chan<- interface{}) error { - mut.Lock() - wo.Count = wo.Count + 1 - mut.Unlock() +func (wo *WorkerOne) Work(ctx context.Context, in interface{}, out chan<- interface{}) error { + atomic.AddInt32(&wo.Count, 1) total := in.(int) * 2 out <- total @@ -53,15 +51,11 @@ func (wo *WorkerOne) Work(in interface{}, out chan<- interface{}) error { } func (wt *WorkerTwo) CurrentCount() int { - wt.Lock() - defer wt.Unlock() - return wt.Count + return int(wt.Count) } -func (wt *WorkerTwo) Work(in interface{}, out chan<- interface{}) error { - mut.Lock() - wt.Count = wt.Count + 1 - mut.Unlock() +func (wt *WorkerTwo) Work(ctx context.Context, in interface{}, out chan<- interface{}) error { + atomic.AddInt32(&wt.Count, 1) return nil } @@ -109,10 +103,10 @@ var ( }, } - getWorker = func(ctx context.Context, wt workerTest) Runner { - worker := NewRunner(ctx, wt.worker, wt.numWorkers) + getWorker = func(ctx context.Context, wt workerTest) *workers.Runner { + worker := workers.NewRunner(ctx, wt.worker, wt.numWorkers, wt.numWorkers) if wt.timeout > 0 { - worker.SetTimeout(wt.timeout) + worker.SetWorkerTimeout(wt.timeout) } if wt.deadline != nil { worker.SetDeadline(wt.deadline()) @@ -125,41 +119,41 @@ type workerTest struct { name string timeout time.Duration deadline func() time.Time - worker Worker + worker workers.Worker numWorkers int64 testSignal bool errExpected bool } type TestWorkerObject struct { - workFunc func(in interface{}, out chan<- interface{}) error + workFunc func(ctx context.Context, in interface{}, out chan<- interface{}) error } -func NewTestWorkerObject(wf func(in interface{}, out chan<- interface{}) error) Worker { +func NewTestWorkerObject(wf func(ctx context.Context, in interface{}, out chan<- interface{}) error) workers.Worker { return &TestWorkerObject{wf} } -func (tw *TestWorkerObject) Work(in interface{}, out chan<- interface{}) error { - return tw.workFunc(in, out) +func (tw *TestWorkerObject) Work(ctx context.Context, in interface{}, out chan<- interface{}) error { + return tw.workFunc(ctx, in, out) } -func workBasicNoOut() func(in interface{}, out chan<- interface{}) error { - return func(in interface{}, out chan<- interface{}) error { +func workBasicNoOut() func(ctx context.Context, in interface{}, out chan<- interface{}) error { + return func(ctx context.Context, in interface{}, out chan<- interface{}) error { _ = in.(int) return nil } } -func workBasic() func(in interface{}, out chan<- interface{}) error { - return func(in interface{}, out chan<- interface{}) error { +func workBasic() func(ctx context.Context, in interface{}, out chan<- interface{}) error { + return func(ctx context.Context, in interface{}, out chan<- interface{}) error { i := in.(int) out <- i return nil } } -func workWithError(err error) func(in interface{}, out chan<- interface{}) error { - return func(in interface{}, out chan<- interface{}) error { +func workWithError(err error) func(ctx context.Context, in interface{}, out chan<- interface{}) error { + return func(ctx context.Context, in interface{}, out chan<- interface{}) error { i := in.(int) total := i * rand.Intn(1000) if i == 100 { @@ -181,20 +175,23 @@ func TestWorkers(t *testing.T) { for _, tt := range workerTestScenarios { t.Run(tt.name, func(t *testing.T) { ctx := context.Background() - workerOne := getWorker(ctx, tt).Start() + workerOne := getWorker(ctx, tt) // always need a consumer for the out tests so using basic here. - workerTwo := NewRunner(ctx, NewTestWorkerObject(workBasicNoOut()), workerCount).InFrom(workerOne).Start() - - for i := 0; i < runTimes; i++ { - workerOne.Send(i) - } + workerTwo := workers.NewRunner(ctx, NewTestWorkerObject(workBasicNoOut()), workerCount, workerCount).InFrom(workerOne) - if err := workerOne.Wait(); err != nil && (!tt.errExpected) { + if err := workerOne.Start(); err != nil && !tt.errExpected { t.Error(err) } - if err := workerTwo.Wait(); err != nil && !tt.errExpected { + if err := workerTwo.Start(); err != nil && !tt.errExpected { t.Error(err) } + + for i := 0; i < runTimes; i++ { + workerOne.Send(i) + } + + workerOne.Wait().Stop() + workerTwo.Wait().Stop() }) } } @@ -204,27 +201,25 @@ func TestWorkersFinish100(t *testing.T) { ctx := context.Background() w1 := NewWorkerOne() w2 := NewWorkerTwo() - workerOne := NewRunner(ctx, w1, 1000).Start() - workerTwo := NewRunner(ctx, w2, 1000).InFrom(workerOne).Start() + workerOne := workers.NewRunner(ctx, w1, 1000, 10) + workerTwo := workers.NewRunner(ctx, w2, 1000, 10000).InFrom(workerOne) + workerOne.Start() + workerTwo.Start() for i := 0; i < workCount; i++ { workerOne.Send(rand.Intn(100)) } - if err := workerOne.Wait(); err != nil { - fmt.Println(err) - } + workerOne.Wait().Stop() - if err := workerTwo.Wait(); err != nil { - fmt.Println(err) - } + workerTwo.Wait().Stop() if w1.CurrentCount() != workCount { - t.Log("worker one failed to finish,", "worker_one count", w1.CurrentCount(), "/ 100000") + t.Log("worker one failed to finish,", "worker_one count", w1.CurrentCount(), "/", workCount) t.Fail() } if w2.CurrentCount() != workCount { - t.Log("worker two failed to finish,", "worker_two count", w2.CurrentCount(), "/ 100000") + t.Log("worker two failed to finish,", "worker_two count", w2.CurrentCount(), "/", workCount) t.Fail() } @@ -236,27 +231,25 @@ func TestWorkersFinish100000(t *testing.T) { ctx := context.Background() w1 := NewWorkerOne() w2 := NewWorkerTwo() - workerOne := NewRunner(ctx, w1, 1000).Start() - workerTwo := NewRunner(ctx, w2, 1000).InFrom(workerOne).Start() + workerOne := workers.NewRunner(ctx, w1, 1000, 2000) + workerTwo := workers.NewRunner(ctx, w2, 1000, 1).InFrom(workerOne) + workerOne.Start() + workerTwo.Start() for i := 0; i < workCount; i++ { workerOne.Send(rand.Intn(100)) } - if err := workerOne.Wait(); err != nil { - fmt.Println(err) - } + workerOne.Wait().Stop() - if err := workerTwo.Wait(); err != nil { - fmt.Println(err) - } + workerTwo.Wait().Stop() if w1.CurrentCount() != workCount { - t.Log("worker one failed to finish,", "worker_one count", w1.CurrentCount(), "/ 100000") + t.Log("worker one failed to finish,", "worker_one count", w1.CurrentCount(), "/", workCount) t.Fail() } if w2.CurrentCount() != workCount { - t.Log("worker two failed to finish,", "worker_two count", w2.CurrentCount(), "/ 100000") + t.Log("worker two failed to finish,", "worker_two count", w2.CurrentCount(), "/", workCount) t.Fail() } @@ -268,27 +261,25 @@ func TestWorkersFinish1000000(t *testing.T) { ctx := context.Background() w1 := NewWorkerOne() w2 := NewWorkerTwo() - workerOne := NewRunner(ctx, w1, 1000).Start() - workerTwo := NewRunner(ctx, w2, 1000).InFrom(workerOne).Start() + workerOne := workers.NewRunner(ctx, w1, 1000, 1000) + workerTwo := workers.NewRunner(ctx, w2, 1000, 500).InFrom(workerOne) + workerOne.Start() + workerTwo.Start() for i := 0; i < workCount; i++ { workerOne.Send(rand.Intn(100)) } - if err := workerOne.Wait(); err != nil { - fmt.Println(err) - } + workerOne.Wait().Stop() - if err := workerTwo.Wait(); err != nil { - fmt.Println(err) - } + workerTwo.Wait().Stop() if w1.CurrentCount() != workCount { - t.Log("worker one failed to finish,", "worker_one count", w1.CurrentCount(), "/ 100000") + t.Log("worker one failed to finish,", "worker_one count", w1.CurrentCount(), "/", workCount) t.Fail() } if w2.CurrentCount() != workCount { - t.Log("worker two failed to finish,", "worker_two count", w2.CurrentCount(), "/ 100000") + t.Log("worker two failed to finish,", "worker_two count", w2.CurrentCount(), "/", workCount) t.Fail() } @@ -296,7 +287,8 @@ func TestWorkersFinish1000000(t *testing.T) { } func BenchmarkGoWorkers1to1(b *testing.B) { - worker := NewRunner(context.Background(), NewTestWorkerObject(workBasicNoOut()), 1000).Start() + worker := workers.NewRunner(context.Background(), NewTestWorkerObject(workBasicNoOut()), 1000, 2000) + worker.Start() b.ResetTimer() for i := 0; i < b.N; i++ { @@ -306,49 +298,44 @@ func BenchmarkGoWorkers1to1(b *testing.B) { } b.StopTimer() - if err := worker.Wait(); err != nil { - b.Error(err) - } + worker.Wait().Stop() } func Benchmark100GoWorkers(b *testing.B) { b.ReportAllocs() - worker := NewRunner(context.Background(), NewTestWorkerObject(workBasicNoOut()), 100).Start() + worker := workers.NewRunner(context.Background(), NewTestWorkerObject(workBasicNoOut()), 100, 200) + worker.Start() b.ResetTimer() for i := 0; i < b.N; i++ { worker.Send(i) } - if err := worker.Wait(); err != nil { - b.Error(err) - } + worker.Wait().Stop() } func Benchmark1000GoWorkers(b *testing.B) { b.ReportAllocs() - worker := NewRunner(context.Background(), NewTestWorkerObject(workBasicNoOut()), 1000).Start() + worker := workers.NewRunner(context.Background(), NewTestWorkerObject(workBasicNoOut()), 1000, 500) + worker.Start() b.ResetTimer() for i := 0; i < b.N; i++ { worker.Send(i) } - if err := worker.Wait(); err != nil { - b.Error(err) - } + worker.Wait().Stop() } func Benchmark10000GoWorkers(b *testing.B) { b.ReportAllocs() - worker := NewRunner(context.Background(), NewTestWorkerObject(workBasicNoOut()), 10000).Start() + worker := workers.NewRunner(context.Background(), NewTestWorkerObject(workBasicNoOut()), 10000, 5000) + worker.Start() b.ResetTimer() for i := 0; i < b.N; i++ { worker.Send(i) } - if err := worker.Wait(); err != nil { - b.Error(err) - } + worker.Wait().Stop() } From ab1d0e8836859079cf8373f515af7c7866853c85 Mon Sep 17 00:00:00 2001 From: Guilhem Lettron Date: Thu, 23 Sep 2021 19:23:02 +0200 Subject: [PATCH 3/4] fix: working examples --- examples/deadline_worker/deadlineworker.go | 21 ++++++++------ examples/multiple_workers/multipleworkers.go | 26 ++++++++++++------ examples/passing_fields/passingfields.go | 29 +++++++++++--------- examples/quickstart/quickstart.go | 23 +++++++--------- examples/timeout_worker/timeoutworker.go | 23 +++++++--------- 5 files changed, 65 insertions(+), 57 deletions(-) diff --git a/examples/deadline_worker/deadlineworker.go b/examples/deadline_worker/deadlineworker.go index 832670c..b8d527a 100644 --- a/examples/deadline_worker/deadlineworker.go +++ b/examples/deadline_worker/deadlineworker.go @@ -1,3 +1,4 @@ +//go:build ignore // +build ignore package main @@ -5,35 +6,37 @@ package main import ( "context" "fmt" - "github.com/catmullet/go-workers" "time" + + "github.com/catmullet/go-workers" ) func main() { ctx := context.Background() t := time.Now() - deadlineWorker := workers.NewRunner(ctx, NewDeadlineWorker(), 100). - SetDeadline(t.Add(200 * time.Millisecond)).Start() + deadlineWorker := workers.NewRunner(ctx, NewDeadlineWorker().Work, 100, 100). + SetDeadline(t.Add(200 * time.Millisecond)) + deadlineWorker.Start() + if err := deadlineWorker.Start(); err != nil { + fmt.Println(err) + } for i := 0; i < 1000000; i++ { deadlineWorker.Send("hello") } - err := deadlineWorker.Wait() - if err != nil { - fmt.Println(err) - } + deadlineWorker.Wait().Stop() fmt.Println("finished") } type DeadlineWorker struct{} -func NewDeadlineWorker() workers.Worker { +func NewDeadlineWorker() *DeadlineWorker { return &DeadlineWorker{} } -func (dlw *DeadlineWorker) Work(in interface{}, out chan<- interface{}) error { +func (dlw *DeadlineWorker) Work(_ context.Context, in interface{}, out chan<- interface{}) error { fmt.Println(in) time.Sleep(1 * time.Second) return nil diff --git a/examples/multiple_workers/multipleworkers.go b/examples/multiple_workers/multipleworkers.go index 86f76d7..513d1a0 100644 --- a/examples/multiple_workers/multipleworkers.go +++ b/examples/multiple_workers/multipleworkers.go @@ -1,3 +1,4 @@ +//go:build ignore // +build ignore package main @@ -5,9 +6,10 @@ package main import ( "context" "fmt" - "github.com/catmullet/go-workers" "math/rand" "sync" + + "github.com/catmullet/go-workers" ) var ( @@ -18,19 +20,25 @@ var ( func main() { ctx := context.Background() - workerOne := workers.NewRunner(ctx, NewWorkerOne(), 1000).Start() - workerTwo := workers.NewRunner(ctx, NewWorkerTwo(), 1000).InFrom(workerOne).Start() + workerOne := workers.NewRunner(ctx, NewWorkerOne().Work, 1000, 1000) + workerTwo := workers.NewRunner(ctx, NewWorkerTwo().Work, 1000, 1000).InFrom(workerOne) + if err := workerOne.Start(); err != nil { + fmt.Println(err) + } + if err := workerTwo.Start(); err != nil { + fmt.Println(err) + } go func() { for i := 0; i < 100000; i++ { workerOne.Send(rand.Intn(100)) } - if err := workerOne.Wait(); err != nil { + if err := workerOne.Wait().Stop(); err != nil { fmt.Println(err) } }() - if err := workerTwo.Wait(); err != nil { + if err := workerTwo.Wait().Stop(); err != nil { fmt.Println(err) } @@ -44,15 +52,15 @@ type WorkerOne struct { type WorkerTwo struct { } -func NewWorkerOne() workers.Worker { +func NewWorkerOne() *WorkerOne { return &WorkerOne{} } -func NewWorkerTwo() workers.Worker { +func NewWorkerTwo() *WorkerTwo { return &WorkerTwo{} } -func (wo *WorkerOne) Work(in interface{}, out chan<- interface{}) error { +func (wo *WorkerOne) Work(_ context.Context, in interface{}, out chan<- interface{}) error { var workerOne = "worker_one" mut.Lock() if val, ok := count[workerOne]; ok { @@ -68,7 +76,7 @@ func (wo *WorkerOne) Work(in interface{}, out chan<- interface{}) error { return nil } -func (wt *WorkerTwo) Work(in interface{}, out chan<- interface{}) error { +func (wt *WorkerTwo) Work(_ context.Context, in interface{}, out chan<- interface{}) error { var workerTwo = "worker_two" mut.Lock() if val, ok := count[workerTwo]; ok { diff --git a/examples/passing_fields/passingfields.go b/examples/passing_fields/passingfields.go index b4cc768..20ccef9 100644 --- a/examples/passing_fields/passingfields.go +++ b/examples/passing_fields/passingfields.go @@ -1,3 +1,4 @@ +//go:build ignore // +build ignore package main @@ -5,26 +6,28 @@ package main import ( "context" "fmt" - "github.com/catmullet/go-workers" "math/rand" + + "github.com/catmullet/go-workers" ) func main() { ctx := context.Background() - workerOne := workers.NewRunner(ctx, NewWorkerOne(2), 100).Start() - workerTwo := workers.NewRunner(ctx, NewWorkerTwo(4), 100).InFrom(workerOne).Start() - - for i := 0; i < 15; i++ { - workerOne.Send(rand.Intn(100)) + workerOne := workers.NewRunner(ctx, NewWorkerOne(2).Work, 100, 100) + workerTwo := workers.NewRunner(ctx, NewWorkerTwo(4).Work, 100, 100).InFrom(workerOne) + if err := workerOne.Start(); err != nil { + fmt.Println(err) } - if err := workerOne.Wait(); err != nil { + if err := workerTwo.Start(); err != nil { fmt.Println(err) } - if err := workerTwo.Wait(); err != nil { - fmt.Println(err) + for i := 0; i < 15; i++ { + workerOne.Send(rand.Intn(100)) } + workerOne.Wait().Stop() + workerTwo.Wait().Stop() } type WorkerOne struct { @@ -34,26 +37,26 @@ type WorkerTwo struct { amountToMultiply int } -func NewWorkerOne(amountToMultiply int) workers.Worker { +func NewWorkerOne(amountToMultiply int) *WorkerOne { return &WorkerOne{ amountToMultiply: amountToMultiply, } } -func NewWorkerTwo(amountToMultiply int) workers.Worker { +func NewWorkerTwo(amountToMultiply int) *WorkerTwo { return &WorkerTwo{ amountToMultiply, } } -func (wo *WorkerOne) Work(in interface{}, out chan<- interface{}) error { +func (wo *WorkerOne) Work(ctx context.Context, in interface{}, out chan<- interface{}) error { total := in.(int) * wo.amountToMultiply fmt.Println("worker1", fmt.Sprintf("%d * %d = %d", in.(int), wo.amountToMultiply, total)) out <- total return nil } -func (wt *WorkerTwo) Work(in interface{}, out chan<- interface{}) error { +func (wt *WorkerTwo) Work(ctx context.Context, in interface{}, out chan<- interface{}) error { totalFromWorkerOne := in.(int) fmt.Println("worker2", fmt.Sprintf("%d * %d = %d", totalFromWorkerOne, wt.amountToMultiply, totalFromWorkerOne*wt.amountToMultiply)) return nil diff --git a/examples/quickstart/quickstart.go b/examples/quickstart/quickstart.go index 2829039..43e2de1 100644 --- a/examples/quickstart/quickstart.go +++ b/examples/quickstart/quickstart.go @@ -1,3 +1,4 @@ +//go:build ignore // +build ignore package main @@ -5,36 +6,32 @@ package main import ( "context" "fmt" - "github.com/catmullet/go-workers" "math/rand" "time" + + "github.com/catmullet/go-workers" ) func main() { ctx := context.Background() t := time.Now() - rnr := workers.NewRunner(ctx, NewWorker(), 100).Start() + rnr := workers.NewRunner(ctx, work, 100, 100) + + if err := rnr.Start(); err != nil { + fmt.Println(err) + } for i := 0; i < 1000000; i++ { rnr.Send(rand.Intn(100)) } - if err := rnr.Wait(); err != nil { - fmt.Println(err) - } + rnr.Wait().Stop() totalTime := time.Since(t).Milliseconds() fmt.Printf("total time %dms\n", totalTime) } -type WorkerOne struct { -} - -func NewWorker() workers.Worker { - return &WorkerOne{} -} - -func (wo *WorkerOne) Work(in interface{}, out chan<- interface{}) error { +func work(ctx context.Context, in interface{}, out chan<- interface{}) error { total := in.(int) * 2 fmt.Println(fmt.Sprintf("%d * 2 = %d", in.(int), total)) return nil diff --git a/examples/timeout_worker/timeoutworker.go b/examples/timeout_worker/timeoutworker.go index f5226a1..5727aae 100644 --- a/examples/timeout_worker/timeoutworker.go +++ b/examples/timeout_worker/timeoutworker.go @@ -1,3 +1,4 @@ +//go:build ignore // +build ignore package main @@ -5,32 +6,28 @@ package main import ( "context" "fmt" - "github.com/catmullet/go-workers" "time" + + "github.com/catmullet/go-workers" ) func main() { ctx := context.Background() - timeoutWorker := workers.NewRunner(ctx, NewTimeoutWorker(), 10).SetTimeout(100 * time.Millisecond).Start() - - for i := 0; i < 1000000; i++ { - timeoutWorker.Send("hello") - } - - err := timeoutWorker.Wait() + timeoutWorker := workers.NewRunner(ctx, work, 10, 10).SetWorkerTimeout(100 * time.Millisecond) + err := timeoutWorker.Start() if err != nil { fmt.Println(err) } -} -type TimeoutWorker struct{} + for i := 0; i < 1000000; i++ { + timeoutWorker.Send("hello") + } -func NewTimeoutWorker() workers.Worker { - return &TimeoutWorker{} + timeoutWorker.Wait().Stop() } -func (tw *TimeoutWorker) Work(in interface{}, out chan<- interface{}) error { +func work(ctx context.Context, in interface{}, out chan<- interface{}) error { fmt.Println(in) time.Sleep(1 * time.Second) return nil From 4ce2b85750208abf61815c27edc42449064db71d Mon Sep 17 00:00:00 2001 From: Guilhem Lettron Date: Fri, 24 Sep 2021 12:04:03 +0200 Subject: [PATCH 4/4] refactor: change repo name + use function definition instead of interface send input to afterFunc instead of context value --- README.md | 94 +++++++++++++------- examples/deadline_worker/deadlineworker.go | 4 +- examples/multiple_workers/multipleworkers.go | 6 +- examples/passing_fields/passingfields.go | 6 +- examples/quickstart/quickstart.go | 4 +- examples/timeout_worker/timeoutworker.go | 4 +- go.mod | 2 +- workers.go | 73 +++++++-------- workers_test.go | 36 ++++---- 9 files changed, 130 insertions(+), 99 deletions(-) diff --git a/README.md b/README.md index 9e22637..445ce2b 100644 --- a/README.md +++ b/README.md @@ -1,51 +1,67 @@ -![go workers](https://raw.githubusercontent.com/catmullet/go-workers/assets/constworker_header_anim.gif) +# gorkers + + -[![Mentioned in Awesome Go](https://awesome.re/mentioned-badge-flat.svg)](https://github.com/avelino/awesome-go#goroutines) [![Maintainability](https://api.codeclimate.com/v1/badges/402fee86fbd1e24defb2/maintainability)](https://codeclimate.com/github/catmullet/go-workers/maintainability) [![GoCover](http://gocover.io/_badge/github.com/catmullet/go-workers)](http://gocover.io/github.com/catmullet/go-workers) [![Go Reference](https://pkg.go.dev/badge/github.com/catmullet/go-workers.svg)](https://pkg.go.dev/github.com/catmullet/go-workers) # Examples -* [Quickstart](https://github.com/catmullet/go-workers/blob/master/examples/quickstart/quickstart.go) -* [Multiple Go Workers](https://github.com/catmullet/go-workers/blob/master/examples/multiple_workers/multipleworkers.go) -* [Passing Fields](https://github.com/catmullet/go-workers/blob/master/examples/passing_fields/passingfields.go) + +- [Quickstart](https://github.com/catmullet/go-workers/blob/master/examples/quickstart/quickstart.go) +- [Multiple Go Workers](https://github.com/catmullet/go-workers/blob/master/examples/multiple_workers/multipleworkers.go) +- [Passing Fields](https://github.com/catmullet/go-workers/blob/master/examples/passing_fields/passingfields.go) + # Getting Started + ### Pull in the dependency -```zsh + +```sh go get github.com/catmullet/go-workers ``` ### Add the import to your project -giving an alias helps since go-workers doesn't exactly follow conventions. + +giving an alias helps since go-workers doesn't exactly follow conventions. _(If you're using a JetBrains IDE it should automatically give it an alias)_ + ```go import ( workers "github.com/catmullet/go-workers" ) ``` + ### Create a new worker worker -The NewWorker factory method returns a new worker. + +The NewWorker factory method returns a new worker. _(Method chaining can be performed on this method like calling .Work() immediately after.)_ + ```go type MyWorker struct {} func NewMyWorker() Worker { - return &MyWorker{} + return &MyWorker{} } func (my *MyWorker) Work(in interface{}, out chan<- interface{}) error { - // work iteration here + // work iteration here } -runner := workers.NewRunner(ctx, NewMyWorker(), numberOfWorkers) +runner := workers.NewRunner(ctx, NewMyWorker(), numberOfWorkers, numbersOfBuffers) ``` + ### Send work to worker -Send accepts an interface. So send it anything you want. + +Send accepts an interface. So send it anything you want. + ```go runner.Send("Hello World") ``` + ### Wait for the worker to finish and handle errors + Any error that bubbles up from your worker functions will return here. + ```go if err := runner.Wait(); err != nil { //Handle error @@ -53,72 +69,86 @@ if err := runner.Wait(); err != nil { ``` ## Working With Multiple Workers -### Passing work form one worker to the next + +### Passing work form one worker to the next By using the InFrom method you can tell `workerTwo` to accept output from `workerOne` + ```go -runnerOne := workers.NewRunner(ctx, NewMyWorker(), 100).Work() -runnerTwo := workers.NewRunner(ctx, NewMyWorkerTwo(), 100).InFrom(workerOne).Work() +runnerOne := workers.NewRunner(ctx, NewMyWorker(), 100, 100).Work() +runnerTwo := workers.NewRunner(ctx, NewMyWorkerTwo(), 100, 100).InFrom(workerOne).Work() ``` + ### Accepting output from multiple workers -It is possible to accept output from more than one worker but it is up to you to determine what is coming from which worker. (They will send on the same channel.) + +It is possible to accept output from more than one worker but it is up to you to determine what is coming from which worker. (They will send on the same channel.) + ```go -runnerOne := workers.NewRunner(ctx, NewMyWorker(), 100).Work() -runnerTwo := workers.NewRunner(ctx, NewMyWorkerTwo(), 100).Work() -runnerThree := workers.NewRunner(ctx, NewMyWorkerThree(), 100).InFrom(workerOne, workerTwo).Work() +runnerOne := workers.NewRunner(ctx, NewMyWorker(), 100, 100).Work() +runnerTwo := workers.NewRunner(ctx, NewMyWorkerTwo(), 100, 100).Work() +runnerThree := workers.NewRunner(ctx, NewMyWorkerThree(), 100, 100).InFrom(workerOne, workerTwo).Work() ``` ## Passing Fields To Workers + ### Adding Values -Fields can be passed via the workers object. Be sure as with any concurrency in Golang that your variables are concurrent safe. Most often the golang documentation will state the package or parts of it are concurrent safe. If it does not state so there is a good chance it isn't. Use the sync package to lock and unlock for writes on unsafe variables. (It is good practice NOT to defer in the work function.) + +Fields can be passed via the workers object. Be sure as with any concurrency in Golang that your variables are concurrent safe. Most often the golang documentation will state the package or parts of it are concurrent safe. If it does not state so there is a good chance it isn't. Use the sync package to lock and unlock for writes on unsafe variables. (It is good practice NOT to defer in the work function.) worker **ONLY** use the `Send()` method to get data into your worker. It is not shared memory unlike the worker objects values. ```go type MyWorker struct { - message string + message string } func NewMyWorker(message string) Worker { - return &MyWorker{message} + return &MyWorker{message} } func (my *MyWorker) Work(in interface{}, out chan<- interface{}) error { - fmt.Println(my.message) + fmt.Println(my.message) } -runner := workers.NewRunner(ctx, NewMyWorker(), 100).Work() +runner := workers.NewRunner(ctx, NewMyWorker(), 100, 100).Work() ``` ### Setting Timeouts or Deadlines + If your workers needs to stop at a deadline or you just need to have a timeout use the SetTimeout or SetDeadline methods. (These must be in place before setting the workers off to work.) + ```go // Setting a timeout of 2 seconds - timeoutRunner.SetTimeout(2 * time.Second) + timeoutRunner.SetWorkerTimeout(2 * time.Second) // Setting a deadline of 4 hours from now deadlineRunner.SetDeadline(time.Now().Add(4 * time.Hour)) func workerFunction(in interface{}, out chan<- interface{} error { - fmt.Println(in) - time.Sleep(1 * time.Second) + fmt.Println(in) + time.Sleep(1 * time.Second) } ``` - ## Performance Hints + ### Buffered Writer -If you want to write out to a file or just stdout you can use SetWriterOut(writer io.Writer). The worker will have the following methods available + +If you want to write out to a file or just stdout you can use SetWriterOut(writer io.Writer). The worker will have the following methods available + ```go runner.Println() runner.Printf() runner.Print() ``` -The workers use a buffered writer for output and can be up to 3 times faster than the fmt package. Just be mindful it won't write out to the console as quickly as an unbuffered writer. It will sync and eventually flush everything at the end, making it ideal for writing out to a file. + +The workers use a buffered writer for output and can be up to 3 times faster than the fmt package. Just be mindful it won't write out to the console as quickly as an unbuffered writer. It will sync and eventually flush everything at the end, making it ideal for writing out to a file. ### Using GOGC env variable + If your application is based solely around using workers, consider upping the percentage of when the scheduler will garbage collect. (ex. GOGC=200) 200% -> 300% is a good starting point. Make sure your machine has some good memory behind it. -By upping the percentage your application will interupt the workers less, meaning they get more work done. However, be aware of the rest of your applications needs when modifying this variable. +By upping the percentage your application will interupt the workers less, meaning they get more work done. However, be aware of the rest of your applications needs when modifying this variable. ### Using GOMAXPROCS env variable -For workers that run quick bursts of lots of simple data consider lowering the GOMAXPROCS. Be carfeful though, this can affect your entire applicaitons performance. Profile your application and benchmark it. See where your application runs best. + +For workers that run quick bursts of lots of simple data consider lowering the GOMAXPROCS. Be carfeful though, this can affect your entire applicaitons performance. Profile your application and benchmark it. See where your application runs best. diff --git a/examples/deadline_worker/deadlineworker.go b/examples/deadline_worker/deadlineworker.go index b8d527a..f78f217 100644 --- a/examples/deadline_worker/deadlineworker.go +++ b/examples/deadline_worker/deadlineworker.go @@ -8,14 +8,14 @@ import ( "fmt" "time" - "github.com/catmullet/go-workers" + "github.com/guilhem/gorkers" ) func main() { ctx := context.Background() t := time.Now() - deadlineWorker := workers.NewRunner(ctx, NewDeadlineWorker().Work, 100, 100). + deadlineWorker := gorkers.NewRunner(ctx, NewDeadlineWorker().Work, 100, 100). SetDeadline(t.Add(200 * time.Millisecond)) deadlineWorker.Start() if err := deadlineWorker.Start(); err != nil { diff --git a/examples/multiple_workers/multipleworkers.go b/examples/multiple_workers/multipleworkers.go index 513d1a0..466dc83 100644 --- a/examples/multiple_workers/multipleworkers.go +++ b/examples/multiple_workers/multipleworkers.go @@ -9,7 +9,7 @@ import ( "math/rand" "sync" - "github.com/catmullet/go-workers" + "github.com/guilhem/gorkers" ) var ( @@ -20,8 +20,8 @@ var ( func main() { ctx := context.Background() - workerOne := workers.NewRunner(ctx, NewWorkerOne().Work, 1000, 1000) - workerTwo := workers.NewRunner(ctx, NewWorkerTwo().Work, 1000, 1000).InFrom(workerOne) + workerOne := gorkers.NewRunner(ctx, NewWorkerOne().Work, 1000, 1000) + workerTwo := gorkers.NewRunner(ctx, NewWorkerTwo().Work, 1000, 1000).InFrom(workerOne) if err := workerOne.Start(); err != nil { fmt.Println(err) } diff --git a/examples/passing_fields/passingfields.go b/examples/passing_fields/passingfields.go index 20ccef9..b883284 100644 --- a/examples/passing_fields/passingfields.go +++ b/examples/passing_fields/passingfields.go @@ -8,13 +8,13 @@ import ( "fmt" "math/rand" - "github.com/catmullet/go-workers" + "github.com/guilhem/gorkers" ) func main() { ctx := context.Background() - workerOne := workers.NewRunner(ctx, NewWorkerOne(2).Work, 100, 100) - workerTwo := workers.NewRunner(ctx, NewWorkerTwo(4).Work, 100, 100).InFrom(workerOne) + workerOne := gorkers.NewRunner(ctx, NewWorkerOne(2).Work, 100, 100) + workerTwo := gorkers.NewRunner(ctx, NewWorkerTwo(4).Work, 100, 100).InFrom(workerOne) if err := workerOne.Start(); err != nil { fmt.Println(err) } diff --git a/examples/quickstart/quickstart.go b/examples/quickstart/quickstart.go index 43e2de1..9b5caea 100644 --- a/examples/quickstart/quickstart.go +++ b/examples/quickstart/quickstart.go @@ -9,13 +9,13 @@ import ( "math/rand" "time" - "github.com/catmullet/go-workers" + "github.com/guilhem/gorkers" ) func main() { ctx := context.Background() t := time.Now() - rnr := workers.NewRunner(ctx, work, 100, 100) + rnr := gorkers.NewRunner(ctx, work, 100, 100) if err := rnr.Start(); err != nil { fmt.Println(err) diff --git a/examples/timeout_worker/timeoutworker.go b/examples/timeout_worker/timeoutworker.go index 5727aae..ae06548 100644 --- a/examples/timeout_worker/timeoutworker.go +++ b/examples/timeout_worker/timeoutworker.go @@ -8,13 +8,13 @@ import ( "fmt" "time" - "github.com/catmullet/go-workers" + "github.com/guilhem/gorkers" ) func main() { ctx := context.Background() - timeoutWorker := workers.NewRunner(ctx, work, 10, 10).SetWorkerTimeout(100 * time.Millisecond) + timeoutWorker := gorkers.NewRunner(ctx, work, 10, 10).SetWorkerTimeout(100 * time.Millisecond) err := timeoutWorker.Start() if err != nil { fmt.Println(err) diff --git a/go.mod b/go.mod index 684b7a6..0868b8b 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,3 @@ -module github.com/catmullet/go-workers +module github.com/guilhem/gorkers go 1.15 diff --git a/workers.go b/workers.go index d06b3f8..dfaf611 100644 --- a/workers.go +++ b/workers.go @@ -1,4 +1,4 @@ -package workers +package gorkers import ( "context" @@ -7,13 +7,13 @@ import ( "time" ) -// Worker Contains the work function. -// Work() get input and could put in outChan for followers. +// WorkFunc get input and could put in outChan for followers. // ⚠️ outChan could be closed if follower is stoped before producer. // error returned can be process by afterFunc but will be ignored by default. -type Worker interface { - Work(ctx context.Context, in interface{}, out chan<- interface{}) error -} +type WorkFunc func(ctx context.Context, in interface{}, out chan<- interface{}) error + +type BeforeFunc func(ctx context.Context) error +type AfterFunc func(ctx context.Context, in interface{}, err error) error type Runner struct { ctx context.Context @@ -24,9 +24,9 @@ type Runner struct { outChan chan interface{} limiter chan struct{} - afterFunc func(ctx context.Context, err error) error - workFunc func(ctx context.Context, in interface{}, out chan<- interface{}) error - beforeFunc func(ctx context.Context) error + afterFunc AfterFunc + workFunc WorkFunc + beforeFunc BeforeFunc timeout time.Duration @@ -36,7 +36,7 @@ type Runner struct { } // NewRunner Factory function for a new Runner. The Runner will handle running the workers logic. -func NewRunner(ctx context.Context, w Worker, numWorkers int64, buffer int64) *Runner { +func NewRunner(ctx context.Context, w WorkFunc, numWorkers int64, buffer int64) *Runner { runnerCtx, runnerCancel := context.WithCancel(ctx) inputCtx, inputCancel := context.WithCancel(runnerCtx) @@ -48,8 +48,8 @@ func NewRunner(ctx context.Context, w Worker, numWorkers int64, buffer int64) *R inChan: make(chan interface{}, buffer), outChan: nil, limiter: make(chan struct{}, numWorkers), - afterFunc: func(ctx context.Context, err error) error { return nil }, - workFunc: w.Work, + afterFunc: func(ctx context.Context, in interface{}, err error) error { return nil }, + workFunc: w, beforeFunc: func(ctx context.Context) error { return nil }, numWorkers: numWorkers, started: new(sync.Once), @@ -97,7 +97,7 @@ func (r *Runner) Start() error { } // BeforeFunc Function to be run before worker starts processing. -func (r *Runner) BeforeFunc(f func(ctx context.Context) error) *Runner { +func (r *Runner) BeforeFunc(f BeforeFunc) *Runner { r.beforeFunc = f return r } @@ -107,7 +107,7 @@ func (r *Runner) BeforeFunc(f func(ctx context.Context) error) *Runner { // input can be retreive with context value: // ctx.Value(workers.InputKey{}) // ⚠️ If an error is returned it stop Runner execution. -func (r *Runner) AfterFunc(f func(ctx context.Context, err error) error) *Runner { +func (r *Runner) AfterFunc(f AfterFunc) *Runner { r.afterFunc = f return r } @@ -156,8 +156,6 @@ func (r *Runner) Stop() *Runner { return r } -type InputKey struct{} - // work starts processing input and limits worker instance number. func (r *Runner) work() { var wg sync.WaitGroup @@ -172,30 +170,33 @@ func (r *Runner) work() { select { case <-r.ctx.Done(): return - case input, open := <-r.inChan: - if !open { + case r.limiter <- struct{}{}: + // slot available for worker + select { + case <-r.ctx.Done(): return - } - wg.Add(1) - - r.limiter <- struct{}{} + case input, open := <-r.inChan: + if !open { + return + } + wg.Add(1) - inputCtx := context.WithValue(r.ctx, InputKey{}, input) - workCtx, workCancel := context.WithCancel(inputCtx) - if r.timeout > 0 { - workCtx, workCancel = context.WithTimeout(inputCtx, r.timeout) - } + workCtx, workCancel := context.WithCancel(r.ctx) + if r.timeout > 0 { + workCtx, workCancel = context.WithTimeout(r.ctx, r.timeout) + } - go func() { - defer func() { - <-r.limiter - workCancel() - wg.Done() + go func() { + defer func() { + <-r.limiter + workCancel() + wg.Done() + }() + if err := r.afterFunc(workCtx, input, r.workFunc(workCtx, input, r.outChan)); err != nil { + r.cancel() + } }() - if err := r.afterFunc(inputCtx, r.workFunc(workCtx, input, r.outChan)); err != nil { - r.cancel() - } - }() + } } } } diff --git a/workers_test.go b/workers_test.go index 266d795..03f6e1f 100644 --- a/workers_test.go +++ b/workers_test.go @@ -1,4 +1,4 @@ -package workers_test +package gorkers_test import ( "context" @@ -12,7 +12,7 @@ import ( "testing" "time" - "github.com/catmullet/go-workers" + "github.com/guilhem/gorkers" ) const ( @@ -103,8 +103,8 @@ var ( }, } - getWorker = func(ctx context.Context, wt workerTest) *workers.Runner { - worker := workers.NewRunner(ctx, wt.worker, wt.numWorkers, wt.numWorkers) + getWorker = func(ctx context.Context, wt workerTest) *gorkers.Runner { + worker := gorkers.NewRunner(ctx, wt.worker, wt.numWorkers, wt.numWorkers) if wt.timeout > 0 { worker.SetWorkerTimeout(wt.timeout) } @@ -119,7 +119,7 @@ type workerTest struct { name string timeout time.Duration deadline func() time.Time - worker workers.Worker + worker gorkers.WorkFunc numWorkers int64 testSignal bool errExpected bool @@ -129,8 +129,8 @@ type TestWorkerObject struct { workFunc func(ctx context.Context, in interface{}, out chan<- interface{}) error } -func NewTestWorkerObject(wf func(ctx context.Context, in interface{}, out chan<- interface{}) error) workers.Worker { - return &TestWorkerObject{wf} +func NewTestWorkerObject(wf func(ctx context.Context, in interface{}, out chan<- interface{}) error) gorkers.WorkFunc { + return wf } func (tw *TestWorkerObject) Work(ctx context.Context, in interface{}, out chan<- interface{}) error { @@ -177,7 +177,7 @@ func TestWorkers(t *testing.T) { ctx := context.Background() workerOne := getWorker(ctx, tt) // always need a consumer for the out tests so using basic here. - workerTwo := workers.NewRunner(ctx, NewTestWorkerObject(workBasicNoOut()), workerCount, workerCount).InFrom(workerOne) + workerTwo := gorkers.NewRunner(ctx, NewTestWorkerObject(workBasicNoOut()), workerCount, workerCount).InFrom(workerOne) if err := workerOne.Start(); err != nil && !tt.errExpected { t.Error(err) @@ -201,8 +201,8 @@ func TestWorkersFinish100(t *testing.T) { ctx := context.Background() w1 := NewWorkerOne() w2 := NewWorkerTwo() - workerOne := workers.NewRunner(ctx, w1, 1000, 10) - workerTwo := workers.NewRunner(ctx, w2, 1000, 10000).InFrom(workerOne) + workerOne := gorkers.NewRunner(ctx, w1.Work, 1000, 10) + workerTwo := gorkers.NewRunner(ctx, w2.Work, 1000, 10000).InFrom(workerOne) workerOne.Start() workerTwo.Start() @@ -231,8 +231,8 @@ func TestWorkersFinish100000(t *testing.T) { ctx := context.Background() w1 := NewWorkerOne() w2 := NewWorkerTwo() - workerOne := workers.NewRunner(ctx, w1, 1000, 2000) - workerTwo := workers.NewRunner(ctx, w2, 1000, 1).InFrom(workerOne) + workerOne := gorkers.NewRunner(ctx, w1.Work, 1000, 2000) + workerTwo := gorkers.NewRunner(ctx, w2.Work, 1000, 1).InFrom(workerOne) workerOne.Start() workerTwo.Start() @@ -261,8 +261,8 @@ func TestWorkersFinish1000000(t *testing.T) { ctx := context.Background() w1 := NewWorkerOne() w2 := NewWorkerTwo() - workerOne := workers.NewRunner(ctx, w1, 1000, 1000) - workerTwo := workers.NewRunner(ctx, w2, 1000, 500).InFrom(workerOne) + workerOne := gorkers.NewRunner(ctx, w1.Work, 1000, 1000) + workerTwo := gorkers.NewRunner(ctx, w2.Work, 1000, 500).InFrom(workerOne) workerOne.Start() workerTwo.Start() @@ -287,7 +287,7 @@ func TestWorkersFinish1000000(t *testing.T) { } func BenchmarkGoWorkers1to1(b *testing.B) { - worker := workers.NewRunner(context.Background(), NewTestWorkerObject(workBasicNoOut()), 1000, 2000) + worker := gorkers.NewRunner(context.Background(), NewTestWorkerObject(workBasicNoOut()), 1000, 2000) worker.Start() b.ResetTimer() @@ -303,7 +303,7 @@ func BenchmarkGoWorkers1to1(b *testing.B) { func Benchmark100GoWorkers(b *testing.B) { b.ReportAllocs() - worker := workers.NewRunner(context.Background(), NewTestWorkerObject(workBasicNoOut()), 100, 200) + worker := gorkers.NewRunner(context.Background(), NewTestWorkerObject(workBasicNoOut()), 100, 200) worker.Start() b.ResetTimer() @@ -316,7 +316,7 @@ func Benchmark100GoWorkers(b *testing.B) { func Benchmark1000GoWorkers(b *testing.B) { b.ReportAllocs() - worker := workers.NewRunner(context.Background(), NewTestWorkerObject(workBasicNoOut()), 1000, 500) + worker := gorkers.NewRunner(context.Background(), NewTestWorkerObject(workBasicNoOut()), 1000, 500) worker.Start() b.ResetTimer() @@ -329,7 +329,7 @@ func Benchmark1000GoWorkers(b *testing.B) { func Benchmark10000GoWorkers(b *testing.B) { b.ReportAllocs() - worker := workers.NewRunner(context.Background(), NewTestWorkerObject(workBasicNoOut()), 10000, 5000) + worker := gorkers.NewRunner(context.Background(), NewTestWorkerObject(workBasicNoOut()), 10000, 5000) worker.Start() b.ResetTimer()