Skip to content

Commit 68dcd47

Browse files
committed
support compress encode
1 parent d8d6d95 commit 68dcd47

File tree

8 files changed

+79
-95
lines changed

8 files changed

+79
-95
lines changed

cmd/delayqueue/main.go

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"github.com/changsongl/delay-queue/bucket"
99
"github.com/changsongl/delay-queue/config"
1010
"github.com/changsongl/delay-queue/dispatch"
11-
"github.com/changsongl/delay-queue/pkg/encode"
1211
"github.com/changsongl/delay-queue/pkg/log"
1312
client "github.com/changsongl/delay-queue/pkg/redis"
1413
"github.com/changsongl/delay-queue/pool"
@@ -127,14 +126,8 @@ func run() int {
127126
disp := dispatch.NewDispatch(l,
128127
func() (bucket.Bucket, pool.Pool, queue.Queue, timer.Timer) {
129128
cli := client.New(conf.Redis)
130-
var e encode.Encoder
131-
if conf.DelayQueue.Encoder == config.EncoderCompress {
132-
e = encode.NewCompress()
133-
} else {
134-
e = encode.NewJSON()
135-
}
136129

137-
s := redis.NewStore(cli, e)
130+
s := redis.NewStore(cli)
138131

139132
b := bucket.New(s, l, conf.DelayQueue.BucketSize, conf.DelayQueue.BucketName)
140133
if maxFetchNum := conf.DelayQueue.BucketMaxFetchNum; maxFetchNum != 0 {

config/config.go

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package config
33
import (
44
encodeJson "encoding/json"
55
"errors"
6-
"fmt"
76
"github.com/changsongl/delay-queue/config/decode"
87
"github.com/changsongl/delay-queue/config/decode/json"
98
"github.com/changsongl/delay-queue/config/decode/yaml"
@@ -32,13 +31,6 @@ const (
3231
RedisModeCluster RedisMode = "cluster"
3332
)
3433

35-
const (
36-
// EncoderJSON json
37-
EncoderJSON = "json"
38-
// EncoderCompress compress like protobbuf
39-
EncoderCompress = "compress"
40-
)
41-
4234
// default configurations
4335
const (
4436
// delay queue configuration
@@ -83,8 +75,6 @@ type DelayQueue struct {
8375
// fetch delay(ms), if there are still job in the bucket after the fetch
8476
// it will delay timer_fetch_delay ms for next fetch. Default is not wait.
8577
TimerFetchDelay int `yaml:"timer_fetch_delay,omitempty" json:"timer_fetch_delay,omitempty"`
86-
// encoder: json, compress(fast diy encode, similar to protobuf)
87-
Encoder string `yaml:"encoder,omitempty" json:"encoder,omitempty"`
8878
}
8979

9080
// Redis redis configuration
@@ -142,7 +132,6 @@ func New() *Conf {
142132
QueueName: DefaultDQQueueName,
143133
BucketMaxFetchNum: DefaultDQBucketMaxFetchNum,
144134
TimerFetchInterval: DefaultTimerFetchInterval,
145-
Encoder: EncoderCompress,
146135
},
147136
Redis: Redis{
148137
Network: DefaultRedisNetwork,
@@ -175,10 +164,6 @@ func (c *Conf) Load(file string, fileType FileType) error {
175164
return err
176165
}
177166

178-
if err = c.IsValid(); err != nil {
179-
return err
180-
}
181-
182167
return nil
183168
}
184169

@@ -202,14 +187,6 @@ func (c *Conf) getDecoderByFileType(fileType FileType) (decode.Decoder, error) {
202187
return nil, errors.New("invalid file type")
203188
}
204189

205-
// IsValid check conf is valid
206-
func (c *Conf) IsValid() error {
207-
if c.DelayQueue.Encoder != EncoderJSON && c.DelayQueue.Encoder != EncoderCompress {
208-
return fmt.Errorf("invalid encoder %s (%s, %s)", c.DelayQueue.Encoder, EncoderJSON, EncoderCompress)
209-
}
210-
return nil
211-
}
212-
213190
// String config string
214191
func (c *Conf) String() string {
215192
bytes, _ := encodeJson.Marshal(c)

pkg/encode/compress_test.go

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,10 @@
11
package encode
22

33
import (
4-
"github.com/changsongl/delay-queue/job"
5-
"github.com/changsongl/delay-queue/pkg/lock"
6-
"github.com/stretchr/testify/require"
74
"testing"
85
)
96

107
func TestCompressEncoder(t *testing.T) {
118
encoder := NewCompress()
12-
j, err := job.New("jobTopic11", "哈哈大萨达撒多", 10213213211, 521321312312, "萨达大所多敖德萨多", func(name string) lock.Locker {
13-
return nil
14-
})
15-
require.NoError(t, err)
16-
17-
str, err := encoder.Encode(j)
18-
require.NoError(t, err)
19-
20-
t.Log(string(str))
21-
jDecode := &job.Job{}
22-
err = encoder.Decode(str, jDecode)
23-
24-
require.NoError(t, err)
25-
require.Equal(t, j.ID, jDecode.ID)
26-
require.Equal(t, j.TTR, jDecode.TTR)
27-
require.Equal(t, j.Delay, jDecode.Delay)
28-
require.Equal(t, j.Topic, jDecode.Topic)
29-
require.True(t, j.Version.Equal(jDecode.Version))
30-
require.Equal(t, j.Body, jDecode.Body)
31-
require.Equal(t, j.Version.String(), jDecode.Version.String())
9+
runEncodeTest(t, encoder)
3210
}

pkg/encode/encode_benchmark_test.go

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,22 +7,10 @@ import (
77
"testing"
88
)
99

10-
var body = `
11-
The Bible is not a single book but a collection of books, whose complex development is not completely understood. The books began as songs and stories orally transmitted from generation to generation before being written down in a process that began sometime around the start of the first millennium BCE and continued for over a thousand years. The Bible was written and compiled by many people, from a variety of disparate cultures, most of whom are unknown.[18] British biblical scholar John K. Riches wrote:[19]
12-
[T]he biblical texts were produced over a period in which the living conditions of the writers – political, cultural, economic, and ecological – varied enormously. There are texts which reflect a nomadic existence, texts from people with an established monarchy and Temple cult, texts from exile, texts born out of fierce oppression by foreign rulers, courtly texts, texts from wandering charismatic preachers, texts from those who give themselves the airs of sophisticated Hellenistic writers. It is a time-span which encompasses the compositions of Homer, Plato, Aristotle, Thucydides, Sophocles, Caesar, Cicero, and Catullus. It is a period which sees the rise and fall of the Assyrian empire (twelfth to seventh century) and of the Persian empire (sixth to fourth century), Alexander's campaigns (336–326), the rise of Rome and its domination of the Mediterranean (fourth century to the founding of the Principate, 27 BCE), the destruction of the Jerusalem Temple (70 CE), and the extension of Roman rule to parts of Scotland (84 CE).
13-
Hebrew Bible from 1300. page 20, Genesis.
14-
Hebrew Bible from 1300. Genesis.
15-
Considered to be scriptures (sacred, authoritative religious texts), the books were compiled by different religious communities into various biblical canons (official collections of scriptures). The earliest compilation, containing the first five books of the Bible and called the Torah (meaning "law", "instruction", or "teaching") or Pentateuch ("five books"), was accepted as Jewish canon by the 5th century BCE. A second collection of narrative histories and prophesies, called the Nevi'im ("prophets"), was canonized in the 3rd century BCE. A third collection called the Ketuvim ("writings"), containing psalms, proverbs, and narrative histories, was canonized sometime between the 2nd century BCE and the 2nd century CE. These three collections were written mostly in Hebrew, with some parts in Aramaic, and together form the Hebrew Bible or "TaNaKh" (a portmanteau of "Torah", "Nevi'im", and "Ketuvim").[20]
16-
Greek-speaking Jews in Alexandria and elsewhere in the Jewish diaspora considered additional scriptures, composed between 200 BCE and 100 CE and not included in the Hebrew Bible, to be canon. These additional texts were included in a translation of the Hebrew Bible into Koine Greek (common Greek spoken by ordinary people) known as the Septuagint (meaning "the work of the seventy"), which began as a translation of the Torah made around 250 BCE and continued to develop for several centuries. The Septuagint contained all of the books of the Hebrew Bible, re-organized and with some textual differences, with the additional scriptures interspersed throughout.[21]
17-
Saint Paul Writing His Epistles, 16th-century painting.
18-
During the rise of Christianity in the 1st century CE, new scriptures were written in Greek about the life and teachings of Jesus Christ, who Christians believed was the messiah prophesized in the books of the Hebrew Bible. Two collections of these new scriptures – the Pauline epistles and the Gospels – were accepted as canon by the end of the 2nd century CE. A third collection, the catholic epistles, were canonized over the next few centuries. Christians called these new scriptures the "New Testament", and began referring to the Septuagint as the "Old Testament".[22]
19-
Between 385 and 405 CE, the early Christian church translated its canon into Vulgar Latin (the common Latin spoken by ordinary people), a translation known as the Vulgate, which included in its Old Testament the books that were in the Septuagint but not in the Hebrew Bible. The Vulgate introduced stability to the Bible, but also began the East-West Schism between Latin-speaking Western Christianity (led by the Catholic Church) and multi-lingual Eastern Christianity (led by the Eastern Orthodox Church). Christian denominations' biblical canons varied not only in the language of the books, but also in their selection, organization, and text.[23]
20-
Jewish rabbis began developing a standard Hebrew Bible in the 1st century CE, maintained since the middle of the first millennium by the Masoretes, and called the Masoretic Text. Christians have held ecumenical councils to standardize their biblical canon since the 4th century CE. The Council of Trent (1545–63), held by the Catholic Church in response to the Protestant Reformation, authorized the Vulgate as its official Latin translation of the Bible. The Church deemed the additional books in its Old Testament that were interspersed among the Hebrew Bible books to be "deuterocanonical" (meaning part of a second or later canon). Protestant Bibles either separated these books into a separate section called the "Apocrypha" (meaning "hidden away") between the Old and New Testaments, or omitted them altogether. The 17th-century Protestant King James Version was the most ubiquitous English Bible of all time, but it has largely been superseded by modern translations.[24]`
21-
2210
// BenchmarkCompressEncode 1000000000 0.000011 ns/op len(32)
2311
func BenchmarkCompressEncode(t *testing.B) {
2412
encoder := NewCompress()
25-
j, err := job.New("jobTopic", "131223", 1, 1, job.Body(body), func(name string) lock.Locker {
13+
j, err := job.New("jobTopic", "131223", 1, 1, job.Body(longText), func(name string) lock.Locker {
2614
return nil
2715
})
2816
require.NoError(t, err)
@@ -35,7 +23,7 @@ func BenchmarkCompressEncode(t *testing.B) {
3523
// BenchmarkJSONEncode 1000000000 0.000024 ns/op len(92)
3624
func BenchmarkJSONEncode(t *testing.B) {
3725
encoder := NewJSON()
38-
j, err := job.New("jobTopic", "131223", 1, 1, job.Body(body), func(name string) lock.Locker {
26+
j, err := job.New("jobTopic", "131223", 1, 1, job.Body(longText), func(name string) lock.Locker {
3927
return nil
4028
})
4129
require.NoError(t, err)

pkg/encode/encode_test.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package encode
2+
3+
import (
4+
"github.com/changsongl/delay-queue/job"
5+
"github.com/changsongl/delay-queue/pkg/lock"
6+
"github.com/stretchr/testify/require"
7+
"testing"
8+
)
9+
10+
var longText = `
11+
The Bible is not a single book but a collection of books, whose complex development is not completely understood. The books began as songs and stories orally transmitted from generation to generation before being written down in a process that began sometime around the start of the first millennium BCE and continued for over a thousand years. The Bible was written and compiled by many people, from a variety of disparate cultures, most of whom are unknown.[18] British biblical scholar John K. Riches wrote:[19]
12+
[T]he biblical texts were produced over a period in which the living conditions of the writers – political, cultural, economic, and ecological – varied enormously. There are texts which reflect a nomadic existence, texts from people with an established monarchy and Temple cult, texts from exile, texts born out of fierce oppression by foreign rulers, courtly texts, texts from wandering charismatic preachers, texts from those who give themselves the airs of sophisticated Hellenistic writers. It is a time-span which encompasses the compositions of Homer, Plato, Aristotle, Thucydides, Sophocles, Caesar, Cicero, and Catullus. It is a period which sees the rise and fall of the Assyrian empire (twelfth to seventh century) and of the Persian empire (sixth to fourth century), Alexander's campaigns (336–326), the rise of Rome and its domination of the Mediterranean (fourth century to the founding of the Principate, 27 BCE), the destruction of the Jerusalem Temple (70 CE), and the extension of Roman rule to parts of Scotland (84 CE).
13+
Hebrew Bible from 1300. page 20, Genesis.
14+
Hebrew Bible from 1300. Genesis.
15+
Considered to be scriptures (sacred, authoritative religious texts), the books were compiled by different religious communities into various biblical canons (official collections of scriptures). The earliest compilation, containing the first five books of the Bible and called the Torah (meaning "law", "instruction", or "teaching") or Pentateuch ("five books"), was accepted as Jewish canon by the 5th century BCE. A second collection of narrative histories and prophesies, called the Nevi'im ("prophets"), was canonized in the 3rd century BCE. A third collection called the Ketuvim ("writings"), containing psalms, proverbs, and narrative histories, was canonized sometime between the 2nd century BCE and the 2nd century CE. These three collections were written mostly in Hebrew, with some parts in Aramaic, and together form the Hebrew Bible or "TaNaKh" (a portmanteau of "Torah", "Nevi'im", and "Ketuvim").[20]
16+
Greek-speaking Jews in Alexandria and elsewhere in the Jewish diaspora considered additional scriptures, composed between 200 BCE and 100 CE and not included in the Hebrew Bible, to be canon. These additional texts were included in a translation of the Hebrew Bible into Koine Greek (common Greek spoken by ordinary people) known as the Septuagint (meaning "the work of the seventy"), which began as a translation of the Torah made around 250 BCE and continued to develop for several centuries. The Septuagint contained all of the books of the Hebrew Bible, re-organized and with some textual differences, with the additional scriptures interspersed throughout.[21]
17+
Saint Paul Writing His Epistles, 16th-century painting.
18+
During the rise of Christianity in the 1st century CE, new scriptures were written in Greek about the life and teachings of Jesus Christ, who Christians believed was the messiah prophesized in the books of the Hebrew Bible. Two collections of these new scriptures – the Pauline epistles and the Gospels – were accepted as canon by the end of the 2nd century CE. A third collection, the catholic epistles, were canonized over the next few centuries. Christians called these new scriptures the "New Testament", and began referring to the Septuagint as the "Old Testament".[22]
19+
Between 385 and 405 CE, the early Christian church translated its canon into Vulgar Latin (the common Latin spoken by ordinary people), a translation known as the Vulgate, which included in its Old Testament the books that were in the Septuagint but not in the Hebrew Bible. The Vulgate introduced stability to the Bible, but also began the East-West Schism between Latin-speaking Western Christianity (led by the Catholic Church) and multi-lingual Eastern Christianity (led by the Eastern Orthodox Church). Christian denominations' biblical canons varied not only in the language of the books, but also in their selection, organization, and text.[23]
20+
Jewish rabbis began developing a standard Hebrew Bible in the 1st century CE, maintained since the middle of the first millennium by the Masoretes, and called the Masoretic Text. Christians have held ecumenical councils to standardize their biblical canon since the 4th century CE. The Council of Trent (1545–63), held by the Catholic Church in response to the Protestant Reformation, authorized the Vulgate as its official Latin translation of the Bible. The Church deemed the additional books in its Old Testament that were interspersed among the Hebrew Bible books to be "deuterocanonical" (meaning part of a second or later canon). Protestant Bibles either separated these books into a separate section called the "Apocrypha" (meaning "hidden away") between the Old and New Testaments, or omitted them altogether. The 17th-century Protestant King James Version was the most ubiquitous English Bible of all time, but it has largely been superseded by modern translations.[24]`
21+
22+
func runEncodeTest(t *testing.T, encoder Encoder) {
23+
for _, j := range testCases(t) {
24+
str, err := encoder.Encode(j)
25+
require.NoError(t, err)
26+
27+
jDecode := &job.Job{}
28+
err = encoder.Decode(str, jDecode)
29+
t.Logf("%+v", jDecode)
30+
31+
require.NoError(t, err)
32+
require.Equal(t, j.ID, jDecode.ID)
33+
require.Equal(t, j.TTR, jDecode.TTR)
34+
require.Equal(t, j.Delay, jDecode.Delay)
35+
require.Equal(t, j.Topic, jDecode.Topic)
36+
require.True(t, j.Version.Equal(jDecode.Version))
37+
require.Equal(t, j.Body, jDecode.Body)
38+
require.Equal(t, j.Version.String(), jDecode.Version.String())
39+
}
40+
}
41+
42+
func testCases(t *testing.T) []*job.Job {
43+
jEmpty, err := job.New("jobTopic", "1sdsa", 0, 0, "", func(name string) lock.Locker {
44+
return nil
45+
})
46+
require.NoError(t, err)
47+
48+
jAllEmpty := &job.Job{}
49+
50+
jShort, err := job.New("jobTopicdsad", "1sdsadsads", 10,
51+
20, "gfdgsdas", func(name string) lock.Locker {
52+
return nil
53+
},
54+
)
55+
require.NoError(t, err)
56+
57+
jLong, err := job.New(job.Topic(longText), job.ID(longText), 10000000000,
58+
10000000000, job.Body(longText), func(name string) lock.Locker {
59+
return nil
60+
},
61+
)
62+
require.NoError(t, err)
63+
64+
return []*job.Job{
65+
jEmpty,
66+
jAllEmpty,
67+
jShort,
68+
jLong,
69+
}
70+
}

pkg/encode/json_test.go

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,10 @@
11
package encode
22

33
import (
4-
"fmt"
5-
"github.com/changsongl/delay-queue/job"
6-
"github.com/changsongl/delay-queue/pkg/lock"
7-
"github.com/stretchr/testify/require"
84
"testing"
95
)
106

117
func TestEncoder(t *testing.T) {
128
encoder := NewJSON()
13-
j, err := job.New("jobTopic", "1", 1, 1, "", func(name string) lock.Locker {
14-
return nil
15-
})
16-
require.NoError(t, err)
17-
18-
str, err := encoder.Encode(j)
19-
require.NoError(t, err)
20-
21-
jDecode := &job.Job{}
22-
err = encoder.Decode(str, jDecode)
23-
fmt.Println(j.Version.String())
24-
25-
require.NoError(t, err)
26-
require.Equal(t, j.ID, jDecode.ID)
27-
require.Equal(t, j.TTR, jDecode.TTR)
28-
require.Equal(t, j.Delay, jDecode.Delay)
29-
require.Equal(t, j.Topic, jDecode.Topic)
30-
require.True(t, j.Version.Equal(jDecode.Version))
31-
require.Equal(t, j.Body, jDecode.Body)
32-
require.Equal(t, j.Version.String(), jDecode.Version.String())
9+
runEncodeTest(t, encoder)
3310
}

store/redis/store.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,6 @@ func (s storage) GetLock(name string) lock.Locker {
2020
}
2121

2222
// NewStore return a redis storage
23-
func NewStore(r redis.Redis, e encode.Encoder) store.Store {
24-
return &storage{rds: r, encoder: e}
23+
func NewStore(r redis.Redis) store.Store {
24+
return &storage{rds: r, encoder: encode.NewCompress()}
2525
}

test/integration/integration_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func TestDelayQueueAddAndRemove(t *testing.T) {
3131
for i := 0; i < Jobs; i++ {
3232
delayTime := rand.Intn(DelayTimeSeconds)
3333
id := fmt.Sprintf("test-%d", i)
34-
j, err := job.New(topic, id, job.DelayOption(time.Duration(delayTime)*time.Second))
34+
j, err := job.New(topic, id, job.DelayOption(time.Duration(delayTime)*time.Second), job.BodyOption("a body"))
3535
require.NoError(t, err)
3636

3737
err = AddJobRecord(key, id)
@@ -50,6 +50,7 @@ func TestDelayQueueAddAndRemove(t *testing.T) {
5050
ch := c.Consume()
5151
for jobMsg := range ch {
5252
id := jobMsg.GetID()
53+
//t.Logf("%+v", jobMsg)
5354
err := DeleteJobRecord(key, id)
5455
require.NoError(t, err)
5556

0 commit comments

Comments
 (0)