Skip to content

Commit 8a4edd4

Browse files
committed
design octopus v1.3.1 - max delay code
1 parent 09d931b commit 8a4edd4

File tree

1 file changed

+36
-0
lines changed

1 file changed

+36
-0
lines changed

octopus/pipe_spl_maxdelay.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package octopus
2+
3+
import (
4+
"fmt"
5+
"time"
6+
)
7+
8+
func (o *octopus) makeMaxDelayPipe(opNodeChSet *NodeChSet) *NodeChSet {
9+
listenNodeCh := make(chan *Node)
10+
listenQuitCh := make(chan int, 1)
11+
listenChSet := MakeNodeChSet(listenNodeCh, listenQuitCh)
12+
go connectWithTimeout(listenNodeCh, listenQuitCh, opNodeChSet, o.timeToQuit)
13+
return listenChSet
14+
}
15+
16+
func connectWithTimeout(listenNodeCh <-chan *Node, listenQuitCh <-chan int,
17+
opNodeChSet *NodeChSet, timeoutDuration time.Duration) {
18+
timer := time.NewTimer(timeoutDuration)
19+
for {
20+
select {
21+
case node := <-listenNodeCh:
22+
opNodeChSet.NodeCh <- node
23+
if !timer.Stop() {
24+
<-timer.C
25+
}
26+
timer.Reset(timeoutDuration)
27+
case i := <-listenQuitCh:
28+
opNodeChSet.QuitCh <- i
29+
return
30+
case <-timer.C:
31+
fmt.Println("Timeout Triggered in MaxDelayTimeout Channel")
32+
opNodeChSet.QuitCh <- 1
33+
return
34+
}
35+
}
36+
}

0 commit comments

Comments
 (0)