Skip to content

Commit 0404718

Browse files
authored
Merge pull request #30 from changsongl/develop
feat: encoder, compress encoder like protobuf.
2 parents 9cbff86 + 68dcd47 commit 0404718

File tree

15 files changed

+99
-66
lines changed

15 files changed

+99
-66
lines changed

cmd/delayqueue/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ func run() int {
126126
disp := dispatch.NewDispatch(l,
127127
func() (bucket.Bucket, pool.Pool, queue.Queue, timer.Timer) {
128128
cli := client.New(conf.Redis)
129+
129130
s := redis.NewStore(cli)
130131

131132
b := bucket.New(s, l, conf.DelayQueue.BucketSize, conf.DelayQueue.BucketName)

config/config.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,7 @@ func (c *Conf) Load(file string, fileType FileType) error {
160160
return err
161161
}
162162

163-
err = c.load(bts, decoder.DecodeFunc())
164-
if err != nil {
163+
if err = c.load(bts, decoder.DecodeFunc()); err != nil {
165164
return err
166165
}
167166

config/config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ delay_queue:
99
timer_fetch_interval: 1000 # fetching job interval(ms), decrease interval may get better throughout.
1010
timer_fetch_delay: 0 # fetch delay(ms), if there are still job in the bucket after the fetch,
1111
# it will delay timer_fetch_delay ms for next fetch. Default is not wait.
12+
#encoder: "json" # encoder: json, compress(default, similar to protobuf)
1213

1314
# redis config
1415
redis:

config/config.yaml.example

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ delay_queue:
99
timer_fetch_interval: 2000 # fetching job interval(ms), decrease interval may get better throughout.
1010
timer_fetch_delay: 0 # fetch delay(ms), if there are still job in the bucket after the fetch,
1111
# it will delay timer_fetch_delay ms for next fetch. Default is not wait.
12+
encoder: "json" # encoder: json, compress(fast diy encode, similar to protobuf)
1213

1314
# redis config
1415
redis:

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ require (
1111
github.com/go-redsync/redsync/v4 v4.0.4
1212
github.com/golang/mock v1.5.0
1313
github.com/gomodule/redigo v2.0.0+incompatible // indirect
14+
github.com/json-iterator/go v1.1.11
1415
github.com/leodido/go-urn v1.2.1 // indirect
1516
github.com/prometheus/client_golang v1.9.0
1617
github.com/stretchr/testify v1.7.0

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,8 @@ github.com/json-iterator/go v1.1.8/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/u
184184
github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
185185
github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68=
186186
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
187+
github.com/json-iterator/go v1.1.11 h1:uVUAXhF2To8cbw/3xN3pxj6kk7TYKs98NIrTqPlMWAQ=
188+
github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
187189
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
188190
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
189191
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=

job/job.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,12 @@ import (
1010

1111
// Job job for delay queue
1212
type Job struct {
13-
Topic Topic `json:"topic"`
14-
ID ID `json:"id"`
15-
Delay Delay `json:"delay"`
16-
TTR TTR `json:"ttr"`
17-
Body Body `json:"body"`
18-
Version Version `json:"version"`
13+
Topic Topic `json:"topic,omitempty"`
14+
ID ID `json:"id,omitempty"`
15+
Delay Delay `json:"delay,omitempty"`
16+
TTR TTR `json:"ttr,omitempty"`
17+
Body Body `json:"body,omitempty"`
18+
Version Version `json:"version,omitempty"`
1919
Mutex lock.Locker `json:"-"`
2020
}
2121

pkg/encode/compress.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66
"github.com/changsongl/delay-queue/job"
77
)
88

9+
// |tag|[length]|data
10+
911
// tag s
1012
const (
1113
TagID uint64 = iota
@@ -32,7 +34,7 @@ func NewCompress() Encoder {
3234
return &compress{}
3335
}
3436

35-
// Encode compress encode
37+
// Encode compress encode, not using reflect
3638
func (c *compress) Encode(j *job.Job) ([]byte, error) {
3739
buf := make([]byte, c.bufLength(j))
3840
written := 0
@@ -42,8 +44,6 @@ func (c *compress) Encode(j *job.Job) ([]byte, error) {
4244
if !j.TTR.IsEmpty() {
4345
written += c.PutUInt64(TagTTR, uint64(j.TTR), buf[written:])
4446
}
45-
written += c.PutUInt64(TagVersion, j.Version.UInt64(), buf[written:])
46-
4747
if !j.ID.IsEmpty() {
4848
written += c.PutString(TagID, string(j.ID), buf[written:])
4949
}
@@ -53,6 +53,7 @@ func (c *compress) Encode(j *job.Job) ([]byte, error) {
5353
if !j.Topic.IsEmpty() {
5454
written += c.PutString(TagTopic, string(j.Topic), buf[written:])
5555
}
56+
written += c.PutUInt64(TagVersion, j.Version.UInt64(), buf[written:])
5657

5758
return buf[:written], nil
5859
}
@@ -100,7 +101,7 @@ func (c *compress) Decode(b []byte, j *job.Job) error {
100101
func (c *compress) bufLength(j *job.Job) int {
101102
l := (TagLength+MaxUInt64Length)*5 + len(j.ID) + len(j.Topic)
102103
if j.Body != "" {
103-
l += TagLength + MaxUInt64Length + len(j.Topic)
104+
l += TagLength + MaxUInt64Length + len(j.Body)
104105
}
105106
return l
106107
}

pkg/encode/compress_test.go

Lines changed: 1 addition & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,10 @@
11
package encode
22

33
import (
4-
"fmt"
5-
"github.com/changsongl/delay-queue/job"
6-
"github.com/changsongl/delay-queue/pkg/lock"
7-
"github.com/stretchr/testify/require"
84
"testing"
95
)
106

117
func TestCompressEncoder(t *testing.T) {
128
encoder := NewCompress()
13-
j, err := job.New("jobTopic11", "1222", 10, 5, "", func(name string) lock.Locker {
14-
return nil
15-
})
16-
require.NoError(t, err)
17-
18-
str, err := encoder.Encode(j)
19-
require.NoError(t, err)
20-
fmt.Println(str)
21-
22-
jDecode := &job.Job{}
23-
err = encoder.Decode(str, jDecode)
24-
fmt.Printf("%+v", jDecode)
25-
26-
require.NoError(t, err)
27-
require.Equal(t, j.ID, jDecode.ID)
28-
require.Equal(t, j.TTR, jDecode.TTR)
29-
require.Equal(t, j.Delay, jDecode.Delay)
30-
require.Equal(t, j.Topic, jDecode.Topic)
31-
require.True(t, j.Version.Equal(jDecode.Version))
32-
require.Equal(t, j.Body, jDecode.Body)
33-
require.Equal(t, j.Version.String(), jDecode.Version.String())
9+
runEncodeTest(t, encoder)
3410
}

pkg/encode/encode_benchmark_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
// BenchmarkCompressEncode 1000000000 0.000011 ns/op len(32)
1111
func BenchmarkCompressEncode(t *testing.B) {
1212
encoder := NewCompress()
13-
j, err := job.New("jobTopic", "131223", 1, 1, "", func(name string) lock.Locker {
13+
j, err := job.New("jobTopic", "131223", 1, 1, job.Body(longText), func(name string) lock.Locker {
1414
return nil
1515
})
1616
require.NoError(t, err)
@@ -23,7 +23,7 @@ func BenchmarkCompressEncode(t *testing.B) {
2323
// BenchmarkJSONEncode 1000000000 0.000024 ns/op len(92)
2424
func BenchmarkJSONEncode(t *testing.B) {
2525
encoder := NewJSON()
26-
j, err := job.New("jobTopic", "131223", 1, 1, "", func(name string) lock.Locker {
26+
j, err := job.New("jobTopic", "131223", 1, 1, job.Body(longText), func(name string) lock.Locker {
2727
return nil
2828
})
2929
require.NoError(t, err)

0 commit comments

Comments
 (0)