Skip to content

Commit 041ce08

Browse files
committed
Processor.Stop will wait until all batches are finished
Processor.Stop() should block until all batches are ended (and resources stored). This is needed to implement graceful shutdown in applications.
1 parent e46a4cd commit 041ce08

File tree

4 files changed

+97
-43
lines changed

4 files changed

+97
-43
lines changed

_example/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ func main() {
2323
SaveResource: db.SaveTrain,
2424
},
2525
)
26+
defer processor.Stop()
2627

2728
trainService := train.Service{
2829
BatchProcessor: processor,

batch.go

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package batch
66
import (
77
"context"
88
"runtime"
9+
"sync"
910
"time"
1011
)
1112

@@ -21,31 +22,41 @@ type Options[Resource any] struct {
2122
func StartProcessor[Resource any](options Options[Resource]) *Processor[Resource] {
2223
options = options.withDefaults()
2324

24-
operations := make([]chan operation[Resource], options.GoRoutines)
25+
workerChannels := make([]chan operation[Resource], options.GoRoutines)
26+
27+
var workersFinished sync.WaitGroup
28+
workersFinished.Add(options.GoRoutines)
29+
2530
for i := 0; i < options.GoRoutines; i++ {
26-
operations[i] = make(chan operation[Resource])
27-
w := worker[Resource]{
28-
goRoutineNumber: i,
29-
operations: operations[i],
30-
loadResource: options.LoadResource,
31-
saveResource: options.SaveResource,
32-
minDuration: options.MinDuration,
33-
maxDuration: options.MaxDuration,
31+
workerChannels[i] = make(chan operation[Resource])
32+
_worker := worker[Resource]{
33+
goRoutineNumber: i,
34+
incomingOperations: workerChannels[i],
35+
loadResource: options.LoadResource,
36+
saveResource: options.SaveResource,
37+
minDuration: options.MinDuration,
38+
maxDuration: options.MaxDuration,
3439
}
35-
go w.run()
40+
41+
go func() {
42+
_worker.run()
43+
workersFinished.Done()
44+
}()
3645
}
3746

3847
return &Processor[Resource]{
39-
options: options,
40-
stopped: make(chan struct{}),
41-
operations: operations,
48+
options: options,
49+
stopped: make(chan struct{}),
50+
workerChannels: workerChannels,
51+
workersFinished: &workersFinished,
4252
}
4353
}
4454

4555
type Processor[Resource any] struct {
46-
options Options[Resource]
47-
stopped chan struct{}
48-
operations []chan operation[Resource]
56+
options Options[Resource]
57+
stopped chan struct{}
58+
workerChannels []chan operation[Resource]
59+
workersFinished *sync.WaitGroup
4960
}
5061

5162
func (s Options[Resource]) withDefaults() Options[Resource] {
@@ -97,7 +108,7 @@ func (p *Processor[Resource]) Run(key string, op func(Resource)) error {
97108

98109
goRoutineNumber := p.options.GoRoutineNumberForKey(key, p.options.GoRoutines)
99110

100-
p.operations[goRoutineNumber] <- operation[Resource]{
111+
p.workerChannels[goRoutineNumber] <- operation[Resource]{
101112
resourceKey: key,
102113
run: op,
103114
result: result,
@@ -107,9 +118,13 @@ func (p *Processor[Resource]) Run(key string, op func(Resource)) error {
107118
}
108119

109120
// Stop ends all running batches. No new operations will be accepted.
121+
// Stop blocks until all pending batches are ended and resources saved.
110122
func (p *Processor[Resource]) Stop() {
111123
close(p.stopped)
112-
for _, op := range p.operations {
113-
close(op)
124+
125+
for _, channel := range p.workerChannels {
126+
close(channel)
114127
}
128+
129+
p.workersFinished.Wait()
115130
}

batch_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ func TestProcessor_Run(t *testing.T) {
2323
futureValue := FutureValue[*resource]()
2424

2525
processor := batch.StartProcessor(batch.Options[*resource]{})
26+
defer processor.Stop()
2627
// when
2728
err := processor.Run("key", func(c *resource) {
2829
futureValue.Set(c)
@@ -44,6 +45,7 @@ func TestProcessor_Run(t *testing.T) {
4445
return res, nil
4546
},
4647
})
48+
defer processor.Stop()
4749
// when
4850
err := processor.Run(key, func(r *resource) {
4951
futureValue.Set(r)
@@ -64,6 +66,7 @@ func TestProcessor_Run(t *testing.T) {
6466
LoadResource: db.Load,
6567
SaveResource: db.Save,
6668
})
69+
defer processor.Stop()
6770
// when
6871
err := processor.Run(key, func(r *resource) {
6972
r.value = 2
@@ -83,6 +86,7 @@ func TestProcessor_Run(t *testing.T) {
8386
MinDuration: 100 * time.Millisecond,
8487
},
8588
)
89+
defer processor.Stop()
8690

8791
started := time.Now()
8892
// when
@@ -95,6 +99,7 @@ func TestProcessor_Run(t *testing.T) {
9599

96100
t.Run("should run batch with default min duration", func(t *testing.T) {
97101
processor := batch.StartProcessor(batch.Options[empty]{})
102+
defer processor.Stop()
98103

99104
started := time.Now()
100105
// when
@@ -117,6 +122,7 @@ func TestProcessor_Run(t *testing.T) {
117122
return nil
118123
},
119124
})
125+
defer processor.Stop()
120126

121127
key := ""
122128

@@ -146,6 +152,7 @@ func TestProcessor_Run(t *testing.T) {
146152
SaveResource: db.Save,
147153
},
148154
)
155+
defer processor.Stop()
149156
// when
150157
err := processor.Run(key, func(*resource) {})
151158
// then
@@ -172,6 +179,7 @@ func TestProcessor_Run(t *testing.T) {
172179
SaveResource: db.Save,
173180
},
174181
)
182+
defer processor.Stop()
175183
// when
176184
err := processor.Run(key, func(*resource) {})
177185
// then
@@ -196,6 +204,7 @@ func TestProcessor_Run(t *testing.T) {
196204
},
197205
},
198206
)
207+
defer processor.Stop()
199208
// when
200209
err := processor.Run(key, func(empty) {})
201210
// then
@@ -211,6 +220,7 @@ func TestProcessor_Run(t *testing.T) {
211220
},
212221
},
213222
)
223+
defer processor.Stop()
214224

215225
const iterations = 1000
216226

@@ -273,6 +283,34 @@ func TestProcessor_Stop(t *testing.T) {
273283
elapsed := time.Now().Sub(started)
274284
assert.True(t, elapsed < minDuration, "stopped batch should take less time than batch min duration")
275285
})
286+
287+
t.Run("Stop should wait until all batches are finished", func(t *testing.T) {
288+
var operationExecuted sync.WaitGroup
289+
operationExecuted.Add(1)
290+
291+
batchFinished := false
292+
processor := batch.StartProcessor(
293+
batch.Options[empty]{
294+
MinDuration: time.Second,
295+
MaxDuration: time.Second,
296+
SaveResource: func(ctx context.Context, key string, _ empty) error {
297+
<-ctx.Done()
298+
batchFinished = true
299+
return nil
300+
},
301+
},
302+
)
303+
go func() {
304+
_ = processor.Run("key", func(empty) {
305+
operationExecuted.Done()
306+
})
307+
}()
308+
operationExecuted.Wait()
309+
// when
310+
processor.Stop()
311+
// then
312+
assert.True(t, batchFinished)
313+
})
276314
}
277315

278316
type resource struct{ value int }

worker.go

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@ type operation[Resource any] struct {
1515
}
1616

1717
type worker[Resource any] struct {
18-
goRoutineNumber int
19-
operations chan operation[Resource]
20-
loadResource func(context.Context, string) (Resource, error)
21-
saveResource func(context.Context, string, Resource) error
22-
minDuration time.Duration
23-
maxDuration time.Duration
18+
goRoutineNumber int
19+
incomingOperations <-chan operation[Resource]
20+
loadResource func(context.Context, string) (Resource, error)
21+
saveResource func(context.Context, string, Resource) error
22+
minDuration time.Duration
23+
maxDuration time.Duration
2424
}
2525

2626
func (w worker[Resource]) run() {
@@ -34,50 +34,50 @@ func (w worker[Resource]) run() {
3434
select {
3535
case <-ticker.C:
3636
now := time.Now()
37-
for _, b := range batchByDeadline {
38-
if b.deadline.Before(now) {
39-
err := w.saveResource(b.ctx, b.key, b.resource)
40-
b.publishResult(err)
41-
delete(batchByResourceKey, b.key)
37+
for _, _batch := range batchByDeadline {
38+
if _batch.deadline.Before(now) {
39+
err := w.saveResource(_batch.ctx, _batch.key, _batch.resource)
40+
_batch.publishResult(err)
41+
delete(batchByResourceKey, _batch.key)
4242
batchByDeadline = batchByDeadline[1:]
4343
continue
4444
}
4545
}
46-
case op, ok := <-w.operations:
46+
case _operation, ok := <-w.incomingOperations:
4747
if !ok {
48-
for key, b := range batchByResourceKey {
49-
err := w.saveResource(b.ctx, key, b.resource)
50-
b.publishResult(err)
48+
for key, _batch := range batchByResourceKey {
49+
err := w.saveResource(_batch.ctx, key, _batch.resource)
50+
_batch.publishResult(err)
5151
continue
5252
}
5353
return
5454
}
5555

56-
b, found := batchByResourceKey[op.resourceKey]
56+
_batch, found := batchByResourceKey[_operation.resourceKey]
5757
if !found {
5858
ctx, cancel := context.WithTimeout(context.Background(), w.maxDuration)
5959
defer cancel()
6060

6161
now := time.Now()
6262

63-
resource, err := w.loadResource(ctx, op.resourceKey)
63+
resource, err := w.loadResource(ctx, _operation.resourceKey)
6464
if err != nil {
65-
op.result <- err
65+
_operation.result <- err
6666
continue
6767
}
68-
b = &batch[Resource]{
68+
_batch = &batch[Resource]{
6969
ctx: ctx,
70-
key: op.resourceKey,
70+
key: _operation.resourceKey,
7171
resource: resource,
7272
deadline: now.Add(w.minDuration),
7373
}
74-
batchByResourceKey[op.resourceKey] = b
75-
batchByDeadline = append(batchByDeadline, b)
74+
batchByResourceKey[_operation.resourceKey] = _batch
75+
batchByDeadline = append(batchByDeadline, _batch)
7676
}
7777

78-
b.results = append(b.results, op.result)
78+
_batch.results = append(_batch.results, _operation.result)
7979

80-
op.run(b.resource)
80+
_operation.run(_batch.resource)
8181
}
8282
}
8383
}

0 commit comments

Comments
 (0)