diff --git a/bundle/phases/deploy.go b/bundle/phases/deploy.go index 94d05aa8bc..db2e5aed5d 100644 --- a/bundle/phases/deploy.go +++ b/bundle/phases/deploy.go @@ -17,6 +17,7 @@ import ( "github.com/databricks/cli/bundle/libraries" "github.com/databricks/cli/bundle/metrics" "github.com/databricks/cli/bundle/permissions" + "github.com/databricks/cli/bundle/resourcesnapshot" "github.com/databricks/cli/bundle/scripts" "github.com/databricks/cli/bundle/statemgmt" "github.com/databricks/cli/libs/cmdio" @@ -116,6 +117,7 @@ func deployCore(ctx context.Context, b *bundle.Bundle, plan *deployplan.Plan, ta statemgmt.Load(targetEngine), metadata.Compute(), metadata.Upload(), + resourcesnapshot.Save(), ) if !logdiag.HasError(ctx) { diff --git a/bundle/resourcesnapshot/snapshot.go b/bundle/resourcesnapshot/snapshot.go new file mode 100644 index 0000000000..b980cd7d86 --- /dev/null +++ b/bundle/resourcesnapshot/snapshot.go @@ -0,0 +1,149 @@ +package resourcesnapshot + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "io" + "io/fs" + "path" + "strconv" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/libs/diag" + "github.com/databricks/cli/libs/filer" + "github.com/databricks/cli/libs/log" + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/databricks/databricks-sdk-go/service/pipelines" +) + +const snapshotFileName = "resource_snapshots.json" + +// Snapshot stores the state of resources from the last successful deploy. +type Snapshot struct { + Jobs map[string]*jobs.Job `json:"jobs"` + Pipelines map[string]*pipelines.GetPipelineResponse `json:"pipelines"` +} + +func snapshotFilePath(b *bundle.Bundle) string { + return path.Join(b.Config.Workspace.StatePath, snapshotFileName) +} + +type save struct{} + +func Save() bundle.Mutator { + return &save{} +} + +func (s *save) Name() string { + return "resourcesnapshot.Save" +} + +func (s *save) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { + snapshot := &Snapshot{ + Jobs: make(map[string]*jobs.Job), + Pipelines: make(map[string]*pipelines.GetPipelineResponse), + } + + w := b.WorkspaceClient() + + // Fetch and store job snapshots + for key, job := range b.Config.Resources.Jobs { + if job.ID == "" { + log.Debugf(ctx, "Skipping job %s: no ID (not deployed)", key) + continue + } + + jobID, err := strconv.ParseInt(job.ID, 10, 64) + if err != nil { + log.Warnf(ctx, "Skipping job %s: invalid ID %q: %v", key, job.ID, err) + continue + } + + remoteJob, err := w.Jobs.Get(ctx, jobs.GetJobRequest{ + JobId: jobID, + }) + if err != nil { + log.Warnf(ctx, "Failed to fetch job %s (ID: %d): %v", key, jobID, err) + continue + } + + snapshot.Jobs[key] = remoteJob + log.Debugf(ctx, "Saved snapshot for job %s (ID: %d)", key, jobID) + } + + // Fetch and store pipeline snapshots + for key, pipeline := range b.Config.Resources.Pipelines { + if pipeline.ID == "" { + log.Debugf(ctx, "Skipping pipeline %s: no ID (not deployed)", key) + continue + } + + remotePipeline, err := w.Pipelines.Get(ctx, pipelines.GetPipelineRequest{ + PipelineId: pipeline.ID, + }) + if err != nil { + log.Warnf(ctx, "Failed to fetch pipeline %s (ID: %s): %v", key, pipeline.ID, err) + continue + } + + snapshot.Pipelines[key] = remotePipeline + log.Debugf(ctx, "Saved snapshot for pipeline %s (ID: %s)", key, pipeline.ID) + } + + // Marshal snapshot to JSON + data, err := json.MarshalIndent(snapshot, "", " ") + if err != nil { + return diag.FromErr(err) + } + + // Write to WSFS + f, err := filer.NewWorkspaceFilesClient(b.WorkspaceClient(), b.Config.Workspace.StatePath) + if err != nil { + return diag.FromErr(err) + } + + err = f.Write(ctx, snapshotFileName, bytes.NewReader(data), filer.CreateParentDirectories, filer.OverwriteIfExists) + if err != nil { + return diag.FromErr(err) + } + + log.Infof(ctx, "Saved resource snapshots to %s", snapshotFilePath(b)) + return nil +} + +// Load reads the snapshot from WSFS. +// Returns nil if the snapshot file doesn't exist (e.g., first deploy). +func Load(ctx context.Context, b *bundle.Bundle) (*Snapshot, error) { + f, err := filer.NewWorkspaceFilesClient(b.WorkspaceClient(), b.Config.Workspace.StatePath) + if err != nil { + return nil, err + } + + r, err := f.Read(ctx, snapshotFileName) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + log.Debugf(ctx, "No previous snapshot found at %s", snapshotFilePath(b)) + return nil, nil + } + return nil, err + } + defer r.Close() + + data, err := io.ReadAll(r) + if err != nil { + return nil, err + } + + snapshot := &Snapshot{} + err = json.Unmarshal(data, snapshot) + if err != nil { + return nil, err + } + + log.Debugf(ctx, "Loaded snapshot with %d jobs and %d pipelines from %s", + len(snapshot.Jobs), len(snapshot.Pipelines), snapshotFilePath(b)) + + return snapshot, nil +} diff --git a/cmd/bundle/debug.go b/cmd/bundle/debug.go index 2c6658b968..04d320276f 100644 --- a/cmd/bundle/debug.go +++ b/cmd/bundle/debug.go @@ -17,5 +17,6 @@ func newDebugCommand() *cobra.Command { cmd.AddCommand(debug.NewRefSchemaCommand()) cmd.AddCommand(debug.NewPlanCommand()) cmd.AddCommand(debug.NewStatesCommand()) + cmd.AddCommand(debug.NewExpDiffCommand()) return cmd } diff --git a/cmd/bundle/debug/diff_writer.go b/cmd/bundle/debug/diff_writer.go new file mode 100644 index 0000000000..c5677c3ab8 --- /dev/null +++ b/cmd/bundle/debug/diff_writer.go @@ -0,0 +1,441 @@ +package debug + +import ( + "bytes" + "context" + "errors" + "fmt" + "os" + "path/filepath" + "reflect" + "strconv" + "strings" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/libs/dyn" + "github.com/databricks/cli/libs/dyn/convert" + "github.com/databricks/cli/libs/dyn/yamlloader" + "github.com/databricks/cli/libs/log" + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/databricks/databricks-sdk-go/service/pipelines" + "github.com/r3labs/diff/v3" + "gopkg.in/yaml.v3" +) + +// DiffWriter handles writing diff changes back to YAML files +type DiffWriter struct { + bundle *bundle.Bundle + saveToFile bool +} + +// NewDiffWriter creates a new DiffWriter +func NewDiffWriter(b *bundle.Bundle) *DiffWriter { + return &DiffWriter{bundle: b} +} + +// WriteJobDiff writes job diff changes back to the YAML file +func (w *DiffWriter) WriteJobDiff(ctx context.Context, jobKey string, currentState any, changelog diff.Changelog) (*FileChange, error) { + return w.writeResourceDiff(ctx, "jobs", jobKey, currentState, changelog, extractJobSettings) +} + +// WritePipelineDiff writes pipeline diff changes back to the YAML file +func (w *DiffWriter) WritePipelineDiff(ctx context.Context, pipelineKey string, currentState any, changelog diff.Changelog) (*FileChange, error) { + return w.writeResourceDiff(ctx, "pipelines", pipelineKey, currentState, changelog, extractPipelineSpec) +} + +// extractorFunc extracts the relevant settings from the full API response +type extractorFunc func(any) (any, error) + +// extractJobSettings extracts JobSettings from a Job response +func extractJobSettings(state any) (any, error) { + job, ok := state.(*jobs.Job) + if !ok { + return nil, fmt.Errorf("expected *jobs.Job, got %T", state) + } + if job.Settings == nil { + return nil, errors.New("job settings is nil") + } + return job.Settings, nil +} + +// extractPipelineSpec extracts PipelineSpec from a Pipeline response +func extractPipelineSpec(state any) (any, error) { + pipeline, ok := state.(*pipelines.GetPipelineResponse) + if !ok { + return nil, fmt.Errorf("expected *pipelines.GetPipelineResponse, got %T", state) + } + if pipeline.Spec == nil { + return nil, errors.New("pipeline spec is nil") + } + return pipeline.Spec, nil +} + +// unwrapSettingsPath removes the "Settings" or "Spec" wrapper from the path +// SDK responses have job.Settings.Field, but YAML has jobs.my_job.field +func unwrapSettingsPath(path []string) []string { + if len(path) > 0 && (path[0] == "Settings" || path[0] == "Spec") { + return path[1:] + } + return path +} + +// convertChangePathToDynPath converts a changelog path to a dyn.Path +// It handles converting SDK struct field names to JSON tag names using reflection +func convertChangePathToDynPath(path []string, structType reflect.Type) (dyn.Path, error) { + var dynPath dyn.Path + currentType := structType + + for _, segment := range path { + if currentType.Kind() == reflect.Ptr { + currentType = currentType.Elem() + } + + if currentType.Kind() != reflect.Struct { + // Check if segment is a numeric index (for arrays/slices) + if index, err := strconv.Atoi(segment); err == nil { + // It's a numeric index - use Index instead of Key + dynPath = dynPath.Append(dyn.Index(index)) + // Update currentType to element type for arrays/slices + if currentType.Kind() == reflect.Slice || currentType.Kind() == reflect.Array { + currentType = currentType.Elem() + } + continue + } + // For non-numeric segments in non-structs (e.g., map keys), use the segment as-is + dynPath = dynPath.Append(dyn.Key(segment)) + continue + } + + // Find the field in the struct + field, found := currentType.FieldByName(segment) + if !found { + return nil, fmt.Errorf("field %s not found in type %s", segment, currentType.Name()) + } + + // Get the JSON tag name + jsonTag := field.Tag.Get("json") + if jsonTag == "" { + // No JSON tag, use lowercase field name + jsonTag = strings.ToLower(segment) + } else { + // Parse the JSON tag (it may have options like "omitempty") + parts := strings.Split(jsonTag, ",") + jsonTag = parts[0] + } + + dynPath = dynPath.Append(dyn.Key(jsonTag)) + currentType = field.Type + } + + return dynPath, nil +} + +// ensurePathExists ensures all intermediate path segments exist before setting a value +// If an intermediate path doesn't exist, it creates an empty mapping at that location +func ensurePathExists(ctx context.Context, v dyn.Value, path dyn.Path) (dyn.Value, error) { + // Build the path incrementally and ensure each level exists + for i := range path { + intermediatePath := path[:i+1] + + // Check if this path exists + item, err := dyn.GetByPath(v, intermediatePath) + if err != nil || !item.IsValid() { + // Path doesn't exist, create an empty mapping + log.Debugf(ctx, "Creating intermediate path: %s", intermediatePath) + v, err = dyn.SetByPath(v, intermediatePath, dyn.V(dyn.NewMapping())) + if err != nil { + return dyn.InvalidValue, fmt.Errorf("failed to create intermediate path %s: %w", intermediatePath, err) + } + } + } + + return v, nil +} + +// writeResourceDiff writes resource diff changes back to the YAML file +func (w *DiffWriter) writeResourceDiff(ctx context.Context, resourceType, resourceKey string, currentState any, changelog diff.Changelog, extractor extractorFunc) (*FileChange, error) { + // Build the path to the resource in the bundle config + resourcePath := dyn.MustPathFromString(fmt.Sprintf("resources.%s.%s", resourceType, resourceKey)) + + // Get the current config value for this resource + resourceValue, err := dyn.GetByPath(w.bundle.Config.Value(), resourcePath) + if err != nil { + return nil, fmt.Errorf("failed to get resource at path %s: %w", resourcePath, err) + } + + // Get the file location for this resource + location := resourceValue.Location() + if location.File == "" { + return nil, fmt.Errorf("resource %s.%s has no file location", resourceType, resourceKey) + } + + log.Infof(ctx, "Updating %s.%s in %s with %d changes", resourceType, resourceKey, location.File, len(changelog)) + + // Extract the relevant settings from the API response + // (e.g., JobSettings from Job, PipelineSpec from Pipeline) + settings, err := extractor(currentState) + if err != nil { + return nil, fmt.Errorf("failed to extract settings from current state: %w", err) + } + + // Convert the entire remote settings to dyn.Value so we can extract specific field values + remoteValue, err := convert.FromTyped(settings, resourceValue) + if err != nil { + return nil, fmt.Errorf("failed to convert settings to dyn.Value: %w", err) + } + + log.Debugf(ctx, "Converted remote state to dyn.Value") + + // Read the YAML file + originalContent, err := os.ReadFile(location.File) + if err != nil { + return nil, fmt.Errorf("failed to read file %s: %w", location.File, err) + } + + // Parse the YAML file to dyn.Value + fileValue, err := yamlloader.LoadYAML(location.File, bytes.NewReader(originalContent)) + if err != nil { + return nil, fmt.Errorf("failed to parse YAML file %s: %w", location.File, err) + } + + // Get the struct type for path conversion + settingsType := reflect.TypeOf(settings) + if settingsType.Kind() == reflect.Ptr { + settingsType = settingsType.Elem() + } + + // Apply each change from the changelog + updatedFileValue := fileValue + for _, change := range changelog { + // Skip read-only fields (EditMode, Deployment, Format) + if len(change.Path) > 0 { + fieldName := change.Path[0] + if fieldName == "EditMode" || fieldName == "Deployment" || fieldName == "Format" { + log.Debugf(ctx, "Skipping read-only field: %v", change.Path) + continue + } + } + + // Unwrap Settings/Spec wrapper from the path + unwrappedPath := unwrapSettingsPath(change.Path) + if len(unwrappedPath) == 0 { + log.Debugf(ctx, "Skipping empty path after unwrapping") + continue + } + + // Convert to dyn.Path with JSON tag names + dynPath, err := convertChangePathToDynPath(unwrappedPath, settingsType) + if err != nil { + log.Warnf(ctx, "Failed to convert path %v: %v", change.Path, err) + continue + } + + // Prepend the resource path + fullPath := resourcePath.Append(dynPath...) + + log.Debugf(ctx, "Applying change %s at path %s", change.Type, fullPath) + + // Apply the change based on type + switch change.Type { + case "create", "update": + // Extract the value from the remote state + fieldValue, err := dyn.GetByPath(remoteValue, dynPath) + if err != nil { + log.Warnf(ctx, "Failed to get value at path %s: %v", dynPath, err) + continue + } + + // Ensure all intermediate paths exist before setting the value + updatedFileValue, err = ensurePathExists(ctx, updatedFileValue, fullPath) + if err != nil { + log.Warnf(ctx, "Failed to ensure path exists %s: %v", fullPath, err) + continue + } + + // Update the file value + updatedFileValue, err = dyn.SetByPath(updatedFileValue, fullPath, fieldValue) + if err != nil { + log.Warnf(ctx, "Failed to set value at path %s: %v", fullPath, err) + continue + } + + case "delete": + // For delete operations, we need to manually manipulate the mapping + // since dyn doesn't have a DeleteByPath function + log.Debugf(ctx, "Skipping delete operation for path %s (not yet implemented)", fullPath) + // TODO: Implement deletion by reconstructing the parent mapping without the key + } + } + + // Generate the modified content + modifiedContent, err := w.generateYAMLContent(updatedFileValue) + if err != nil { + return nil, fmt.Errorf("failed to generate modified YAML: %w", err) + } + + // Write the updated file + err = w.writeYAMLFile(ctx, location.File, updatedFileValue) + if err != nil { + return nil, err + } + + // Return the file change info + return &FileChange{ + Path: location.File, + OriginalContent: string(originalContent), + ModifiedContent: modifiedContent, + }, nil +} + +// generateYAMLContent converts a dyn.Value to YAML string +func (w *DiffWriter) generateYAMLContent(fileValue dyn.Value) (string, error) { + // Convert to yaml.Node directly from dyn.Value + yamlNode, err := dynValueToYamlNode(fileValue) + if err != nil { + return "", fmt.Errorf("failed to convert to YAML node: %w", err) + } + + // Write the YAML to a buffer + var buf bytes.Buffer + enc := yaml.NewEncoder(&buf) + enc.SetIndent(2) + err = enc.Encode(yamlNode) + if err != nil { + return "", fmt.Errorf("failed to encode YAML: %w", err) + } + + return buf.String(), nil +} + +// writeYAMLFile writes a dyn.Value to a YAML file +func (w *DiffWriter) writeYAMLFile(ctx context.Context, filePath string, fileValue dyn.Value) error { + // Skip writing if saveToFile is false + if !w.saveToFile { + log.Debugf(ctx, "Skipping file write (save flag not set)") + return nil + } + + // Create directory if needed + err := os.MkdirAll(filepath.Dir(filePath), 0o755) + if err != nil { + return fmt.Errorf("failed to create directory: %w", err) + } + + // Create the file + file, err := os.Create(filePath) + if err != nil { + return fmt.Errorf("failed to create file %s: %w", filePath, err) + } + defer file.Close() + + // Convert to yaml.Node directly from dyn.Value + yamlNode, err := dynValueToYamlNode(fileValue) + if err != nil { + return fmt.Errorf("failed to convert to YAML node: %w", err) + } + + // Write the YAML + enc := yaml.NewEncoder(file) + enc.SetIndent(2) + err = enc.Encode(yamlNode) + if err != nil { + return fmt.Errorf("failed to write YAML: %w", err) + } + + log.Infof(ctx, "Successfully updated %s", filePath) + return nil +} + +// dynValueToYamlNode converts a dyn.Value to a yaml.Node +// This is similar to yamlsaver.toYamlNode but handles our use case +func dynValueToYamlNode(v dyn.Value) (*yaml.Node, error) { + return dynValueToYamlNodeWithStyle(v, yaml.Style(0), nil) +} + +func dynValueToYamlNodeWithStyle(v dyn.Value, style yaml.Style, stylesMap map[string]yaml.Style) (*yaml.Node, error) { + switch v.Kind() { + case dyn.KindMap: + m, _ := v.AsMap() + var content []*yaml.Node + + // Sort by location line number to preserve order + pairs := m.Pairs() + for _, pair := range pairs { + pk := pair.Key + pv := pair.Value + keyNode := yaml.Node{Kind: yaml.ScalarNode, Value: pk.MustString(), Style: style} + + // Check if this key has a custom style + var nestedStyle yaml.Style + if stylesMap != nil { + if customStyle, ok := stylesMap[pk.MustString()]; ok { + nestedStyle = customStyle + } else { + nestedStyle = style + } + } else { + nestedStyle = style + } + + valueNode, err := dynValueToYamlNodeWithStyle(pv, nestedStyle, stylesMap) + if err != nil { + return nil, err + } + content = append(content, &keyNode) + content = append(content, valueNode) + } + return &yaml.Node{Kind: yaml.MappingNode, Content: content, Style: style}, nil + + case dyn.KindSequence: + seq, _ := v.AsSequence() + var content []*yaml.Node + for _, item := range seq { + node, err := dynValueToYamlNodeWithStyle(item, style, stylesMap) + if err != nil { + return nil, err + } + content = append(content, node) + } + return &yaml.Node{Kind: yaml.SequenceNode, Content: content, Style: style}, nil + + case dyn.KindNil: + return &yaml.Node{Kind: yaml.ScalarNode, Value: "null", Style: style}, nil + + case dyn.KindString: + s := v.MustString() + // Quote strings that look like scalars + if isScalarLikeString(s) { + return &yaml.Node{Kind: yaml.ScalarNode, Value: s, Style: yaml.DoubleQuotedStyle}, nil + } + return &yaml.Node{Kind: yaml.ScalarNode, Value: s, Style: style}, nil + + case dyn.KindBool: + return &yaml.Node{Kind: yaml.ScalarNode, Value: strconv.FormatBool(v.MustBool()), Style: style}, nil + + case dyn.KindInt: + return &yaml.Node{Kind: yaml.ScalarNode, Value: strconv.FormatInt(v.MustInt(), 10), Style: style}, nil + + case dyn.KindFloat: + return &yaml.Node{Kind: yaml.ScalarNode, Value: fmt.Sprint(v.MustFloat()), Style: style}, nil + + case dyn.KindTime: + return &yaml.Node{Kind: yaml.ScalarNode, Value: v.MustTime().String(), Style: style}, nil + + default: + return nil, fmt.Errorf("unsupported kind: %s", v.Kind()) + } +} + +func isScalarLikeString(s string) bool { + if s == "" || s == "true" || s == "false" { + return true + } + // Check if it's a number + if _, err := strconv.ParseInt(s, 0, 64); err == nil { + return true + } + if _, err := strconv.ParseFloat(s, 64); err == nil { + return true + } + return false +} diff --git a/cmd/bundle/debug/diff_writer_test.go b/cmd/bundle/debug/diff_writer_test.go new file mode 100644 index 0000000000..15f05ac70f --- /dev/null +++ b/cmd/bundle/debug/diff_writer_test.go @@ -0,0 +1,78 @@ +package debug + +import ( + "reflect" + "testing" + + "github.com/databricks/cli/libs/dyn" + "github.com/databricks/databricks-sdk-go/service/pipelines" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestConvertChangePathToDynPath_WithArrayIndex(t *testing.T) { + // Test case: path with array index like ["Libraries", "1", "Glob"] + // This simulates the path from r3labs/diff when comparing pipeline libraries + path := []string{"Libraries", "1", "Glob"} + structType := reflect.TypeOf(pipelines.PipelineSpec{}) + + dynPath, err := convertChangePathToDynPath(path, structType) + require.NoError(t, err) + + // Expected path: libraries[1].glob + // Should have 3 components: Key("libraries"), Index(1), Key("glob") + assert.Equal(t, 3, len(dynPath)) + + // First component should be a key "libraries" + assert.Equal(t, dyn.Key("libraries"), dynPath[0]) + + // Second component should be an index 1 + assert.Equal(t, dyn.Index(1), dynPath[1]) + + // Third component should be a key "glob" + assert.Equal(t, dyn.Key("glob"), dynPath[2]) +} + +func TestConvertChangePathToDynPath_WithMultipleArrayIndices(t *testing.T) { + // Test case: path with multiple array indices + path := []string{"Libraries", "0"} + structType := reflect.TypeOf(pipelines.PipelineSpec{}) + + dynPath, err := convertChangePathToDynPath(path, structType) + require.NoError(t, err) + + // Expected path: libraries[0] + assert.Equal(t, 2, len(dynPath)) + assert.Equal(t, dyn.Key("libraries"), dynPath[0]) + assert.Equal(t, dyn.Index(0), dynPath[1]) +} + +func TestConvertChangePathToDynPath_WithoutArrayIndex(t *testing.T) { + // Test case: simple path without array indices + path := []string{"Name"} + structType := reflect.TypeOf(pipelines.PipelineSpec{}) + + dynPath, err := convertChangePathToDynPath(path, structType) + require.NoError(t, err) + + // Expected path: name + assert.Equal(t, 1, len(dynPath)) + assert.Equal(t, dyn.Key("name"), dynPath[0]) +} + +func TestConvertChangePathToDynPath_NestedStructWithArrayIndex(t *testing.T) { + // Test case: array of structs, accessing a field in array element + // ["Libraries", "1", "Notebook", "Path"] + path := []string{"Libraries", "1", "Notebook", "Path"} + structType := reflect.TypeOf(pipelines.PipelineSpec{}) + + dynPath, err := convertChangePathToDynPath(path, structType) + require.NoError(t, err) + + // Expected: libraries[1].notebook.path + assert.Equal(t, 4, len(dynPath)) + assert.Equal(t, dyn.Key("libraries"), dynPath[0]) + assert.Equal(t, dyn.Index(1), dynPath[1]) + assert.Equal(t, dyn.Key("notebook"), dynPath[2]) + assert.Equal(t, dyn.Key("path"), dynPath[3]) +} diff --git a/cmd/bundle/debug/exp_diff.go b/cmd/bundle/debug/exp_diff.go new file mode 100644 index 0000000000..9465c418d2 --- /dev/null +++ b/cmd/bundle/debug/exp_diff.go @@ -0,0 +1,218 @@ +package debug + +import ( + "encoding/json" + "strconv" + + "github.com/databricks/cli/bundle/direct/dresources" + "github.com/databricks/cli/bundle/resourcesnapshot" + "github.com/databricks/cli/cmd/bundle/utils" + "github.com/databricks/cli/cmd/root" + "github.com/databricks/cli/libs/log" + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/databricks/databricks-sdk-go/service/pipelines" + "github.com/r3labs/diff/v3" + "github.com/spf13/cobra" +) + +type ResourceDiff struct { + Changes diff.Changelog `json:"changes"` +} + +type FileChange struct { + Path string `json:"path"` + OriginalContent string `json:"originalContent"` + ModifiedContent string `json:"modifiedContent"` +} + +type ChangesSummary struct { + Jobs map[string]*ResourceDiff `json:"jobs,omitempty"` + Pipelines map[string]*ResourceDiff `json:"pipelines,omitempty"` +} + +type DiffOutput struct { + Files []FileChange `json:"files"` + Changes *ChangesSummary `json:"changes"` +} + +func NewExpDiffCommand() *cobra.Command { + var save bool + + cmd := &cobra.Command{ + Use: "exp-diff", + Short: "Show differences between current remote state and last deploy snapshot (experimental)", + Long: `Show differences between the current remote resource state and the state at the time of the last successful deploy. + +This command compares the current state of deployed resources (jobs, pipelines) with snapshots +saved during the last successful deploy. It helps identify configuration drift caused by manual +changes or external modifications. + +Note: This command is experimental and may change without notice.`, + Args: root.NoArgs, + } + + cmd.Flags().BoolVar(&save, "save", false, "Save the diff back to the bundle YAML files") + + cmd.RunE = func(cmd *cobra.Command, args []string) error { + ctx := cmd.Context() + + // Load bundle with resource IDs from state + b, err := utils.ProcessBundle(cmd, utils.ProcessOptions{ + InitIDs: true, + }) + if err != nil { + return err + } + + // Load previous snapshots + snapshot, err := resourcesnapshot.Load(ctx, b) + if err != nil { + return err + } + + output := &DiffOutput{ + Files: []FileChange{}, + Changes: &ChangesSummary{ + Jobs: make(map[string]*ResourceDiff), + Pipelines: make(map[string]*ResourceDiff), + }, + } + + // If no snapshot exists, return empty diff + if snapshot == nil { + log.Debugf(ctx, "No previous snapshot found, skipping diff") + return writeOutput(cmd, output) + } + + w := b.WorkspaceClient() + + // Create diff writer + writer := NewDiffWriter(b) + writer.saveToFile = save + + // Compare jobs + for key, job := range b.Config.Resources.Jobs { + if job.ID == "" { + log.Debugf(ctx, "Skipping job %s: no ID (not deployed)", key) + continue + } + + // Check if we have a previous snapshot for this resource + previousJob, ok := snapshot.Jobs[key] + if !ok { + log.Debugf(ctx, "Skipping job %s: no previous snapshot", key) + continue + } + + jobID, err := strconv.ParseInt(job.ID, 10, 64) + if err != nil { + log.Warnf(ctx, "Skipping job %s: invalid ID %q: %v", key, job.ID, err) + continue + } + + // Fetch current remote state + currentJob, err := w.Jobs.Get(ctx, jobs.GetJobRequest{ + JobId: jobID, + }) + if err != nil { + log.Warnf(ctx, "Failed to fetch job %s (ID: %d): %v", key, jobID, err) + continue + } + + job := dresources.ResourceJob{} + currentJobComparable := job.RemapState(currentJob) + previousJobComparable := job.RemapState(previousJob) + + // Compare previous and current state + changelog, err := diff.Diff(previousJobComparable, currentJobComparable) + if err != nil { + log.Warnf(ctx, "Failed to diff job %s: %v", key, err) + continue + } + + // Only add to output if there are changes + if len(changelog) > 0 { + output.Changes.Jobs[key] = &ResourceDiff{ + Changes: changelog, + } + log.Debugf(ctx, "Found %d changes for job %s", len(changelog), key) + + // Generate file change (will save to file if save flag is set) + fileChange, err := writer.WriteJobDiff(ctx, key, currentJob, changelog) + if err != nil { + log.Warnf(ctx, "Failed to process job %s changes: %v", key, err) + } else if fileChange != nil { + output.Files = append(output.Files, *fileChange) + } + } + } + + // Compare pipelines + for key, pipeline := range b.Config.Resources.Pipelines { + if pipeline.ID == "" { + log.Debugf(ctx, "Skipping pipeline %s: no ID (not deployed)", key) + continue + } + + // Check if we have a previous snapshot for this resource + previousPipeline, ok := snapshot.Pipelines[key] + if !ok { + log.Debugf(ctx, "Skipping pipeline %s: no previous snapshot", key) + continue + } + + // Fetch current remote state + currentPipeline, err := w.Pipelines.Get(ctx, pipelines.GetPipelineRequest{ + PipelineId: pipeline.ID, + }) + if err != nil { + log.Warnf(ctx, "Failed to fetch pipeline %s (ID: %s): %v", key, pipeline.ID, err) + continue + } + + pipeline := dresources.ResourcePipeline{} + currentPipelineComparable := pipeline.RemapState(currentPipeline) + previousPipelineComparable := pipeline.RemapState(previousPipeline) + + // Compare previous and current state + changelog, err := diff.Diff(previousPipelineComparable, currentPipelineComparable) + if err != nil { + log.Warnf(ctx, "Failed to diff pipeline %s: %v", key, err) + continue + } + + // Only add to output if there are changes + if len(changelog) > 0 { + output.Changes.Pipelines[key] = &ResourceDiff{ + Changes: changelog, + } + log.Debugf(ctx, "Found %d changes for pipeline %s", len(changelog), key) + + // Generate file change (will save to file if save flag is set) + fileChange, err := writer.WritePipelineDiff(ctx, key, currentPipeline, changelog) + if err != nil { + log.Warnf(ctx, "Failed to process pipeline %s changes: %v", key, err) + } else if fileChange != nil { + output.Files = append(output.Files, *fileChange) + } + } + } + + return writeOutput(cmd, output) + } + + return cmd +} + +func writeOutput(cmd *cobra.Command, output *DiffOutput) error { + buf, err := json.MarshalIndent(output, "", " ") + if err != nil { + return err + } + + out := cmd.OutOrStdout() + _, _ = out.Write(buf) + _, _ = out.Write([]byte{'\n'}) + + return nil +} diff --git a/go.mod b/go.mod index a0403f81f3..2a1dafba72 100644 --- a/go.mod +++ b/go.mod @@ -40,6 +40,8 @@ require ( gopkg.in/yaml.v3 v3.0.1 ) +require github.com/r3labs/diff/v3 v3.0.2 + require ( cloud.google.com/go/auth v0.16.5 // indirect cloud.google.com/go/auth/oauth2adapt v0.2.8 // indirect @@ -62,6 +64,8 @@ require ( github.com/mattn/go-colorable v0.1.13 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/stretchr/objx v0.5.2 // indirect + github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect + github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect github.com/zclconf/go-cty v1.16.4 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.63.0 // indirect diff --git a/go.sum b/go.sum index db02c96f7b..0e565ebffc 100644 --- a/go.sum +++ b/go.sum @@ -119,6 +119,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/quasilyte/go-ruleguard/dsl v0.3.22 h1:wd8zkOhSNr+I+8Qeciml08ivDt1pSXe60+5DqOpCjPE= github.com/quasilyte/go-ruleguard/dsl v0.3.22/go.mod h1:KeCP03KrjuSO0H1kTuZQCWlQPulDV6YMIXmpQss17rU= +github.com/r3labs/diff/v3 v3.0.2 h1:yVuxAY1V6MeM4+HNur92xkS39kB/N+cFi2hMkY06BbA= +github.com/r3labs/diff/v3 v3.0.2/go.mod h1:Cy542hv0BAEmhDYWtGxXRQ4kqRsVIcEjG9gChUlTmkw= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= @@ -139,6 +141,10 @@ github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8= +github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok= +github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= +github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= github.com/xanzy/ssh-agent v0.3.3 h1:+/15pJfg/RsTxqYcX6fHqOXZwwMP+2VyYWJeWM2qQFM= github.com/xanzy/ssh-agent v0.3.3/go.mod h1:6dzNDKs0J9rVPHPhaGCukekBHKqfl+L3KghI1Bc68Uw= github.com/zclconf/go-cty v1.16.4 h1:QGXaag7/7dCzb+odlGrgr+YmYZFaOCMW6DEpS+UD1eE= diff --git a/test_bundle/databricks.yml b/test_bundle/databricks.yml new file mode 100644 index 0000000000..8838e6c615 --- /dev/null +++ b/test_bundle/databricks.yml @@ -0,0 +1,12 @@ +bundle: + name: databricks-bundle-test +targets: + dev: + mode: development + default: true + workspace: + host: https://e2-dogfood.staging.cloud.databricks.com + +include: + - job.yaml + - pipeline.yaml diff --git a/test_bundle/job.yaml b/test_bundle/job.yaml new file mode 100644 index 0000000000..7ac6d4ce55 --- /dev/null +++ b/test_bundle/job.yaml @@ -0,0 +1,8 @@ +resources: + jobs: + test_exp_diff_job: + queue: + enabled: true + performance_target: PERFORMANCE_OPTIMIZED + tags: + new: taggg diff --git a/test_bundle/notebook.py b/test_bundle/notebook.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/test_bundle/pipeline.yaml b/test_bundle/pipeline.yaml new file mode 100644 index 0000000000..deeb015ff9 --- /dev/null +++ b/test_bundle/pipeline.yaml @@ -0,0 +1,10 @@ +resources: + pipelines: + test_exp_diff_pipeline: + name: test_exp_diff_pipeline + tags: + my_tag: new_tag + new_tag: tag3 + libraries: + - file: + path: ./notebook.py