Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,9 @@ spec:
- --pprof-port=$(PPROF_PORT)
- --pprof-block-rate=$(PPROF_BLOCK_RATE)
- --pprof-mutex-rate=$(PPROF_MUTEX_RATE)
- --retry-creating-failed-pipelines-tick=$(RETRY_CREATING_FAILED_PIPELINES_TICK)
- --retry-deleting-failed-pipelines-tick=$(RETRY_DELETING_FAILED_PIPELINES_TICK)
- --max-retry-failed-pipelines=$(MAX_RETRY_FAILED_PIPELINES)
command:
- /bin/scheduler
env:
Expand Down Expand Up @@ -611,6 +614,12 @@ spec:
value: "0"
- name: PPROF_MUTEX_RATE
value: "0"
- name: RETRY_CREATING_FAILED_PIPELINES_TICK
value: 60s
- name: RETRY_DELETING_FAILED_PIPELINES_TICK
value: 60s
- name: MAX_RETRY_FAILED_PIPELINES
value: "10"
image: '{{ .Values.scheduler.image.registry }}/{{ .Values.scheduler.image.repository
}}:{{ .Values.scheduler.image.tag }}'
imagePullPolicy: '{{ .Values.scheduler.image.pullPolicy }}'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,9 @@ spec:
- --pprof-port=$(PPROF_PORT)
- --pprof-block-rate=$(PPROF_BLOCK_RATE)
- --pprof-mutex-rate=$(PPROF_MUTEX_RATE)
- --retry-creating-failed-pipelines-tick=$(RETRY_CREATING_FAILED_PIPELINES_TICK)
- --retry-deleting-failed-pipelines-tick=$(RETRY_DELETING_FAILED_PIPELINES_TICK)
- --max-retry-failed-pipelines=$(MAX_RETRY_FAILED_PIPELINES)
command:
- /bin/scheduler
env:
Expand Down Expand Up @@ -611,6 +614,12 @@ spec:
value: "0"
- name: PPROF_MUTEX_RATE
value: "0"
- name: RETRY_CREATING_FAILED_PIPELINES_TICK
value: 60s
- name: RETRY_DELETING_FAILED_PIPELINES_TICK
value: 60s
- name: MAX_RETRY_FAILED_PIPELINES
value: "10"
image: '{{ .Values.scheduler.image.registry }}/{{ .Values.scheduler.image.repository
}}:{{ .Values.scheduler.image.tag }}'
imagePullPolicy: '{{ .Values.scheduler.image.pullPolicy }}'
Expand Down
9 changes: 9 additions & 0 deletions k8s/yaml/components.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,9 @@ spec:
- --pprof-port=$(PPROF_PORT)
- --pprof-block-rate=$(PPROF_BLOCK_RATE)
- --pprof-mutex-rate=$(PPROF_MUTEX_RATE)
- --retry-creating-failed-pipelines-tick=$(RETRY_CREATING_FAILED_PIPELINES_TICK)
- --retry-deleting-failed-pipelines-tick=$(RETRY_DELETING_FAILED_PIPELINES_TICK)
- --max-retry-failed-pipelines=$(MAX_RETRY_FAILED_PIPELINES)
command:
- /bin/scheduler
env:
Expand Down Expand Up @@ -454,6 +457,12 @@ spec:
value: "0"
- name: PPROF_MUTEX_RATE
value: "0"
- name: RETRY_CREATING_FAILED_PIPELINES_TICK
value: 60s
- name: RETRY_DELETING_FAILED_PIPELINES_TICK
value: 60s
- name: MAX_RETRY_FAILED_PIPELINES
value: "10"
image: 'docker.io/seldonio/seldon-scheduler:latest'
imagePullPolicy: 'IfNotPresent'
livenessProbe:
Expand Down
9 changes: 9 additions & 0 deletions operator/config/seldonconfigs/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,9 @@ spec:
- --pprof-port=$(PPROF_PORT)
- --pprof-block-rate=$(PPROF_BLOCK_RATE)
- --pprof-mutex-rate=$(PPROF_MUTEX_RATE)
- --retry-creating-failed-pipelines-tick=$(RETRY_CREATING_FAILED_PIPELINES_TICK)
- --retry-deleting-failed-pipelines-tick=$(RETRY_DELETING_FAILED_PIPELINES_TICK)
- --max-retry-failed-pipelines=$(MAX_RETRY_FAILED_PIPELINES)
command:
- /bin/scheduler
env:
Expand Down Expand Up @@ -386,6 +389,12 @@ spec:
value: "0"
- name: PPROF_MUTEX_RATE
value: "0"
- name: RETRY_CREATING_FAILED_PIPELINES_TICK
value: "60s"
- name: RETRY_DELETING_FAILED_PIPELINES_TICK
value: "60s"
- name: MAX_RETRY_FAILED_PIPELINES
value: "10"
image: seldonio/seldon-scheduler:latest
imagePullPolicy: Always
name: scheduler
Expand Down
79 changes: 46 additions & 33 deletions scheduler/cmd/scheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,37 +51,40 @@ import (
)

