@@ -127,28 +127,21 @@ func (q *BoundedQueue) StartConsumersWithFactory(num int, factory func() Consume
127127 select {
128128 case <- time .After (retryDelay ):
129129 retryAttemptNumber ++
130- func () {
131- defer func () {
132- if r := recover (); r != nil {
133- // Panic occurred, treat as error by not clearing hasRetry
134- }
135- }()
136- err := consumer .Consume (retryItem )
137- if err != nil {
138- // Still failing, increase delay (exponential backoff)
139- retryDelay = retryDelay * 2
140- if retryDelay > q .retryConfig .delay {
141- retryDelay = q .retryConfig .delay
142- }
143- } else {
144- // Success! Clear retry state and decrement size
145- hasRetry = false
146- retryItem = nil
147- retryDelay = time .Millisecond * 100
148- retryAttemptNumber = 0
149- q .size .Add (- 1 )
130+ err := consumer .Consume (retryItem )
131+ if err != nil {
132+ // Still failing, increase delay (exponential backoff)
133+ retryDelay = retryDelay * 2
134+ if retryDelay > q .retryConfig .delay {
135+ retryDelay = q .retryConfig .delay
150136 }
151- }()
137+ } else {
138+ // Success! Clear retry state and decrement size
139+ hasRetry = false
140+ retryItem = nil
141+ retryDelay = time .Millisecond * 100
142+ retryAttemptNumber = 0
143+ q .size .Add (- 1 )
144+ }
152145 case <- q .stopCh :
153146 // Queue is closing
154147 if hasRetry {
@@ -161,24 +154,15 @@ func (q *BoundedQueue) StartConsumersWithFactory(num int, factory func() Consume
161154 select {
162155 case item , ok := <- queue :
163156 if ok {
164- func () {
165- defer func () {
166- if r := recover (); r != nil {
167- // Panic occurred, treat as error
168- retryItem = item
169- hasRetry = true
170- }
171- }()
172- err := consumer .Consume (item )
173- if err != nil {
174- // Failed, set as retry item
175- retryItem = item
176- hasRetry = true
177- } else {
178- // Success, decrement size
179- q .size .Add (- 1 )
180- }
181- }()
157+ err := consumer .Consume (item )
158+ if err != nil {
159+ // Failed, set as retry item
160+ retryItem = item
161+ hasRetry = true
162+ } else {
163+ // Success, decrement size
164+ q .size .Add (- 1 )
165+ }
182166 } else {
183167 // channel closed, finish worker
184168 return
@@ -194,14 +178,7 @@ func (q *BoundedQueue) StartConsumersWithFactory(num int, factory func() Consume
194178 select {
195179 case item , ok := <- queue :
196180 if ok {
197- func () {
198- defer func () {
199- if r := recover (); r != nil {
200- // Panic occurred, but with retry disabled we still decrement size
201- }
202- }()
203- _ = consumer .Consume (item )
204- }()
181+ _ = consumer .Consume (item )
205182 q .size .Add (- 1 )
206183 } else {
207184 // channel closed, finish worker
0 commit comments