Skip to content

Commit 21cbc5a

Browse files
authored
fix(scheduler/model-gw): failed pipelines never retried (#6917)
* retry failed pipeline on dataflow-engine * fix: failed pipelines never creating/deleting due to kafka connectivity issues * copyright * tests * copyright * do not try to create pipeline if not latest version * PR comments and max retry feature * PR comments * remove retry count when pipeline/model deleted
1 parent 579cfff commit 21cbc5a

29 files changed

+2706
-243
lines changed

k8s/helm-charts/seldon-core-v2-setup/templates/_components-deployments.tpl

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -526,6 +526,9 @@ spec:
526526
- --pprof-port=$(PPROF_PORT)
527527
- --pprof-block-rate=$(PPROF_BLOCK_RATE)
528528
- --pprof-mutex-rate=$(PPROF_MUTEX_RATE)
529+
- --retry-creating-failed-pipelines-tick=$(RETRY_CREATING_FAILED_PIPELINES_TICK)
530+
- --retry-deleting-failed-pipelines-tick=$(RETRY_DELETING_FAILED_PIPELINES_TICK)
531+
- --max-retry-failed-pipelines=$(MAX_RETRY_FAILED_PIPELINES)
529532
command:
530533
- /bin/scheduler
531534
env:
@@ -611,6 +614,12 @@ spec:
611614
value: "0"
612615
- name: PPROF_MUTEX_RATE
613616
value: "0"
617+
- name: RETRY_CREATING_FAILED_PIPELINES_TICK
618+
value: 60s
619+
- name: RETRY_DELETING_FAILED_PIPELINES_TICK
620+
value: 60s
621+
- name: MAX_RETRY_FAILED_PIPELINES
622+
value: "10"
614623
image: '{{ .Values.scheduler.image.registry }}/{{ .Values.scheduler.image.repository
615624
}}:{{ .Values.scheduler.image.tag }}'
616625
imagePullPolicy: '{{ .Values.scheduler.image.pullPolicy }}'

k8s/helm-charts/seldon-core-v2-setup/templates/_components-statefulsets.tpl

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -526,6 +526,9 @@ spec:
526526
- --pprof-port=$(PPROF_PORT)
527527
- --pprof-block-rate=$(PPROF_BLOCK_RATE)
528528
- --pprof-mutex-rate=$(PPROF_MUTEX_RATE)
529+
- --retry-creating-failed-pipelines-tick=$(RETRY_CREATING_FAILED_PIPELINES_TICK)
530+
- --retry-deleting-failed-pipelines-tick=$(RETRY_DELETING_FAILED_PIPELINES_TICK)
531+
- --max-retry-failed-pipelines=$(MAX_RETRY_FAILED_PIPELINES)
529532
command:
530533
- /bin/scheduler
531534
env:
@@ -611,6 +614,12 @@ spec:
611614
value: "0"
612615
- name: PPROF_MUTEX_RATE
613616
value: "0"
617+
- name: RETRY_CREATING_FAILED_PIPELINES_TICK
618+
value: 60s
619+
- name: RETRY_DELETING_FAILED_PIPELINES_TICK
620+
value: 60s
621+
- name: MAX_RETRY_FAILED_PIPELINES
622+
value: "10"
614623
image: '{{ .Values.scheduler.image.registry }}/{{ .Values.scheduler.image.repository
615624
}}:{{ .Values.scheduler.image.tag }}'
616625
imagePullPolicy: '{{ .Values.scheduler.image.pullPolicy }}'

