Skip to content

Commit 3451b68

Browse files
committed
feat: add consume loop function
1 parent 029d429 commit 3451b68

File tree

2 files changed

+82
-11
lines changed

2 files changed

+82
-11
lines changed

pkg/kafka/README.md

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func main() {
3131
defer p.Close()
3232

3333
// Case 1: send sarama.ProducerMessage type message
34-
msg := testData[0].(*sarama.ProducerMessage) // testData is https://github.com/go-dev-frame/sponge/blob/main/pkg/kafka/producer_test.go#L18
34+
msg := testData[0].(*sarama.ProducerMessage) // testData is https://github.com/go-dev-frame/sponge/blob/main/pkg/kafka/producer_test.go#L19
3535
partition, offset, err := p.SendMessage(msg)
3636
if err != nil {
3737
fmt.Println(err)
@@ -73,7 +73,7 @@ func main() {
7373
kafka.AsyncProducerWithVersion(sarama.V3_6_0_0),
7474
kafka.AsyncProducerWithRequiredAcks(sarama.WaitForLocal),
7575
kafka.AsyncProducerWithFlushMessages(50),
76-
kafka.AsyncProducerWithFlushFrequency(time.milliseconds*500),
76+
kafka.AsyncProducerWithFlushFrequency(time.Millisecond*500),
7777
)
7878
if err != nil {
7979
fmt.Println(err)
@@ -82,7 +82,7 @@ func main() {
8282
defer p.Close()
8383

8484
// Case 1: send sarama.ProducerMessage type message, supports multiple messages
85-
msg := testData[0].(*sarama.ProducerMessage) // testData is https://github.com/go-dev-frame/sponge/blob/main/pkg/kafka/producer_test.go#L18
85+
msg := testData[0].(*sarama.ProducerMessage) // testData is https://github.com/go-dev-frame/sponge/blob/main/pkg/kafka/producer_test.go#L19
8686
err = p.SendMessage(msg, msg)
8787
if err != nil {
8888
fmt.Println(err)
@@ -112,6 +112,7 @@ package main
112112
import (
113113
"fmt"
114114
"time"
115+
"context"
115116
"github.com/IBM/sarama"
116117
"github.com/go-dev-frame/sponge/pkg/kafka"
117118
)
@@ -130,11 +131,11 @@ func main() {
130131
defer cg.Close()
131132

132133
// Case 1: consume default handle message
133-
go cg.Consume(context.Background(), []string{testTopic}, handleMsgFn) // handleMsgFn is https://github.com/go-dev-frame/sponge/blob/main/pkg/kafka/consumer_test.go#L19
134+
go cg.ConsumeLoop(context.Background(), []string{testTopic}, handleMsgFn) // handleMsgFn is https://github.com/go-dev-frame/sponge/blob/main/pkg/kafka/consumer_test.go#L19
134135

135136
// Case 2: consume custom handle message
136-
go cg.ConsumeCustom(context.Background(), []string{testTopic}, &myConsumerGroupHandler{ // myConsumerGroupHandler is https://github.com/go-dev-frame/sponge/blob/main/pkg/kafka/consumer_test.go#L26
137-
autoCommitEnable: cg.autoCommitEnable,
137+
go cg.ConsumeCustomLoop(context.Background(), []string{testTopic}, &myConsumerGroupHandler{ // myConsumerGroupHandler is https://github.com/go-dev-frame/sponge/blob/main/pkg/kafka/consumer_test.go#L26
138+
autoCommitEnable: true,
138139
})
139140

140141
<-time.After(time.Minute) // wait exit
@@ -150,9 +151,10 @@ package main
150151

151152
import (
152153
"fmt"
154+
"time"
155+
"context"
153156
"github.com/IBM/sarama"
154157
"github.com/go-dev-frame/sponge/pkg/kafka"
155-
"time"
156158
)
157159

158160
func main() {
@@ -167,7 +169,7 @@ func main() {
167169
defer c.Close()
168170

169171
// Case 1: consume one partition
170-
go c.ConsumePartition(context.Background(), testTopic, 0, sarama.OffsetNewest, handleMsgFn) // // handleMsgFn is https://github.com/go-dev-frame/sponge/blob/main/pkg/kafka/consumer_test.go#L19
172+
go c.ConsumePartition(context.Background(), testTopic, 0, sarama.OffsetNewest, handleMsgFn) // handleMsgFn is https://github.com/go-dev-frame/sponge/blob/main/pkg/kafka/consumer_test.go#L19
171173

172174
// Case 2: consume all partition
173175
c.ConsumeAllPartition(context.Background(), testTopic, sarama.OffsetNewest, handleMsgFn)

pkg/kafka/consumer.go

Lines changed: 72 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package kafka
22

33
import (
44
"context"
5+
"time"
56

67
"github.com/IBM/sarama"
78
"go.uber.org/zap"
@@ -51,6 +52,40 @@ func InitConsumerGroup(addrs []string, groupID string, opts ...ConsumerOption) (
5152
}, nil
5253
}
5354

55+
// ConsumeLoop consume messages in a loop, with rebalanced handling
56+
func (c *ConsumerGroup) ConsumeLoop(ctx context.Context, topics []string, handleMessageFn HandleMessageFn) {
57+
backoff := time.Second
58+
maxBackoff := 30 * time.Second
59+
60+
for {
61+
err := c.Consume(ctx, topics, handleMessageFn)
62+
if err != nil {
63+
select {
64+
case <-time.After(backoff):
65+
case <-ctx.Done():
66+
return
67+
}
68+
69+
if backoff < maxBackoff {
70+
backoff *= 2
71+
if backoff > maxBackoff {
72+
backoff = maxBackoff
73+
}
74+
}
75+
continue
76+
}
77+
78+
// Normal exit (mostly due to rebalanced), should reset the backoff and wait for the next time to join the consumption group
79+
backoff = time.Second
80+
81+
if ctx.Err() != nil {
82+
return
83+
}
84+
85+
c.zapLogger.Info("rebalanced, starting new session", zap.String("group_id", c.groupID), zap.Strings("topics", topics))
86+
}
87+
}
88+
5489
// Consume consume messages
5590
func (c *ConsumerGroup) Consume(ctx context.Context, topics []string, handleMessageFn HandleMessageFn) error {
5691
handler := &defaultConsumerHandler{
@@ -68,11 +103,45 @@ func (c *ConsumerGroup) Consume(ctx context.Context, topics []string, handleMess
68103
return nil
69104
}
70105

106+
// ConsumeCustomLoop consume messages for custom handler in a loop, with rebalanced handling
107+
func (c *ConsumerGroup) ConsumeCustomLoop(ctx context.Context, topics []string, handler sarama.ConsumerGroupHandler) {
108+
backoff := time.Second
109+
maxBackoff := 30 * time.Second
110+
111+
for {
112+
err := c.ConsumeCustom(ctx, topics, handler)
113+
if err != nil {
114+
select {
115+
case <-time.After(backoff):
116+
case <-ctx.Done():
117+
return
118+
}
119+
120+
if backoff < maxBackoff {
121+
backoff *= 2
122+
if backoff > maxBackoff {
123+
backoff = maxBackoff
124+
}
125+
}
126+
continue
127+
}
128+
129+
// Normal exit (mostly due to rebalanced), should reset the backoff and wait for the next time to join the consumption group
130+
backoff = time.Second
131+
132+
if ctx.Err() != nil {
133+
return
134+
}
135+
136+
c.zapLogger.Info("rebalanced, starting new session", zap.String("group_id", c.groupID), zap.Strings("topics", topics))
137+
}
138+
}
139+
71140
// ConsumeCustom consume messages for custom handler, you need to implement the sarama.ConsumerGroupHandler interface
72141
func (c *ConsumerGroup) ConsumeCustom(ctx context.Context, topics []string, handler sarama.ConsumerGroupHandler) error {
73142
err := c.Group.Consume(ctx, topics, handler)
74143
if err != nil {
75-
c.zapLogger.Error("failed to consume messages", zap.String("group_id", c.groupID), zap.Strings("topics", topics), zap.Error(err))
144+
c.zapLogger.Error("failed to consume custom messages", zap.String("group_id", c.groupID), zap.Strings("topics", topics), zap.Error(err))
76145
return err
77146
}
78147
return nil
@@ -192,11 +261,11 @@ func (c *Consumer) ConsumePartition(ctx context.Context, topic string, partition
192261
for {
193262
select {
194263
case msg := <-pc.Messages():
195-
err := handleFn(msg)
264+
err = handleFn(msg)
196265
if err != nil {
197266
c.zapLogger.Warn("failed to handle message", zap.Error(err), zap.String("topic", topic), zap.Int32("partition", partition), zap.Int64("offset", msg.Offset))
198267
}
199-
case err := <-pc.Errors():
268+
case err = <-pc.Errors():
200269
c.zapLogger.Error("partition consumer error", zap.Error(err))
201270
case <-ctx.Done():
202271
return

0 commit comments

Comments
 (0)