Skip to content

Commit d8d6d95

Browse files
committed
add encoder
1 parent 9cbff86 commit d8d6d95

File tree

12 files changed

+71
-22
lines changed

12 files changed

+71
-22
lines changed

cmd/delayqueue/main.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/changsongl/delay-queue/bucket"
99
"github.com/changsongl/delay-queue/config"
1010
"github.com/changsongl/delay-queue/dispatch"
11+
"github.com/changsongl/delay-queue/pkg/encode"
1112
"github.com/changsongl/delay-queue/pkg/log"
1213
client "github.com/changsongl/delay-queue/pkg/redis"
1314
"github.com/changsongl/delay-queue/pool"
@@ -126,7 +127,14 @@ func run() int {
126127
disp := dispatch.NewDispatch(l,
127128
func() (bucket.Bucket, pool.Pool, queue.Queue, timer.Timer) {
128129
cli := client.New(conf.Redis)
129-
s := redis.NewStore(cli)
130+
var e encode.Encoder
131+
if conf.DelayQueue.Encoder == config.EncoderCompress {
132+
e = encode.NewCompress()
133+
} else {
134+
e = encode.NewJSON()
135+
}
136+
137+
s := redis.NewStore(cli, e)
130138

131139
b := bucket.New(s, l, conf.DelayQueue.BucketSize, conf.DelayQueue.BucketName)
132140
if maxFetchNum := conf.DelayQueue.BucketMaxFetchNum; maxFetchNum != 0 {

config/config.go

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package config
33
import (
44
encodeJson "encoding/json"
55
"errors"
6+
"fmt"
67
"github.com/changsongl/delay-queue/config/decode"
78
"github.com/changsongl/delay-queue/config/decode/json"
89
"github.com/changsongl/delay-queue/config/decode/yaml"
@@ -31,6 +32,13 @@ const (
3132
RedisModeCluster RedisMode = "cluster"
3233
)
3334

35+
const (
36+
// EncoderJSON json
37+
EncoderJSON = "json"
38+
// EncoderCompress compress like protobbuf
39+
EncoderCompress = "compress"
40+
)
41+
3442
// default configurations
3543
const (
3644
// delay queue configuration
@@ -75,6 +83,8 @@ type DelayQueue struct {
7583
// fetch delay(ms), if there are still job in the bucket after the fetch
7684
// it will delay timer_fetch_delay ms for next fetch. Default is not wait.
7785
TimerFetchDelay int `yaml:"timer_fetch_delay,omitempty" json:"timer_fetch_delay,omitempty"`
86+
// encoder: json, compress(fast diy encode, similar to protobuf)
87+
Encoder string `yaml:"encoder,omitempty" json:"encoder,omitempty"`
7888
}
7989

8090
// Redis redis configuration
@@ -132,6 +142,7 @@ func New() *Conf {
132142
QueueName: DefaultDQQueueName,
133143
BucketMaxFetchNum: DefaultDQBucketMaxFetchNum,
134144
TimerFetchInterval: DefaultTimerFetchInterval,
145+
Encoder: EncoderCompress,
135146
},
136147
Redis: Redis{
137148
Network: DefaultRedisNetwork,
@@ -160,8 +171,11 @@ func (c *Conf) Load(file string, fileType FileType) error {
160171
return err
161172
}
162173

163-
err = c.load(bts, decoder.DecodeFunc())
164-
if err != nil {
174+
if err = c.load(bts, decoder.DecodeFunc()); err != nil {
175+
return err
176+
}
177+
178+
if err = c.IsValid(); err != nil {
165179
return err
166180
}
167181

@@ -188,6 +202,14 @@ func (c *Conf) getDecoderByFileType(fileType FileType) (decode.Decoder, error) {
188202
return nil, errors.New("invalid file type")
189203
}
190204

205+
// IsValid check conf is valid
206+
func (c *Conf) IsValid() error {
207+
if c.DelayQueue.Encoder != EncoderJSON && c.DelayQueue.Encoder != EncoderCompress {
208+
return fmt.Errorf("invalid encoder %s (%s, %s)", c.DelayQueue.Encoder, EncoderJSON, EncoderCompress)
209+
}
210+
return nil
211+
}
212+
191213
// String config string
192214
func (c *Conf) String() string {
193215
bytes, _ := encodeJson.Marshal(c)

config/config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ delay_queue:
99
timer_fetch_interval: 1000 # fetching job interval(ms), decrease interval may get better throughout.
1010
timer_fetch_delay: 0 # fetch delay(ms), if there are still job in the bucket after the fetch,
1111
# it will delay timer_fetch_delay ms for next fetch. Default is not wait.
12+
#encoder: "json" # encoder: json, compress(default, similar to protobuf)
1213

1314
# redis config
1415
redis:

config/config.yaml.example

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ delay_queue:
99
timer_fetch_interval: 2000 # fetching job interval(ms), decrease interval may get better throughout.
1010
timer_fetch_delay: 0 # fetch delay(ms), if there are still job in the bucket after the fetch,
1111
# it will delay timer_fetch_delay ms for next fetch. Default is not wait.
12+
encoder: "json" # encoder: json, compress(fast diy encode, similar to protobuf)
1213

1314
# redis config
1415
redis:

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ require (
1111
github.com/go-redsync/redsync/v4 v4.0.4
1212
github.com/golang/mock v1.5.0
1313
github.com/gomodule/redigo v2.0.0+incompatible // indirect
14+
github.com/json-iterator/go v1.1.11
1415
github.com/leodido/go-urn v1.2.1 // indirect
1516
github.com/prometheus/client_golang v1.9.0
1617
github.com/stretchr/testify v1.7.0

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,8 @@ github.com/json-iterator/go v1.1.8/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/u
184184
github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
185185
github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68=
186186
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
187+
github.com/json-iterator/go v1.1.11 h1:uVUAXhF2To8cbw/3xN3pxj6kk7TYKs98NIrTqPlMWAQ=
188+
github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
187189
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
188190
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
189191
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=

job/job.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,12 @@ import (
1010

1111
// Job job for delay queue
1212
type Job struct {
13-
Topic Topic `json:"topic"`
14-
ID ID `json:"id"`
15-
Delay Delay `json:"delay"`
16-
TTR TTR `json:"ttr"`
17-
Body Body `json:"body"`
18-
Version Version `json:"version"`
13+
Topic Topic `json:"topic,omitempty"`
14+
ID ID `json:"id,omitempty"`
15+
Delay Delay `json:"delay,omitempty"`
16+
TTR TTR `json:"ttr,omitempty"`
17+
Body Body `json:"body,omitempty"`
18+
Version Version `json:"version,omitempty"`
1919
Mutex lock.Locker `json:"-"`
2020
}
2121

pkg/encode/compress.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66
"github.com/changsongl/delay-queue/job"
77
)
88

9+
// |tag|[length]|data
10+
911
// tag s
1012
const (
1113
TagID uint64 = iota
@@ -32,7 +34,7 @@ func NewCompress() Encoder {
3234
return &compress{}
3335
}
3436

35-
// Encode compress encode
37+
// Encode compress encode, not using reflect
3638
func (c *compress) Encode(j *job.Job) ([]byte, error) {
3739
buf := make([]byte, c.bufLength(j))
3840
written := 0
@@ -42,8 +44,6 @@ func (c *compress) Encode(j *job.Job) ([]byte, error) {
4244
if !j.TTR.IsEmpty() {
4345
written += c.PutUInt64(TagTTR, uint64(j.TTR), buf[written:])
4446
}
45-
written += c.PutUInt64(TagVersion, j.Version.UInt64(), buf[written:])
46-
4747
if !j.ID.IsEmpty() {
4848
written += c.PutString(TagID, string(j.ID), buf[written:])
4949
}
@@ -53,6 +53,7 @@ func (c *compress) Encode(j *job.Job) ([]byte, error) {
5353
if !j.Topic.IsEmpty() {
5454
written += c.PutString(TagTopic, string(j.Topic), buf[written:])
5555
}
56+
written += c.PutUInt64(TagVersion, j.Version.UInt64(), buf[written:])
5657

5758
return buf[:written], nil
5859
}
@@ -100,7 +101,7 @@ func (c *compress) Decode(b []byte, j *job.Job) error {
100101
func (c *compress) bufLength(j *job.Job) int {
101102
l := (TagLength+MaxUInt64Length)*5 + len(j.ID) + len(j.Topic)
102103
if j.Body != "" {
103-
l += TagLength + MaxUInt64Length + len(j.Topic)
104+
l += TagLength + MaxUInt64Length + len(j.Body)
104105
}
105106
return l
106107
}

pkg/encode/compress_test.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package encode
22

33
import (
4-
"fmt"
54
"github.com/changsongl/delay-queue/job"
65
"github.com/changsongl/delay-queue/pkg/lock"
76
"github.com/stretchr/testify/require"
@@ -10,18 +9,17 @@ import (
109

1110
func TestCompressEncoder(t *testing.T) {
1211
encoder := NewCompress()
13-
j, err := job.New("jobTopic11", "1222", 10, 5, "", func(name string) lock.Locker {
12+
j, err := job.New("jobTopic11", "哈哈大萨达撒多", 10213213211, 521321312312, "萨达大所多敖德萨多", func(name string) lock.Locker {
1413
return nil
1514
})
1615
require.NoError(t, err)
1716

1817
str, err := encoder.Encode(j)
1918
require.NoError(t, err)
20-
fmt.Println(str)
2119

20+
t.Log(string(str))
2221
jDecode := &job.Job{}
2322
err = encoder.Decode(str, jDecode)
24-
fmt.Printf("%+v", jDecode)
2523

2624
require.NoError(t, err)
2725
require.Equal(t, j.ID, jDecode.ID)

pkg/encode/encode_benchmark_test.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,22 @@ import (
77
"testing"
88
)
99

10+
var body = `
11+
The Bible is not a single book but a collection of books, whose complex development is not completely understood. The books began as songs and stories orally transmitted from generation to generation before being written down in a process that began sometime around the start of the first millennium BCE and continued for over a thousand years. The Bible was written and compiled by many people, from a variety of disparate cultures, most of whom are unknown.[18] British biblical scholar John K. Riches wrote:[19]
12+
[T]he biblical texts were produced over a period in which the living conditions of the writers – political, cultural, economic, and ecological – varied enormously. There are texts which reflect a nomadic existence, texts from people with an established monarchy and Temple cult, texts from exile, texts born out of fierce oppression by foreign rulers, courtly texts, texts from wandering charismatic preachers, texts from those who give themselves the airs of sophisticated Hellenistic writers. It is a time-span which encompasses the compositions of Homer, Plato, Aristotle, Thucydides, Sophocles, Caesar, Cicero, and Catullus. It is a period which sees the rise and fall of the Assyrian empire (twelfth to seventh century) and of the Persian empire (sixth to fourth century), Alexander's campaigns (336–326), the rise of Rome and its domination of the Mediterranean (fourth century to the founding of the Principate, 27 BCE), the destruction of the Jerusalem Temple (70 CE), and the extension of Roman rule to parts of Scotland (84 CE).
13+
Hebrew Bible from 1300. page 20, Genesis.
14+
Hebrew Bible from 1300. Genesis.
15+
Considered to be scriptures (sacred, authoritative religious texts), the books were compiled by different religious communities into various biblical canons (official collections of scriptures). The earliest compilation, containing the first five books of the Bible and called the Torah (meaning "law", "instruction", or "teaching") or Pentateuch ("five books"), was accepted as Jewish canon by the 5th century BCE. A second collection of narrative histories and prophesies, called the Nevi'im ("prophets"), was canonized in the 3rd century BCE. A third collection called the Ketuvim ("writings"), containing psalms, proverbs, and narrative histories, was canonized sometime between the 2nd century BCE and the 2nd century CE. These three collections were written mostly in Hebrew, with some parts in Aramaic, and together form the Hebrew Bible or "TaNaKh" (a portmanteau of "Torah", "Nevi'im", and "Ketuvim").[20]
16+
Greek-speaking Jews in Alexandria and elsewhere in the Jewish diaspora considered additional scriptures, composed between 200 BCE and 100 CE and not included in the Hebrew Bible, to be canon. These additional texts were included in a translation of the Hebrew Bible into Koine Greek (common Greek spoken by ordinary people) known as the Septuagint (meaning "the work of the seventy"), which began as a translation of the Torah made around 250 BCE and continued to develop for several centuries. The Septuagint contained all of the books of the Hebrew Bible, re-organized and with some textual differences, with the additional scriptures interspersed throughout.[21]
17+
Saint Paul Writing His Epistles, 16th-century painting.
18+
During the rise of Christianity in the 1st century CE, new scriptures were written in Greek about the life and teachings of Jesus Christ, who Christians believed was the messiah prophesized in the books of the Hebrew Bible. Two collections of these new scriptures – the Pauline epistles and the Gospels – were accepted as canon by the end of the 2nd century CE. A third collection, the catholic epistles, were canonized over the next few centuries. Christians called these new scriptures the "New Testament", and began referring to the Septuagint as the "Old Testament".[22]
19+
Between 385 and 405 CE, the early Christian church translated its canon into Vulgar Latin (the common Latin spoken by ordinary people), a translation known as the Vulgate, which included in its Old Testament the books that were in the Septuagint but not in the Hebrew Bible. The Vulgate introduced stability to the Bible, but also began the East-West Schism between Latin-speaking Western Christianity (led by the Catholic Church) and multi-lingual Eastern Christianity (led by the Eastern Orthodox Church). Christian denominations' biblical canons varied not only in the language of the books, but also in their selection, organization, and text.[23]
20+
Jewish rabbis began developing a standard Hebrew Bible in the 1st century CE, maintained since the middle of the first millennium by the Masoretes, and called the Masoretic Text. Christians have held ecumenical councils to standardize their biblical canon since the 4th century CE. The Council of Trent (1545–63), held by the Catholic Church in response to the Protestant Reformation, authorized the Vulgate as its official Latin translation of the Bible. The Church deemed the additional books in its Old Testament that were interspersed among the Hebrew Bible books to be "deuterocanonical" (meaning part of a second or later canon). Protestant Bibles either separated these books into a separate section called the "Apocrypha" (meaning "hidden away") between the Old and New Testaments, or omitted them altogether. The 17th-century Protestant King James Version was the most ubiquitous English Bible of all time, but it has largely been superseded by modern translations.[24]`
21+
1022
// BenchmarkCompressEncode 1000000000 0.000011 ns/op len(32)
1123
func BenchmarkCompressEncode(t *testing.B) {
1224
encoder := NewCompress()
13-
j, err := job.New("jobTopic", "131223", 1, 1, "", func(name string) lock.Locker {
25+
j, err := job.New("jobTopic", "131223", 1, 1, job.Body(body), func(name string) lock.Locker {
1426
return nil
1527
})
1628
require.NoError(t, err)
@@ -23,7 +35,7 @@ func BenchmarkCompressEncode(t *testing.B) {
2335
// BenchmarkJSONEncode 1000000000 0.000024 ns/op len(92)
2436
func BenchmarkJSONEncode(t *testing.B) {
2537
encoder := NewJSON()
26-
j, err := job.New("jobTopic", "131223", 1, 1, "", func(name string) lock.Locker {
38+
j, err := job.New("jobTopic", "131223", 1, 1, job.Body(body), func(name string) lock.Locker {
2739
return nil
2840
})
2941
require.NoError(t, err)

0 commit comments

Comments
 (0)