k8s/yaml/components.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,9 @@ spec:
373373
- --pprof-port=$(PPROF_PORT)
374374
- --pprof-block-rate=$(PPROF_BLOCK_RATE)
375375
- --pprof-mutex-rate=$(PPROF_MUTEX_RATE)
376+
- --retry-creating-failed-pipelines-tick=$(RETRY_CREATING_FAILED_PIPELINES_TICK)
377+
- --retry-deleting-failed-pipelines-tick=$(RETRY_DELETING_FAILED_PIPELINES_TICK)
378+
- --max-retry-failed-pipelines=$(MAX_RETRY_FAILED_PIPELINES)
376379
command:
377380
- /bin/scheduler
378381
env:
@@ -454,6 +457,12 @@ spec:
454457
value: "0"
455458
- name: PPROF_MUTEX_RATE
456459
value: "0"
460+
- name: RETRY_CREATING_FAILED_PIPELINES_TICK
461+
value: 60s
462+
- name: RETRY_DELETING_FAILED_PIPELINES_TICK
463+
value: 60s
464+
- name: MAX_RETRY_FAILED_PIPELINES
465+
value: "10"
457466
image: 'docker.io/seldonio/seldon-scheduler:latest'
458467
imagePullPolicy: 'IfNotPresent'
459468
livenessProbe:

operator/config/seldonconfigs/default.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,9 @@ spec:
347347
- --pprof-port=$(PPROF_PORT)
348348
- --pprof-block-rate=$(PPROF_BLOCK_RATE)
349349
- --pprof-mutex-rate=$(PPROF_MUTEX_RATE)
350+
- --retry-creating-failed-pipelines-tick=$(RETRY_CREATING_FAILED_PIPELINES_TICK)
351+
- --retry-deleting-failed-pipelines-tick=$(RETRY_DELETING_FAILED_PIPELINES_TICK)
352+
- --max-retry-failed-pipelines=$(MAX_RETRY_FAILED_PIPELINES)
350353
command:
351354
- /bin/scheduler
352355
env:
@@ -386,6 +389,12 @@ spec:
386389
value: "0"
387390
- name: PPROF_MUTEX_RATE
388391
value: "0"
392+
- name: RETRY_CREATING_FAILED_PIPELINES_TICK
393+
value: "60s"
394+
- name: RETRY_DELETING_FAILED_PIPELINES_TICK
395+
value: "60s"
396+
- name: MAX_RETRY_FAILED_PIPELINES
397+
value: "10"
389398
image: seldonio/seldon-scheduler:latest
390399
imagePullPolicy: Always
391400
name: scheduler

scheduler/cmd/scheduler/main.go

Lines changed: 46 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -51,37 +51,40 @@ import (
5151
)
5252

5353
var (
54-
envoyPort uint
55-
agentPort uint
56-
agentMtlsPort uint
57-
schedulerPort uint
58-
schedulerMtlsPort uint
59-
chainerPort uint
60-
healthProbePort uint
61-
namespace string
62-
pipelineGatewayHost string
63-
pipelineGatewayHttpPort int
64-
pipelineGatewayGrpcPort int
65-
logLevel string
66-
tracingConfigPath string
67-
dbPath string
68-
nodeID string
69-
allowPlaintxt bool // scheduler server
70-
autoscalingModelEnabled bool
71-
autoscalingServerEnabled bool
72-
kafkaConfigPath string
73-
scalingConfigPath string
74-
schedulerReadyTimeoutSeconds uint
75-
deletedResourceTTLSeconds uint
76-
serverPackingEnabled bool
77-
serverPackingPercentage float64
78-
accessLogPath string
79-
enableAccessLog bool
80-
includeSuccessfulRequests bool
81-
enablePprof bool
82-
pprofPort int
83-
pprofMutexRate int
84-
pprofBlockRate int
54+
envoyPort uint
55+
agentPort uint
56+
agentMtlsPort uint
57+
schedulerPort uint
58+
schedulerMtlsPort uint
59+
chainerPort uint
60+
healthProbePort uint
61+
namespace string
62+
pipelineGatewayHost string
63+
pipelineGatewayHttpPort int
64+
pipelineGatewayGrpcPort int
65+
logLevel string
66+
tracingConfigPath string
67+
dbPath string
68+
nodeID string
69+
allowPlaintxt bool // scheduler server
70+
autoscalingModelEnabled bool
71+
autoscalingServerEnabled bool
72+
kafkaConfigPath string
73+
scalingConfigPath string
74+
schedulerReadyTimeoutSeconds uint
75+
deletedResourceTTLSeconds uint
76+
serverPackingEnabled bool
77+
serverPackingPercentage float64
78+
accessLogPath string
79+
enableAccessLog bool
80+
includeSuccessfulRequests bool
81+
enablePprof bool
82+
pprofPort int
83+
pprofMutexRate int
84+
pprofBlockRate int
85+
retryFailedCreatingPipelinesTick time.Duration
86+
retryFailedDeletePipelinesTick time.Duration
87+
maxRetryFailedPipelines uint
8588
)
8689

