Skip to content

Commit 997c07a

Browse files
committed
chore(poller): avoid warn log on shutdown
Signed-off-by: Shijie Sheng <liouvetren@gmail.com>
1 parent b9b333e commit 997c07a

File tree

2 files changed

+71
-4
lines changed

2 files changed

+71
-4
lines changed

internal/internal_worker_base.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ type (
139139
pollLimiter *rate.Limiter
140140
taskLimiter *rate.Limiter
141141
limiterContext context.Context
142-
limiterContextCancel func()
142+
limiterContextCancel context.CancelCauseFunc
143143
retrier *backoff.ConcurrentRetrier // Service errors back off retrier
144144
logger *zap.Logger
145145
metricsScope tally.Scope
@@ -168,7 +168,7 @@ func createPollRetryPolicy() backoff.RetryPolicy {
168168
}
169169

170170
func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope tally.Scope, sessionTokenBucket *sessionTokenBucket) *baseWorker {
171-
ctx, cancel := context.WithCancel(context.Background())
171+
ctx, cancel := context.WithCancelCause(context.Background())
172172
logger = logger.With(zap.String(tagWorkerType, options.workerType))
173173
metricsScope = tagScope(metricsScope, tagWorkerType, options.workerType)
174174

@@ -324,7 +324,10 @@ func (bw *baseWorker) pollTask() {
324324
if pErr := bw.concurrency.PollerPermit.Acquire(bw.limiterContext); pErr == nil {
325325
defer bw.concurrency.PollerPermit.Release()
326326
} else {
327-
bw.logger.Warn("poller permit acquire error", zap.Error(pErr))
327+
if !errors.Is(context.Cause(bw.limiterContext), errShutdown) { // don't log on shutdown
328+
bw.logger.Warn("poller permit acquire error", zap.Error(pErr))
329+
}
330+
return // don't try to poll without a permit, will retry on next poll
328331
}
329332

330333
bw.retrier.Throttle()
@@ -418,7 +421,7 @@ func (bw *baseWorker) Stop() {
418421
return
419422
}
420423
close(bw.shutdownCh)
421-
bw.limiterContextCancel()
424+
bw.limiterContextCancel(errShutdown)
422425
bw.concurrencyAutoScaler.Stop()
423426

424427
if success := util.AwaitWaitGroup(&bw.shutdownWG, bw.options.shutdownTimeout); !success {
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package internal
2+
3+
import (
4+
"errors"
5+
"testing"
6+
7+
// "github.com/golang/mock/gomock"
8+
"github.com/stretchr/testify/assert"
9+
"github.com/uber-go/tally"
10+
"go.uber.org/zap"
11+
"go.uber.org/zap/zapcore"
12+
"go.uber.org/zap/zaptest/observer"
13+
14+
"go.uber.org/cadence/internal/common/debug"
15+
)
16+
17+
func TestBaseWorker_pollTask_no_warnLogOnShutdown(t *testing.T) {
18+
core, observed := observer.New(zapcore.InfoLevel)
19+
logger := zap.New(core, zap.Development())
20+
worker := newBaseWorker(baseWorkerOptions{
21+
maxConcurrentTask: 1,
22+
pollerCountWithoutAutoScaling: 1,
23+
identity: "test-identity",
24+
pollerTracker: debug.NewNoopPollerTracker(),
25+
taskWorker: &testTaskWorker{},
26+
}, logger, tally.NoopScope, nil)
27+
28+
// mock the worker started
29+
worker.Start()
30+
worker.Stop()
31+
worker.pollTask()
32+
33+
assert.Equal(t, 0, observed.FilterMessage("poller permit acquire error").Len())
34+
}
35+
36+
func TestBaseWorker_processTask_warnLogOnOtherError(t *testing.T) {
37+
core, observed := observer.New(zapcore.InfoLevel)
38+
logger := zap.New(core, zap.Development())
39+
worker := newBaseWorker(baseWorkerOptions{
40+
maxConcurrentTask: 1,
41+
pollerCountWithoutAutoScaling: 1,
42+
identity: "test-identity",
43+
pollerTracker: debug.NewNoopPollerTracker(),
44+
taskWorker: &testTaskWorker{},
45+
}, logger, tally.NoopScope, nil)
46+
47+
// mock the worker started
48+
worker.Start()
49+
worker.limiterContextCancel(errors.New("test error"))
50+
worker.pollTask()
51+
worker.Stop()
52+
53+
assert.LessOrEqual(t, 1, observed.FilterMessage("poller permit acquire error").Len())
54+
}
55+
56+
type testTaskWorker struct{}
57+
58+
func (t *testTaskWorker) PollTask() (interface{}, error) {
59+
return nil, errors.New("poll in test will fail")
60+
}
61+
62+
func (t *testTaskWorker) ProcessTask(task interface{}) error {
63+
return nil
64+
}

0 commit comments

Comments
 (0)