Skip to content

Commit 2c5b0fc

Browse files
committed
增加Redis连接、读取、写入超时配置
1 parent 47a5dbd commit 2c5b0fc

File tree

5 files changed

+44
-27
lines changed

5 files changed

+44
-27
lines changed

cmd/cmd.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/ouqiang/delay-queue/delayqueue"
1010
"log"
1111
"fmt"
12+
_ "net/http/pprof"
1213
)
1314

1415
type Cmd struct {}

config/config.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ const (
1919
DefaultRedisPassword = ""
2020
DefaultRedisMaxIdle = 30
2121
DefaultRedisMaxActive = 0
22+
DefaultRedisConnectTimeout = 5
23+
DefaultRedisReadTimeout = 1
24+
DefaultRedisWriteTimeout = 1
2225
)
2326

2427
type Config struct {
@@ -33,6 +36,9 @@ type RedisConfig struct {
3336
Password string
3437
MaxIdle int // 连接池最大空闲连接数
3538
MaxActive int // 连接池最大激活连接数
39+
ConnectTimeout int // 连接超时, 单位秒
40+
ReadTimeout int // 读取超时, 单位秒
41+
WriteTimeout int // 写入超时, 单位秒
3642
}
3743

3844
func Init(path string) {
@@ -59,6 +65,9 @@ func (config *Config) parse(path string) {
5965
config.Redis.Password = section.Key("redis.password").MustString(DefaultRedisPassword)
6066
config.Redis.MaxIdle = section.Key("redis.max_idle").MustInt(DefaultRedisMaxIdle)
6167
config.Redis.MaxActive = section.Key("redis.max_active").MustInt(DefaultRedisMaxActive)
68+
config.Redis.ConnectTimeout = section.Key("redis.connect_timeout").MustInt(DefaultRedisConnectTimeout)
69+
config.Redis.ReadTimeout = section.Key("redis.read_timeout").MustInt(DefaultRedisReadTimeout)
70+
config.Redis.WriteTimeout = section.Key("redis.write_timeout").MustInt(DefaultRedisWriteTimeout)
6271
}
6372

6473

@@ -70,4 +79,7 @@ func (config *Config) initDefaultConfig() {
7079
config.Redis.Password = DefaultRedisPassword
7180
config.Redis.MaxIdle = DefaultRedisMaxIdle
7281
config.Redis.MaxActive = DefaultRedisMaxActive
82+
config.Redis.ConnectTimeout = DefaultRedisConnectTimeout
83+
config.Redis.ReadTimeout = DefaultRedisReadTimeout
84+
config.Redis.WriteTimeout = DefaultRedisWriteTimeout
7385
}

delayqueue/delay_queue.go

Lines changed: 15 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"github.com/ouqiang/delay-queue/config"
66
"fmt"
77
"log"
8+
"errors"
89
)
910
const (
1011
BucketRedisKey = "dq_bucket_%d"
@@ -13,27 +14,34 @@ const (
1314
var (
1415
// 每个定时器对应一个bucket
1516
timers []*time.Ticker
16-
// 保存待放入bucket中的job
17-
queue chan Job
1817
// bucket名称chan
1918
bucketNameChan chan string
2019
)
2120

2221
func Init() {
2322
RedisPool = initRedisPool()
2423
initTimers()
25-
queue = make(chan Job, 100)
2624
bucketNameChan = generateBucketName()
27-
go waitJob()
2825
}
2926

3027
// 添加一个Job到队列中
31-
func Push(job Job) {
28+
func Push(job Job) error {
3229
if job.Id == "" || job.Topic == "" || job.Delay < 0 || job.TTR <= 0 {
33-
return
30+
return errors.New("invalid job")
3431
}
3532

36-
queue <- job
33+
err := putJob(job.Id, job)
34+
if err != nil {
35+
log.Printf("添加job到job pool失败#job-%+v#%s", job, err.Error())
36+
return err
37+
}
38+
err = pushToBucket(<-bucketNameChan, job.Delay, job.Id)
39+
if err != nil {
40+
log.Printf("添加job到bucket失败#job-%+v#%s", job, err.Error())
41+
return err
42+
}
43+
44+
return nil
3745
}
3846

3947
// 获取Job
@@ -70,22 +78,6 @@ func Remove(jobId string) error {
7078
return removeJob(jobId)
7179
}
7280

73-
74-
func waitJob() {
75-
var err error
76-
for job := range queue {
77-
err = putJob(job.Id, job)
78-
if err != nil {
79-
log.Printf("添加job到job pool失败#job-%+v#%s", job, err.Error())
80-
continue
81-
}
82-
err = pushToBucket(<-bucketNameChan, job.Delay, job.Id)
83-
if err != nil {
84-
log.Printf("添加job到bucket失败#job-%+v#%s", job, err.Error())
85-
}
86-
}
87-
}
88-
8981
// 轮询获取Job名称, 使job分布到不同bucket中, 提高扫描速度
9082
func generateBucketName() (chan string) {
9183
c := make(chan string)

delayqueue/redis.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,13 @@ func initRedisPool() *redis.Pool {
2727

2828
// 连接redis
2929
func redisDial() (redis.Conn, error) {
30-
conn, err := redis.Dial("tcp", config.Setting.Redis.Host)
30+
conn, err := redis.Dial(
31+
"tcp",
32+
config.Setting.Redis.Host,
33+
redis.DialConnectTimeout(time.Duration(config.Setting.Redis.ConnectTimeout) * time.Second),
34+
redis.DialReadTimeout(time.Duration(config.Setting.Redis.ReadTimeout) * time.Second),
35+
redis.DialWriteTimeout(time.Duration(config.Setting.Redis.WriteTimeout) * time.Second),
36+
)
3137
if err != nil {
3238
log.Printf("连接redis失败#%s", err.Error())
3339
return nil, err
@@ -43,10 +49,12 @@ func redisDial() (redis.Conn, error) {
4349

4450
_, err = conn.Do("SELECT", config.Setting.Redis.Db)
4551
if err != nil {
52+
conn.Close()
4653
log.Printf("redis选择数据库失败#%s", err.Error())
54+
return nil, err
4755
}
4856

49-
return conn, err
57+
return conn, nil
5058
}
5159

5260
// 从池中取出连接后,判断连接是否有效

routers/routers.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,13 @@ func Push(resp http.ResponseWriter, req *http.Request) {
5151

5252
log.Printf("add job#%+v\n", job)
5353
job.Delay = time.Now().Unix() + job.Delay
54-
delayqueue.Push(job)
54+
err = delayqueue.Push(job)
5555

56-
resp.Write(generateSuccessBody("添加成功", nil))
56+
if err != nil {
57+
resp.Write(generateFailureBody("添加失败"))
58+
} else {
59+
resp.Write(generateSuccessBody("添加成功", nil))
60+
}
5761
}
5862

5963
// 获取job

0 commit comments

Comments
 (0)