@@ -10,15 +10,51 @@ import (
1010 "time"
1111)
1212
13+ // Options represent parameters for batch.Processor. They should be passed to StartProcessor function. All options
14+ // (as the name suggest) are optional and have default values.
1315type Options [Resource any ] struct {
14- MinDuration time.Duration
15- MaxDuration time.Duration
16- LoadResource func (_ context.Context , key string ) (Resource , error )
17- SaveResource func (_ context.Context , key string , _ Resource ) error
18- GoRoutines int
19- GoRoutineNumberForKey func (_ string , goroutines int ) int
16+ // All batches will be run for at least MinDuration.
17+ //
18+ // By default, 100ms.
19+ MinDuration time.Duration
20+ // Batch will have timeout with MaxDuration. Context with this timeout will be passed to
21+ // LoadResource and SaveResource functions, which can abort the batch by returning an error.
22+ //
23+ // By default, 2*MinDuration.
24+ MaxDuration time.Duration
25+ // LoadResource loads resource with given key from a database. Returning an error aborts the batch.
26+ // This function is called in the beginning of each new batch. Context passed as a first parameter
27+ // has a timeout calculated using batch MaxDuration. You can use this information to abort loading resource
28+ // if it takes too long.
29+ //
30+ // By default, returns zero-value Resource.
31+ LoadResource func (_ context.Context , key string ) (Resource , error )
32+ // SaveResource saves resource with given key to a database. Returning an error aborts the batch.
33+ // This function is called at the end of each batch. Context passed as a first parameter
34+ // has a timeout calculated using batch MaxDuration. You can use this information to abort saving resource
35+ // if it takes too long.
36+ //
37+ // By default, does nothing.
38+ SaveResource func (_ context.Context , key string , _ Resource ) error
39+ // GoRoutines specifies how many goroutines should be used to run batch operations.
40+ //
41+ // By default, 16 * number of CPUs.
42+ GoRoutines int
43+ // GoRoutineNumberForKey returns go-routine number which will be used to run operation on
44+ // a given resource key. This function is crucial to properly serialize requests.
45+ //
46+ // This function must be deterministic - it should always return the same go-routine number
47+ // for given combination of key and goroutines parameters.
48+ //
49+ // By default, GoroutineNumberForKey function is used. This implementation calculates hash
50+ // on a given key and use modulo to calculate go-routine number.
51+ GoRoutineNumberForKey func (key string , goroutines int ) int
2052}
2153
54+ // StartProcessor starts batch processor which will run operations in batches.
55+ //
56+ // Please note that Processor is a go-routine pool internally and should be stopped when no longer needed.
57+ // Please use Processor.Stop method to stop it.
2258func StartProcessor [Resource any ](options Options [Resource ]) * Processor [Resource ] {
2359 options = options .withDefaults ()
2460
@@ -52,6 +88,7 @@ func StartProcessor[Resource any](options Options[Resource]) *Processor[Resource
5288 }
5389}
5490
91+ // Processor represents instance of batch processor which can be used to issue operations which run in a batch manner.
5592type Processor [Resource any ] struct {
5693 options Options [Resource ]
5794 stopped chan struct {}
@@ -92,11 +129,17 @@ func (s Options[Resource]) withDefaults() Options[Resource] {
92129 return s
93130}
94131
95- // Run lets you run an operation which will be run along other operations in a single batch (as a single atomic transaction).
96- // If there is no pending batch then the batch will be started. Operations are run sequentially.
132+ // Run lets you run an operation on a resource with given key. Operation will run along other operations in batches.
133+ // If there is no pending batch then the new batch will be started and will run for at least MinDuration. After the
134+ // MinDuration no new operations will be accepted and SaveResource function will be called.
135+ //
136+ // Operations are run sequentially. No manual locking is required inside operation. Operation should be fast, which
137+ // basically means that any I/O should be avoided at all cost.
97138//
98- // Run ends when the entire batch has ended.
99- func (p * Processor [Resource ]) Run (key string , op func (Resource )) error {
139+ // Run ends when the entire batch has ended. It returns error when batch is aborted or processor is stopped.
140+ // Only LoadResource and SaveResource functions can abort the batch by returning an error. If error was reported
141+ // for a batch all Run calls assigned to this batch will get this error.
142+ func (p * Processor [Resource ]) Run (key string , _operation func (Resource )) error {
100143 select {
101144 case <- p .stopped :
102145 return ProcessorStopped
@@ -110,7 +153,7 @@ func (p *Processor[Resource]) Run(key string, op func(Resource)) error {
110153
111154 p .workerChannels [goRoutineNumber ] <- operation [Resource ]{
112155 resourceKey : key ,
113- run : op ,
156+ run : _operation ,
114157 result : result ,
115158 }
116159
0 commit comments