Skip to content

Commit cdc0c97

Browse files
authored
fix(scheduler): not all models deployed to Servers when minReplicas on Model is set (#6885)
* fix: not all models deployed to Servers when minReplicas on Model's is set * log error * tests * copyright * PR comment * remove dead code * PR comments
1 parent 078d8e9 commit cdc0c97

File tree

13 files changed

+828
-32
lines changed

13 files changed

+828
-32
lines changed

ansible/README.dev.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,28 @@ stringData:
245245
client_id: "a-secret-client-id"
246246
```
247247

248+
## Install Delve - experimental
249+
250+
Install Delve (Go debugger) onto the Kind node with:
251+
252+
```shell
253+
./install-delve.sh
254+
```
255+
256+
You can then docker exec into the Kind node and attach delve to the process you want to debug. For example to attach to
257+
the scheduler:
258+
259+
```shell
260+
dlv --listen=:40000 --headless=true --api-version=2 --log attach $(pgrep -f "^/bin/scheduler")
261+
```
262+
263+
This will listen on port `40,000`. The Kind cluster setup playbook has been configured to expose this port and map to localhost.
264+
It attaches to the PID of the scheduler determined by `$(pgrep -f "^/bin/scheduler")`.
265+
266+
You then need to configure your IDE to connect to remote debug on `localhost:40000`. It does crash on occasion and needs
267+
restarting. This will need investigating to make more stable. Potential improvements of building a custom image from the
268+
Kind base image with Delve installed and any other packages to improve stability.
269+
248270
## Mounting local (host) path into the rclone container of a Server pod
249271

250272
For this, you first need to enable local mounts into kind (configuring the `kind_local_mount`,

ansible/install-delve.sh

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
#!/bin/bash
2+
3+
CLUSTER_NAME="seldon"
4+
GO_VERSION="1.24.7"
5+
6+
echo "Installing Go and Delve in kind cluster: $CLUSTER_NAME"
7+
8+
NODES=$(kind get nodes --name "$CLUSTER_NAME")
9+
10+
for node in $NODES; do
11+
echo "Setting up $node..."
12+
13+
docker exec "$node" bash -c "
14+
# Install Go
15+
curl -sL https://go.dev/dl/go${GO_VERSION}.linux-amd64.tar.gz -o /tmp/go.tar.gz && \
16+
rm -rf /usr/local/go && \
17+
tar -C /usr/local -xzf /tmp/go.tar.gz && \
18+
rm /tmp/go.tar.gz && \
19+
20+
# Install Delve
21+
/usr/local/go/bin/go install github.com/go-delve/delve/cmd/dlv@latest && \
22+
23+
# Create symlinks
24+
ln -sf /usr/local/go/bin/go /usr/local/bin/go && \
25+
ln -sf /root/go/bin/dlv /usr/local/bin/dlv && \
26+
27+
# Verify
28+
echo '=== Versions ===' && go version && dlv version
29+
"
30+
31+
echo "✓ Setup complete on $node"
32+
done

ansible/playbooks/templates/default-kind-cluster-config.yaml.j2

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@ apiVersion: kind.x-k8s.io/v1alpha4
33
kind: Cluster
44
nodes:
55
- role: control-plane
6+
extraPortMappings:
7+
- containerPort: 40000
8+
hostPort: 40000
9+
protocol: TCP
610
{% if kind_local_mount %}
711
extraMounts:
812
- hostPath: {{ kind_host_path }}

scheduler/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ require (
1616
github.com/go-playground/validator/v10 v10.27.0
1717
github.com/google/go-cmp v0.7.0
1818
github.com/gorilla/mux v1.8.1
19+
github.com/gotidy/ptr v1.4.0
1920
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0
2021
github.com/jarcoal/httpmock v1.4.0
2122
github.com/knadh/koanf/parsers/yaml v1.1.0

scheduler/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,8 @@ github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
298298
github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ=
299299
github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 h1:JeSE6pjso5THxAzdVpqr6/geYxZytqFMBCOtn/ujyeo=
300300
github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674/go.mod h1:r4w70xmWCQKmi1ONH4KIaBptdivuRPyosB9RmPlGEwA=
301+
github.com/gotidy/ptr v1.4.0 h1:7++suUs+HNHMnyz6/AW3SE+4EnBhupPSQTSI7QNijVc=
302+
github.com/gotidy/ptr v1.4.0/go.mod h1:MjRBG6/IETiiZGWI8LrRtISXEji+8b/jigmj2q0mEyM=
301303
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 h1:UH//fgunKIs4JdUbpDl1VZCDaL56wXCB/5+wF6uHfaI=
302304
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0/go.mod h1:g5qyo/la0ALbONm6Vbp88Yd8NsDy6rZz+RcrMPxvld8=
303305
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.1 h1:X5VWvz21y3gzm9Nw/kaUeku/1+uBhcekkmy4IkffJww=

scheduler/pkg/agent/server.go

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,7 @@ func (s *Server) Subscribe(request *pb.AgentSubscribeRequest, stream pb.AgentSer
406406
defer mu.(*sync.Mutex).Unlock()
407407

408408
logger.Infof("Received subscribe request from %s:%d", request.ServerName, request.ReplicaIdx)
409+
defer logger.Infof("Agent subscribe stream closed for %s:%d", request.ServerName, request.ReplicaIdx)
409410

410411
fin := make(chan bool)
411412

@@ -419,29 +420,31 @@ func (s *Server) Subscribe(request *pb.AgentSubscribeRequest, stream pb.AgentSer
419420
s.logger.Debugf("Add Server Replica %+v with config %+v", request, request.ReplicaConfig)
420421
err := s.store.AddServerReplica(request)
421422
if err != nil {
423+
s.logger.WithError(err).WithField("req", request).Error("Failed to add server replica")
422424
return err
423425
}
426+
424427
err = s.scheduleModelsFromRequest(request)
425428
if err != nil {
429+
s.logger.WithError(err).WithField("req", request).Error("Failed to schedule models")
426430
return err
427431
}
428432

429433
ctx := stream.Context()
430434
// Keep this scope alive because once this scope exits - the stream is closed
431-
for {
432-
select {
433-
case <-fin:
434-
logger.Infof("Closing stream for replica: %s:%d", request.ServerName, request.ReplicaIdx)
435-
return nil
436-
case <-ctx.Done():
437-
logger.Infof("Client replica %s:%d has disconnected", request.ServerName, request.ReplicaIdx)
438-
s.mutex.Lock()
439-
delete(s.agents, key)
440-
s.mutex.Unlock()
441-
s.removeServerReplicaImpl(request.GetServerName(), int(request.GetReplicaIdx())) // this is non-blocking beyond rescheduling models on removed server
442-
return nil
443-
}
435+
select {
436+
case <-fin:
437+
logger.Infof("Closing stream for replica: %s:%d", request.ServerName, request.ReplicaIdx)
438+
case <-ctx.Done():
439+
logger.WithError(ctx.Err()).Warnf("Client replica %s:%d has disconnected", request.ServerName, request.ReplicaIdx)
440+
s.mutex.Lock()
441+
delete(s.agents, key)
442+
s.mutex.Unlock()
443+
logger.WithField("request", request).Info("Removing server replica and re-scheduling model(s)")
444+
s.removeServerReplicaImpl(request.GetServerName(), int(request.GetReplicaIdx())) // this is non-blocking beyond rescheduling models on removed server
444445
}
446+
447+
return nil
445448
}
446449

447450
func (s *Server) StopAgentStreams() {
@@ -476,18 +479,27 @@ func (s *Server) removeServerReplicaImpl(serverName string, serverReplicaIdx int
476479
if err != nil {
477480
s.logger.WithError(err).Errorf("Failed to remove replica and redeploy models for %s:%d", serverName, serverReplicaIdx)
478481
}
482+
479483
s.logger.Debugf("Removing models %v from server %s:%d", modelsChanged, serverName, serverReplicaIdx)
480484
for _, modelName := range modelsChanged {
485+
s.logger.WithField("model", modelName).Debug("Scheduling model")
481486
err = s.scheduler.Schedule(modelName)
482487
if err != nil {
483-
s.logger.Debugf("Failed to reschedule model %s when server %s replica %d disconnected", modelName, serverName, serverReplicaIdx)
488+
s.logger.WithError(err).Debugf("Failed to reschedule model %s when server %s replica %d disconnected", modelName, serverName, serverReplicaIdx)
489+
continue
484490
}
491+
s.logger.WithField("model", modelName).Debug("Scheduling complete")
485492
}
493+
486494
// retry failed models
487495
// this is perhaps counterintuitive, but we want to retry failed models on other servers
488496
// specifically in the case of model state `LoadFailed` and the server replica disconnects, we want to reconcile
489497
// the model state with th new set of active servers
490498
// note that this will also retry `ScheduleFailed`, which is a side effect of calling `ScheduleFailedModels`
499+
s.logger.WithFields(log.Fields{
500+
"serverName": serverName,
501+
"replicaID": serverReplicaIdx,
502+
}).Debug("Scheduling failed models")
491503
if _, err := s.scheduler.ScheduleFailedModels(); err != nil {
492504
s.logger.WithError(err).Errorf("Failed to reschedule failed models when server %s replica %d disconnected", serverName, serverReplicaIdx)
493505
}

scheduler/pkg/scheduler/scheduler.go

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,12 @@ import (
2929
const serverScaleupEventSource = "scheduler.server.scaleup"
3030

3131
type SimpleScheduler struct {
32-
muSortAndUpdate sync.Mutex
33-
store store.ModelStore
34-
logger log.FieldLogger
35-
synchroniser synchroniser.Synchroniser
36-
eventHub *coordinator.EventHub
32+
muSortAndUpdate sync.Mutex
33+
store store.ModelStore
34+
logger log.FieldLogger
35+
synchroniser synchroniser.Synchroniser
36+
eventHub *coordinator.EventHub
37+
muScheduleFailed sync.Mutex
3738
SchedulerConfig
3839
}
3940

@@ -80,19 +81,32 @@ func (s *SimpleScheduler) Schedule(modelKey string) error {
8081
}
8182

8283
func (s *SimpleScheduler) ScheduleFailedModels() ([]string, error) {
83-
s.synchroniser.WaitReady()
84+
if !s.synchroniser.IsReady() {
85+
s.logger.Debug("Waiting for servers to connect")
86+
s.synchroniser.WaitReady()
87+
s.logger.Debug("Waiting for servers complete")
88+
}
89+
90+
s.muScheduleFailed.Lock()
91+
defer s.muScheduleFailed.Unlock()
92+
8493
failedModels, err := s.getFailedModels()
8594
if err != nil {
8695
return nil, err
8796
}
97+
98+
if len(failedModels) > 0 {
99+
s.logger.WithField("failed_models", failedModels).Debug("Got failed models to schedule")
100+
}
101+
88102
var updatedModels []string
89103
for _, modelName := range failedModels {
90104
_, err := s.scheduleToServer(modelName)
91105
if err != nil {
92-
s.logger.Debugf("Failed to schedule failed model %s", modelName)
93-
} else {
94-
updatedModels = append(updatedModels, modelName)
106+
s.logger.WithError(err).Debugf("Failed to schedule failed model %s", modelName)
107+
continue
95108
}
109+
updatedModels = append(updatedModels, modelName)
96110
}
97111
return updatedModels, nil
98112
}
@@ -107,17 +121,20 @@ func (s *SimpleScheduler) getFailedModels() ([]string, error) {
107121
if err != nil {
108122
return nil, err
109123
}
124+
110125
var failedModels []string
111126
for _, model := range models {
112127
version := model.GetLatest()
113128
if version != nil {
114129
versionState := version.ModelState()
115130
if versionState.State == store.ModelFailed || versionState.State == store.ScheduleFailed ||
116-
(versionState.State == store.ModelAvailable && versionState.AvailableReplicas < version.GetDeploymentSpec().GetReplicas()) {
131+
((versionState.State == store.ModelAvailable || versionState.State == store.ModelProgressing) &&
132+
versionState.AvailableReplicas < version.GetDeploymentSpec().GetReplicas()) {
117133
failedModels = append(failedModels, model.Name)
118134
}
119135
}
120136
}
137+
121138
return failedModels, nil
122139
}
123140

@@ -205,10 +222,11 @@ func (s *SimpleScheduler) scheduleToServer(modelName string) (*coordinator.Serve
205222
Debug("Identified candidate servers for model")
206223

207224
// The main logic of trying to find a server for the model is as follows:
208-
// 1. If there are enough replicas on a server, schedule the model
209-
// 2. If there are not enough replicas on a server, try to schedule with min replicas. In this case we actually should get
225+
// 1. If there are enough replicas of a server, schedule the model
226+
// 2. If there are not enough replicas of a server, try to schedule with min replicas. In this case we actually should get
210227
// the models loaded on all the replicas of the servers (assuming min replicas is less than the number of replicas on the server)
211-
// we also mark the model in this case as failed to schedule so that if the infra changes in the future we can try to reschedule
228+
// we mark the model as failed to schedule only if we failed to schedule on both desired replicas and min replicas,
229+
// so that if the infra changes in the future we can try to re-schedule
212230

213231
// For each server filter and sort replicas and attempt schedule if enough replicas
214232
ok := s.findAndUpdateToServers(filteredServers, latestModel, desiredReplicas, desiredReplicas)
@@ -246,10 +264,11 @@ func (s *SimpleScheduler) scheduleToServer(modelName string) (*coordinator.Serve
246264
return serverEvent, nil
247265
}
248266

249-
func (s *SimpleScheduler) findAndUpdateToServers(filteredServers []*store.ServerSnapshot, latestModel *store.ModelVersion, desiredReplicas, minReplicas int) bool {
267+
func (s *SimpleScheduler) findAndUpdateToServers(filteredServers []*store.ServerSnapshot, latestModel *store.ModelVersion, desiredReplicas, desiredMinReplicas int) bool {
250268
modelName := latestModel.GetMeta().GetName()
251269
logger := s.logger.WithField("func", "findAndUpdateToServers").WithField("model", modelName)
252270
ok := false
271+
253272
for _, candidateServer := range filteredServers {
254273
logger.WithField("server", candidateServer.Name).Debug("Checking compatibility with candidate server")
255274
var candidateReplicas *sorters.CandidateServer
@@ -259,21 +278,21 @@ func (s *SimpleScheduler) findAndUpdateToServers(filteredServers []*store.Server
259278
s.muSortAndUpdate.Lock()
260279
candidateReplicas = s.filterReplicas(latestModel, candidateServer)
261280
numServerReplicas := len(candidateReplicas.ChosenReplicas)
262-
if numServerReplicas < minReplicas {
281+
if numServerReplicas < desiredMinReplicas {
263282
logger.
264283
WithField("server", candidateServer.Name).
265284
WithField("available_replicas", numServerReplicas).
266285
WithField("desired_replicas", desiredReplicas).
267-
WithField("min_replicas", minReplicas).
286+
WithField("min_replicas", desiredMinReplicas).
268287
Debug("Skipping server due to insufficient suitable replicas")
269288

270289
s.muSortAndUpdate.Unlock()
271290
continue
272291
}
273292

274293
s.sortReplicas(candidateReplicas)
275-
numReplicas := minReplicas
276-
if minReplicas != desiredReplicas {
294+
numReplicas := desiredMinReplicas
295+
if desiredMinReplicas != desiredReplicas {
277296
numReplicas = min(numServerReplicas, desiredReplicas) // we have more replicas for the server than min, so we can use all of them
278297
}
279298
err := s.store.UpdateLoadedModels(

0 commit comments

Comments
 (0)