var (
envoyPort uint
agentPort uint
agentMtlsPort uint
schedulerPort uint
schedulerMtlsPort uint
chainerPort uint
healthProbePort uint
namespace string
pipelineGatewayHost string
pipelineGatewayHttpPort int
pipelineGatewayGrpcPort int
logLevel string
tracingConfigPath string
dbPath string
nodeID string
allowPlaintxt bool // scheduler server
autoscalingModelEnabled bool
autoscalingServerEnabled bool
kafkaConfigPath string
scalingConfigPath string
schedulerReadyTimeoutSeconds uint
deletedResourceTTLSeconds uint
serverPackingEnabled bool
serverPackingPercentage float64
accessLogPath string
enableAccessLog bool
includeSuccessfulRequests bool
enablePprof bool
pprofPort int
pprofMutexRate int
pprofBlockRate int
envoyPort uint
agentPort uint
agentMtlsPort uint
schedulerPort uint
schedulerMtlsPort uint
chainerPort uint
healthProbePort uint
namespace string
pipelineGatewayHost string
pipelineGatewayHttpPort int
pipelineGatewayGrpcPort int
logLevel string
tracingConfigPath string
dbPath string
nodeID string
allowPlaintxt bool // scheduler server
autoscalingModelEnabled bool
autoscalingServerEnabled bool
kafkaConfigPath string
scalingConfigPath string
schedulerReadyTimeoutSeconds uint
deletedResourceTTLSeconds uint
serverPackingEnabled bool
serverPackingPercentage float64
accessLogPath string
enableAccessLog bool
includeSuccessfulRequests bool
enablePprof bool
pprofPort int
pprofMutexRate int
pprofBlockRate int
retryFailedCreatingPipelinesTick time.Duration
retryFailedDeletePipelinesTick time.Duration
maxRetryFailedPipelines uint
)

