Skip to content

Commit ee7cbe1

Browse files
committed
Use a go-routine pool for payload ingest. Fixes #763
1 parent a77a36e commit ee7cbe1

File tree

9 files changed

+123
-150
lines changed

9 files changed

+123
-150
lines changed

edge-apis/pool.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"math/rand/v2"
2525
"net"
2626
"net/url"
27+
"slices"
2728
"sync/atomic"
2829
"time"
2930
)
@@ -35,7 +36,7 @@ type ApiClientTransport struct {
3536

3637
// ClientTransportPool abstracts the concept of multiple `runtime.ClientTransport` (openapi interface) representing one
3738
// target OpenZiti network. In situations where controllers are running in HA mode (multiple controllers) this
38-
// interface can attempt to try different controller during outages or partitioning.
39+
// interface can attempt to try a different controller during outages or partitioning.
3940
type ClientTransportPool interface {
4041
runtime.ClientTransport
4142

@@ -265,6 +266,5 @@ func selectAndRemoveRandom[T any](slice []T, zero T) (selected T, modifiedSlice
265266
rng := rand.New(rand.NewPCG(seed, seed))
266267
index := rng.IntN(len(slice))
267268
selected = slice[index]
268-
modifiedSlice = append(slice[:index], slice[index+1:]...)
269-
return selected, modifiedSlice
269+
return selected, slices.Delete(slice, index, index+1)
270270
}

xgress/circuit_inspections.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,12 @@ type InspectDetail struct {
6262
Goroutines []string `json:"goroutines"`
6363
Sequence uint64 `json:"sequence"`
6464
Flags string `json:"flags"`
65+
LastSizeSent uint32 `json:"lastSizeSent"`
6566
}
6667

6768
type SendBufferDetail struct {
6869
WindowSize uint32 `json:"windowSize"`
70+
QueuedPayloadCount int `json:"queuedPayloadCount"`
6971
LinkSendBufferSize uint32 `json:"linkSendBufferSize"`
7072
LinkRecvBufferSize uint32 `json:"linkRecvBufferSize"`
7173
Accumulator uint32 `json:"accumulator"`
@@ -85,7 +87,6 @@ type SendBufferDetail struct {
8587
type RecvBufferDetail struct {
8688
Size uint32 `json:"size"`
8789
PayloadCount uint32 `json:"payloadCount"`
88-
LastSizeSent uint32 `json:"lastSizeSent"`
8990
Sequence int32 `json:"sequence"`
9091
MaxSequence int32 `json:"maxSequence"`
9192
NextPayload string `json:"nextPayload"`

xgress/link_receive_buffer.go

Lines changed: 45 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -21,22 +21,24 @@ import (
2121
"github.com/emirpasic/gods/trees/btree"
2222
"github.com/emirpasic/gods/utils"
2323
"github.com/michaelquigley/pfxlog"
24+
"sync"
2425
"sync/atomic"
25-
"time"
2626
)
2727

2828
type LinkReceiveBuffer struct {
29-
tree *btree.Tree
30-
sequence int32
31-
maxSequence int32
32-
size uint32
33-
lastBufferSizeSent uint32
29+
sync.Mutex
30+
tree *btree.Tree
31+
sequence int32
32+
maxSequence int32
33+
size uint32
34+
txQueue chan *Payload
3435
}
3536

36-
func NewLinkReceiveBuffer() *LinkReceiveBuffer {
37+
func NewLinkReceiveBuffer(txQueueSize int32) *LinkReceiveBuffer {
3738
return &LinkReceiveBuffer{
3839
tree: btree.NewWith(10240, utils.Int32Comparator),
3940
sequence: -1,
41+
txQueue: make(chan *Payload, txQueueSize),
4042
}
4143
}
4244

@@ -45,6 +47,9 @@ func (buffer *LinkReceiveBuffer) Size() uint32 {
4547
}
4648

4749
func (buffer *LinkReceiveBuffer) ReceiveUnordered(x *Xgress, payload *Payload, maxSize uint32) bool {
50+
buffer.Lock()
51+
defer buffer.Unlock()
52+
4853
if payload.GetSequence() <= buffer.sequence {
4954
x.dataPlane.GetMetrics().MarkDuplicatePayload()
5055
return true
@@ -67,47 +72,56 @@ func (buffer *LinkReceiveBuffer) ReceiveUnordered(x *Xgress, payload *Payload, m
6772
} else {
6873
x.dataPlane.GetMetrics().MarkDuplicatePayload()
6974
}
75+
76+
buffer.queueNext()
77+
7078
return true
7179
}
7280

73-
func (buffer *LinkReceiveBuffer) PeekHead() *Payload {
81+
func (buffer *LinkReceiveBuffer) queueNext() {
7482
if val := buffer.tree.LeftValue(); val != nil {
7583
payload := val.(*Payload)
7684
if payload.Sequence == buffer.sequence+1 {
77-
return payload
85+
select {
86+
case buffer.txQueue <- payload:
87+
buffer.tree.Remove(payload.Sequence)
88+
buffer.sequence = payload.Sequence
89+
default:
90+
}
7891
}
7992
}
80-
return nil
8193
}
8294

83-
func (buffer *LinkReceiveBuffer) Remove(payload *Payload) {
84-
buffer.tree.Remove(payload.Sequence)
85-
buffer.sequence = payload.Sequence
86-
}
95+
func (buffer *LinkReceiveBuffer) NextPayload(closeNotify <-chan struct{}) *Payload {
96+
select {
97+
case payload := <-buffer.txQueue:
98+
return payload
99+
default:
100+
}
87101

88-
func (buffer *LinkReceiveBuffer) getLastBufferSizeSent() uint32 {
89-
return atomic.LoadUint32(&buffer.lastBufferSizeSent)
90-
}
102+
buffer.Lock()
103+
buffer.queueNext()
104+
buffer.Unlock()
91105

92-
func (buffer *LinkReceiveBuffer) Inspect(x *Xgress) *RecvBufferDetail {
93-
timeout := time.After(100 * time.Millisecond)
94-
inspectEvent := &receiveBufferInspectEvent{
95-
buffer: buffer,
96-
notifyComplete: make(chan *RecvBufferDetail, 1),
106+
select {
107+
case payload := <-buffer.txQueue:
108+
return payload
109+
case <-closeNotify:
97110
}
98111

99-
if x.dataPlane.GetPayloadIngester().inspect(inspectEvent, timeout) {
100-
select {
101-
case result := <-inspectEvent.notifyComplete:
102-
return result
103-
case <-timeout:
104-
}
112+
// closed, check if there's anything pending in the queue
113+
select {
114+
case payload := <-buffer.txQueue:
115+
return payload
116+
default:
117+
return nil
105118
}
106-
107-
return buffer.inspectIncomplete()
108119
}
109120

110-
func (buffer *LinkReceiveBuffer) inspectComplete() *RecvBufferDetail {
121+
func (buffer *LinkReceiveBuffer) Inspect() *RecvBufferDetail {
122+
buffer.Lock()
123+
defer buffer.Unlock()
124+
111125
nextPayload := "none"
112126
if head := buffer.tree.LeftValue(); head != nil {
113127
payload := head.(*Payload)
@@ -117,31 +131,9 @@ func (buffer *LinkReceiveBuffer) inspectComplete() *RecvBufferDetail {
117131
return &RecvBufferDetail{
118132
Size: buffer.Size(),
119133
PayloadCount: uint32(buffer.tree.Size()),
120-
LastSizeSent: buffer.getLastBufferSizeSent(),
121134
Sequence: buffer.sequence,
122135
MaxSequence: buffer.maxSequence,
123136
NextPayload: nextPayload,
124137
AcquiredSafely: true,
125138
}
126139
}
127-
128-
func (buffer *LinkReceiveBuffer) inspectIncomplete() *RecvBufferDetail {
129-
return &RecvBufferDetail{
130-
Size: buffer.Size(),
131-
LastSizeSent: buffer.getLastBufferSizeSent(),
132-
Sequence: buffer.sequence,
133-
MaxSequence: buffer.maxSequence,
134-
NextPayload: "unsafe to check",
135-
AcquiredSafely: false,
136-
}
137-
}
138-
139-
type receiveBufferInspectEvent struct {
140-
buffer *LinkReceiveBuffer
141-
notifyComplete chan *RecvBufferDetail
142-
}
143-
144-
func (self *receiveBufferInspectEvent) handle() {
145-
result := self.buffer.inspectComplete()
146-
self.notifyComplete <- result
147-
}

xgress/payload_ingester.go

Lines changed: 39 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,60 +1,56 @@
11
package xgress
22

3-
import "time"
4-
5-
type payloadEntry struct {
6-
payload *Payload
7-
x *Xgress
8-
}
3+
import (
4+
"fmt"
5+
"github.com/michaelquigley/pfxlog"
6+
"github.com/openziti/foundation/v2/goroutines"
7+
"github.com/sirupsen/logrus"
8+
"runtime/debug"
9+
"time"
10+
)
911

1012
type PayloadIngester struct {
11-
payloadIngest chan *payloadEntry
12-
payloadSendReq chan *Xgress
13-
receiveBufferInspects chan *receiveBufferInspectEvent
14-
closeNotify <-chan struct{}
13+
pool goroutines.Pool
1514
}
1615

1716
func NewPayloadIngester(closeNotify <-chan struct{}) *PayloadIngester {
18-
pi := &PayloadIngester{
19-
payloadIngest: make(chan *payloadEntry, 16),
20-
payloadSendReq: make(chan *Xgress, 16),
21-
receiveBufferInspects: make(chan *receiveBufferInspectEvent, 4),
22-
closeNotify: closeNotify,
17+
return NewPayloadIngesterWithConfig(1, closeNotify)
18+
}
19+
20+
func NewPayloadIngesterWithConfig(maxWorkers uint32, closeNotify <-chan struct{}) *PayloadIngester {
21+
if maxWorkers < 1 {
22+
maxWorkers = 1
23+
}
24+
poolConfig := goroutines.PoolConfig{
25+
QueueSize: uint32(64),
26+
MinWorkers: 1,
27+
MaxWorkers: maxWorkers,
28+
IdleTime: 30 * time.Second,
29+
CloseNotify: closeNotify,
30+
PanicHandler: func(err interface{}) {
31+
pfxlog.Logger().WithField(logrus.ErrorKey, err).WithField("backtrace", string(debug.Stack())).Error("panic during payload ingest")
32+
},
33+
WorkerFunction: payloadIngesterWorker,
2334
}
2435

25-
go pi.run()
36+
pool, err := goroutines.NewPool(poolConfig)
37+
if err != nil {
38+
panic(fmt.Errorf("error creating payload ingester handler pool (%w)", err))
39+
}
40+
41+
pi := &PayloadIngester{
42+
pool: pool,
43+
}
2644

2745
return pi
2846
}
2947

30-
func (self *PayloadIngester) inspect(evt *receiveBufferInspectEvent, timeout <-chan time.Time) bool {
31-
select {
32-
case self.receiveBufferInspects <- evt:
33-
return true
34-
case <-self.closeNotify:
35-
case <-timeout:
36-
}
37-
return false
48+
func payloadIngesterWorker(_ uint32, f func()) {
49+
f()
3850
}
3951

4052
func (self *PayloadIngester) ingest(payload *Payload, x *Xgress) {
41-
self.payloadIngest <- &payloadEntry{
42-
payload: payload,
43-
x: x,
44-
}
45-
}
46-
47-
func (self *PayloadIngester) run() {
48-
for {
49-
select {
50-
case payloadEntry := <-self.payloadIngest:
51-
payloadEntry.x.acceptPayload(payloadEntry.payload)
52-
case x := <-self.payloadSendReq:
53-
x.queueSends()
54-
case evt := <-self.receiveBufferInspects:
55-
evt.handle()
56-
case <-self.closeNotify:
57-
return
58-
}
59-
}
53+
_ = self.pool.Queue(func() {
54+
x.acceptPayload(payload)
55+
})
6056
}

0 commit comments

Comments
 (0)