From 7b87bfbaafd26a035aac9b77e36dbbd55fccfb2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Tich=C3=A1k?= Date: Wed, 14 May 2025 17:28:33 +0200 Subject: [PATCH] [core] deployment retry properly retries only failed tasks --- core/task/manager.go | 53 ++- core/task/scheduler.go | 736 ++++++++++++++++++------------------ core/task/schedulerstate.go | 2 +- 3 files changed, 399 insertions(+), 392 deletions(-) diff --git a/core/task/manager.go b/core/task/manager.go index de705fe2..f9727ec7 100644 --- a/core/task/manager.go +++ b/core/task/manager.go @@ -28,12 +28,13 @@ import ( "context" "errors" "fmt" - "github.com/AliceO2Group/Control/common/utils/safeacks" "os" "strings" "sync" "time" + "github.com/AliceO2Group/Control/common/utils/safeacks" + "github.com/AliceO2Group/Control/apricot" "github.com/AliceO2Group/Control/common/event" "github.com/AliceO2Group/Control/common/gera" @@ -494,7 +495,12 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e undeployableNonCriticalDescriptors := make(Descriptors, 0) undeployableCriticalDescriptors := make(Descriptors, 0) - deployedTasks := make(DeploymentMap) + // we are retrying deployment multiple times in case of failure and we don't want + // to rerun tasks already running + tasksToRunThisAttempt := make(Descriptors, len(tasksToRun)) + copy(tasksToRunThisAttempt, tasksToRun) + + allDeployedTasks := make(DeploymentMap) if len(tasksToRun) > 0 { // Alright, so we have some descriptors whose requirements should be met with // new Tasks we're about to deploy here. @@ -524,45 +530,49 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e undeployableNonCriticalDescriptors = make(Descriptors, 0) undeployableCriticalDescriptors = make(Descriptors, 0) - deployedTasks = make(DeploymentMap) - outcomeCh := make(chan ResourceOffersOutcome) m.tasksToDeploy <- &ResourceOffersDeploymentRequest{ - tasksToDeploy: tasksToRun, + tasksToDeploy: tasksToRunThisAttempt, envId: envId, outcomeCh: outcomeCh, } // buffered channel, does not block log.WithField("partition", envId). - Debugf("scheduler has been sent request to deploy %d tasks", len(tasksToRun)) + Debugf("scheduler has been sent request to deploy %d tasks", len(tasksToRunThisAttempt)) timeReviveOffers := time.Now() timeDeployMu := time.Now() m.reviveOffersTrg <- struct{}{} // signal scheduler to revive offers <-m.reviveOffersTrg // we only continue when it's done utils.TimeTrack(timeReviveOffers, "acquireTasks: revive offers", - log.WithField("tasksToRun", len(tasksToRun)). + log.WithField("tasksToRunThisAttempt", len(tasksToRunThisAttempt)). WithField("partition", envId)) roOutcome := <-outcomeCh // blocks until a verdict from resourceOffers comes in utils.TimeTrack(timeDeployMu, "acquireTasks: deployment critical section", - log.WithField("tasksToRun", len(tasksToRun)). + log.WithField("tasksToRunThisAttempt", len(tasksToRunThisAttempt)). WithField("partition", envId)) - deployedTasks = roOutcome.deployed + deployedThisAttempt := roOutcome.deployed undeployedDescriptors = roOutcome.undeployed undeployableDescriptors = roOutcome.undeployable - logWithId.WithField("tasks", deployedTasks). - Debugf("resourceOffers is done, %d new tasks running", len(deployedTasks)) + logWithId.WithField("tasks", deployedThisAttempt). + Debugf("resourceOffers is done, %d new tasks running", len(deployedThisAttempt)) - if len(deployedTasks) != len(tasksToRun) { + for deployedTask, deployedDescriptor := range deployedThisAttempt { + allDeployedTasks[deployedTask] = deployedDescriptor + // add deployed tasks to roster, so updates can be distributed properly + m.roster.append(deployedTask) + } + + if len(deployedThisAttempt) != len(tasksToRunThisAttempt) { // ↑ Not all roles could be deployed. If some were critical, // we cannot proceed with running this environment. Either way, // we keep the roles running since they might be useful in the future. logWithId.WithField("level", infologger.IL_Devel). - Errorf("environment deployment failure: %d tasks requested for deployment, but %d deployed", len(tasksToRun), len(deployedTasks)) + Errorf("environment deployment failure: %d tasks requested for deployment, but %d deployed", len(tasksToRunThisAttempt), len(deployedThisAttempt)) for _, desc := range undeployedDescriptors { if desc.TaskRole.GetTaskTraits().Critical == true { @@ -586,7 +596,7 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e if deploymentSuccess { // ↑ means all the required critical processes are now running, // and we are ready to update the envId - for taskPtr, descriptor := range deployedTasks { + for taskPtr, descriptor := range deployedThisAttempt { taskPtr.SetParent(descriptor.TaskRole) // Ensure everything is filled out properly if !taskPtr.IsLocked() { @@ -596,6 +606,9 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e } break DEPLOYMENT_ATTEMPTS_LOOP } + tasksToRunThisAttempt = make(Descriptors, 0, len(undeployableDescriptors)+len(undeployedDescriptors)) + tasksToRunThisAttempt = append(tasksToRunThisAttempt, undeployedDescriptors...) + tasksToRunThisAttempt = append(tasksToRunThisAttempt, undeployableDescriptors...) log.WithField("partition", envId). WithField("level", infologger.IL_Devel). @@ -604,12 +617,14 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e } } + log.Infof("Succeeded to deploy %d/%d tasks", len(allDeployedTasks), len(tasksToRun)) + { logWithIdDev := logWithId.WithField("level", infologger.IL_Devel) logDescriptors("critical task deployment impossible: ", logWithIdDev.Errorf, undeployableCriticalDescriptors) logDescriptors("critical task deployment failure: ", logWithIdDev.Errorf, undeployedCriticalDescriptors) - logDescriptors("non-critical task deployment failure: ", logWithIdDev.Warningf, undeployedNonCriticalDescriptors) + logDescriptors("non-critical task deployment impossible: ", logWithIdDev.Warningf, undeployableNonCriticalDescriptors) } @@ -624,7 +639,7 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e if !deploymentSuccess { var deployedTaskIds []string - for taskPtr := range deployedTasks { + for taskPtr := range allDeployedTasks { taskPtr.SetParent(nil) deployedTaskIds = append(deployedTaskIds, taskPtr.taskId) } @@ -638,12 +653,8 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e } } - // Finally, we write to the roster. Point of no return! - for taskPtr := range deployedTasks { - m.roster.append(taskPtr) - } if deploymentSuccess { - for taskPtr := range deployedTasks { + for taskPtr := range allDeployedTasks { taskPtr.GetParent().SetTask(taskPtr) } for taskPtr, descriptor := range tasksAlreadyRunning { diff --git a/core/task/scheduler.go b/core/task/scheduler.go index aeea651f..c5088cc4 100644 --- a/core/task/scheduler.go +++ b/core/task/scheduler.go @@ -84,7 +84,8 @@ var schedEventsCh = make(chan scheduler.Event_Type) func runSchedulerController(ctx context.Context, state *schedulerState, - fidStore store.Singleton) error { + fidStore store.Singleton, +) error { // Set up communication from controller to state machine. go func() { for { @@ -275,7 +276,6 @@ func (state *schedulerState) incomingMessageHandler() events.HandlerFunc { // only one entry in the list, we signal back to commandqueue // otherwise, we log and ignore. return func(ctx context.Context, e *scheduler.Event) (err error) { - mesosMessage := e.GetMessage() if mesosMessage == nil { err = errors.New("message handler got bad MESSAGE") @@ -339,7 +339,7 @@ func (state *schedulerState) incomingMessageHandler() events.HandlerFunc { return } state.taskman.internalEventCh <- ev - //state.handleDeviceEvent(ev) + // state.handleDeviceEvent(ev) } else { log.WithFields(logrus.Fields{ "type": incomingEvent.Type.String(), @@ -440,7 +440,7 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han timeResourceOffersCall := time.Now() var ( offers = e.GetOffers().GetOffers() - callOption = calls.RefuseSeconds(time.Second) //calls.RefuseSecondsWithJitter(state.random, state.config.maxRefuseSeconds) + callOption = calls.RefuseSeconds(time.Second) // calls.RefuseSecondsWithJitter(state.random, state.config.maxRefuseSeconds) tasksLaunchedThisCycle = 0 offersDeclined = 0 ) @@ -449,16 +449,18 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han var ( prettyOffers []string offerIds []string + hostnames []string ) for i := range offers { prettyOffer, _ := json.MarshalIndent(offers[i], "", "\t") prettyOffers = append(prettyOffers, string(prettyOffer)) offerIds = append(offerIds, offers[i].ID.Value) + hostnames = append(hostnames, offers[i].Hostname) } log.WithPrefix("scheduler").WithFields(logrus.Fields{ - "offerIds": strings.Join(offerIds, ", "), - //"offers": strings.Join(prettyOffers, "\n"), - "offers": len(offerIds), + "offerIds": strings.Join(offerIds, ", "), + "hostnames": strings.Join(hostnames, ", "), + "offersCount": len(offerIds), }). Trace("received offers") } @@ -589,6 +591,10 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han } offerDescriptorsPrematchToDeploy[offer.ID] = append(offerDescriptorsPrematchToDeploy[offer.ID], descriptor) descriptorsStillToDeploy = append(descriptorsStillToDeploy[:i], descriptorsStillToDeploy[i+1:]...) + log.WithField("partition", envId.String()). + WithField("level", infologger.IL_Devel). + WithField("descriptor", descriptor.TaskClassName). + WithField("hostname", offer.Hostname).Trace("descriptor matched") } else { // We have a constraint on the machine_id, but we didn't find any offer that matches it. // We can bail out early. @@ -611,418 +617,410 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han // concurrent stuff starts to happen here. descriptorsMu := sync.Mutex{} - if len(descriptorsUndeployable) == 0 { // we still have hope to deploy something - - // Parallelized offer processing, exhaustive search for each offer - var offerWaitGroup sync.WaitGroup - offerWaitGroup.Add(len(offers)) - - for offerIndex := range offers { - go func(offerIndex int) { - defer offerWaitGroup.Done() - - offer := offers[offerIndex] - - timeSingleOffer := time.Now() - var ( - remainingResourcesInOffer = mesos.Resources(offer.Resources) - taskInfosToLaunchForCurrentOffer = make([]mesos.TaskInfo, 0) - tasksDeployedForCurrentOffer = make(DeploymentMap) - targetExecutorId = mesos.ExecutorID{} - ) - - // If there are no executors provided by the offer, - // we start a new one by generating a new ID - if len(offer.ExecutorIDs) == 0 { - targetExecutorId.Value = uid.New().String() + // Parallelized offer processing, exhaustive search for each offer + var offerWaitGroup sync.WaitGroup + offerWaitGroup.Add(len(offers)) + + for offerIndex := range offers { + go func(offerIndex int) { + defer offerWaitGroup.Done() + + offer := offers[offerIndex] + + timeSingleOffer := time.Now() + var ( + remainingResourcesInOffer = mesos.Resources(offer.Resources) + taskInfosToLaunchForCurrentOffer = make([]mesos.TaskInfo, 0) + tasksDeployedForCurrentOffer = make(DeploymentMap) + targetExecutorId = mesos.ExecutorID{} + ) + + // If there are no executors provided by the offer, + // we start a new one by generating a new ID + if len(offer.ExecutorIDs) == 0 { + targetExecutorId.Value = uid.New().String() + log.WithField("executorId", targetExecutorId.Value). + WithField("offerHost", offer.GetHostname()). + WithField("level", infologger.IL_Support). + Info("received offer without executor ID, will start new executor if accepted") + } else { + targetExecutorId.Value = offer.ExecutorIDs[0].Value + if len(offer.ExecutorIDs) == 1 { log.WithField("executorId", targetExecutorId.Value). WithField("offerHost", offer.GetHostname()). WithField("level", infologger.IL_Support). - Info("received offer without executor ID, will start new executor if accepted") - } else { - targetExecutorId.Value = offer.ExecutorIDs[0].Value - if len(offer.ExecutorIDs) == 1 { - log.WithField("executorId", targetExecutorId.Value). - WithField("offerHost", offer.GetHostname()). - WithField("level", infologger.IL_Support). - Info("received offer with one executor ID, will use existing executor") - } else if len(offer.ExecutorIDs) > 1 { - log.WithField("executorId", targetExecutorId.Value). - WithField("executorIds", offer.ExecutorIDs). - WithField("offerHost", offer.GetHostname()). - WithField("level", infologger.IL_Support). - Warn("received offer with more than one executor ID, will use first one") - } + Info("received offer with one executor ID, will use existing executor") + } else if len(offer.ExecutorIDs) > 1 { + log.WithField("executorId", targetExecutorId.Value). + WithField("executorIds", offer.ExecutorIDs). + WithField("offerHost", offer.GetHostname()). + WithField("level", infologger.IL_Support). + Warn("received offer with more than one executor ID, will use first one") } + } - host := offer.GetHostname() - var detector string - detector, err = apricot.Instance().GetDetectorForHost(host) - if err != nil { - detector = "" - } + host := offer.GetHostname() + var detector string + detector, err = apricot.Instance().GetDetectorForHost(host) + if err != nil { + detector = "" + } - log.WithPrefix("scheduler"). - WithFields(logrus.Fields{ - "offerId": offer.ID.Value, - "offerHost": host, - "resources": remainingResourcesInOffer.String(), - "partition": envId.String(), - "detector": detector, - }). - Debug("processing offer") - - remainingResourcesFlattened := resources.Flatten(remainingResourcesInOffer) - - // avoid the expense of computing these if we can... - if viper.GetBool("summaryMetrics") && viper.GetBool("mesosResourceTypeMetrics") { - for name, resType := range resources.TypesOf(remainingResourcesFlattened...) { - if resType == mesos.SCALAR { - sum, _ := name.Sum(remainingResourcesFlattened...) - state.metricsAPI.offeredResources(sum.GetScalar().GetValue(), name.String()) - } + log.WithPrefix("scheduler"). + WithFields(logrus.Fields{ + "offerId": offer.ID.Value, + "offerHost": host, + "resources": remainingResourcesInOffer.String(), + "partition": envId.String(), + "detector": detector, + }). + Debug("processing offer") + + remainingResourcesFlattened := resources.Flatten(remainingResourcesInOffer) + + // avoid the expense of computing these if we can... + if viper.GetBool("summaryMetrics") && viper.GetBool("mesosResourceTypeMetrics") { + for name, resType := range resources.TypesOf(remainingResourcesFlattened...) { + if resType == mesos.SCALAR { + sum, _ := name.Sum(remainingResourcesFlattened...) + state.metricsAPI.offeredResources(sum.GetScalar().GetValue(), name.String()) } } + } - log.WithPrefix("scheduler"). - WithField("partition", envId.String()). - WithField("detector", detector). - Trace("state lock to process descriptors to deploy") - - timeDescriptorsSection := time.Now() - descriptorsMu.Lock() + log.WithPrefix("scheduler"). + WithField("partition", envId.String()). + WithField("detector", detector). + Trace("state lock to process descriptors to deploy") - descriptorsPrematchToDeploy, prematchedDescriptorsExistForThisOffer := offerDescriptorsPrematchToDeploy[offer.ID] - if prematchedDescriptorsExistForThisOffer { - FOR_PREMATCH_DESCRIPTORS: - for i := len(descriptorsPrematchToDeploy) - 1; i >= 0; i-- { - descriptor := descriptorsPrematchToDeploy[i] + timeDescriptorsSection := time.Now() + descriptorsMu.Lock() - descriptorDetector, ok := descriptor.TaskRole.GetVars().Get("detector") - if !ok { - descriptorDetector = "" - } + descriptorsPrematchToDeploy, prematchedDescriptorsExistForThisOffer := offerDescriptorsPrematchToDeploy[offer.ID] + if prematchedDescriptorsExistForThisOffer { + FOR_PREMATCH_DESCRIPTORS: + for i := len(descriptorsPrematchToDeploy) - 1; i >= 0; i-- { + descriptor := descriptorsPrematchToDeploy[i] - offerAttributes := constraint.Attributes(offer.Attributes) - if !offerAttributes.Satisfy(descriptorConstraints[descriptor]) { - if viper.GetBool("veryVerbose") { - log.WithPrefix("scheduler"). - WithField("partition", envId.String()). - WithField("detector", descriptorDetector). - WithFields(logrus.Fields{ - "taskClass": descriptor.TaskClassName, - "constraints": descriptorConstraints[descriptor], - "offerId": offer.ID.Value, - "resources": remainingResourcesInOffer.String(), - "attributes": offerAttributes.String(), - }). - Trace("descriptor constraints not satisfied by pre-matched offer attributes, descriptor undeployable") - } - - // we know this descriptor will never be satisfiable, no point in continuing - descriptorsPrematchToDeploy = append(descriptorsPrematchToDeploy[:i], descriptorsPrematchToDeploy[i+1:]...) - descriptorsUndeployable = append(descriptorsUndeployable, descriptor) - - break FOR_PREMATCH_DESCRIPTORS - } - log.WithPrefix("scheduler"). - WithField("partition", envId.String()). - WithField("detector", descriptorDetector). - Debug("pre-matched offer attributes satisfy constraints") + descriptorDetector, ok := descriptor.TaskRole.GetVars().Get("detector") + if !ok { + descriptorDetector = "" + } - var wants *Wants - wants, err = state.taskman.GetWantsForDescriptor(descriptor, envId) - if err != nil { + offerAttributes := constraint.Attributes(offer.Attributes) + if !offerAttributes.Satisfy(descriptorConstraints[descriptor]) { + if viper.GetBool("veryVerbose") { log.WithPrefix("scheduler"). - WithError(err). WithField("partition", envId.String()). WithField("detector", descriptorDetector). WithFields(logrus.Fields{ - "class": descriptor.TaskClassName, - "constraints": descriptor.RoleConstraints.String(), - "level": infologger.IL_Devel, - "offerHost": offer.Hostname, + "taskClass": descriptor.TaskClassName, + "constraints": descriptorConstraints[descriptor], + "offerId": offer.ID.Value, + "resources": remainingResourcesInOffer.String(), + "attributes": offerAttributes.String(), }). - Error("invalid task class: no task class or no resource demands for pre-matched descriptor, WILL NOT BE DEPLOYED") - - // we know this descriptor will never be satisfiable, no point in continuing - descriptorsPrematchToDeploy = append(descriptorsPrematchToDeploy[:i], descriptorsPrematchToDeploy[i+1:]...) - descriptorsUndeployable = append(descriptorsUndeployable, descriptor) - - break FOR_PREMATCH_DESCRIPTORS - } - if !Resources(remainingResourcesInOffer).Satisfy(wants) { - if viper.GetBool("veryVerbose") { - log.WithPrefix("scheduler"). - WithField("partition", envId.String()). - WithField("detector", descriptorDetector). - WithFields(logrus.Fields{ - "taskClass": descriptor.TaskClassName, - "wants": *wants, - "offerId": offer.ID.Value, - "resources": remainingResourcesInOffer.String(), - "level": infologger.IL_Devel, - "offerHost": offer.Hostname, - }). - Warn("descriptor wants not satisfied by pre-matched offer resources") - } - - // we know this descriptor will never be satisfiable, no point in continuing - descriptorsPrematchToDeploy = append(descriptorsPrematchToDeploy[:i], descriptorsPrematchToDeploy[i+1:]...) - descriptorsUndeployable = append(descriptorsUndeployable, descriptor) - - break FOR_PREMATCH_DESCRIPTORS - } - - var limits *Limits - limits = state.taskman.GetLimitsForDescriptor(descriptor, envId) - - // Point of no return, we start subtracting resources - taskPtr, mesosTaskInfo := makeTaskForMesosResources( - state, - &offer, - descriptor, - wants, - limits, - remainingResourcesInOffer, - machinesUsed, - targetExecutorId, - envId, - descriptorDetector, - offerIDsToDecline, - ) - if taskPtr == nil || mesosTaskInfo == nil { - break FOR_PREMATCH_DESCRIPTORS + Trace("descriptor constraints not satisfied by pre-matched offer attributes, descriptor undeployable") } - log.WithPrefix("scheduler"). - WithField("partition", envId.String()). - WithField("detector", descriptorDetector). - WithFields(logrus.Fields{ - "name": mesosTaskInfo.Name, - "taskId": mesosTaskInfo.TaskID.Value, - "offerId": offer.ID.Value, - "executorId": state.executor.ExecutorID.Value, - "limits": mesosTaskInfo.Limits, - }).Debug("launching task") - - taskPtr.SendEvent(&event.TaskEvent{ - Name: taskPtr.GetName(), - TaskID: mesosTaskInfo.TaskID.Value, - State: "LAUNCHED", - Hostname: taskPtr.hostname, - ClassName: taskPtr.GetClassName(), - }) - - taskInfosToLaunchForCurrentOffer = append(taskInfosToLaunchForCurrentOffer, *mesosTaskInfo) + // we know this descriptor will never be satisfiable, no point in continuing descriptorsPrematchToDeploy = append(descriptorsPrematchToDeploy[:i], descriptorsPrematchToDeploy[i+1:]...) - tasksDeployedForCurrentOffer[taskPtr] = descriptor + descriptorsUndeployable = append(descriptorsUndeployable, descriptor) + break FOR_PREMATCH_DESCRIPTORS } - } // end FOR_PREMATCH_DESCRIPTORS - - if len(descriptorsUndeployable) == 0 { // still hope of deploying something - - // We iterate down over the descriptors, and we remove them as we match - FOR_DESCRIPTORS: - for i := len(descriptorsStillToDeploy) - 1; i >= 0; i-- { - descriptor := descriptorsStillToDeploy[i] - - descriptorDetector, ok := descriptor.TaskRole.GetVars().Get("detector") - if !ok { - descriptorDetector = "" - } + log.WithPrefix("scheduler"). + WithField("partition", envId.String()). + WithField("detector", descriptorDetector). + Debug("pre-matched offer attributes satisfy constraints") - offerAttributes := constraint.Attributes(offer.Attributes) - if !offerAttributes.Satisfy(descriptorConstraints[descriptor]) { - if viper.GetBool("veryVerbose") { - log.WithPrefix("scheduler"). - WithField("partition", envId.String()). - WithField("detector", descriptorDetector). - WithFields(logrus.Fields{ - "taskClass": descriptor.TaskClassName, - "constraints": descriptorConstraints[descriptor], - "offerId": offer.ID.Value, - "resources": remainingResourcesInOffer.String(), - "attributes": offerAttributes.String(), - }). - Trace("descriptor constraints not satisfied by offer attributes") - } - continue FOR_DESCRIPTORS // next descriptor - } + var wants *Wants + wants, err = state.taskman.GetWantsForDescriptor(descriptor, envId) + if err != nil { log.WithPrefix("scheduler"). + WithError(err). WithField("partition", envId.String()). WithField("detector", descriptorDetector). - Debug("offer attributes satisfy constraints") + WithFields(logrus.Fields{ + "class": descriptor.TaskClassName, + "constraints": descriptor.RoleConstraints.String(), + "level": infologger.IL_Devel, + "offerHost": offer.Hostname, + }). + Error("invalid task class: no task class or no resource demands for pre-matched descriptor, WILL NOT BE DEPLOYED") + + // we know this descriptor will never be satisfiable, no point in continuing + descriptorsPrematchToDeploy = append(descriptorsPrematchToDeploy[:i], descriptorsPrematchToDeploy[i+1:]...) + descriptorsUndeployable = append(descriptorsUndeployable, descriptor) - var wants *Wants - wants, err = state.taskman.GetWantsForDescriptor(descriptor, envId) - if err != nil { + break FOR_PREMATCH_DESCRIPTORS + } + if !Resources(remainingResourcesInOffer).Satisfy(wants) { + if viper.GetBool("veryVerbose") { log.WithPrefix("scheduler"). - WithError(err). WithField("partition", envId.String()). WithField("detector", descriptorDetector). WithFields(logrus.Fields{ - "class": descriptor.TaskClassName, - "constraints": descriptor.RoleConstraints.String(), - "level": infologger.IL_Devel, - "offerHost": offer.Hostname, + "taskClass": descriptor.TaskClassName, + "wants": *wants, + "offerId": offer.ID.Value, + "resources": remainingResourcesInOffer.String(), + "level": infologger.IL_Devel, + "offerHost": offer.Hostname, }). - Error("invalid task class: no task class or no resource demands for descriptor, WILL NOT BE DEPLOYED") - continue FOR_DESCRIPTORS // next descriptor - } - if !Resources(remainingResourcesInOffer).Satisfy(wants) { - if viper.GetBool("veryVerbose") { - log.WithPrefix("scheduler"). - WithField("partition", envId.String()). - WithField("detector", descriptorDetector). - WithFields(logrus.Fields{ - "taskClass": descriptor.TaskClassName, - "wants": *wants, - "offerId": offer.ID.Value, - "resources": remainingResourcesInOffer.String(), - "level": infologger.IL_Devel, - "offerHost": offer.Hostname, - }). - Warn("descriptor wants not satisfied by offer resources") - } - continue FOR_DESCRIPTORS // next descriptor + Warn("descriptor wants not satisfied by pre-matched offer resources") } - var limits *Limits - limits = state.taskman.GetLimitsForDescriptor(descriptor, envId) - - // Point of no return, we start subtracting resources - taskPtr, mesosTaskInfo := makeTaskForMesosResources( - state, - &offer, - descriptor, - wants, - limits, - remainingResourcesInOffer, - machinesUsed, - targetExecutorId, - envId, - descriptorDetector, - offerIDsToDecline, - ) - if taskPtr == nil || mesosTaskInfo == nil { - continue FOR_DESCRIPTORS // next descriptor - } + // we know this descriptor will never be satisfiable, no point in continuing + descriptorsPrematchToDeploy = append(descriptorsPrematchToDeploy[:i], descriptorsPrematchToDeploy[i+1:]...) + descriptorsUndeployable = append(descriptorsUndeployable, descriptor) + + break FOR_PREMATCH_DESCRIPTORS + } + + var limits *Limits + limits = state.taskman.GetLimitsForDescriptor(descriptor, envId) + + // Point of no return, we start subtracting resources + taskPtr, mesosTaskInfo := makeTaskForMesosResources( + state, + &offer, + descriptor, + wants, + limits, + remainingResourcesInOffer, + machinesUsed, + targetExecutorId, + envId, + descriptorDetector, + offerIDsToDecline, + ) + if taskPtr == nil || mesosTaskInfo == nil { + break FOR_PREMATCH_DESCRIPTORS + } + + log.WithPrefix("scheduler"). + WithField("partition", envId.String()). + WithField("detector", descriptorDetector). + WithFields(logrus.Fields{ + "name": mesosTaskInfo.Name, + "taskId": mesosTaskInfo.TaskID.Value, + "offerId": offer.ID.Value, + "executorId": state.executor.ExecutorID.Value, + "limits": mesosTaskInfo.Limits, + }).Debug("launching task") + + taskPtr.SendEvent(&event.TaskEvent{ + Name: taskPtr.GetName(), + TaskID: mesosTaskInfo.TaskID.Value, + State: "LAUNCHED", + Hostname: taskPtr.hostname, + ClassName: taskPtr.GetClassName(), + }) + + taskInfosToLaunchForCurrentOffer = append(taskInfosToLaunchForCurrentOffer, *mesosTaskInfo) + descriptorsPrematchToDeploy = append(descriptorsPrematchToDeploy[:i], descriptorsPrematchToDeploy[i+1:]...) + tasksDeployedForCurrentOffer[taskPtr] = descriptor + } + } // end FOR_PREMATCH_DESCRIPTORS + + // We iterate down over the descriptors, and we remove them as we match + FOR_DESCRIPTORS: + for i := len(descriptorsStillToDeploy) - 1; i >= 0; i-- { + descriptor := descriptorsStillToDeploy[i] + + descriptorDetector, ok := descriptor.TaskRole.GetVars().Get("detector") + if !ok { + descriptorDetector = "" + } + + offerAttributes := constraint.Attributes(offer.Attributes) + if !offerAttributes.Satisfy(descriptorConstraints[descriptor]) { + if viper.GetBool("veryVerbose") { log.WithPrefix("scheduler"). WithField("partition", envId.String()). WithField("detector", descriptorDetector). WithFields(logrus.Fields{ - "name": mesosTaskInfo.Name, - "taskId": mesosTaskInfo.TaskID.Value, - "offerId": offer.ID.Value, - "executorId": state.executor.ExecutorID.Value, - "limits": mesosTaskInfo.Limits, - }).Debug("launching task") - - taskPtr.SendEvent(&event.TaskEvent{ - Name: taskPtr.GetName(), - TaskID: mesosTaskInfo.TaskID.Value, - State: "LAUNCHED", - Hostname: taskPtr.hostname, - ClassName: taskPtr.GetClassName(), - }) - - taskInfosToLaunchForCurrentOffer = append(taskInfosToLaunchForCurrentOffer, *mesosTaskInfo) - descriptorsStillToDeploy = append(descriptorsStillToDeploy[:i], descriptorsStillToDeploy[i+1:]...) - tasksDeployedForCurrentOffer[taskPtr] = descriptor - - } // end FOR_DESCRIPTORS + "taskClass": descriptor.TaskClassName, + "constraints": descriptorConstraints[descriptor], + "offerId": offer.ID.Value, + "resources": remainingResourcesInOffer.String(), + "attributes": offerAttributes.String(), + }). + Trace("descriptor constraints not satisfied by offer attributes") + } + continue FOR_DESCRIPTORS // next descriptor } - descriptorsMu.Unlock() - - utils.TimeTrack(timeDescriptorsSection, "resourceOffers: single offer descriptors section", log. - WithField("partition", envId.String()). - WithField("offerHost", host). - WithField("tasksDeployed", len(tasksDeployedForCurrentOffer)). - WithField("descriptorsStillToDeploy", len(descriptorsStillToDeploy)). - WithField("offers", len(offers))) - timeOfferAcceptance := time.Now() - log.WithPrefix("scheduler"). - WithField("offerHost", host). - WithField("detector", detector). WithField("partition", envId.String()). - Trace("state unlock") - - // build ACCEPT call to launch all of the tasks we've assembled - accept := calls.Accept( - calls.OfferOperations{calls.OpLaunch(taskInfosToLaunchForCurrentOffer...)}.WithOffers(offer.ID), - ).With(callOption) // handles refuseSeconds etc. + WithField("detector", descriptorDetector). + Debug("offer attributes satisfy constraints") - // send ACCEPT call to mesos - err = calls.CallNoData(ctx, state.cli, accept) + var wants *Wants + wants, err = state.taskman.GetWantsForDescriptor(descriptor, envId) if err != nil { log.WithPrefix("scheduler"). WithError(err). - WithField("detector", detector). WithField("partition", envId.String()). - WithField("offerHost", host). - Error("failed to launch tasks") - // FIXME: we probably need to react to a failed ACCEPT here - } else { - if n := len(taskInfosToLaunchForCurrentOffer); n > 0 { - tasksLaunchedThisCycle += n + WithField("detector", descriptorDetector). + WithFields(logrus.Fields{ + "class": descriptor.TaskClassName, + "constraints": descriptor.RoleConstraints.String(), + "level": infologger.IL_Devel, + "offerHost": offer.Hostname, + }). + Error("invalid task class: no task class or no resource demands for descriptor, WILL NOT BE DEPLOYED") + continue FOR_DESCRIPTORS // next descriptor + } + if !Resources(remainingResourcesInOffer).Satisfy(wants) { + if viper.GetBool("veryVerbose") { log.WithPrefix("scheduler"). - WithField("tasks", n). WithField("partition", envId.String()). - WithField("detector", detector). - WithField("level", infologger.IL_Support). - WithField("offerHost", offer.Hostname). - WithField("executorId", targetExecutorId.Value). - Infof("launch request sent to %s: %d tasks", offer.Hostname, n) - for _, taskInfo := range taskInfosToLaunchForCurrentOffer { - log.WithPrefix("scheduler"). - WithFields(logrus.Fields{ - "executorId": taskInfo.GetExecutor().ExecutorID.Value, - "executorName": taskInfo.GetExecutor().GetName(), - "agentId": taskInfo.GetAgentID().Value, - "taskId": taskInfo.GetTaskID().Value, - "level": infologger.IL_Devel, - }). - WithField("offerHost", offer.Hostname). - WithField("partition", envId.String()). - WithField("detector", detector). - Debug("task launch requested") - } - - tasksDeployedMutex.Lock() - // update deployment map - for k, v := range tasksDeployedForCurrentOffer { - tasksDeployed[k] = v - } - tasksDeployedMutex.Unlock() - } else { - offersDeclined++ + WithField("detector", descriptorDetector). + WithFields(logrus.Fields{ + "taskClass": descriptor.TaskClassName, + "wants": *wants, + "offerId": offer.ID.Value, + "resources": remainingResourcesInOffer.String(), + "level": infologger.IL_Devel, + "offerHost": offer.Hostname, + }). + Warn("descriptor wants not satisfied by offer resources") } + continue FOR_DESCRIPTORS // next descriptor } - utils.TimeTrack(timeOfferAcceptance, "resourceOffers: single offer acceptance section", log. - WithField("partition", envId.String()). - WithField("detector", detector). - WithField("tasksDeployed", len(tasksDeployedForCurrentOffer)). - WithField("descriptorsStillToDeploy", len(descriptorsStillToDeploy)). - WithField("offers", len(offers)). - WithField("offerHost", offer.Hostname)) - utils.TimeTrack(timeSingleOffer, "resourceOffers: process and accept host offer", log. + var limits *Limits + limits = state.taskman.GetLimitsForDescriptor(descriptor, envId) + + // Point of no return, we start subtracting resources + taskPtr, mesosTaskInfo := makeTaskForMesosResources( + state, + &offer, + descriptor, + wants, + limits, + remainingResourcesInOffer, + machinesUsed, + targetExecutorId, + envId, + descriptorDetector, + offerIDsToDecline, + ) + if taskPtr == nil || mesosTaskInfo == nil { + continue FOR_DESCRIPTORS // next descriptor + } + + log.WithPrefix("scheduler"). WithField("partition", envId.String()). + WithField("detector", descriptorDetector). + WithFields(logrus.Fields{ + "name": mesosTaskInfo.Name, + "taskId": mesosTaskInfo.TaskID.Value, + "offerId": offer.ID.Value, + "executorId": state.executor.ExecutorID.Value, + "limits": mesosTaskInfo.Limits, + }).Debug("launching task") + + taskPtr.SendEvent(&event.TaskEvent{ + Name: taskPtr.GetName(), + TaskID: mesosTaskInfo.TaskID.Value, + State: "LAUNCHED", + Hostname: taskPtr.hostname, + ClassName: taskPtr.GetClassName(), + }) + + taskInfosToLaunchForCurrentOffer = append(taskInfosToLaunchForCurrentOffer, *mesosTaskInfo) + descriptorsStillToDeploy = append(descriptorsStillToDeploy[:i], descriptorsStillToDeploy[i+1:]...) + tasksDeployedForCurrentOffer[taskPtr] = descriptor + + } // end FOR_DESCRIPTORS + descriptorsMu.Unlock() + + utils.TimeTrack(timeDescriptorsSection, "resourceOffers: single offer descriptors section", log. + WithField("partition", envId.String()). + WithField("offerHost", host). + WithField("tasksDeployed", len(tasksDeployedForCurrentOffer)). + WithField("descriptorsStillToDeploy", len(descriptorsStillToDeploy)). + WithField("offers", len(offers))) + timeOfferAcceptance := time.Now() + + log.WithPrefix("scheduler"). + WithField("offerHost", host). + WithField("detector", detector). + WithField("partition", envId.String()). + Trace("state unlock") + + // build ACCEPT call to launch all of the tasks we've assembled + accept := calls.Accept( + calls.OfferOperations{calls.OpLaunch(taskInfosToLaunchForCurrentOffer...)}.WithOffers(offer.ID), + ).With(callOption) // handles refuseSeconds etc. + + // send ACCEPT call to mesos + err = calls.CallNoData(ctx, state.cli, accept) + if err != nil { + log.WithPrefix("scheduler"). + WithError(err). WithField("detector", detector). - WithField("tasksDeployed", len(tasksDeployedForCurrentOffer)). - WithField("descriptorsStillToDeploy", len(descriptorsStillToDeploy)). - WithField("offers", len(offers)). - WithField("offerHost", offer.Hostname)) + WithField("partition", envId.String()). + WithField("offerHost", host). + Error("failed to launch tasks") + // FIXME: we probably need to react to a failed ACCEPT here + } else { + if n := len(taskInfosToLaunchForCurrentOffer); n > 0 { + tasksLaunchedThisCycle += n + log.WithPrefix("scheduler"). + WithField("tasks", n). + WithField("partition", envId.String()). + WithField("detector", detector). + WithField("level", infologger.IL_Support). + WithField("offerHost", offer.Hostname). + WithField("executorId", targetExecutorId.Value). + Infof("launch request sent to %s: %d tasks", offer.Hostname, n) + for _, taskInfo := range taskInfosToLaunchForCurrentOffer { + log.WithPrefix("scheduler"). + WithFields(logrus.Fields{ + "executorId": taskInfo.GetExecutor().ExecutorID.Value, + "executorName": taskInfo.GetExecutor().GetName(), + "agentId": taskInfo.GetAgentID().Value, + "taskId": taskInfo.GetTaskID().Value, + "level": infologger.IL_Devel, + }). + WithField("offerHost", offer.Hostname). + WithField("partition", envId.String()). + WithField("detector", detector). + Debug("task launch requested") + } - }(offerIndex) // end for offer closure - } // end for _, offer := range offers - offerWaitGroup.Wait() + tasksDeployedMutex.Lock() + // update deployment map + for k, v := range tasksDeployedForCurrentOffer { + tasksDeployed[k] = v + } + tasksDeployedMutex.Unlock() + } else { + offersDeclined++ + } + } + utils.TimeTrack(timeOfferAcceptance, "resourceOffers: single offer acceptance section", log. + WithField("partition", envId.String()). + WithField("detector", detector). + WithField("tasksDeployed", len(tasksDeployedForCurrentOffer)). + WithField("descriptorsStillToDeploy", len(descriptorsStillToDeploy)). + WithField("offers", len(offers)). + WithField("offerHost", offer.Hostname)) - } + utils.TimeTrack(timeSingleOffer, "resourceOffers: process and accept host offer", log. + WithField("partition", envId.String()). + WithField("detector", detector). + WithField("tasksDeployed", len(tasksDeployedForCurrentOffer)). + WithField("descriptorsStillToDeploy", len(descriptorsStillToDeploy)). + WithField("offers", len(offers)). + WithField("offerHost", offer.Hostname)) + }(offerIndex) // end for offer closure + } // end for _, offer := range offers + offerWaitGroup.Wait() utils.TimeTrack(timeForOffers, "resourceOffers: pre-processing done to for_offers done (concurrent)", log. WithField("partition", envId.String()). @@ -1339,7 +1337,6 @@ func makeTaskForMesosResources( descriptorDetector string, offerIDsToDecline map[mesos.OfferID]struct{}, ) (*Task, *mesos.TaskInfo) { - bindMap := make(channel.BindMap) for _, ch := range wants.InboundChannels { if ch.Addressing == channel.IPC { @@ -1372,7 +1369,7 @@ func makeTaskForMesosResources( Attributes: offer.Attributes, Hostname: offer.Hostname, } - state.taskman.AgentCache.Update(agentForCache) //thread safe + state.taskman.AgentCache.Update(agentForCache) // thread safe machinesUsed[offer.Hostname] = struct{}{} taskPtr := state.taskman.newTaskForMesosOffer(offer, descriptor, bindMap, targetExecutorId) @@ -1570,12 +1567,11 @@ func makeTaskForMesosResources( ldLibPath, ok := agentForCache.Attributes.Get("executor_env_LD_LIBRARY_PATH") mesosTaskInfo.Executor.Command.Environment = &mesos.Environment{} if ok { - mesosTaskInfo.Executor.Command.Environment.Variables = - append(mesosTaskInfo.Executor.Command.Environment.Variables, - mesos.Environment_Variable{ - Name: "LD_LIBRARY_PATH", - Value: proto.String(ldLibPath), - }) + mesosTaskInfo.Executor.Command.Environment.Variables = append(mesosTaskInfo.Executor.Command.Environment.Variables, + mesos.Environment_Variable{ + Name: "LD_LIBRARY_PATH", + Value: proto.String(ldLibPath), + }) } return taskPtr, &mesosTaskInfo diff --git a/core/task/schedulerstate.go b/core/task/schedulerstate.go index e85e01ee..86970754 100644 --- a/core/task/schedulerstate.go +++ b/core/task/schedulerstate.go @@ -50,7 +50,7 @@ import ( const ( MAX_CONCURRENT_DEPLOY_REQUESTS = 100 - MAX_ATTEMPTS_PER_DEPLOY_REQUEST = 3 + MAX_ATTEMPTS_PER_DEPLOY_REQUEST = 5 SLEEP_LENGTH_BETWEEN_PER_DEPLOY_REQUESTS = 1 // in seconds )