Skip to content

Commit c12c887

Browse files
committed
oct design v 1.4.3 - clean up contd
1 parent e23927a commit c12c887

File tree

2 files changed

+3
-40
lines changed

2 files changed

+3
-40
lines changed

octopus/pipe_spl_ingest.go

Lines changed: 2 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,7 @@
11
package octopus
22

3-
import (
4-
"time"
5-
)
6-
73
func (o *octopus) makeIngestPipe(inChSet *ingestPipeChSet, opChSet *NodeChSet) {
8-
go channelConnector(inChSet, opChSet, o.timeToQuit, o.masterQuitCh)
4+
go channelConnector(inChSet, opChSet)
95
go setupStringIngestPipe(inChSet, opChSet, o.masterQuitCh)
106
}
117

@@ -26,36 +22,11 @@ func setupStringIngestPipe(inChSet *ingestPipeChSet, nodeOpChSet *NodeChSet,
2622
}
2723
}
2824

29-
func channelConnector(inChSet *ingestPipeChSet, opChSet *NodeChSet,
30-
timeOut time.Duration, masterQuitCh chan int) {
31-
// timeOutTimer := time.NewTimer(timeOut)
25+
func channelConnector(inChSet *ingestPipeChSet, opChSet *NodeChSet) {
3226
for {
33-
// timeOutCh = time.After(timeOut * time.Second)
34-
// timeOutCh = time.NewTimer(timeOut)
3527
select {
3628
case node := <-inChSet.NodeCh:
3729
opChSet.NodeCh <- node
38-
// if !timeOutTimer.Stop() {
39-
// <-timeOutTimer.C
40-
// }
41-
// log.Println("abc")
42-
// timeOutTimer.Reset(timeOut)
43-
// case i := <-inChSet.QuitCh:
44-
// {
45-
// fmt.Println("Quit Received on Ingest Channel")
46-
// opChSet.QuitCh <- i
47-
// masterQuitCh <- i
48-
// if !timeOutTimer.Stop() {
49-
// <-timeOutTimer.C
50-
// }
51-
// timeOutTimer.Reset(timeOut)
52-
// }
53-
// case <-timeOutTimer.C:
54-
// fmt.Println("Timeout Triggered in Ingest Channel")
55-
// opChSet.QuitCh <- 1
56-
// masterQuitCh <- 1
57-
// return
58-
5930
}
6031
}
6132
}

octopus/stdpipefunc.go

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,8 @@ type stdFunc func(*Node, *NodeChSet)
99
func stdLinearNodeFunc(stdFn stdFunc, outChSet *NodeChSet) *NodeChSet {
1010
listenCh := make(chan *Node)
1111
listenQuitCh := make(chan int, 1)
12-
listenChSet := &NodeChSet{
13-
NodeCh: listenCh,
14-
StdChannels: &StdChannels{
15-
QuitCh: listenQuitCh,
16-
},
17-
}
18-
// listenChSet := MakeNodeChSet(listenCh, listenQuitCh)
12+
listenChSet := MakeNodeChSet(listenCh, listenQuitCh)
1913
go func() {
20-
// defer close(listenCh)
21-
// defer close(listenQuitCh)
2214
for {
2315
select {
2416
case node := <-listenCh:

0 commit comments

Comments
 (0)