@@ -21,67 +21,85 @@ type worker[Resource any] struct {
2121 saveResource func (context.Context , string , Resource ) error
2222 minDuration time.Duration
2323 maxDuration time.Duration
24+
25+ batchByResourceKey map [string ]* batch [Resource ]
26+ batchByDeadline []* batch [Resource ]
2427}
2528
26- func (w worker [Resource ]) run () {
27- batchByResourceKey := map [string ]* batch [Resource ]{}
28- var batchByDeadline []* batch [Resource ]
29+ func (w * worker [Resource ]) run () {
30+ w .batchByResourceKey = map [string ]* batch [Resource ]{}
2931
3032 ticker := time .NewTicker (time .Millisecond )
3133 defer ticker .Stop ()
3234
3335 for {
3436 select {
3537 case <- ticker .C :
36- now := time .Now ()
37- for _ , _batch := range batchByDeadline {
38- if _batch .deadline .Before (now ) {
39- err := w .saveResource (_batch .ctx , _batch .key , _batch .resource )
40- _batch .publishResult (err )
41- delete (batchByResourceKey , _batch .key )
42- batchByDeadline = batchByDeadline [1 :]
43- continue
44- }
45- }
38+ w .endBatchesAfterDeadline ()
4639 case _operation , ok := <- w .incomingOperations :
4740 if ! ok {
48- for key , _batch := range batchByResourceKey {
49- err := w .saveResource (_batch .ctx , key , _batch .resource )
50- _batch .publishResult (err )
51- continue
52- }
41+ w .endAllBatches ()
5342 return
5443 }
5544
56- _batch , found := batchByResourceKey [_operation .resourceKey ]
57- if ! found {
58- ctx , cancel := context .WithTimeout (context .Background (), w .maxDuration )
59- defer cancel ()
60-
61- now := time .Now ()
62-
63- resource , err := w .loadResource (ctx , _operation .resourceKey )
64- if err != nil {
65- _operation .result <- err
66- continue
67- }
68- _batch = & batch [Resource ]{
69- ctx : ctx ,
70- key : _operation .resourceKey ,
71- resource : resource ,
72- deadline : now .Add (w .minDuration ),
73- }
74- batchByResourceKey [_operation .resourceKey ] = _batch
75- batchByDeadline = append (batchByDeadline , _batch )
76- }
45+ w .runOperation (_operation )
46+ }
47+ }
48+ }
7749
78- _batch .results = append (_batch .results , _operation .result )
50+ func (w * worker [Resource ]) endBatchesAfterDeadline () {
51+ now := time .Now ()
7952
80- _operation .run (_batch .resource )
53+ for _ , _batch := range w .batchByDeadline {
54+ if _batch .deadline .After (now ) {
55+ return
8156 }
57+
58+ err := w .saveResource (_batch .ctx , _batch .key , _batch .resource )
59+ _batch .publishResult (err )
60+ delete (w .batchByResourceKey , _batch .key )
61+ w .batchByDeadline = w .batchByDeadline [1 :]
8262 }
8363}
8464
65+ func (w * worker [Resource ]) endAllBatches () {
66+ for key , _batch := range w .batchByResourceKey {
67+ err := w .saveResource (_batch .ctx , key , _batch .resource )
68+ _batch .publishResult (err )
69+ }
70+
71+ w .batchByResourceKey = map [string ]* batch [Resource ]{}
72+ w .batchByDeadline = nil
73+ }
74+
75+ func (w * worker [Resource ]) runOperation (_operation operation [Resource ]) {
76+ _batch , found := w .batchByResourceKey [_operation .resourceKey ]
77+ if ! found {
78+ ctx , _ := context .WithTimeout (context .Background (), w .maxDuration )
79+
80+ now := time .Now ()
81+
82+ resource , err := w .loadResource (ctx , _operation .resourceKey )
83+ if err != nil {
84+ _operation .result <- err
85+ return
86+ }
87+
88+ _batch = & batch [Resource ]{
89+ ctx : ctx ,
90+ key : _operation .resourceKey ,
91+ resource : resource ,
92+ deadline : now .Add (w .minDuration ),
93+ }
94+ w .batchByResourceKey [_operation .resourceKey ] = _batch
95+ w .batchByDeadline = append (w .batchByDeadline , _batch )
96+ }
97+
98+ _batch .results = append (_batch .results , _operation .result )
99+
100+ _operation .run (_batch .resource )
101+ }
102+
85103type batch [Resource any ] struct {
86104 ctx context.Context
87105 key string
0 commit comments