const (
Expand Down Expand Up @@ -172,6 +175,11 @@ func init() {
flag.IntVar(&pprofPort, "pprof-port", 6060, "pprof HTTP server port")
flag.IntVar(&pprofBlockRate, "pprof-block-rate", 0, "pprof block rate")
flag.IntVar(&pprofMutexRate, "pprof-mutex-rate", 0, "pprof mutex rate")

// frequency to retry creating/deleting pipelines which failed to create/delete
flag.DurationVar(&retryFailedCreatingPipelinesTick, "retry-creating-failed-pipelines-tick", time.Minute, "tick interval for re-attempting to create pipelines which failed to create")
flag.DurationVar(&retryFailedDeletePipelinesTick, "retry-deleting-failed-pipelines-tick", time.Minute, "tick interval for re-attempting to delete pipelines which failed to terminate")
flag.UintVar(&maxRetryFailedPipelines, "max-retry-failed-pipelines", 10, "max number of retry attempts to create/terminate pipelines which failed to create/terminate")
}

func getNamespace() string {
Expand Down Expand Up @@ -322,8 +330,11 @@ func main() {
logger.WithError(err).Fatal("Failed to start data engine chainer server")
}
defer cs.Stop()

ctx, stopPipelinePollers := context.WithCancel(context.Background())
defer stopPipelinePollers()
go func() {
err := cs.StartGrpcServer(chainerPort)
err := cs.StartGrpcServer(ctx, retryFailedCreatingPipelinesTick, retryFailedDeletePipelinesTick, maxRetryFailedPipelines, chainerPort)
if err != nil {
log.WithError(err).Fatalf("Chainer server start error")
}
Expand Down Expand Up @@ -382,7 +393,8 @@ func main() {
)
defer s.Stop()

err = s.StartGrpcServers(allowPlaintxt, schedulerPort, schedulerMtlsPort)
err = s.StartGrpcServers(ctx, allowPlaintxt, schedulerPort, schedulerMtlsPort, retryFailedCreatingPipelinesTick,
retryFailedDeletePipelinesTick, maxRetryFailedPipelines)
if err != nil {
logger.WithError(err).Fatal("Failed to start server gRPC servers")
}
Expand Down Expand Up @@ -421,6 +433,7 @@ func main() {
s.StopSendServerEvents()
s.StopSendExperimentEvents()
s.StopSendPipelineEvents()
stopPipelinePollers()
s.StopSendControlPlaneEvents()
as.StopAgentStreams()

Expand Down
5 changes: 4 additions & 1 deletion scheduler/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,10 @@ require (
sigs.k8s.io/yaml v1.5.0 // indirect
)

tool go.uber.org/mock/mockgen
tool (
go.uber.org/mock/mockgen
golang.org/x/tools/cmd/stringer
)

replace github.com/seldonio/seldon-core/components/tls/v2 => ../components/tls

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ func GetPipelineStatus(
messageStr += fmt.Sprintf("%d/%d failed ", failedCount, len(streams))
}

failedTerminatingCount := cr.GetCountResourceWithStatus(pipelineName, pipeline.PipelineFailedTerminating)
if failedTerminatingCount > 0 {
messageStr += fmt.Sprintf("%d/%d failed terminating", failedTerminatingCount, len(streams))
}

rebalancingCount := cr.GetCountResourceWithStatus(pipelineName, pipeline.PipelineRebalancing)
if rebalancingCount > 0 {
messageStr += fmt.Sprintf("%d/%d rebalancing ", rebalancingCount, len(streams))
Expand All @@ -170,8 +175,8 @@ func GetPipelineStatus(
}

if message.Update.Op == chainer.PipelineUpdateMessage_Delete {
if failedCount > 0 {
return pipeline.PipelineFailed, messageStr
if failedTerminatingCount > 0 {
return pipeline.PipelineFailedTerminating, messageStr
}
if terminatedCount == len(streams) {
return pipeline.PipelineTerminated, messageStr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,17 @@ func TestIsMessageOutdated(t *testing.T) {
func TestGetPipelineStatus(t *testing.T) {
g := gomega.NewGomegaWithT(t)

type expect struct {
status pipeline.PipelineStatus
msg string
}

tests := []struct {
name string
op chainer.PipelineUpdateMessage_PipelineOperation
statuses map[string]pipeline.PipelineStatus
expected pipeline.PipelineStatus
expect expect
msg string
}{
{
name: "create creating",
Expand All @@ -163,7 +169,7 @@ func TestGetPipelineStatus(t *testing.T) {
"a": pipeline.PipelineReady,
"b": pipeline.PipelineStatusUnknown,
},
expected: pipeline.PipelineCreating,
expect: expect{status: pipeline.PipelineCreating, msg: "1/2 ready "},
},
{
name: "create ready (all ready)",
Expand All @@ -172,7 +178,7 @@ func TestGetPipelineStatus(t *testing.T) {
"a": pipeline.PipelineReady,
"b": pipeline.PipelineReady,
},
expected: pipeline.PipelineReady,
expect: expect{status: pipeline.PipelineReady, msg: "2/2 ready "},
},
{
name: "create creating (some ready)",
Expand All @@ -181,7 +187,7 @@ func TestGetPipelineStatus(t *testing.T) {
"a": pipeline.PipelineReady,
"b": pipeline.PipelineFailed,
},
expected: pipeline.PipelineReady,
expect: expect{status: pipeline.PipelineReady, msg: "1/2 ready 1/2 failed "},
},
{
name: "create failed",
Expand All @@ -190,7 +196,7 @@ func TestGetPipelineStatus(t *testing.T) {
"a": pipeline.PipelineFailed,
"b": pipeline.PipelineFailed,
},
expected: pipeline.PipelineFailed,
expect: expect{status: pipeline.PipelineFailed, msg: "2/2 failed "},
},
{
name: "delete terminating",
Expand All @@ -199,15 +205,15 @@ func TestGetPipelineStatus(t *testing.T) {
"a": pipeline.PipelineTerminated,
"b": pipeline.PipelineStatusUnknown,
},
expected: pipeline.PipelineTerminating,
expect: expect{status: pipeline.PipelineTerminating, msg: "1/2 terminated "},
},
{
name: "delete failed",
op: chainer.PipelineUpdateMessage_Delete,
statuses: map[string]pipeline.PipelineStatus{
"a": pipeline.PipelineFailed,
"a": pipeline.PipelineFailedTerminating,
},
expected: pipeline.PipelineFailed,
expect: expect{status: pipeline.PipelineFailedTerminating, msg: "1/1 failed terminating"},
},
{
name: "rebalance failed",
Expand All @@ -216,7 +222,7 @@ func TestGetPipelineStatus(t *testing.T) {
"a": pipeline.PipelineFailed,
"b": pipeline.PipelineFailed,
},
expected: pipeline.PipelineFailed,
expect: expect{status: pipeline.PipelineFailed, msg: "2/2 failed "},
},
{
name: "rebalanced",
Expand All @@ -225,7 +231,7 @@ func TestGetPipelineStatus(t *testing.T) {
"a": pipeline.PipelineReady,
"b": pipeline.PipelineReady,
},
expected: pipeline.PipelineReady,
expect: expect{status: pipeline.PipelineReady, msg: "2/2 ready "},
},
{
name: "rebalanced (some ready)",
Expand All @@ -234,7 +240,7 @@ func TestGetPipelineStatus(t *testing.T) {
"a": pipeline.PipelineReady,
"b": pipeline.PipelineFailed,
},
expected: pipeline.PipelineReady,
expect: expect{status: pipeline.PipelineReady, msg: "1/2 ready 1/2 failed "},
},
{
name: "rebalancing all",
Expand All @@ -243,7 +249,7 @@ func TestGetPipelineStatus(t *testing.T) {
"a": pipeline.PipelineRebalancing,
"b": pipeline.PipelineRebalancing,
},
expected: pipeline.PipelineRebalancing,
expect: expect{status: pipeline.PipelineRebalancing, msg: "2/2 rebalancing "},
},
{
name: "rebalancing some",
Expand All @@ -252,7 +258,24 @@ func TestGetPipelineStatus(t *testing.T) {
"a": pipeline.PipelineReady,
"b": pipeline.PipelineRebalancing,
},
expected: pipeline.PipelineRebalancing,
expect: expect{status: pipeline.PipelineRebalancing, msg: "1/2 ready 1/2 rebalancing "},
},
{
name: "delete failed",
op: chainer.PipelineUpdateMessage_Delete,
statuses: map[string]pipeline.PipelineStatus{
"a": pipeline.PipelineFailedTerminating,
},
expect: expect{status: pipeline.PipelineFailedTerminating, msg: "1/1 failed terminating"},
},
{
name: "delete failed and pipeline failed to create",
op: chainer.PipelineUpdateMessage_Delete,
statuses: map[string]pipeline.PipelineStatus{
"a": pipeline.PipelineFailedTerminating,
"b": pipeline.PipelineFailed,
},
expect: expect{status: pipeline.PipelineFailedTerminating, msg: "1/2 failed 1/2 failed terminating"},
},
}

Expand All @@ -275,8 +298,9 @@ func TestGetPipelineStatus(t *testing.T) {
},
}

status, _ := GetPipelineStatus(cr, "p1", msg)
g.Expect(status).To(gomega.Equal(test.expected))
status, outputMsg := GetPipelineStatus(cr, "p1", msg)
g.Expect(status).To(gomega.Equal(test.expect.status))
g.Expect(outputMsg).To(gomega.Equal(test.expect.msg))
})
}
}
Loading
Loading