8790
const (
@@ -172,6 +175,11 @@ func init() {
172175
flag.IntVar(&pprofPort, "pprof-port", 6060, "pprof HTTP server port")
173176
flag.IntVar(&pprofBlockRate, "pprof-block-rate", 0, "pprof block rate")
174177
flag.IntVar(&pprofMutexRate, "pprof-mutex-rate", 0, "pprof mutex rate")
178+
179+
// frequency to retry creating/deleting pipelines which failed to create/delete
180+
flag.DurationVar(&retryFailedCreatingPipelinesTick, "retry-creating-failed-pipelines-tick", time.Minute, "tick interval for re-attempting to create pipelines which failed to create")
181+
flag.DurationVar(&retryFailedDeletePipelinesTick, "retry-deleting-failed-pipelines-tick", time.Minute, "tick interval for re-attempting to delete pipelines which failed to terminate")
182+
flag.UintVar(&maxRetryFailedPipelines, "max-retry-failed-pipelines", 10, "max number of retry attempts to create/terminate pipelines which failed to create/terminate")
175183
}
176184

177185
func getNamespace() string {
@@ -322,8 +330,11 @@ func main() {
322330
logger.WithError(err).Fatal("Failed to start data engine chainer server")
323331
}
324332
defer cs.Stop()
333+
334+
ctx, stopPipelinePollers := context.WithCancel(context.Background())
335+
defer stopPipelinePollers()
325336
go func() {
326-
err := cs.StartGrpcServer(chainerPort)
337+
err := cs.StartGrpcServer(ctx, retryFailedCreatingPipelinesTick, retryFailedDeletePipelinesTick, maxRetryFailedPipelines, chainerPort)
327338
if err != nil {
328339
log.WithError(err).Fatalf("Chainer server start error")
329340
}
@@ -382,7 +393,8 @@ func main() {
382393
)
383394
defer s.Stop()
384395

385-
err = s.StartGrpcServers(allowPlaintxt, schedulerPort, schedulerMtlsPort)
396+
err = s.StartGrpcServers(ctx, allowPlaintxt, schedulerPort, schedulerMtlsPort, retryFailedCreatingPipelinesTick,
397+
retryFailedDeletePipelinesTick, maxRetryFailedPipelines)
386398
if err != nil {
387399
logger.WithError(err).Fatal("Failed to start server gRPC servers")
388400
}
@@ -421,6 +433,7 @@ func main() {
421433
s.StopSendServerEvents()
422434
s.StopSendExperimentEvents()
423435
s.StopSendPipelineEvents()
436+
stopPipelinePollers()
424437
s.StopSendControlPlaneEvents()
425438
as.StopAgentStreams()
426439

scheduler/go.mod

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,10 @@ require (
148148
sigs.k8s.io/yaml v1.5.0 // indirect
149149
)
150150

151-
tool go.uber.org/mock/mockgen
151+
tool (
152+
go.uber.org/mock/mockgen
153+
golang.org/x/tools/cmd/stringer
154+
)
152155

153156
replace github.com/seldonio/seldon-core/components/tls/v2 => ../components/tls
154157

scheduler/pkg/kafka/conflict-resolution/conflict_resolution.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,11 @@ func GetPipelineStatus(
146146
messageStr += fmt.Sprintf("%d/%d failed ", failedCount, len(streams))
147147
}
148148

149+
failedTerminatingCount := cr.GetCountResourceWithStatus(pipelineName, pipeline.PipelineFailedTerminating)
150+
if failedTerminatingCount > 0 {
151+
messageStr += fmt.Sprintf("%d/%d failed terminating", failedTerminatingCount, len(streams))
152+
}
153+
149154
rebalancingCount := cr.GetCountResourceWithStatus(pipelineName, pipeline.PipelineRebalancing)
150155
if rebalancingCount > 0 {
151156
messageStr += fmt.Sprintf("%d/%d rebalancing ", rebalancingCount, len(streams))
@@ -170,8 +175,8 @@ func GetPipelineStatus(
170175
}
171176

172177
if message.Update.Op == chainer.PipelineUpdateMessage_Delete {
173-
if failedCount > 0 {
174-
return pipeline.PipelineFailed, messageStr
178+
if failedTerminatingCount > 0 {
179+
return pipeline.PipelineFailedTerminating, messageStr
175180
}
176181
if terminatedCount == len(streams) {
177182
return pipeline.PipelineTerminated, messageStr

scheduler/pkg/kafka/conflict-resolution/conflict_resolution_test.go

Lines changed: 39 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -150,11 +150,17 @@ func TestIsMessageOutdated(t *testing.T) {
150150
func TestGetPipelineStatus(t *testing.T) {
151151
g := gomega.NewGomegaWithT(t)
152152

153+
type expect struct {
154+
status pipeline.PipelineStatus
155+
msg string
156+
}
157+
153158
tests := []struct {
154159
name string
155160
op chainer.PipelineUpdateMessage_PipelineOperation
156161
statuses map[string]pipeline.PipelineStatus
157-
expected pipeline.PipelineStatus
162+
expect expect
163+
msg string
158164
}{
159165
{
160166
name: "create creating",
@@ -163,7 +169,7 @@ func TestGetPipelineStatus(t *testing.T) {
163169
"a": pipeline.PipelineReady,
164170
"b": pipeline.PipelineStatusUnknown,
165171
},
166-
expected: pipeline.PipelineCreating,
172+
expect: expect{status: pipeline.PipelineCreating, msg: "1/2 ready "},
167173
},
168174
{
169175
name: "create ready (all ready)",
@@ -172,7 +178,7 @@ func TestGetPipelineStatus(t *testing.T) {
172178
"a": pipeline.PipelineReady,
173179
"b": pipeline.PipelineReady,
174180
},
175-
expected: pipeline.PipelineReady,
181+
expect: expect{status: pipeline.PipelineReady, msg: "2/2 ready "},
176182
},
177183
{
178184
name: "create creating (some ready)",
@@ -181,7 +187,7 @@ func TestGetPipelineStatus(t *testing.T) {
181187
"a": pipeline.PipelineReady,
182188
"b": pipeline.PipelineFailed,
183189
},
184-
expected: pipeline.PipelineReady,
190+
expect: expect{status: pipeline.PipelineReady, msg: "1/2 ready 1/2 failed "},
185191
},
186192
{
187193
name: "create failed",
@@ -190,7 +196,7 @@ func TestGetPipelineStatus(t *testing.T) {
190196
"a": pipeline.PipelineFailed,
191197
"b": pipeline.PipelineFailed,
192198
},
193-
expected: pipeline.PipelineFailed,
199+
expect: expect{status: pipeline.PipelineFailed, msg: "2/2 failed "},
194200
},
195201
{
196202
name: "delete terminating",
@@ -199,15 +205,15 @@ func TestGetPipelineStatus(t *testing.T) {
199205
"a": pipeline.PipelineTerminated,
200206
"b": pipeline.PipelineStatusUnknown,
201207
},
202-
expected: pipeline.PipelineTerminating,
208+
expect: expect{status: pipeline.PipelineTerminating, msg: "1/2 terminated "},
203209
},
204210
{
205211
name: "delete failed",
206212
op: chainer.PipelineUpdateMessage_Delete,
207213
statuses: map[string]pipeline.PipelineStatus{
208-
"a": pipeline.PipelineFailed,
214+
"a": pipeline.PipelineFailedTerminating,
209215
},
210-
expected: pipeline.PipelineFailed,
216+
expect: expect{status: pipeline.PipelineFailedTerminating, msg: "1/1 failed terminating"},
211217
},
212218
{
213219
name: "rebalance failed",
@@ -216,7 +222,7 @@ func TestGetPipelineStatus(t *testing.T) {
216222
"a": pipeline.PipelineFailed,
217223
"b": pipeline.PipelineFailed,
218224
},
219-
expected: pipeline.PipelineFailed,
225+
expect: expect{status: pipeline.PipelineFailed, msg: "2/2 failed "},
220226
},
221227
{
222228
name: "rebalanced",
@@ -225,7 +231,7 @@ func TestGetPipelineStatus(t *testing.T) {
225231
"a": pipeline.PipelineReady,
226232
"b": pipeline.PipelineReady,
227233
},
228-
expected: pipeline.PipelineReady,
234+
expect: expect{status: pipeline.PipelineReady, msg: "2/2 ready "},
229235
},
230236
{
231237
name: "rebalanced (some ready)",
@@ -234,7 +240,7 @@ func TestGetPipelineStatus(t *testing.T) {
234240
"a": pipeline.PipelineReady,
235241
"b": pipeline.PipelineFailed,
236242
},
237-
expected: pipeline.PipelineReady,
243+
expect: expect{status: pipeline.PipelineReady, msg: "1/2 ready 1/2 failed "},
238244
},
239245
{
240246
name: "rebalancing all",
@@ -243,7 +249,7 @@ func TestGetPipelineStatus(t *testing.T) {
243249
"a": pipeline.PipelineRebalancing,
244250
"b": pipeline.PipelineRebalancing,
245251
},
246-
expected: pipeline.PipelineRebalancing,
252+
expect: expect{status: pipeline.PipelineRebalancing, msg: "2/2 rebalancing "},
247253
},
248254
{
249255
name: "rebalancing some",
@@ -252,7 +258,24 @@ func TestGetPipelineStatus(t *testing.T) {
252258
"a": pipeline.PipelineReady,
253259
"b": pipeline.PipelineRebalancing,
254260
},
255-
expected: pipeline.PipelineRebalancing,
261+
expect: expect{status: pipeline.PipelineRebalancing, msg: "1/2 ready 1/2 rebalancing "},
262+
},
263+
{
264+
name: "delete failed",
265+
op: chainer.PipelineUpdateMessage_Delete,
266+
statuses: map[string]pipeline.PipelineStatus{
267+
"a": pipeline.PipelineFailedTerminating,
268+
},
269+
expect: expect{status: pipeline.PipelineFailedTerminating, msg: "1/1 failed terminating"},
270+
},
271+
{
272+
name: "delete failed and pipeline failed to create",
273+
op: chainer.PipelineUpdateMessage_Delete,
274+
statuses: map[string]pipeline.PipelineStatus{
275+
"a": pipeline.PipelineFailedTerminating,
276+
"b": pipeline.PipelineFailed,
277+
},
278+
expect: expect{status: pipeline.PipelineFailedTerminating, msg: "1/2 failed 1/2 failed terminating"},
256279
},
257280
}
258281

@@ -275,8 +298,9 @@ func TestGetPipelineStatus(t *testing.T) {
275298
},
276299
}
277300

278-
status, _ := GetPipelineStatus(cr, "p1", msg)
279-
g.Expect(status).To(gomega.Equal(test.expected))
301+
status, outputMsg := GetPipelineStatus(cr, "p1", msg)
302+
g.Expect(status).To(gomega.Equal(test.expect.status))
303+
g.Expect(outputMsg).To(gomega.Equal(test.expect.msg))
280304
})
281305
}
282306
}

0 commit comments

Comments
 (0)