Skip to content

Commit d1a1a14

Browse files
committed
compress
1 parent cd6139e commit d1a1a14

File tree

12 files changed

+316
-54
lines changed

12 files changed

+316
-54
lines changed

job/field.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,33 @@ type TTR time.Duration
1818

1919
// Body job body
2020
type Body string
21+
22+
// Empty interface
23+
type Empty interface {
24+
IsEmpty() bool
25+
}
26+
27+
// IsEmpty topic is empty
28+
func (t Topic) IsEmpty() bool {
29+
return t == ""
30+
}
31+
32+
// IsEmpty id is empty
33+
func (id ID) IsEmpty() bool {
34+
return id == ""
35+
}
36+
37+
// IsEmpty body is empty
38+
func (b Body) IsEmpty() bool {
39+
return b == ""
40+
}
41+
42+
// IsEmpty delay is empty
43+
func (d Delay) IsEmpty() bool {
44+
return d == 0
45+
}
46+
47+
// IsEmpty ttr is empty
48+
func (t TTR) IsEmpty() bool {
49+
return t == 0
50+
}

job/job.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,3 +104,8 @@ func (j *Job) Lock() error {
104104
func (j *Job) Unlock() (bool, error) {
105105
return j.Mutex.Unlock()
106106
}
107+
108+
// SetVersion set job version by nano ts
109+
func (j *Job) SetVersion(ts int64) {
110+
j.Version.t = time.Unix(ts/1e9, ts%1e9)
111+
}

job/version.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@ func (v Version) String() string {
2020
return strconv.FormatInt(v.t.UnixNano(), 10)
2121
}
2222

23+
// UInt64 function
24+
func (v Version) UInt64() uint64 {
25+
return uint64(v.t.UnixNano())
26+
}
27+
2328
// LoadVersion load version from a string
2429
func LoadVersion(vs string) (Version, error) {
2530
version := Version{}

pkg/encode/compress.go

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
package encode
2+
3+
import (
4+
"bytes"
5+
"encoding/binary"
6+
"github.com/changsongl/delay-queue/job"
7+
)
8+
9+
// tag s
10+
const (
11+
TagID uint64 = iota
12+
TagTopic
13+
TagDelay
14+
TagTTR
15+
TagBody
16+
TagVersion
17+
)
18+
19+
const (
20+
// TagLength tag length
21+
TagLength = 1
22+
23+
// MaxUInt64Length variant max uint64 length
24+
MaxUInt64Length = 10
25+
)
26+
27+
type compress struct {
28+
}
29+
30+
// NewCompress create a json encoder
31+
func NewCompress() Encoder {
32+
return &compress{}
33+
}
34+
35+
// Encode compress encode
36+
func (c *compress) Encode(j *job.Job) ([]byte, error) {
37+
buf := make([]byte, c.bufLength(j))
38+
written := 0
39+
if !j.Delay.IsEmpty() {
40+
written += c.PutUInt64(TagDelay, uint64(j.Delay), buf[written:])
41+
}
42+
if !j.TTR.IsEmpty() {
43+
written += c.PutUInt64(TagTTR, uint64(j.TTR), buf[written:])
44+
}
45+
written += c.PutUInt64(TagVersion, j.Version.UInt64(), buf[written:])
46+
47+
if !j.ID.IsEmpty() {
48+
written += c.PutString(TagID, string(j.ID), buf[written:])
49+
}
50+
if !j.Body.IsEmpty() {
51+
written += c.PutString(TagBody, string(j.Body), buf[written:])
52+
}
53+
if !j.Topic.IsEmpty() {
54+
written += c.PutString(TagTopic, string(j.Topic), buf[written:])
55+
}
56+
57+
return buf[:written], nil
58+
}
59+
60+
// Decode compress decode
61+
func (c *compress) Decode(b []byte, j *job.Job) error {
62+
index := 0
63+
for index < len(b) {
64+
tag, err := binary.ReadUvarint(bytes.NewBuffer(b[index:]))
65+
if err != nil {
66+
return err
67+
}
68+
index++
69+
70+
switch tag {
71+
case TagID:
72+
id, indexInc := c.ReadString(b[index:])
73+
j.ID = job.ID(id)
74+
index += indexInc
75+
case TagTopic:
76+
topic, indexInc := c.ReadString(b[index:])
77+
j.Topic = job.Topic(topic)
78+
index += indexInc
79+
case TagBody:
80+
body, indexInc := c.ReadString(b[index:])
81+
j.Body = job.Body(body)
82+
index += indexInc
83+
case TagTTR:
84+
ttr, indexInc := c.ReadUint64(b[index:])
85+
j.TTR = job.TTR(ttr)
86+
index += indexInc
87+
case TagDelay:
88+
delay, indexInc := c.ReadUint64(b[index:])
89+
j.Delay = job.Delay(delay)
90+
index += indexInc
91+
case TagVersion:
92+
ts, indexInc := c.ReadUint64(b[index:])
93+
j.SetVersion(int64(ts))
94+
index += indexInc
95+
}
96+
}
97+
return nil
98+
}
99+
100+
func (c *compress) bufLength(j *job.Job) int {
101+
l := (TagLength+MaxUInt64Length)*5 + len(j.ID) + len(j.Topic)
102+
if j.Body != "" {
103+
l += TagLength + MaxUInt64Length + len(j.Topic)
104+
}
105+
return l
106+
}
107+
108+
func (c *compress) ReadUint64(buf []byte) (uint64, int) {
109+
return binary.Uvarint(buf)
110+
}
111+
112+
func (c *compress) PutUInt64(tag uint64, num uint64, buf []byte) int {
113+
written := binary.PutUvarint(buf, tag)
114+
written += binary.PutUvarint(buf[written:], num)
115+
return written
116+
}
117+
118+
func (c *compress) ReadString(buf []byte) (string, int) {
119+
l, inc := binary.Uvarint(buf)
120+
end := inc + int(l)
121+
return string(buf[inc:end]), end
122+
}
123+
124+
func (c *compress) PutString(tag uint64, str string, buf []byte) int {
125+
l := len(str)
126+
written := binary.PutUvarint(buf, tag)
127+
written += binary.PutUvarint(buf[written:], uint64(l))
128+
chs := make([]uint8, 0, l)
129+
for _, ch := range str {
130+
chs = append(chs, uint8(ch))
131+
}
132+
133+
for _, ch := range []byte(str) {
134+
buf[written] = ch
135+
written++
136+
}
137+
138+
return written
139+
}

pkg/encode/compress_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package encode
2+
3+
import (
4+
"fmt"
5+
"github.com/changsongl/delay-queue/job"
6+
"github.com/changsongl/delay-queue/pkg/lock"
7+
"github.com/stretchr/testify/require"
8+
"testing"
9+
)
10+
11+
func TestCompressEncoder(t *testing.T) {
12+
encoder := NewCompress()
13+
j, err := job.New("jobTopic11", "1222", 10, 5, "", func(name string) lock.Locker {
14+
return nil
15+
})
16+
require.NoError(t, err)
17+
18+
str, err := encoder.Encode(j)
19+
require.NoError(t, err)
20+
fmt.Println(str)
21+
22+
jDecode := &job.Job{}
23+
err = encoder.Decode(str, jDecode)
24+
fmt.Printf("%+v", jDecode)
25+
26+
require.NoError(t, err)
27+
require.Equal(t, j.ID, jDecode.ID)
28+
require.Equal(t, j.TTR, jDecode.TTR)
29+
require.Equal(t, j.Delay, jDecode.Delay)
30+
require.Equal(t, j.Topic, jDecode.Topic)
31+
require.True(t, j.Version.Equal(jDecode.Version))
32+
require.Equal(t, j.Body, jDecode.Body)
33+
require.Equal(t, j.Version.String(), jDecode.Version.String())
34+
}

pkg/encode/encode.go

Lines changed: 5 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,11 @@
11
package encode
22

3-
import "encoding/json"
3+
import (
4+
"github.com/changsongl/delay-queue/job"
5+
)
46

57
// Encoder encoder interface
68
type Encoder interface {
7-
Encode(interface{}) (string, error)
8-
Decode(string, interface{}) error
9-
}
10-
11-
// implemented Encoder interface
12-
type jsonEncoder struct {
13-
}
14-
15-
// New create an encoder
16-
func New() Encoder {
17-
return &jsonEncoder{}
18-
}
19-
20-
// Encode encode function
21-
func (j jsonEncoder) Encode(i interface{}) (string, error) {
22-
bytes, err := json.Marshal(i)
23-
if err != nil {
24-
return "", err
25-
}
26-
27-
return string(bytes), nil
28-
}
29-
30-
// Decode decode function
31-
func (j jsonEncoder) Decode(s string, i interface{}) error {
32-
return json.Unmarshal([]byte(s), i)
9+
Encode(*job.Job) ([]byte, error)
10+
Decode([]byte, *job.Job) error
3311
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package encode
2+
3+
import (
4+
"github.com/changsongl/delay-queue/job"
5+
"github.com/changsongl/delay-queue/pkg/lock"
6+
"github.com/stretchr/testify/require"
7+
"testing"
8+
)
9+
10+
// BenchmarkCompressEncode 1000000000 0.000011 ns/op len(32)
11+
func BenchmarkCompressEncode(t *testing.B) {
12+
encoder := NewCompress()
13+
j, err := job.New("jobTopic", "131223", 1, 1, "", func(name string) lock.Locker {
14+
return nil
15+
})
16+
require.NoError(t, err)
17+
18+
bytes, _ := encoder.Encode(j)
19+
20+
_ = encoder.Decode(bytes, j)
21+
}
22+
23+
// BenchmarkJSONEncode 1000000000 0.000024 ns/op len(92)
24+
func BenchmarkJSONEncode(t *testing.B) {
25+
encoder := NewJSON()
26+
j, err := job.New("jobTopic", "131223", 1, 1, "", func(name string) lock.Locker {
27+
return nil
28+
})
29+
require.NoError(t, err)
30+
31+
bytes, _ := encoder.Encode(j)
32+
_ = encoder.Decode(bytes, j)
33+
}

pkg/encode/encode_test.go

Lines changed: 0 additions & 24 deletions
This file was deleted.

pkg/encode/json.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package encode
2+
3+
import (
4+
"encoding/json"
5+
"github.com/changsongl/delay-queue/job"
6+
)
7+
8+
// implemented Encoder interface
9+
type jsonEncoder struct {
10+
}
11+
12+
// NewJSON create a json encoder
13+
func NewJSON() Encoder {
14+
return &jsonEncoder{}
15+
}
16+
17+
// Encode encode function
18+
func (j jsonEncoder) Encode(i *job.Job) ([]byte, error) {
19+
bytes, err := json.Marshal(i)
20+
if err != nil {
21+
return nil, err
22+
}
23+
24+
return bytes, nil
25+
}
26+
27+
// Decode decode function
28+
func (j jsonEncoder) Decode(b []byte, i *job.Job) error {
29+
return json.Unmarshal(b, i)
30+
}

pkg/encode/json_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package encode
2+
3+
import (
4+
"fmt"
5+
"github.com/changsongl/delay-queue/job"
6+
"github.com/changsongl/delay-queue/pkg/lock"
7+
"github.com/stretchr/testify/require"
8+
"testing"
9+
)
10+
11+
func TestEncoder(t *testing.T) {
12+
encoder := NewJSON()
13+
j, err := job.New("jobTopic", "1", 1, 1, "", func(name string) lock.Locker {
14+
return nil
15+
})
16+
require.NoError(t, err)
17+
18+
str, err := encoder.Encode(j)
19+
require.NoError(t, err)
20+
21+
jDecode := &job.Job{}
22+
err = encoder.Decode(str, jDecode)
23+
fmt.Println(j.Version.String())
24+
25+
require.NoError(t, err)
26+
require.Equal(t, j.ID, jDecode.ID)
27+
require.Equal(t, j.TTR, jDecode.TTR)
28+
require.Equal(t, j.Delay, jDecode.Delay)
29+
require.Equal(t, j.Topic, jDecode.Topic)
30+
require.True(t, j.Version.Equal(jDecode.Version))
31+
require.Equal(t, j.Body, jDecode.Body)
32+
require.Equal(t, j.Version.String(), jDecode.Version.String())
33+
}

0 commit comments

Comments
 (0)