@@ -31,7 +31,7 @@ func TestDelayQueueAddAndRemove(t *testing.T) {
3131 for i := 0 ; i < Jobs ; i ++ {
3232 delayTime := rand .Intn (DelayTimeSeconds )
3333 id := fmt .Sprintf ("test-%d" , i )
34- j , err := job .New (topic , id , job .JobDelayOption (time .Duration (delayTime )* time .Second ))
34+ j , err := job .New (topic , id , job .DelayOption (time .Duration (delayTime )* time .Second ))
3535 require .NoError (t , err )
3636
3737 err = AddJobRecord (key , id )
@@ -49,7 +49,7 @@ func TestDelayQueueAddAndRemove(t *testing.T) {
4949 c := consumer .New (cli , topic , consumer .WorkerNumOption (1 ))
5050 ch := c .Consume ()
5151 for jobMsg := range ch {
52- id := jobMsg .GetId ()
52+ id := jobMsg .GetID ()
5353 err := DeleteJobRecord (key , id )
5454 require .NoError (t , err )
5555
@@ -78,7 +78,7 @@ func TestDelayQueueTTR(t *testing.T) {
7878 t .Parallel ()
7979
8080 topic , id := "TestDelayQueueTTR-topic" , "000"
81- j , err := job .New (topic , id , job .JobDelayOption (10 * time .Second ), job .JobTTROption (5 * time .Second ))
81+ j , err := job .New (topic , id , job .DelayOption (10 * time .Second ), job .TTROption (5 * time .Second ))
8282 require .NoError (t , err )
8383
8484 cli := client .NewClient (DelayQueueAddr )
@@ -94,7 +94,7 @@ func TestDelayQueueTTR(t *testing.T) {
9494 c := consumer .New (cli , topic , consumer .WorkerNumOption (2 ))
9595 ch := c .Consume ()
9696 for jobMsg := range ch {
97- jobID := jobMsg .GetId ()
97+ jobID := jobMsg .GetID ()
9898 t .Logf ("Receive job(id: %s): %d" , jobID , time .Now ().Unix ())
9999 if id == jobID {
100100 v := atomic .LoadInt64 (& num )
@@ -109,42 +109,41 @@ func TestDelayQueueTTR(t *testing.T) {
109109 require .LessOrEqual (t , int64 (4 ), num , "retry time should be equal" )
110110}
111111
112- // TODO: test block
113- // Testing ttr, consume but don't finish or delete.
114- // Message should be consume again.
115- //func TestDelayQueueBlockPop(t *testing.T) {
116- // t.Parallel()
117- //
118- // topic, id := "TestDelayQueueBlockPop-topic", "111"
119- // j, err := job.New(topic, id, job.JobDelayOption(0*time.Second))
120- // require.NoError(t, err)
121- //
122- // blockTime := 5 * time.Second
123- //
124- // cli := client.NewClient(DelayQueueAddr)
125- //
126- // var totalTime time.Duration = 0
127- // go func() {
128- // // consume jobs
129- // c := consumer.New(cli, topic, consumer.WorkerNumOption(1))
130- // ch := c.Consume()
131- // startTime := time.Now()
132- // for jobMsg := range ch {
133- // jobID := jobMsg.GetId()
134- // t.Logf("Receive job(id: %s): %d", jobID, time.Now().Unix())
135- // if id == jobID {
136- // totalTime += time.Since(startTime)
137- // }
138- // }
139- // }()
140- //
141- // time.Sleep(blockTime - 3*time.Second)
142- // t.Logf("Add job: %d", time.Now().Unix())
143- // err = cli.AddJob(j)
144- // require.NoError(t, err)
145- //
146- // time.Sleep(blockTime)
147- // t.Log("total-time", totalTime)
148- // require.Greater(t, totalTime, time.Duration(0))
149- // require.LessOrEqual(t, totalTime, blockTime)
150- //}
112+ //Testing ttr, consume but don't finish or delete.
113+ //Message should be consume again.
114+ func TestDelayQueueBlockPop (t * testing.T ) {
115+ t .Parallel ()
116+
117+ topic , id := "TestDelayQueueBlockPop-topic" , "111"
118+ j , err := job .New (topic , id , job .DelayOption (0 * time .Second ))
119+ require .NoError (t , err )
120+
121+ blockTime := 5 * time .Second
122+
123+ cli := client .NewClient (DelayQueueAddr )
124+
125+ var totalTime time.Duration = 0
126+ go func () {
127+ // consume jobs
128+ c := consumer .New (cli , topic , consumer .WorkerNumOption (1 ), consumer .PopTimeoutOption (blockTime ))
129+ ch := c .Consume ()
130+ startTime := time .Now ()
131+ for jobMsg := range ch {
132+ jobID := jobMsg .GetID ()
133+ t .Logf ("Receive job(id: %s): %d" , jobID , time .Now ().Unix ())
134+ if id == jobID {
135+ totalTime += time .Since (startTime )
136+ }
137+ }
138+ }()
139+
140+ time .Sleep (blockTime - 3 * time .Second )
141+ t .Logf ("Add job: %d" , time .Now ().Unix ())
142+ err = cli .AddJob (j )
143+ require .NoError (t , err )
144+
145+ time .Sleep (blockTime )
146+ t .Log ("total-time" , totalTime )
147+ require .Greater (t , totalTime , time .Duration (0 ))
148+ require .LessOrEqual (t , totalTime , blockTime )
149+ }
0 commit comments