diff --git a/README.md b/README.md index 9e22637..445ce2b 100644 --- a/README.md +++ b/README.md @@ -1,51 +1,67 @@ -![go workers](https://raw.githubusercontent.com/catmullet/go-workers/assets/constworker_header_anim.gif) +# gorkers + + -[![Mentioned in Awesome Go](https://awesome.re/mentioned-badge-flat.svg)](https://github.com/avelino/awesome-go#goroutines) [![Maintainability](https://api.codeclimate.com/v1/badges/402fee86fbd1e24defb2/maintainability)](https://codeclimate.com/github/catmullet/go-workers/maintainability) [![GoCover](http://gocover.io/_badge/github.com/catmullet/go-workers)](http://gocover.io/github.com/catmullet/go-workers) [![Go Reference](https://pkg.go.dev/badge/github.com/catmullet/go-workers.svg)](https://pkg.go.dev/github.com/catmullet/go-workers) # Examples -* [Quickstart](https://github.com/catmullet/go-workers/blob/master/examples/quickstart/quickstart.go) -* [Multiple Go Workers](https://github.com/catmullet/go-workers/blob/master/examples/multiple_workers/multipleworkers.go) -* [Passing Fields](https://github.com/catmullet/go-workers/blob/master/examples/passing_fields/passingfields.go) + +- [Quickstart](https://github.com/catmullet/go-workers/blob/master/examples/quickstart/quickstart.go) +- [Multiple Go Workers](https://github.com/catmullet/go-workers/blob/master/examples/multiple_workers/multipleworkers.go) +- [Passing Fields](https://github.com/catmullet/go-workers/blob/master/examples/passing_fields/passingfields.go) + # Getting Started + ### Pull in the dependency -```zsh + +```sh go get github.com/catmullet/go-workers ``` ### Add the import to your project -giving an alias helps since go-workers doesn't exactly follow conventions. + +giving an alias helps since go-workers doesn't exactly follow conventions. _(If you're using a JetBrains IDE it should automatically give it an alias)_ + ```go import ( workers "github.com/catmullet/go-workers" ) ``` + ### Create a new worker worker -The NewWorker factory method returns a new worker. + +The NewWorker factory method returns a new worker. _(Method chaining can be performed on this method like calling .Work() immediately after.)_ + ```go type MyWorker struct {} func NewMyWorker() Worker { - return &MyWorker{} + return &MyWorker{} } func (my *MyWorker) Work(in interface{}, out chan<- interface{}) error { - // work iteration here + // work iteration here } -runner := workers.NewRunner(ctx, NewMyWorker(), numberOfWorkers) +runner := workers.NewRunner(ctx, NewMyWorker(), numberOfWorkers, numbersOfBuffers) ``` + ### Send work to worker -Send accepts an interface. So send it anything you want. + +Send accepts an interface. So send it anything you want. + ```go runner.Send("Hello World") ``` + ### Wait for the worker to finish and handle errors + Any error that bubbles up from your worker functions will return here. + ```go if err := runner.Wait(); err != nil { //Handle error @@ -53,72 +69,86 @@ if err := runner.Wait(); err != nil { ``` ## Working With Multiple Workers -### Passing work form one worker to the next + +### Passing work form one worker to the next By using the InFrom method you can tell `workerTwo` to accept output from `workerOne` + ```go -runnerOne := workers.NewRunner(ctx, NewMyWorker(), 100).Work() -runnerTwo := workers.NewRunner(ctx, NewMyWorkerTwo(), 100).InFrom(workerOne).Work() +runnerOne := workers.NewRunner(ctx, NewMyWorker(), 100, 100).Work() +runnerTwo := workers.NewRunner(ctx, NewMyWorkerTwo(), 100, 100).InFrom(workerOne).Work() ``` + ### Accepting output from multiple workers -It is possible to accept output from more than one worker but it is up to you to determine what is coming from which worker. (They will send on the same channel.) + +It is possible to accept output from more than one worker but it is up to you to determine what is coming from which worker. (They will send on the same channel.) + ```go -runnerOne := workers.NewRunner(ctx, NewMyWorker(), 100).Work() -runnerTwo := workers.NewRunner(ctx, NewMyWorkerTwo(), 100).Work() -runnerThree := workers.NewRunner(ctx, NewMyWorkerThree(), 100).InFrom(workerOne, workerTwo).Work() +runnerOne := workers.NewRunner(ctx, NewMyWorker(), 100, 100).Work() +runnerTwo := workers.NewRunner(ctx, NewMyWorkerTwo(), 100, 100).Work() +runnerThree := workers.NewRunner(ctx, NewMyWorkerThree(), 100, 100).InFrom(workerOne, workerTwo).Work() ``` ## Passing Fields To Workers + ### Adding Values -Fields can be passed via the workers object. Be sure as with any concurrency in Golang that your variables are concurrent safe. Most often the golang documentation will state the package or parts of it are concurrent safe. If it does not state so there is a good chance it isn't. Use the sync package to lock and unlock for writes on unsafe variables. (It is good practice NOT to defer in the work function.) + +Fields can be passed via the workers object. Be sure as with any concurrency in Golang that your variables are concurrent safe. Most often the golang documentation will state the package or parts of it are concurrent safe. If it does not state so there is a good chance it isn't. Use the sync package to lock and unlock for writes on unsafe variables. (It is good practice NOT to defer in the work function.) worker **ONLY** use the `Send()` method to get data into your worker. It is not shared memory unlike the worker objects values. ```go type MyWorker struct { - message string + message string } func NewMyWorker(message string) Worker { - return &MyWorker{message} + return &MyWorker{message} } func (my *MyWorker) Work(in interface{}, out chan<- interface{}) error { - fmt.Println(my.message) + fmt.Println(my.message) } -runner := workers.NewRunner(ctx, NewMyWorker(), 100).Work() +runner := workers.NewRunner(ctx, NewMyWorker(), 100, 100).Work() ``` ### Setting Timeouts or Deadlines + If your workers needs to stop at a deadline or you just need to have a timeout use the SetTimeout or SetDeadline methods. (These must be in place before setting the workers off to work.) + ```go // Setting a timeout of 2 seconds - timeoutRunner.SetTimeout(2 * time.Second) + timeoutRunner.SetWorkerTimeout(2 * time.Second) // Setting a deadline of 4 hours from now deadlineRunner.SetDeadline(time.Now().Add(4 * time.Hour)) func workerFunction(in interface{}, out chan<- interface{} error { - fmt.Println(in) - time.Sleep(1 * time.Second) + fmt.Println(in) + time.Sleep(1 * time.Second) } ``` - ## Performance Hints + ### Buffered Writer -If you want to write out to a file or just stdout you can use SetWriterOut(writer io.Writer). The worker will have the following methods available + +If you want to write out to a file or just stdout you can use SetWriterOut(writer io.Writer). The worker will have the following methods available + ```go runner.Println() runner.Printf() runner.Print() ``` -The workers use a buffered writer for output and can be up to 3 times faster than the fmt package. Just be mindful it won't write out to the console as quickly as an unbuffered writer. It will sync and eventually flush everything at the end, making it ideal for writing out to a file. + +The workers use a buffered writer for output and can be up to 3 times faster than the fmt package. Just be mindful it won't write out to the console as quickly as an unbuffered writer. It will sync and eventually flush everything at the end, making it ideal for writing out to a file. ### Using GOGC env variable + If your application is based solely around using workers, consider upping the percentage of when the scheduler will garbage collect. (ex. GOGC=200) 200% -> 300% is a good starting point. Make sure your machine has some good memory behind it. -By upping the percentage your application will interupt the workers less, meaning they get more work done. However, be aware of the rest of your applications needs when modifying this variable. +By upping the percentage your application will interupt the workers less, meaning they get more work done. However, be aware of the rest of your applications needs when modifying this variable. ### Using GOMAXPROCS env variable -For workers that run quick bursts of lots of simple data consider lowering the GOMAXPROCS. Be carfeful though, this can affect your entire applicaitons performance. Profile your application and benchmark it. See where your application runs best. + +For workers that run quick bursts of lots of simple data consider lowering the GOMAXPROCS. Be carfeful though, this can affect your entire applicaitons performance. Profile your application and benchmark it. See where your application runs best. diff --git a/examples/deadline_worker/deadlineworker.go b/examples/deadline_worker/deadlineworker.go index 832670c..f78f217 100644 --- a/examples/deadline_worker/deadlineworker.go +++ b/examples/deadline_worker/deadlineworker.go @@ -1,3 +1,4 @@ +//go:build ignore // +build ignore package main @@ -5,35 +6,37 @@ package main import ( "context" "fmt" - "github.com/catmullet/go-workers" "time" + + "github.com/guilhem/gorkers" ) func main() { ctx := context.Background() t := time.Now() - deadlineWorker := workers.NewRunner(ctx, NewDeadlineWorker(), 100). - SetDeadline(t.Add(200 * time.Millisecond)).Start() + deadlineWorker := gorkers.NewRunner(ctx, NewDeadlineWorker().Work, 100, 100). + SetDeadline(t.Add(200 * time.Millisecond)) + deadlineWorker.Start() + if err := deadlineWorker.Start(); err != nil { + fmt.Println(err) + } for i := 0; i < 1000000; i++ { deadlineWorker.Send("hello") } - err := deadlineWorker.Wait() - if err != nil { - fmt.Println(err) - } + deadlineWorker.Wait().Stop() fmt.Println("finished") } type DeadlineWorker struct{} -func NewDeadlineWorker() workers.Worker { +func NewDeadlineWorker() *DeadlineWorker { return &DeadlineWorker{} } -func (dlw *DeadlineWorker) Work(in interface{}, out chan<- interface{}) error { +func (dlw *DeadlineWorker) Work(_ context.Context, in interface{}, out chan<- interface{}) error { fmt.Println(in) time.Sleep(1 * time.Second) return nil diff --git a/examples/multiple_workers/multipleworkers.go b/examples/multiple_workers/multipleworkers.go index 86f76d7..466dc83 100644 --- a/examples/multiple_workers/multipleworkers.go +++ b/examples/multiple_workers/multipleworkers.go @@ -1,3 +1,4 @@ +//go:build ignore // +build ignore package main @@ -5,9 +6,10 @@ package main import ( "context" "fmt" - "github.com/catmullet/go-workers" "math/rand" "sync" + + "github.com/guilhem/gorkers" ) var ( @@ -18,19 +20,25 @@ var ( func main() { ctx := context.Background() - workerOne := workers.NewRunner(ctx, NewWorkerOne(), 1000).Start() - workerTwo := workers.NewRunner(ctx, NewWorkerTwo(), 1000).InFrom(workerOne).Start() + workerOne := gorkers.NewRunner(ctx, NewWorkerOne().Work, 1000, 1000) + workerTwo := gorkers.NewRunner(ctx, NewWorkerTwo().Work, 1000, 1000).InFrom(workerOne) + if err := workerOne.Start(); err != nil { + fmt.Println(err) + } + if err := workerTwo.Start(); err != nil { + fmt.Println(err) + } go func() { for i := 0; i < 100000; i++ { workerOne.Send(rand.Intn(100)) } - if err := workerOne.Wait(); err != nil { + if err := workerOne.Wait().Stop(); err != nil { fmt.Println(err) } }() - if err := workerTwo.Wait(); err != nil { + if err := workerTwo.Wait().Stop(); err != nil { fmt.Println(err) } @@ -44,15 +52,15 @@ type WorkerOne struct { type WorkerTwo struct { } -func NewWorkerOne() workers.Worker { +func NewWorkerOne() *WorkerOne { return &WorkerOne{} } -func NewWorkerTwo() workers.Worker { +func NewWorkerTwo() *WorkerTwo { return &WorkerTwo{} } -func (wo *WorkerOne) Work(in interface{}, out chan<- interface{}) error { +func (wo *WorkerOne) Work(_ context.Context, in interface{}, out chan<- interface{}) error { var workerOne = "worker_one" mut.Lock() if val, ok := count[workerOne]; ok { @@ -68,7 +76,7 @@ func (wo *WorkerOne) Work(in interface{}, out chan<- interface{}) error { return nil } -func (wt *WorkerTwo) Work(in interface{}, out chan<- interface{}) error { +func (wt *WorkerTwo) Work(_ context.Context, in interface{}, out chan<- interface{}) error { var workerTwo = "worker_two" mut.Lock() if val, ok := count[workerTwo]; ok { diff --git a/examples/passing_fields/passingfields.go b/examples/passing_fields/passingfields.go index b4cc768..b883284 100644 --- a/examples/passing_fields/passingfields.go +++ b/examples/passing_fields/passingfields.go @@ -1,3 +1,4 @@ +//go:build ignore // +build ignore package main @@ -5,26 +6,28 @@ package main import ( "context" "fmt" - "github.com/catmullet/go-workers" "math/rand" + + "github.com/guilhem/gorkers" ) func main() { ctx := context.Background() - workerOne := workers.NewRunner(ctx, NewWorkerOne(2), 100).Start() - workerTwo := workers.NewRunner(ctx, NewWorkerTwo(4), 100).InFrom(workerOne).Start() - - for i := 0; i < 15; i++ { - workerOne.Send(rand.Intn(100)) + workerOne := gorkers.NewRunner(ctx, NewWorkerOne(2).Work, 100, 100) + workerTwo := gorkers.NewRunner(ctx, NewWorkerTwo(4).Work, 100, 100).InFrom(workerOne) + if err := workerOne.Start(); err != nil { + fmt.Println(err) } - if err := workerOne.Wait(); err != nil { + if err := workerTwo.Start(); err != nil { fmt.Println(err) } - if err := workerTwo.Wait(); err != nil { - fmt.Println(err) + for i := 0; i < 15; i++ { + workerOne.Send(rand.Intn(100)) } + workerOne.Wait().Stop() + workerTwo.Wait().Stop() } type WorkerOne struct { @@ -34,26 +37,26 @@ type WorkerTwo struct { amountToMultiply int } -func NewWorkerOne(amountToMultiply int) workers.Worker { +func NewWorkerOne(amountToMultiply int) *WorkerOne { return &WorkerOne{ amountToMultiply: amountToMultiply, } } -func NewWorkerTwo(amountToMultiply int) workers.Worker { +func NewWorkerTwo(amountToMultiply int) *WorkerTwo { return &WorkerTwo{ amountToMultiply, } } -func (wo *WorkerOne) Work(in interface{}, out chan<- interface{}) error { +func (wo *WorkerOne) Work(ctx context.Context, in interface{}, out chan<- interface{}) error { total := in.(int) * wo.amountToMultiply fmt.Println("worker1", fmt.Sprintf("%d * %d = %d", in.(int), wo.amountToMultiply, total)) out <- total return nil } -func (wt *WorkerTwo) Work(in interface{}, out chan<- interface{}) error { +func (wt *WorkerTwo) Work(ctx context.Context, in interface{}, out chan<- interface{}) error { totalFromWorkerOne := in.(int) fmt.Println("worker2", fmt.Sprintf("%d * %d = %d", totalFromWorkerOne, wt.amountToMultiply, totalFromWorkerOne*wt.amountToMultiply)) return nil diff --git a/examples/quickstart/quickstart.go b/examples/quickstart/quickstart.go index 2829039..9b5caea 100644 --- a/examples/quickstart/quickstart.go +++ b/examples/quickstart/quickstart.go @@ -1,3 +1,4 @@ +//go:build ignore // +build ignore package main @@ -5,36 +6,32 @@ package main import ( "context" "fmt" - "github.com/catmullet/go-workers" "math/rand" "time" + + "github.com/guilhem/gorkers" ) func main() { ctx := context.Background() t := time.Now() - rnr := workers.NewRunner(ctx, NewWorker(), 100).Start() + rnr := gorkers.NewRunner(ctx, work, 100, 100) + + if err := rnr.Start(); err != nil { + fmt.Println(err) + } for i := 0; i < 1000000; i++ { rnr.Send(rand.Intn(100)) } - if err := rnr.Wait(); err != nil { - fmt.Println(err) - } + rnr.Wait().Stop() totalTime := time.Since(t).Milliseconds() fmt.Printf("total time %dms\n", totalTime) } -type WorkerOne struct { -} - -func NewWorker() workers.Worker { - return &WorkerOne{} -} - -func (wo *WorkerOne) Work(in interface{}, out chan<- interface{}) error { +func work(ctx context.Context, in interface{}, out chan<- interface{}) error { total := in.(int) * 2 fmt.Println(fmt.Sprintf("%d * 2 = %d", in.(int), total)) return nil diff --git a/examples/timeout_worker/timeoutworker.go b/examples/timeout_worker/timeoutworker.go index f5226a1..ae06548 100644 --- a/examples/timeout_worker/timeoutworker.go +++ b/examples/timeout_worker/timeoutworker.go @@ -1,3 +1,4 @@ +//go:build ignore // +build ignore package main @@ -5,32 +6,28 @@ package main import ( "context" "fmt" - "github.com/catmullet/go-workers" "time" + + "github.com/guilhem/gorkers" ) func main() { ctx := context.Background() - timeoutWorker := workers.NewRunner(ctx, NewTimeoutWorker(), 10).SetTimeout(100 * time.Millisecond).Start() - - for i := 0; i < 1000000; i++ { - timeoutWorker.Send("hello") - } - - err := timeoutWorker.Wait() + timeoutWorker := gorkers.NewRunner(ctx, work, 10, 10).SetWorkerTimeout(100 * time.Millisecond) + err := timeoutWorker.Start() if err != nil { fmt.Println(err) } -} -type TimeoutWorker struct{} + for i := 0; i < 1000000; i++ { + timeoutWorker.Send("hello") + } -func NewTimeoutWorker() workers.Worker { - return &TimeoutWorker{} + timeoutWorker.Wait().Stop() } -func (tw *TimeoutWorker) Work(in interface{}, out chan<- interface{}) error { +func work(ctx context.Context, in interface{}, out chan<- interface{}) error { fmt.Println(in) time.Sleep(1 * time.Second) return nil diff --git a/go.mod b/go.mod index 684b7a6..0868b8b 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,3 @@ -module github.com/catmullet/go-workers +module github.com/guilhem/gorkers go 1.15 diff --git a/workers.go b/workers.go index dc36433..dfaf611 100644 --- a/workers.go +++ b/workers.go @@ -1,250 +1,202 @@ -package workers +package gorkers import ( "context" "errors" - "os" - "os/signal" "sync" - "syscall" "time" ) -var defaultWatchSignals = []os.Signal{syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL} +// WorkFunc get input and could put in outChan for followers. +// ⚠️ outChan could be closed if follower is stoped before producer. +// error returned can be process by afterFunc but will be ignored by default. +type WorkFunc func(ctx context.Context, in interface{}, out chan<- interface{}) error -// Worker Contains the work function. Allows an input and output to a channel or another worker for pipeline work. -// Return nil if you want the Runner to continue otherwise any error will cause the Runner to shutdown and return the -// error. -type Worker interface { - Work(in interface{}, out chan<- interface{}) error -} - -// Runner Handles the running the Worker logic. -type Runner interface { - BeforeFunc(func(ctx context.Context) error) Runner - AfterFunc(func(ctx context.Context, err error) error) Runner - SetDeadline(t time.Time) Runner - SetTimeout(duration time.Duration) Runner - SetFollower() - Send(in interface{}) - InFrom(w ...Runner) Runner - SetOut(chan interface{}) - Start() Runner - Stop() chan error - Wait() error -} +type BeforeFunc func(ctx context.Context) error +type AfterFunc func(ctx context.Context, in interface{}, err error) error -type runner struct { - ctx context.Context - cancel context.CancelFunc - inChan chan interface{} - outChan chan interface{} - errChan chan error - signalChan chan os.Signal - limiter chan struct{} +type Runner struct { + ctx context.Context + cancel context.CancelFunc + inputCtx context.Context + inputCancel context.CancelFunc + inChan chan interface{} + outChan chan interface{} + limiter chan struct{} - afterFunc func(ctx context.Context, err error) error - workFunc func(in interface{}, out chan<- interface{}) error - beforeFunc func(ctx context.Context) error + afterFunc AfterFunc + workFunc WorkFunc + beforeFunc BeforeFunc - timeout time.Duration - deadline time.Duration - - isLeader bool - stopCalled bool + timeout time.Duration numWorkers int64 - lock *sync.RWMutex - wg *sync.WaitGroup - done *sync.Once - once *sync.Once + started *sync.Once + done chan struct{} } // NewRunner Factory function for a new Runner. The Runner will handle running the workers logic. -func NewRunner(ctx context.Context, w Worker, numWorkers int64) Runner { - var runnerCtx, runnerCancel = context.WithCancel(ctx) - var runner = &runner{ - ctx: runnerCtx, - cancel: runnerCancel, - inChan: make(chan interface{}, numWorkers), - outChan: nil, - errChan: make(chan error, 1), - signalChan: make(chan os.Signal, 1), - limiter: make(chan struct{}, numWorkers), - afterFunc: func(ctx context.Context, err error) error { return err }, - workFunc: w.Work, - beforeFunc: func(ctx context.Context) error { return nil }, - numWorkers: numWorkers, - isLeader: true, - lock: new(sync.RWMutex), - wg: new(sync.WaitGroup), - once: new(sync.Once), - done: new(sync.Once), +func NewRunner(ctx context.Context, w WorkFunc, numWorkers int64, buffer int64) *Runner { + runnerCtx, runnerCancel := context.WithCancel(ctx) + inputCtx, inputCancel := context.WithCancel(runnerCtx) + + runner := &Runner{ + ctx: runnerCtx, + cancel: runnerCancel, + inputCtx: inputCtx, + inputCancel: inputCancel, + inChan: make(chan interface{}, buffer), + outChan: nil, + limiter: make(chan struct{}, numWorkers), + afterFunc: func(ctx context.Context, in interface{}, err error) error { return nil }, + workFunc: w, + beforeFunc: func(ctx context.Context) error { return nil }, + numWorkers: numWorkers, + started: new(sync.Once), + done: make(chan struct{}), } - runner.waitForSignal(defaultWatchSignals...) return runner } -// Send Send an object to the worker for processing. -func (r *runner) Send(in interface{}) { +var ErrInputClosed = errors.New("input closed") + +// Send Send an object to the worker for processing if context is not Done. +func (r *Runner) Send(in interface{}) error { select { - case <-r.ctx.Done(): - return + case <-r.inputCtx.Done(): + return ErrInputClosed case r.inChan <- in: } + return nil } // InFrom Set a worker to accept output from another worker(s). -func (r *runner) InFrom(w ...Runner) Runner { - r.SetFollower() +func (r *Runner) InFrom(w ...*Runner) *Runner { for _, wr := range w { - wr.SetOut(r.inChan) + // in := make(chan interface{}) + // go func(in chan interface{}) { + // for msg := range in { + // if err := r.Send(msg); err != nil { + // return + // } + // } + // }(in) + wr.SetOut(r.inChan) // nolint } return r } -// SetFollower Sets the worker as a follower and does not need to close it's in channel. -func (r *runner) SetFollower() { - r.lock.Lock() - r.isLeader = false - r.lock.Unlock() -} - -// Start Starts the worker on processing. -func (r *runner) Start() Runner { - r.startWork() - return r +// Start execute beforeFunc and launch worker processing. +func (r *Runner) Start() error { + r.started.Do(func() { + if err := r.beforeFunc(r.ctx); err == nil { + go r.work() + } + }) + return nil } // BeforeFunc Function to be run before worker starts processing. -func (r *runner) BeforeFunc(f func(ctx context.Context) error) Runner { +func (r *Runner) BeforeFunc(f BeforeFunc) *Runner { r.beforeFunc = f return r } // AfterFunc Function to be run after worker has stopped. -func (r *runner) AfterFunc(f func(ctx context.Context, err error) error) Runner { +// It can be used for logging and error management. +// input can be retreive with context value: +// ctx.Value(workers.InputKey{}) +// ⚠️ If an error is returned it stop Runner execution. +func (r *Runner) AfterFunc(f AfterFunc) *Runner { r.afterFunc = f return r } +var ErrOutAlready = errors.New("out already set") + // SetOut Allows the setting of a workers out channel, if not already set. -func (r *runner) SetOut(c chan interface{}) { +func (r *Runner) SetOut(c chan interface{}) error { if r.outChan != nil { - return + return ErrOutAlready } r.outChan = c + return nil } -// SetDeadline allows a time to be set when the workers should stop. -// Deadline needs to be handled by the IsDone method. -func (r *runner) SetDeadline(t time.Time) Runner { - r.lock.Lock() - defer r.lock.Unlock() +// SetDeadline allows a time to be set when the Runner should stop. +// ⚠️ Should only be called before Start +func (r *Runner) SetDeadline(t time.Time) *Runner { r.ctx, r.cancel = context.WithDeadline(r.ctx, t) return r } -// SetTimeout allows a time duration to be set when the workers should stop. -// Timeout needs to be handled by the IsDone method. -func (r *runner) SetTimeout(duration time.Duration) Runner { - r.lock.Lock() - defer r.lock.Unlock() +// SetWorkerTimeout allows a time duration to be set when the workers should stop. +// ⚠️ Should only be called before Start +func (r *Runner) SetWorkerTimeout(duration time.Duration) *Runner { r.timeout = duration return r } -// Wait calls stop on workers and waits for the channel to drain. -// !!Should only be called when certain nothing will send to worker. -func (r *runner) Wait() error { - r.waitForDrain() - if err := <-r.Stop(); err != nil && !errors.Is(err, context.Canceled) { - return err +// Wait close the input channel and waits it to drain and process. +func (r *Runner) Wait() *Runner { + if r.inputCtx.Err() == nil { + r.inputCancel() + close(r.inChan) } - return nil -} -// Stop Stops the processing of a worker and closes it's channel in. -// Returns a blocking channel with type error. -// !!Should only be called when certain nothing will send to worker. -func (r *runner) Stop() chan error { - r.done.Do(func() { - if r.inChan != nil && r.isLeader { - close(r.inChan) - } - }) - return r.errChan + <-r.done + + return r } -// IsDone returns a channel signaling the workers context has been canceled. -func (r *runner) IsDone() <-chan struct{} { - return r.ctx.Done() +// Stop Stops the processing of a worker and waits for workers to finish. +func (r *Runner) Stop() *Runner { + r.cancel() + r.Wait() + return r } -// waitForSignal make sure we wait for a term signal and shutdown correctly -func (r *runner) waitForSignal(signals ...os.Signal) { - go func() { - signal.Notify(r.signalChan, signals...) - <-r.signalChan - if r.cancel != nil { - r.cancel() - } +// work starts processing input and limits worker instance number. +func (r *Runner) work() { + var wg sync.WaitGroup + + defer func() { + wg.Wait() + r.cancel() + close(r.done) }() -} -// waitForDrain Waits for the limiter to be zeroed out and the in channel to be empty. -func (r *runner) waitForDrain() { - for len(r.limiter) > 0 || len(r.inChan) > 0 { - // Wait for the drain. - } -} + for { + select { + case <-r.ctx.Done(): + return + case r.limiter <- struct{}{}: + // slot available for worker + select { + case <-r.ctx.Done(): + return + case input, open := <-r.inChan: + if !open { + return + } + wg.Add(1) -// startWork Runs the before function and starts processing until one of three things happen. -// 1. A term signal is received or cancellation of context. -// 2. Stop function is called. -// 3. Worker returns an error. -func (r *runner) startWork() { - var err error - if err = r.beforeFunc(r.ctx); err != nil { - r.errChan <- err - return - } - if r.timeout > 0 { - r.ctx, r.cancel = context.WithTimeout(r.ctx, r.timeout) - } - r.wg.Add(1) - go func() { - var workerWG = new(sync.WaitGroup) - var closeOnce = new(sync.Once) - - // write out error if not nil on exit. - defer func() { - workerWG.Wait() - r.errChan <- err - closeOnce.Do(func() { - if r.outChan != nil { - close(r.outChan) + workCtx, workCancel := context.WithCancel(r.ctx) + if r.timeout > 0 { + workCtx, workCancel = context.WithTimeout(r.ctx, r.timeout) } - }) - r.wg.Done() - }() - for in := range r.inChan { - input := in - r.limiter <- struct{}{} - workerWG.Add(1) - go func() { - defer func() { - <-r.limiter - workerWG.Done() - }() - if err := r.workFunc(input, r.outChan); err != nil { - r.once.Do(func() { - r.errChan <- err + + go func() { + defer func() { + <-r.limiter + workCancel() + wg.Done() + }() + if err := r.afterFunc(workCtx, input, r.workFunc(workCtx, input, r.outChan)); err != nil { r.cancel() - return - }) - } - }() + } + }() + } } - }() + } } diff --git a/workers_test.go b/workers_test.go index 7401727..03f6e1f 100644 --- a/workers_test.go +++ b/workers_test.go @@ -1,16 +1,18 @@ -package workers +package gorkers_test import ( "context" "errors" - "fmt" "math/rand" "os" "runtime" "runtime/debug" "sync" + "sync/atomic" "testing" "time" + + "github.com/guilhem/gorkers" ) const ( @@ -20,11 +22,11 @@ const ( ) type WorkerOne struct { - Count int + Count int32 sync.Mutex } type WorkerTwo struct { - Count int + Count int32 sync.Mutex } @@ -37,15 +39,11 @@ func NewWorkerTwo() *WorkerTwo { } func (wo *WorkerOne) CurrentCount() int { - wo.Lock() - defer wo.Unlock() - return wo.Count + return int(wo.Count) } -func (wo *WorkerOne) Work(in interface{}, out chan<- interface{}) error { - mut.Lock() - wo.Count = wo.Count + 1 - mut.Unlock() +func (wo *WorkerOne) Work(ctx context.Context, in interface{}, out chan<- interface{}) error { + atomic.AddInt32(&wo.Count, 1) total := in.(int) * 2 out <- total @@ -53,15 +51,11 @@ func (wo *WorkerOne) Work(in interface{}, out chan<- interface{}) error { } func (wt *WorkerTwo) CurrentCount() int { - wt.Lock() - defer wt.Unlock() - return wt.Count + return int(wt.Count) } -func (wt *WorkerTwo) Work(in interface{}, out chan<- interface{}) error { - mut.Lock() - wt.Count = wt.Count + 1 - mut.Unlock() +func (wt *WorkerTwo) Work(ctx context.Context, in interface{}, out chan<- interface{}) error { + atomic.AddInt32(&wt.Count, 1) return nil } @@ -109,10 +103,10 @@ var ( }, } - getWorker = func(ctx context.Context, wt workerTest) Runner { - worker := NewRunner(ctx, wt.worker, wt.numWorkers) + getWorker = func(ctx context.Context, wt workerTest) *gorkers.Runner { + worker := gorkers.NewRunner(ctx, wt.worker, wt.numWorkers, wt.numWorkers) if wt.timeout > 0 { - worker.SetTimeout(wt.timeout) + worker.SetWorkerTimeout(wt.timeout) } if wt.deadline != nil { worker.SetDeadline(wt.deadline()) @@ -125,41 +119,41 @@ type workerTest struct { name string timeout time.Duration deadline func() time.Time - worker Worker + worker gorkers.WorkFunc numWorkers int64 testSignal bool errExpected bool } type TestWorkerObject struct { - workFunc func(in interface{}, out chan<- interface{}) error + workFunc func(ctx context.Context, in interface{}, out chan<- interface{}) error } -func NewTestWorkerObject(wf func(in interface{}, out chan<- interface{}) error) Worker { - return &TestWorkerObject{wf} +func NewTestWorkerObject(wf func(ctx context.Context, in interface{}, out chan<- interface{}) error) gorkers.WorkFunc { + return wf } -func (tw *TestWorkerObject) Work(in interface{}, out chan<- interface{}) error { - return tw.workFunc(in, out) +func (tw *TestWorkerObject) Work(ctx context.Context, in interface{}, out chan<- interface{}) error { + return tw.workFunc(ctx, in, out) } -func workBasicNoOut() func(in interface{}, out chan<- interface{}) error { - return func(in interface{}, out chan<- interface{}) error { +func workBasicNoOut() func(ctx context.Context, in interface{}, out chan<- interface{}) error { + return func(ctx context.Context, in interface{}, out chan<- interface{}) error { _ = in.(int) return nil } } -func workBasic() func(in interface{}, out chan<- interface{}) error { - return func(in interface{}, out chan<- interface{}) error { +func workBasic() func(ctx context.Context, in interface{}, out chan<- interface{}) error { + return func(ctx context.Context, in interface{}, out chan<- interface{}) error { i := in.(int) out <- i return nil } } -func workWithError(err error) func(in interface{}, out chan<- interface{}) error { - return func(in interface{}, out chan<- interface{}) error { +func workWithError(err error) func(ctx context.Context, in interface{}, out chan<- interface{}) error { + return func(ctx context.Context, in interface{}, out chan<- interface{}) error { i := in.(int) total := i * rand.Intn(1000) if i == 100 { @@ -181,20 +175,23 @@ func TestWorkers(t *testing.T) { for _, tt := range workerTestScenarios { t.Run(tt.name, func(t *testing.T) { ctx := context.Background() - workerOne := getWorker(ctx, tt).Start() + workerOne := getWorker(ctx, tt) // always need a consumer for the out tests so using basic here. - workerTwo := NewRunner(ctx, NewTestWorkerObject(workBasicNoOut()), workerCount).InFrom(workerOne).Start() - - for i := 0; i < runTimes; i++ { - workerOne.Send(i) - } + workerTwo := gorkers.NewRunner(ctx, NewTestWorkerObject(workBasicNoOut()), workerCount, workerCount).InFrom(workerOne) - if err := workerOne.Wait(); err != nil && (!tt.errExpected) { + if err := workerOne.Start(); err != nil && !tt.errExpected { t.Error(err) } - if err := workerTwo.Wait(); err != nil && !tt.errExpected { + if err := workerTwo.Start(); err != nil && !tt.errExpected { t.Error(err) } + + for i := 0; i < runTimes; i++ { + workerOne.Send(i) + } + + workerOne.Wait().Stop() + workerTwo.Wait().Stop() }) } } @@ -204,27 +201,25 @@ func TestWorkersFinish100(t *testing.T) { ctx := context.Background() w1 := NewWorkerOne() w2 := NewWorkerTwo() - workerOne := NewRunner(ctx, w1, 1000).Start() - workerTwo := NewRunner(ctx, w2, 1000).InFrom(workerOne).Start() + workerOne := gorkers.NewRunner(ctx, w1.Work, 1000, 10) + workerTwo := gorkers.NewRunner(ctx, w2.Work, 1000, 10000).InFrom(workerOne) + workerOne.Start() + workerTwo.Start() for i := 0; i < workCount; i++ { workerOne.Send(rand.Intn(100)) } - if err := workerOne.Wait(); err != nil { - fmt.Println(err) - } + workerOne.Wait().Stop() - if err := workerTwo.Wait(); err != nil { - fmt.Println(err) - } + workerTwo.Wait().Stop() if w1.CurrentCount() != workCount { - t.Log("worker one failed to finish,", "worker_one count", w1.CurrentCount(), "/ 100000") + t.Log("worker one failed to finish,", "worker_one count", w1.CurrentCount(), "/", workCount) t.Fail() } if w2.CurrentCount() != workCount { - t.Log("worker two failed to finish,", "worker_two count", w2.CurrentCount(), "/ 100000") + t.Log("worker two failed to finish,", "worker_two count", w2.CurrentCount(), "/", workCount) t.Fail() } @@ -236,27 +231,25 @@ func TestWorkersFinish100000(t *testing.T) { ctx := context.Background() w1 := NewWorkerOne() w2 := NewWorkerTwo() - workerOne := NewRunner(ctx, w1, 1000).Start() - workerTwo := NewRunner(ctx, w2, 1000).InFrom(workerOne).Start() + workerOne := gorkers.NewRunner(ctx, w1.Work, 1000, 2000) + workerTwo := gorkers.NewRunner(ctx, w2.Work, 1000, 1).InFrom(workerOne) + workerOne.Start() + workerTwo.Start() for i := 0; i < workCount; i++ { workerOne.Send(rand.Intn(100)) } - if err := workerOne.Wait(); err != nil { - fmt.Println(err) - } + workerOne.Wait().Stop() - if err := workerTwo.Wait(); err != nil { - fmt.Println(err) - } + workerTwo.Wait().Stop() if w1.CurrentCount() != workCount { - t.Log("worker one failed to finish,", "worker_one count", w1.CurrentCount(), "/ 100000") + t.Log("worker one failed to finish,", "worker_one count", w1.CurrentCount(), "/", workCount) t.Fail() } if w2.CurrentCount() != workCount { - t.Log("worker two failed to finish,", "worker_two count", w2.CurrentCount(), "/ 100000") + t.Log("worker two failed to finish,", "worker_two count", w2.CurrentCount(), "/", workCount) t.Fail() } @@ -268,27 +261,25 @@ func TestWorkersFinish1000000(t *testing.T) { ctx := context.Background() w1 := NewWorkerOne() w2 := NewWorkerTwo() - workerOne := NewRunner(ctx, w1, 1000).Start() - workerTwo := NewRunner(ctx, w2, 1000).InFrom(workerOne).Start() + workerOne := gorkers.NewRunner(ctx, w1.Work, 1000, 1000) + workerTwo := gorkers.NewRunner(ctx, w2.Work, 1000, 500).InFrom(workerOne) + workerOne.Start() + workerTwo.Start() for i := 0; i < workCount; i++ { workerOne.Send(rand.Intn(100)) } - if err := workerOne.Wait(); err != nil { - fmt.Println(err) - } + workerOne.Wait().Stop() - if err := workerTwo.Wait(); err != nil { - fmt.Println(err) - } + workerTwo.Wait().Stop() if w1.CurrentCount() != workCount { - t.Log("worker one failed to finish,", "worker_one count", w1.CurrentCount(), "/ 100000") + t.Log("worker one failed to finish,", "worker_one count", w1.CurrentCount(), "/", workCount) t.Fail() } if w2.CurrentCount() != workCount { - t.Log("worker two failed to finish,", "worker_two count", w2.CurrentCount(), "/ 100000") + t.Log("worker two failed to finish,", "worker_two count", w2.CurrentCount(), "/", workCount) t.Fail() } @@ -296,7 +287,8 @@ func TestWorkersFinish1000000(t *testing.T) { } func BenchmarkGoWorkers1to1(b *testing.B) { - worker := NewRunner(context.Background(), NewTestWorkerObject(workBasicNoOut()), 1000).Start() + worker := gorkers.NewRunner(context.Background(), NewTestWorkerObject(workBasicNoOut()), 1000, 2000) + worker.Start() b.ResetTimer() for i := 0; i < b.N; i++ { @@ -306,49 +298,44 @@ func BenchmarkGoWorkers1to1(b *testing.B) { } b.StopTimer() - if err := worker.Wait(); err != nil { - b.Error(err) - } + worker.Wait().Stop() } func Benchmark100GoWorkers(b *testing.B) { b.ReportAllocs() - worker := NewRunner(context.Background(), NewTestWorkerObject(workBasicNoOut()), 100).Start() + worker := gorkers.NewRunner(context.Background(), NewTestWorkerObject(workBasicNoOut()), 100, 200) + worker.Start() b.ResetTimer() for i := 0; i < b.N; i++ { worker.Send(i) } - if err := worker.Wait(); err != nil { - b.Error(err) - } + worker.Wait().Stop() } func Benchmark1000GoWorkers(b *testing.B) { b.ReportAllocs() - worker := NewRunner(context.Background(), NewTestWorkerObject(workBasicNoOut()), 1000).Start() + worker := gorkers.NewRunner(context.Background(), NewTestWorkerObject(workBasicNoOut()), 1000, 500) + worker.Start() b.ResetTimer() for i := 0; i < b.N; i++ { worker.Send(i) } - if err := worker.Wait(); err != nil { - b.Error(err) - } + worker.Wait().Stop() } func Benchmark10000GoWorkers(b *testing.B) { b.ReportAllocs() - worker := NewRunner(context.Background(), NewTestWorkerObject(workBasicNoOut()), 10000).Start() + worker := gorkers.NewRunner(context.Background(), NewTestWorkerObject(workBasicNoOut()), 10000, 5000) + worker.Start() b.ResetTimer() for i := 0; i < b.N; i++ { worker.Send(i) } - if err := worker.Wait(); err != nil { - b.Error(err) - } + worker.Wait().Stop() }