|
1 | 1 | package octopus |
2 | 2 |
|
| 3 | +import ( |
| 4 | + "fmt" |
| 5 | + "time" |
| 6 | +) |
| 7 | + |
3 | 8 | func (o *octopus) makeIngestPipe(inChSet *ingestPipeChSet, opChSet *NodeChSet) { |
4 | | - go channelConnector(inChSet, opChSet) |
5 | | - go setupStringIngestPipe(inChSet, opChSet) |
| 9 | + go channelConnector(inChSet, opChSet, o.timeToQuit, o.masterQuitCh) |
| 10 | + go setupStringIngestPipe(inChSet, opChSet, o.masterQuitCh) |
6 | 11 | } |
7 | 12 |
|
8 | | -func setupStringIngestPipe(inChSet *ingestPipeChSet, nodeOpChSet *NodeChSet) { |
| 13 | +func setupStringIngestPipe(inChSet *ingestPipeChSet, nodeOpChSet *NodeChSet, |
| 14 | + masterQuitCh chan int) { |
9 | 15 | for { |
10 | 16 | select { |
11 | 17 | case str := <-inChSet.StrCh: |
12 | 18 | { |
13 | 19 | nodeOpChSet.NodeCh <- createNode("", str, 1) |
14 | 20 | } |
15 | | - case i := <-inChSet.QuitCh: |
16 | | - { |
17 | | - nodeOpChSet.QuitCh <- i |
18 | | - } |
| 21 | + // case i := <-inChSet.QuitCh: |
| 22 | + // { |
| 23 | + // nodeOpChSet.QuitCh <- i |
| 24 | + // masterQuitCh <- i |
| 25 | + // } |
19 | 26 | } |
20 | 27 | } |
21 | 28 | } |
22 | 29 |
|
23 | | -func channelConnector(inChSet *ingestPipeChSet, opChSet *NodeChSet) { |
| 30 | +func channelConnector(inChSet *ingestPipeChSet, opChSet *NodeChSet, |
| 31 | + timeOut time.Duration, masterQuitCh chan int) { |
24 | 32 | for { |
25 | 33 | select { |
26 | 34 | case node := <-inChSet.NodeCh: |
27 | 35 | opChSet.NodeCh <- node |
28 | 36 | case i := <-inChSet.QuitCh: |
29 | | - opChSet.QuitCh <- i |
| 37 | + { |
| 38 | + fmt.Println("Quit Received on Ingest Channel") |
| 39 | + opChSet.QuitCh <- i |
| 40 | + masterQuitCh <- i |
| 41 | + } |
| 42 | + case <-time.After(timeOut * time.Second): |
| 43 | + { |
| 44 | + fmt.Println("Timeout Triggered in Ingest Channel") |
| 45 | + opChSet.QuitCh <- 1 |
| 46 | + return |
| 47 | + } |
30 | 48 | } |
31 | 49 | } |
32 | 50 | } |
0 commit comments