Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 58 additions & 8 deletions core/environment/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (envs *Manager) GetActiveDetectors() system.IDMap {
return response
}

func (envs *Manager) CreateEnvironment(workflowPath string, userVars map[string]string, public bool, newId uid.ID, autoTransition bool) (uid.ID, error) {
func (envs *Manager) CreateEnvironment(workflowPath string, userVars map[string]string, public bool, newId uid.ID, autoTransition bool) (resultEnvId uid.ID, resultErr error) {
// Before we load the workflow, we get the list of currently active detectors. This query must be performed before
// loading the workflow in order to compare the currently used detectors with the detectors required by the newly
// created environment.
Expand All @@ -214,6 +214,31 @@ func (envs *Manager) CreateEnvironment(workflowPath string, userVars map[string]
_ = json.Unmarshal([]byte(lastRequestUserJ), lastRequestUser)
}

// CreateEnvironment() is not transition from state machine, so we need to emit the same message as in TryTransition
the.EventWriterWithTopic(topic.Environment).WriteEvent(&evpb.Ev_EnvironmentEvent{
EnvironmentId: newId.String(),
State: "PENDING",
Transition: "CREATE",
TransitionStatus: evpb.OpStatus_STARTED,
LastRequestUser: lastRequestUser,
Message: "transition starting",
})

// report error of the CreateEnvironment() in the same way as in TryTransition
defer func() {
if resultErr != nil {
the.EventWriterWithTopic(topic.Environment).WriteEvent(&evpb.Ev_EnvironmentEvent{
EnvironmentId: newId.String(),
Error: resultErr.Error(),
LastRequestUser: lastRequestUser,
Message: "transition error",
State: "PENDING",
Transition: "CREATE",
TransitionStatus: evpb.OpStatus_DONE_ERROR,
})
}
}()

// in case of err==nil, env will be false unless user
// set it to True which will be overwritten in server.go
workflowPublicInfo, err := parseWorkflowPublicInfo(workflowPath)
Expand All @@ -222,7 +247,10 @@ func (envs *Manager) CreateEnvironment(workflowPath string, userVars map[string]
WithField("workflow path", workflowPath).
WithError(err).
Warn("parse workflow public info failed.")
return newId, fmt.Errorf("workflow public info parsing failed: %w", err)

resultEnvId = newId
resultErr = fmt.Errorf("workflow public info parsing failed: %w", err)
return
}

the.EventWriterWithTopic(topic.Environment).WriteEvent(&evpb.Ev_EnvironmentEvent{
Expand Down Expand Up @@ -305,7 +333,9 @@ func (envs *Manager) CreateEnvironment(workflowPath string, userVars map[string]
log.WithError(err).
WithField("partition", gotEnvId.String()).
Logf(logrus.FatalLevel, "environment creation failed")
return gotEnvId, err
resultEnvId = gotEnvId
resultErr = err
return
}

log.WithFields(logrus.Fields{
Expand Down Expand Up @@ -352,21 +382,27 @@ func (envs *Manager) CreateEnvironment(workflowPath string, userVars map[string]
if err != nil {
err = fmt.Errorf("cannot load workflow template: %w", err)

return env.id, err
resultEnvId = env.id
resultErr = err
return
}

// Ensure we provide a very defaulty `detectors` variable
detectors, err := the.ConfSvc().GetDetectorsForHosts(env.GetFLPs())
if err != nil {
err = fmt.Errorf("cannot acquire detectors in loaded workflow template: %w", err)

return env.id, err
resultEnvId = env.id
resultErr = err
return
}
detectorsStr, err := SliceToJSONSlice(detectors)
if err != nil {
err = fmt.Errorf("cannot process detectors in loaded workflow template: %w", err)

return env.id, err
resultEnvId = env.id
resultErr = err
return
}
env.GlobalDefaults.Set("detectors", detectorsStr)

Expand All @@ -379,7 +415,10 @@ func (envs *Manager) CreateEnvironment(workflowPath string, userVars map[string]
for det := range neededDetectors {
if _, contains := alreadyActiveDetectors[det]; contains {
// required detector det is already active in some other environment
return env.id, fmt.Errorf("detector %s is already in use", det.String())

resultEnvId = env.id
resultErr = fmt.Errorf("detector %s is already in use", det.String())
return
}
}

Expand All @@ -396,6 +435,15 @@ func (envs *Manager) CreateEnvironment(workflowPath string, userVars map[string]
WorkflowTemplateInfo: env.GetWorkflowInfo(),
})

the.EventWriterWithTopic(topic.Environment).WriteEvent(&evpb.Ev_EnvironmentEvent{
EnvironmentId: newId.String(),
LastRequestUser: lastRequestUser,
Message: "transition completed successfully",
State: env.CurrentState(),
Transition: "CREATE",
TransitionStatus: evpb.OpStatus_DONE_OK,
})

log.WithField("method", "CreateEnvironment").
WithField("level", infologger.IL_Devel).
Debug("envman write lock")
Expand Down Expand Up @@ -528,7 +576,9 @@ func (envs *Manager) CreateEnvironment(workflowPath string, userVars map[string]
}()
}

return env.id, err
resultEnvId = env.id
resultErr = err
return
}

// Deployment/configuration failure code path starts here
Expand Down