From e5b8487b5351de7367af42652c33c55c1ed67a57 Mon Sep 17 00:00:00 2001 From: Ilya Kuznetsov Date: Wed, 12 Nov 2025 18:13:29 +0100 Subject: [PATCH 01/10] Init --- cmd/bundle/debug.go | 1 + cmd/bundle/debug/exp_diff.go | 38 ++++++++++++++++++++++++++++++++++++ databricks.yml | 10 ++++++++++ 3 files changed, 49 insertions(+) create mode 100644 cmd/bundle/debug/exp_diff.go create mode 100644 databricks.yml diff --git a/cmd/bundle/debug.go b/cmd/bundle/debug.go index 2c6658b968..04d320276f 100644 --- a/cmd/bundle/debug.go +++ b/cmd/bundle/debug.go @@ -17,5 +17,6 @@ func newDebugCommand() *cobra.Command { cmd.AddCommand(debug.NewRefSchemaCommand()) cmd.AddCommand(debug.NewPlanCommand()) cmd.AddCommand(debug.NewStatesCommand()) + cmd.AddCommand(debug.NewExpDiffCommand()) return cmd } diff --git a/cmd/bundle/debug/exp_diff.go b/cmd/bundle/debug/exp_diff.go new file mode 100644 index 0000000000..042443dc62 --- /dev/null +++ b/cmd/bundle/debug/exp_diff.go @@ -0,0 +1,38 @@ +package debug + +import ( + "encoding/json" + + "github.com/databricks/cli/cmd/bundle/utils" + "github.com/databricks/cli/cmd/root" + "github.com/spf13/cobra" +) + +func NewExpDiffCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "exp-diff", + Short: "Show resolved bundle configuration in JSON format (experimental)", + Long: "Show the resolved bundle configuration after all variable substitutions and includes have been processed. This command is experimental and may change without notice.", + Args: root.NoArgs, + } + + cmd.RunE = func(cmd *cobra.Command, args []string) error { + b, err := utils.ProcessBundle(cmd, utils.ProcessOptions{}) + if err != nil { + return err + } + + buf, err := json.MarshalIndent(b.Config.Value().AsAny(), "", " ") + if err != nil { + return err + } + + out := cmd.OutOrStdout() + _, _ = out.Write(buf) + _, _ = out.Write([]byte{'\n'}) + + return nil + } + + return cmd +} diff --git a/databricks.yml b/databricks.yml new file mode 100644 index 0000000000..a72219a4be --- /dev/null +++ b/databricks.yml @@ -0,0 +1,10 @@ +bundle: + name: databricks-bundle-test + +targets: + dev: + mode: development + default: true + + workspace: + host: https://e2-dogfood.staging.cloud.databricks.com From 2c1ce8455122549c8847d3d1a3c7b9af9075df27 Mon Sep 17 00:00:00 2001 From: Ilya Kuznetsov Date: Wed, 12 Nov 2025 20:45:29 +0100 Subject: [PATCH 02/10] Implementation --- bundle/phases/deploy.go | 2 + bundle/resourcesnapshot/snapshot.go | 149 +++++++++++++++++++++++++++ cmd/bundle/debug/exp_diff.go | 151 ++++++++++++++++++++++++++-- databricks.yml | 10 -- go.mod | 4 + go.sum | 6 ++ test_bundle/databricks.yml | 25 +++++ 7 files changed, 328 insertions(+), 19 deletions(-) create mode 100644 bundle/resourcesnapshot/snapshot.go delete mode 100644 databricks.yml create mode 100644 test_bundle/databricks.yml diff --git a/bundle/phases/deploy.go b/bundle/phases/deploy.go index 94d05aa8bc..04d9590257 100644 --- a/bundle/phases/deploy.go +++ b/bundle/phases/deploy.go @@ -14,6 +14,7 @@ import ( "github.com/databricks/cli/bundle/deploy/metadata" "github.com/databricks/cli/bundle/deploy/terraform" "github.com/databricks/cli/bundle/deployplan" + "github.com/databricks/cli/bundle/resourcesnapshot" "github.com/databricks/cli/bundle/libraries" "github.com/databricks/cli/bundle/metrics" "github.com/databricks/cli/bundle/permissions" @@ -116,6 +117,7 @@ func deployCore(ctx context.Context, b *bundle.Bundle, plan *deployplan.Plan, ta statemgmt.Load(targetEngine), metadata.Compute(), metadata.Upload(), + resourcesnapshot.Save(), ) if !logdiag.HasError(ctx) { diff --git a/bundle/resourcesnapshot/snapshot.go b/bundle/resourcesnapshot/snapshot.go new file mode 100644 index 0000000000..311ad5974c --- /dev/null +++ b/bundle/resourcesnapshot/snapshot.go @@ -0,0 +1,149 @@ +package resourcesnapshot + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "io" + "io/fs" + "path" + "strconv" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/libs/diag" + "github.com/databricks/cli/libs/filer" + "github.com/databricks/cli/libs/log" + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/databricks/databricks-sdk-go/service/pipelines" +) + +const snapshotFileName = "resource_snapshots.json" + +// Snapshot stores the state of resources from the last successful deploy. +type Snapshot struct { + Jobs map[string]*jobs.Job `json:"jobs"` + Pipelines map[string]*pipelines.GetPipelineResponse `json:"pipelines"` +} + +func snapshotFilePath(b *bundle.Bundle) string { + return path.Join(b.Config.Workspace.StatePath, snapshotFileName) +} + +type save struct{} + +func Save() bundle.Mutator { + return &save{} +} + +func (s *save) Name() string { + return "resourcesnapshot.Save" +} + +func (s *save) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { + snapshot := &Snapshot{ + Jobs: make(map[string]*jobs.Job), + Pipelines: make(map[string]*pipelines.GetPipelineResponse), + } + + w := b.WorkspaceClient() + + // Fetch and store job snapshots + for key, job := range b.Config.Resources.Jobs { + if job.ID == "" { + log.Debugf(ctx, "Skipping job %s: no ID (not deployed)", key) + continue + } + + jobID, err := strconv.ParseInt(job.ID, 10, 64) + if err != nil { + log.Warnf(ctx, "Skipping job %s: invalid ID %q: %v", key, job.ID, err) + continue + } + + remoteJob, err := w.Jobs.Get(ctx, jobs.GetJobRequest{ + JobId: jobID, + }) + if err != nil { + log.Warnf(ctx, "Failed to fetch job %s (ID: %d): %v", key, jobID, err) + continue + } + + snapshot.Jobs[key] = remoteJob + log.Debugf(ctx, "Saved snapshot for job %s (ID: %d)", key, jobID) + } + + // Fetch and store pipeline snapshots + for key, pipeline := range b.Config.Resources.Pipelines { + if pipeline.ID == "" { + log.Debugf(ctx, "Skipping pipeline %s: no ID (not deployed)", key) + continue + } + + remotePipeline, err := w.Pipelines.Get(ctx, pipelines.GetPipelineRequest{ + PipelineId: pipeline.ID, + }) + if err != nil { + log.Warnf(ctx, "Failed to fetch pipeline %s (ID: %s): %v", key, pipeline.ID, err) + continue + } + + snapshot.Pipelines[key] = remotePipeline + log.Debugf(ctx, "Saved snapshot for pipeline %s (ID: %s)", key, pipeline.ID) + } + + // Marshal snapshot to JSON + data, err := json.MarshalIndent(snapshot, "", " ") + if err != nil { + return diag.FromErr(err) + } + + // Write to WSFS + f, err := filer.NewWorkspaceFilesClient(b.WorkspaceClient(), b.Config.Workspace.StatePath) + if err != nil { + return diag.FromErr(err) + } + + err = f.Write(ctx, snapshotFileName, bytes.NewReader(data), filer.CreateParentDirectories, filer.OverwriteIfExists) + if err != nil { + return diag.FromErr(err) + } + + log.Infof(ctx, "Saved resource snapshots to %s", snapshotFilePath(b)) + return nil +} + +// Load reads the snapshot from WSFS. +// Returns nil if the snapshot file doesn't exist (e.g., first deploy). +func Load(ctx context.Context, b *bundle.Bundle) (*Snapshot, error) { + f, err := filer.NewWorkspaceFilesClient(b.WorkspaceClient(), b.Config.Workspace.StatePath) + if err != nil { + return nil, err + } + + r, err := f.Read(ctx, snapshotFileName) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + log.Debugf(ctx, "No previous snapshot found at %s", snapshotFilePath(b)) + return nil, nil + } + return nil, err + } + defer r.Close() + + data, err := io.ReadAll(r) + if err != nil { + return nil, err + } + + snapshot := &Snapshot{} + err = json.Unmarshal(data, snapshot) + if err != nil { + return nil, err + } + + log.Debugf(ctx, "Loaded snapshot with %d jobs and %d pipelines from %s", + len(snapshot.Jobs), len(snapshot.Pipelines), snapshotFilePath(b)) + + return snapshot, nil +} diff --git a/cmd/bundle/debug/exp_diff.go b/cmd/bundle/debug/exp_diff.go index 042443dc62..ad24fa5858 100644 --- a/cmd/bundle/debug/exp_diff.go +++ b/cmd/bundle/debug/exp_diff.go @@ -2,37 +2,170 @@ package debug import ( "encoding/json" + "strconv" + "github.com/databricks/cli/bundle/resourcesnapshot" "github.com/databricks/cli/cmd/bundle/utils" "github.com/databricks/cli/cmd/root" + "github.com/databricks/cli/libs/log" + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/databricks/databricks-sdk-go/service/pipelines" + "github.com/r3labs/diff/v3" "github.com/spf13/cobra" ) +type ResourceDiff struct { + Changes diff.Changelog `json:"changes"` +} + +type DiffOutput struct { + Jobs map[string]*ResourceDiff `json:"jobs,omitempty"` + Pipelines map[string]*ResourceDiff `json:"pipelines,omitempty"` +} + func NewExpDiffCommand() *cobra.Command { cmd := &cobra.Command{ Use: "exp-diff", - Short: "Show resolved bundle configuration in JSON format (experimental)", - Long: "Show the resolved bundle configuration after all variable substitutions and includes have been processed. This command is experimental and may change without notice.", - Args: root.NoArgs, + Short: "Show differences between current remote state and last deploy snapshot (experimental)", + Long: `Show differences between the current remote resource state and the state at the time of the last successful deploy. + +This command compares the current state of deployed resources (jobs, pipelines) with snapshots +saved during the last successful deploy. It helps identify configuration drift caused by manual +changes or external modifications. + +Note: This command is experimental and may change without notice.`, + Args: root.NoArgs, } cmd.RunE = func(cmd *cobra.Command, args []string) error { - b, err := utils.ProcessBundle(cmd, utils.ProcessOptions{}) + ctx := cmd.Context() + + // Load bundle with resource IDs from state + b, err := utils.ProcessBundle(cmd, utils.ProcessOptions{ + InitIDs: true, + }) if err != nil { return err } - buf, err := json.MarshalIndent(b.Config.Value().AsAny(), "", " ") + // Load previous snapshots + snapshot, err := resourcesnapshot.Load(ctx, b) if err != nil { return err } - out := cmd.OutOrStdout() - _, _ = out.Write(buf) - _, _ = out.Write([]byte{'\n'}) + output := &DiffOutput{ + Jobs: make(map[string]*ResourceDiff), + Pipelines: make(map[string]*ResourceDiff), + } + + // If no snapshot exists, return empty diff + if snapshot == nil { + log.Debugf(ctx, "No previous snapshot found, skipping diff") + return writeOutput(cmd, output) + } + + w := b.WorkspaceClient() + + // Compare jobs + for key, job := range b.Config.Resources.Jobs { + if job.ID == "" { + log.Debugf(ctx, "Skipping job %s: no ID (not deployed)", key) + continue + } + + // Check if we have a previous snapshot for this resource + previousJob, ok := snapshot.Jobs[key] + if !ok { + log.Debugf(ctx, "Skipping job %s: no previous snapshot", key) + continue + } + + jobID, err := strconv.ParseInt(job.ID, 10, 64) + if err != nil { + log.Warnf(ctx, "Skipping job %s: invalid ID %q: %v", key, job.ID, err) + continue + } + + // Fetch current remote state + currentJob, err := w.Jobs.Get(ctx, jobs.GetJobRequest{ + JobId: jobID, + }) + if err != nil { + log.Warnf(ctx, "Failed to fetch job %s (ID: %d): %v", key, jobID, err) + continue + } - return nil + // Compare previous and current state + changelog, err := diff.Diff(previousJob, currentJob) + if err != nil { + log.Warnf(ctx, "Failed to diff job %s: %v", key, err) + continue + } + + // Only add to output if there are changes + if len(changelog) > 0 { + output.Jobs[key] = &ResourceDiff{ + Changes: changelog, + } + log.Debugf(ctx, "Found %d changes for job %s", len(changelog), key) + } + } + + // Compare pipelines + for key, pipeline := range b.Config.Resources.Pipelines { + if pipeline.ID == "" { + log.Debugf(ctx, "Skipping pipeline %s: no ID (not deployed)", key) + continue + } + + // Check if we have a previous snapshot for this resource + previousPipeline, ok := snapshot.Pipelines[key] + if !ok { + log.Debugf(ctx, "Skipping pipeline %s: no previous snapshot", key) + continue + } + + // Fetch current remote state + currentPipeline, err := w.Pipelines.Get(ctx, pipelines.GetPipelineRequest{ + PipelineId: pipeline.ID, + }) + if err != nil { + log.Warnf(ctx, "Failed to fetch pipeline %s (ID: %s): %v", key, pipeline.ID, err) + continue + } + + // Compare previous and current state + changelog, err := diff.Diff(previousPipeline, currentPipeline) + if err != nil { + log.Warnf(ctx, "Failed to diff pipeline %s: %v", key, err) + continue + } + + // Only add to output if there are changes + if len(changelog) > 0 { + output.Pipelines[key] = &ResourceDiff{ + Changes: changelog, + } + log.Debugf(ctx, "Found %d changes for pipeline %s", len(changelog), key) + } + } + + return writeOutput(cmd, output) } return cmd } + +func writeOutput(cmd *cobra.Command, output *DiffOutput) error { + buf, err := json.MarshalIndent(output, "", " ") + if err != nil { + return err + } + + out := cmd.OutOrStdout() + _, _ = out.Write(buf) + _, _ = out.Write([]byte{'\n'}) + + return nil +} diff --git a/databricks.yml b/databricks.yml deleted file mode 100644 index a72219a4be..0000000000 --- a/databricks.yml +++ /dev/null @@ -1,10 +0,0 @@ -bundle: - name: databricks-bundle-test - -targets: - dev: - mode: development - default: true - - workspace: - host: https://e2-dogfood.staging.cloud.databricks.com diff --git a/go.mod b/go.mod index a0403f81f3..2a1dafba72 100644 --- a/go.mod +++ b/go.mod @@ -40,6 +40,8 @@ require ( gopkg.in/yaml.v3 v3.0.1 ) +require github.com/r3labs/diff/v3 v3.0.2 + require ( cloud.google.com/go/auth v0.16.5 // indirect cloud.google.com/go/auth/oauth2adapt v0.2.8 // indirect @@ -62,6 +64,8 @@ require ( github.com/mattn/go-colorable v0.1.13 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/stretchr/objx v0.5.2 // indirect + github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect + github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect github.com/zclconf/go-cty v1.16.4 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.63.0 // indirect diff --git a/go.sum b/go.sum index db02c96f7b..0e565ebffc 100644 --- a/go.sum +++ b/go.sum @@ -119,6 +119,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/quasilyte/go-ruleguard/dsl v0.3.22 h1:wd8zkOhSNr+I+8Qeciml08ivDt1pSXe60+5DqOpCjPE= github.com/quasilyte/go-ruleguard/dsl v0.3.22/go.mod h1:KeCP03KrjuSO0H1kTuZQCWlQPulDV6YMIXmpQss17rU= +github.com/r3labs/diff/v3 v3.0.2 h1:yVuxAY1V6MeM4+HNur92xkS39kB/N+cFi2hMkY06BbA= +github.com/r3labs/diff/v3 v3.0.2/go.mod h1:Cy542hv0BAEmhDYWtGxXRQ4kqRsVIcEjG9gChUlTmkw= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= @@ -139,6 +141,10 @@ github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8= +github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok= +github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= +github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= github.com/xanzy/ssh-agent v0.3.3 h1:+/15pJfg/RsTxqYcX6fHqOXZwwMP+2VyYWJeWM2qQFM= github.com/xanzy/ssh-agent v0.3.3/go.mod h1:6dzNDKs0J9rVPHPhaGCukekBHKqfl+L3KghI1Bc68Uw= github.com/zclconf/go-cty v1.16.4 h1:QGXaag7/7dCzb+odlGrgr+YmYZFaOCMW6DEpS+UD1eE= diff --git a/test_bundle/databricks.yml b/test_bundle/databricks.yml new file mode 100644 index 0000000000..d4372c4282 --- /dev/null +++ b/test_bundle/databricks.yml @@ -0,0 +1,25 @@ +bundle: + name: databricks-bundle-test + +targets: + dev: + mode: development + default: true + + workspace: + host: https://e2-dogfood.staging.cloud.databricks.com + +resources: + jobs: + test_exp_diff_job: + name: test_exp_diff_job + tags: + my_tag: my_value + tasks: + - task_key: main + notebook_task: + notebook_path: /Workspace/Users/${workspace.current_user.userName}/test_notebook + new_cluster: + num_workers: 0 + spark_version: 13.3.x-scala2.12 + node_type_id: i3.xlarge From 056189d9a48c36dea45ed3fab132a0a7168831cb Mon Sep 17 00:00:00 2001 From: Ilya Kuznetsov Date: Thu, 13 Nov 2025 10:55:50 +0100 Subject: [PATCH 03/10] Cleanup in config --- test_bundle/databricks.yml | 8 -------- 1 file changed, 8 deletions(-) diff --git a/test_bundle/databricks.yml b/test_bundle/databricks.yml index d4372c4282..1108ff6dd5 100644 --- a/test_bundle/databricks.yml +++ b/test_bundle/databricks.yml @@ -15,11 +15,3 @@ resources: name: test_exp_diff_job tags: my_tag: my_value - tasks: - - task_key: main - notebook_task: - notebook_path: /Workspace/Users/${workspace.current_user.userName}/test_notebook - new_cluster: - num_workers: 0 - spark_version: 13.3.x-scala2.12 - node_type_id: i3.xlarge From 8983645c82f9d0bdc05e0a24b0805e74ff0d1813 Mon Sep 17 00:00:00 2001 From: Ilya Kuznetsov Date: Thu, 13 Nov 2025 11:00:08 +0100 Subject: [PATCH 04/10] included file --- test_bundle/databricks.yml | 10 ++-------- test_bundle/job.yaml | 20 ++++++++++++++++++++ 2 files changed, 22 insertions(+), 8 deletions(-) create mode 100644 test_bundle/job.yaml diff --git a/test_bundle/databricks.yml b/test_bundle/databricks.yml index 1108ff6dd5..a8b4ff4a60 100644 --- a/test_bundle/databricks.yml +++ b/test_bundle/databricks.yml @@ -1,17 +1,11 @@ bundle: name: databricks-bundle-test - targets: dev: mode: development default: true - workspace: host: https://e2-dogfood.staging.cloud.databricks.com -resources: - jobs: - test_exp_diff_job: - name: test_exp_diff_job - tags: - my_tag: my_value +include: + - job.yaml diff --git a/test_bundle/job.yaml b/test_bundle/job.yaml new file mode 100644 index 0000000000..477ac21b04 --- /dev/null +++ b/test_bundle/job.yaml @@ -0,0 +1,20 @@ +resources: + jobs: + test_exp_diff_job: + deployment: + kind: BUNDLE + metadata_file_path: /Workspace/Users/ilya.kuznetsov@databricks.com/.bundle/databricks-bundle-test/dev/state/metadata.json + edit_mode: EDITABLE + email_notifications: {} + format: MULTI_TASK + max_concurrent_runs: 4 + name: '[dev ilya_kuznetsov] test_exp_diff_job' + queue: + enabled: true + run_as: + user_name: ilya.kuznetsov@databricks.com + tags: + dev: ilya_kuznetsov + my_tag: my_value3 + timeout_seconds: 0 + webhook_notifications: {} From 291304d9c24aac706081552b9c89074986fafee3 Mon Sep 17 00:00:00 2001 From: Ilya Kuznetsov Date: Thu, 13 Nov 2025 12:06:44 +0100 Subject: [PATCH 05/10] Write to files --- bundle/phases/deploy.go | 2 +- bundle/resourcesnapshot/snapshot.go | 2 +- cmd/bundle/debug/diff_writer.go | 251 ++++++++++++++++++++++++++++ cmd/bundle/debug/exp_diff.go | 26 +++ test_bundle/job.yaml | 4 +- 5 files changed, 281 insertions(+), 4 deletions(-) create mode 100644 cmd/bundle/debug/diff_writer.go diff --git a/bundle/phases/deploy.go b/bundle/phases/deploy.go index 04d9590257..db2e5aed5d 100644 --- a/bundle/phases/deploy.go +++ b/bundle/phases/deploy.go @@ -14,10 +14,10 @@ import ( "github.com/databricks/cli/bundle/deploy/metadata" "github.com/databricks/cli/bundle/deploy/terraform" "github.com/databricks/cli/bundle/deployplan" - "github.com/databricks/cli/bundle/resourcesnapshot" "github.com/databricks/cli/bundle/libraries" "github.com/databricks/cli/bundle/metrics" "github.com/databricks/cli/bundle/permissions" + "github.com/databricks/cli/bundle/resourcesnapshot" "github.com/databricks/cli/bundle/scripts" "github.com/databricks/cli/bundle/statemgmt" "github.com/databricks/cli/libs/cmdio" diff --git a/bundle/resourcesnapshot/snapshot.go b/bundle/resourcesnapshot/snapshot.go index 311ad5974c..b980cd7d86 100644 --- a/bundle/resourcesnapshot/snapshot.go +++ b/bundle/resourcesnapshot/snapshot.go @@ -22,7 +22,7 @@ const snapshotFileName = "resource_snapshots.json" // Snapshot stores the state of resources from the last successful deploy. type Snapshot struct { - Jobs map[string]*jobs.Job `json:"jobs"` + Jobs map[string]*jobs.Job `json:"jobs"` Pipelines map[string]*pipelines.GetPipelineResponse `json:"pipelines"` } diff --git a/cmd/bundle/debug/diff_writer.go b/cmd/bundle/debug/diff_writer.go new file mode 100644 index 0000000000..bfd7e8ef0c --- /dev/null +++ b/cmd/bundle/debug/diff_writer.go @@ -0,0 +1,251 @@ +package debug + +import ( + "bytes" + "context" + "errors" + "fmt" + "os" + "path/filepath" + "strconv" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/libs/dyn" + "github.com/databricks/cli/libs/dyn/convert" + "github.com/databricks/cli/libs/dyn/yamlloader" + "github.com/databricks/cli/libs/log" + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/databricks/databricks-sdk-go/service/pipelines" + "gopkg.in/yaml.v3" +) + +// DiffWriter handles writing diff changes back to YAML files +type DiffWriter struct { + bundle *bundle.Bundle +} + +// NewDiffWriter creates a new DiffWriter +func NewDiffWriter(b *bundle.Bundle) *DiffWriter { + return &DiffWriter{bundle: b} +} + +// WriteJobDiff writes job diff changes back to the YAML file +func (w *DiffWriter) WriteJobDiff(ctx context.Context, jobKey string, currentState any) error { + return w.writeResourceDiff(ctx, "jobs", jobKey, currentState, extractJobSettings) +} + +// WritePipelineDiff writes pipeline diff changes back to the YAML file +func (w *DiffWriter) WritePipelineDiff(ctx context.Context, pipelineKey string, currentState any) error { + return w.writeResourceDiff(ctx, "pipelines", pipelineKey, currentState, extractPipelineSpec) +} + +// extractorFunc extracts the relevant settings from the full API response +type extractorFunc func(any) (any, error) + +// extractJobSettings extracts JobSettings from a Job response +func extractJobSettings(state any) (any, error) { + job, ok := state.(*jobs.Job) + if !ok { + return nil, fmt.Errorf("expected *jobs.Job, got %T", state) + } + if job.Settings == nil { + return nil, errors.New("job settings is nil") + } + return job.Settings, nil +} + +// extractPipelineSpec extracts PipelineSpec from a Pipeline response +func extractPipelineSpec(state any) (any, error) { + pipeline, ok := state.(*pipelines.GetPipelineResponse) + if !ok { + return nil, fmt.Errorf("expected *pipelines.GetPipelineResponse, got %T", state) + } + if pipeline.Spec == nil { + return nil, errors.New("pipeline spec is nil") + } + return pipeline.Spec, nil +} + +// writeResourceDiff writes resource diff changes back to the YAML file +func (w *DiffWriter) writeResourceDiff(ctx context.Context, resourceType, resourceKey string, currentState any, extractor extractorFunc) error { + // Build the path to the resource in the bundle config + resourcePath := dyn.MustPathFromString(fmt.Sprintf("resources.%s.%s", resourceType, resourceKey)) + + // Get the current config value for this resource + resourceValue, err := dyn.GetByPath(w.bundle.Config.Value(), resourcePath) + if err != nil { + return fmt.Errorf("failed to get resource at path %s: %w", resourcePath, err) + } + + // Get the file location for this resource + location := resourceValue.Location() + if location.File == "" { + return fmt.Errorf("resource %s.%s has no file location", resourceType, resourceKey) + } + + log.Infof(ctx, "Updating %s.%s in %s", resourceType, resourceKey, location.File) + + // Extract the relevant settings from the API response + // (e.g., JobSettings from Job, PipelineSpec from Pipeline) + settings, err := extractor(currentState) + if err != nil { + return fmt.Errorf("failed to extract settings from current state: %w", err) + } + + // Convert the settings to dyn.Value, using the current resource value as reference + // to preserve locations and structure + updatedValue, err := convert.FromTyped(settings, resourceValue) + if err != nil { + return fmt.Errorf("failed to convert settings to dyn.Value: %w", err) + } + + log.Debugf(ctx, "Converted remote state to dyn.Value") + + // Update the YAML file with the new resource value + return w.updateYAMLFile(ctx, location.File, resourcePath, updatedValue) +} + +// updateYAMLFile updates a specific resource in a YAML file +func (w *DiffWriter) updateYAMLFile(ctx context.Context, filePath string, resourcePath dyn.Path, newValue dyn.Value) error { + // Read the existing file + content, err := os.ReadFile(filePath) + if err != nil { + return fmt.Errorf("failed to read file %s: %w", filePath, err) + } + + // Parse the YAML file to dyn.Value + fileValue, err := yamlloader.LoadYAML(filePath, bytes.NewReader(content)) + if err != nil { + return fmt.Errorf("failed to parse YAML file %s: %w", filePath, err) + } + + // Update the resource in the file value + updatedFileValue, err := dyn.SetByPath(fileValue, resourcePath, newValue) + if err != nil { + return fmt.Errorf("failed to update resource at path %s: %w", resourcePath, err) + } + + // Write back to the file using the internal encode method + // We need to write the file manually since SaveAsYAML expects .AsAny() + // but our value may contain types that dyn.V() can't handle + err = os.MkdirAll(filepath.Dir(filePath), 0o755) + if err != nil { + return fmt.Errorf("failed to create directory: %w", err) + } + + file, err := os.Create(filePath) + if err != nil { + return fmt.Errorf("failed to create file %s: %w", filePath, err) + } + defer file.Close() + + // Convert to yaml.Node directly from dyn.Value + yamlNode, err := dynValueToYamlNode(updatedFileValue) + if err != nil { + return fmt.Errorf("failed to convert to YAML node: %w", err) + } + + enc := yaml.NewEncoder(file) + enc.SetIndent(2) + err = enc.Encode(yamlNode) + if err != nil { + return fmt.Errorf("failed to write YAML: %w", err) + } + + log.Infof(ctx, "Successfully updated %s", filePath) + return nil +} + +// dynValueToYamlNode converts a dyn.Value to a yaml.Node +// This is similar to yamlsaver.toYamlNode but handles our use case +func dynValueToYamlNode(v dyn.Value) (*yaml.Node, error) { + return dynValueToYamlNodeWithStyle(v, yaml.Style(0), nil) +} + +func dynValueToYamlNodeWithStyle(v dyn.Value, style yaml.Style, stylesMap map[string]yaml.Style) (*yaml.Node, error) { + switch v.Kind() { + case dyn.KindMap: + m, _ := v.AsMap() + var content []*yaml.Node + + // Sort by location line number to preserve order + pairs := m.Pairs() + for _, pair := range pairs { + pk := pair.Key + pv := pair.Value + keyNode := yaml.Node{Kind: yaml.ScalarNode, Value: pk.MustString(), Style: style} + + // Check if this key has a custom style + var nestedStyle yaml.Style + if stylesMap != nil { + if customStyle, ok := stylesMap[pk.MustString()]; ok { + nestedStyle = customStyle + } else { + nestedStyle = style + } + } else { + nestedStyle = style + } + + valueNode, err := dynValueToYamlNodeWithStyle(pv, nestedStyle, stylesMap) + if err != nil { + return nil, err + } + content = append(content, &keyNode) + content = append(content, valueNode) + } + return &yaml.Node{Kind: yaml.MappingNode, Content: content, Style: style}, nil + + case dyn.KindSequence: + seq, _ := v.AsSequence() + var content []*yaml.Node + for _, item := range seq { + node, err := dynValueToYamlNodeWithStyle(item, style, stylesMap) + if err != nil { + return nil, err + } + content = append(content, node) + } + return &yaml.Node{Kind: yaml.SequenceNode, Content: content, Style: style}, nil + + case dyn.KindNil: + return &yaml.Node{Kind: yaml.ScalarNode, Value: "null", Style: style}, nil + + case dyn.KindString: + s := v.MustString() + // Quote strings that look like scalars + if isScalarLikeString(s) { + return &yaml.Node{Kind: yaml.ScalarNode, Value: s, Style: yaml.DoubleQuotedStyle}, nil + } + return &yaml.Node{Kind: yaml.ScalarNode, Value: s, Style: style}, nil + + case dyn.KindBool: + return &yaml.Node{Kind: yaml.ScalarNode, Value: strconv.FormatBool(v.MustBool()), Style: style}, nil + + case dyn.KindInt: + return &yaml.Node{Kind: yaml.ScalarNode, Value: strconv.FormatInt(v.MustInt(), 10), Style: style}, nil + + case dyn.KindFloat: + return &yaml.Node{Kind: yaml.ScalarNode, Value: fmt.Sprint(v.MustFloat()), Style: style}, nil + + case dyn.KindTime: + return &yaml.Node{Kind: yaml.ScalarNode, Value: v.MustTime().String(), Style: style}, nil + + default: + return nil, fmt.Errorf("unsupported kind: %s", v.Kind()) + } +} + +func isScalarLikeString(s string) bool { + if s == "" || s == "true" || s == "false" { + return true + } + // Check if it's a number + if _, err := strconv.ParseInt(s, 0, 64); err == nil { + return true + } + if _, err := strconv.ParseFloat(s, 64); err == nil { + return true + } + return false +} diff --git a/cmd/bundle/debug/exp_diff.go b/cmd/bundle/debug/exp_diff.go index ad24fa5858..f0b26167b7 100644 --- a/cmd/bundle/debug/exp_diff.go +++ b/cmd/bundle/debug/exp_diff.go @@ -24,6 +24,8 @@ type DiffOutput struct { } func NewExpDiffCommand() *cobra.Command { + var save bool + cmd := &cobra.Command{ Use: "exp-diff", Short: "Show differences between current remote state and last deploy snapshot (experimental)", @@ -37,6 +39,8 @@ Note: This command is experimental and may change without notice.`, Args: root.NoArgs, } + cmd.Flags().BoolVar(&save, "save", false, "Save the diff back to the bundle YAML files") + cmd.RunE = func(cmd *cobra.Command, args []string) error { ctx := cmd.Context() @@ -67,6 +71,12 @@ Note: This command is experimental and may change without notice.`, w := b.WorkspaceClient() + // Create diff writer if save flag is set + var writer *DiffWriter + if save { + writer = NewDiffWriter(b) + } + // Compare jobs for key, job := range b.Config.Resources.Jobs { if job.ID == "" { @@ -109,6 +119,14 @@ Note: This command is experimental and may change without notice.`, Changes: changelog, } log.Debugf(ctx, "Found %d changes for job %s", len(changelog), key) + + // Save changes back to YAML if save flag is set + if writer != nil { + err = writer.WriteJobDiff(ctx, key, currentJob) + if err != nil { + log.Warnf(ctx, "Failed to save job %s changes: %v", key, err) + } + } } } @@ -148,6 +166,14 @@ Note: This command is experimental and may change without notice.`, Changes: changelog, } log.Debugf(ctx, "Found %d changes for pipeline %s", len(changelog), key) + + // Save changes back to YAML if save flag is set + if writer != nil { + err = writer.WritePipelineDiff(ctx, key, currentPipeline) + if err != nil { + log.Warnf(ctx, "Failed to save pipeline %s changes: %v", key, err) + } + } } } diff --git a/test_bundle/job.yaml b/test_bundle/job.yaml index 477ac21b04..8b373f95f4 100644 --- a/test_bundle/job.yaml +++ b/test_bundle/job.yaml @@ -8,13 +8,13 @@ resources: email_notifications: {} format: MULTI_TASK max_concurrent_runs: 4 - name: '[dev ilya_kuznetsov] test_exp_diff_job' + name: '[dev ilya_kuznetsov]' queue: enabled: true run_as: user_name: ilya.kuznetsov@databricks.com tags: dev: ilya_kuznetsov - my_tag: my_value3 + my_tag: new_tag_2 timeout_seconds: 0 webhook_notifications: {} From c76888ed63faae084bdcc475c19a6ac840df46d2 Mon Sep 17 00:00:00 2001 From: Ilya Kuznetsov Date: Thu, 13 Nov 2025 14:14:31 +0100 Subject: [PATCH 06/10] Write to file partially --- cmd/bundle/debug/diff_writer.go | 188 +++++++++++++++++++++++++++----- cmd/bundle/debug/exp_diff.go | 18 ++- test_bundle/job.yaml | 15 --- 3 files changed, 173 insertions(+), 48 deletions(-) diff --git a/cmd/bundle/debug/diff_writer.go b/cmd/bundle/debug/diff_writer.go index bfd7e8ef0c..05387732bc 100644 --- a/cmd/bundle/debug/diff_writer.go +++ b/cmd/bundle/debug/diff_writer.go @@ -7,7 +7,9 @@ import ( "fmt" "os" "path/filepath" + "reflect" "strconv" + "strings" "github.com/databricks/cli/bundle" "github.com/databricks/cli/libs/dyn" @@ -16,6 +18,7 @@ import ( "github.com/databricks/cli/libs/log" "github.com/databricks/databricks-sdk-go/service/jobs" "github.com/databricks/databricks-sdk-go/service/pipelines" + "github.com/r3labs/diff/v3" "gopkg.in/yaml.v3" ) @@ -30,13 +33,13 @@ func NewDiffWriter(b *bundle.Bundle) *DiffWriter { } // WriteJobDiff writes job diff changes back to the YAML file -func (w *DiffWriter) WriteJobDiff(ctx context.Context, jobKey string, currentState any) error { - return w.writeResourceDiff(ctx, "jobs", jobKey, currentState, extractJobSettings) +func (w *DiffWriter) WriteJobDiff(ctx context.Context, jobKey string, currentState any, changelog diff.Changelog) error { + return w.writeResourceDiff(ctx, "jobs", jobKey, currentState, changelog, extractJobSettings) } // WritePipelineDiff writes pipeline diff changes back to the YAML file -func (w *DiffWriter) WritePipelineDiff(ctx context.Context, pipelineKey string, currentState any) error { - return w.writeResourceDiff(ctx, "pipelines", pipelineKey, currentState, extractPipelineSpec) +func (w *DiffWriter) WritePipelineDiff(ctx context.Context, pipelineKey string, currentState any, changelog diff.Changelog) error { + return w.writeResourceDiff(ctx, "pipelines", pipelineKey, currentState, changelog, extractPipelineSpec) } // extractorFunc extracts the relevant settings from the full API response @@ -66,8 +69,89 @@ func extractPipelineSpec(state any) (any, error) { return pipeline.Spec, nil } +// filterReadOnlyFields filters out read-only fields from the changelog +func filterReadOnlyFields(ctx context.Context, changelog diff.Changelog) diff.Changelog { + var filtered diff.Changelog + for _, change := range changelog { + if len(change.Path) == 0 { + continue + } + + // Skip read-only job fields + if len(change.Path) >= 2 && change.Path[0] == "Settings" { + fieldName := change.Path[1] + if fieldName == "EditMode" || fieldName == "Deployment" || fieldName == "Format" { + log.Debugf(ctx, "Skipping read-only field: %v", change.Path) + continue + } + } + + // Skip read-only pipeline fields + if len(change.Path) >= 2 && change.Path[0] == "Spec" { + fieldName := change.Path[1] + if fieldName == "Deployment" { + log.Debugf(ctx, "Skipping read-only field: %v", change.Path) + continue + } + } + + filtered = append(filtered, change) + } + return filtered +} + +// unwrapSettingsPath removes the "Settings" or "Spec" wrapper from the path +// SDK responses have job.Settings.Field, but YAML has jobs.my_job.field +func unwrapSettingsPath(path []string) []string { + if len(path) > 0 && (path[0] == "Settings" || path[0] == "Spec") { + return path[1:] + } + return path +} + +// convertChangePathToDynPath converts a changelog path to a dyn.Path +// It handles converting SDK struct field names to JSON tag names using reflection +func convertChangePathToDynPath(path []string, structType reflect.Type) (dyn.Path, error) { + var dynPath dyn.Path + currentType := structType + + for _, segment := range path { + if currentType.Kind() == reflect.Ptr { + currentType = currentType.Elem() + } + + if currentType.Kind() != reflect.Struct { + // For non-struct types (maps, slices, etc.), use the segment as-is + dynPath = dynPath.Append(dyn.Key(segment)) + continue + } + + // Find the field in the struct + field, found := currentType.FieldByName(segment) + if !found { + return nil, fmt.Errorf("field %s not found in type %s", segment, currentType.Name()) + } + + // Get the JSON tag name + jsonTag := field.Tag.Get("json") + if jsonTag == "" { + // No JSON tag, use lowercase field name + jsonTag = strings.ToLower(segment) + } else { + // Parse the JSON tag (it may have options like "omitempty") + parts := strings.Split(jsonTag, ",") + jsonTag = parts[0] + } + + dynPath = dynPath.Append(dyn.Key(jsonTag)) + currentType = field.Type + } + + return dynPath, nil +} + // writeResourceDiff writes resource diff changes back to the YAML file -func (w *DiffWriter) writeResourceDiff(ctx context.Context, resourceType, resourceKey string, currentState any, extractor extractorFunc) error { +func (w *DiffWriter) writeResourceDiff(ctx context.Context, resourceType, resourceKey string, currentState any, changelog diff.Changelog, extractor extractorFunc) error { // Build the path to the resource in the bundle config resourcePath := dyn.MustPathFromString(fmt.Sprintf("resources.%s.%s", resourceType, resourceKey)) @@ -83,7 +167,7 @@ func (w *DiffWriter) writeResourceDiff(ctx context.Context, resourceType, resour return fmt.Errorf("resource %s.%s has no file location", resourceType, resourceKey) } - log.Infof(ctx, "Updating %s.%s in %s", resourceType, resourceKey, location.File) + log.Infof(ctx, "Updating %s.%s in %s with %d changes", resourceType, resourceKey, location.File, len(changelog)) // Extract the relevant settings from the API response // (e.g., JobSettings from Job, PipelineSpec from Pipeline) @@ -92,47 +176,92 @@ func (w *DiffWriter) writeResourceDiff(ctx context.Context, resourceType, resour return fmt.Errorf("failed to extract settings from current state: %w", err) } - // Convert the settings to dyn.Value, using the current resource value as reference - // to preserve locations and structure - updatedValue, err := convert.FromTyped(settings, resourceValue) + // Convert the entire remote settings to dyn.Value so we can extract specific field values + remoteValue, err := convert.FromTyped(settings, resourceValue) if err != nil { return fmt.Errorf("failed to convert settings to dyn.Value: %w", err) } log.Debugf(ctx, "Converted remote state to dyn.Value") - // Update the YAML file with the new resource value - return w.updateYAMLFile(ctx, location.File, resourcePath, updatedValue) -} - -// updateYAMLFile updates a specific resource in a YAML file -func (w *DiffWriter) updateYAMLFile(ctx context.Context, filePath string, resourcePath dyn.Path, newValue dyn.Value) error { - // Read the existing file - content, err := os.ReadFile(filePath) + // Read the YAML file + content, err := os.ReadFile(location.File) if err != nil { - return fmt.Errorf("failed to read file %s: %w", filePath, err) + return fmt.Errorf("failed to read file %s: %w", location.File, err) } // Parse the YAML file to dyn.Value - fileValue, err := yamlloader.LoadYAML(filePath, bytes.NewReader(content)) + fileValue, err := yamlloader.LoadYAML(location.File, bytes.NewReader(content)) if err != nil { - return fmt.Errorf("failed to parse YAML file %s: %w", filePath, err) + return fmt.Errorf("failed to parse YAML file %s: %w", location.File, err) } - // Update the resource in the file value - updatedFileValue, err := dyn.SetByPath(fileValue, resourcePath, newValue) - if err != nil { - return fmt.Errorf("failed to update resource at path %s: %w", resourcePath, err) + // Get the struct type for path conversion + settingsType := reflect.TypeOf(settings) + if settingsType.Kind() == reflect.Ptr { + settingsType = settingsType.Elem() } - // Write back to the file using the internal encode method - // We need to write the file manually since SaveAsYAML expects .AsAny() - // but our value may contain types that dyn.V() can't handle - err = os.MkdirAll(filepath.Dir(filePath), 0o755) + // Apply each change from the changelog + updatedFileValue := fileValue + for _, change := range changelog { + // Unwrap Settings/Spec wrapper from the path + unwrappedPath := unwrapSettingsPath(change.Path) + if len(unwrappedPath) == 0 { + log.Debugf(ctx, "Skipping empty path after unwrapping") + continue + } + + // Convert to dyn.Path with JSON tag names + dynPath, err := convertChangePathToDynPath(unwrappedPath, settingsType) + if err != nil { + log.Warnf(ctx, "Failed to convert path %v: %v", change.Path, err) + continue + } + + // Prepend the resource path + fullPath := resourcePath.Append(dynPath...) + + log.Debugf(ctx, "Applying change %s at path %s", change.Type, fullPath) + + // Apply the change based on type + switch change.Type { + case "create", "update": + // Extract the value from the remote state + fieldValue, err := dyn.GetByPath(remoteValue, dynPath) + if err != nil { + log.Warnf(ctx, "Failed to get value at path %s: %v", dynPath, err) + continue + } + + // Update the file value + updatedFileValue, err = dyn.SetByPath(updatedFileValue, fullPath, fieldValue) + if err != nil { + log.Warnf(ctx, "Failed to set value at path %s: %v", fullPath, err) + continue + } + + case "delete": + // For delete operations, we need to manually manipulate the mapping + // since dyn doesn't have a DeleteByPath function + log.Debugf(ctx, "Skipping delete operation for path %s (not yet implemented)", fullPath) + // TODO: Implement deletion by reconstructing the parent mapping without the key + } + } + + // Write the updated file + return w.writeYAMLFile(ctx, location.File, updatedFileValue) +} + +// writeYAMLFile writes a dyn.Value to a YAML file +func (w *DiffWriter) writeYAMLFile(ctx context.Context, filePath string, fileValue dyn.Value) error { + // Create directory if needed + err := os.MkdirAll(filepath.Dir(filePath), 0o755) if err != nil { return fmt.Errorf("failed to create directory: %w", err) } + // Create the file file, err := os.Create(filePath) if err != nil { return fmt.Errorf("failed to create file %s: %w", filePath, err) @@ -140,11 +269,12 @@ func (w *DiffWriter) updateYAMLFile(ctx context.Context, filePath string, resour defer file.Close() // Convert to yaml.Node directly from dyn.Value - yamlNode, err := dynValueToYamlNode(updatedFileValue) + yamlNode, err := dynValueToYamlNode(fileValue) if err != nil { return fmt.Errorf("failed to convert to YAML node: %w", err) } + // Write the YAML enc := yaml.NewEncoder(file) enc.SetIndent(2) err = enc.Encode(yamlNode) diff --git a/cmd/bundle/debug/exp_diff.go b/cmd/bundle/debug/exp_diff.go index f0b26167b7..b34bf3a649 100644 --- a/cmd/bundle/debug/exp_diff.go +++ b/cmd/bundle/debug/exp_diff.go @@ -4,6 +4,7 @@ import ( "encoding/json" "strconv" + "github.com/databricks/cli/bundle/direct/dresources" "github.com/databricks/cli/bundle/resourcesnapshot" "github.com/databricks/cli/cmd/bundle/utils" "github.com/databricks/cli/cmd/root" @@ -106,8 +107,12 @@ Note: This command is experimental and may change without notice.`, continue } + job := dresources.ResourceJob{} + currentJobComparable := job.RemapState(currentJob) + previousJobComparable := job.RemapState(previousJob) + // Compare previous and current state - changelog, err := diff.Diff(previousJob, currentJob) + changelog, err := diff.Diff(previousJobComparable, currentJobComparable) if err != nil { log.Warnf(ctx, "Failed to diff job %s: %v", key, err) continue @@ -122,7 +127,8 @@ Note: This command is experimental and may change without notice.`, // Save changes back to YAML if save flag is set if writer != nil { - err = writer.WriteJobDiff(ctx, key, currentJob) + // Filter out read-only fields before saving + err = writer.WriteJobDiff(ctx, key, currentJob, changelog) if err != nil { log.Warnf(ctx, "Failed to save job %s changes: %v", key, err) } @@ -153,8 +159,12 @@ Note: This command is experimental and may change without notice.`, continue } + pipeline := dresources.ResourcePipeline{} + currentPipelineComparable := pipeline.RemapState(currentPipeline) + previousPipelineComparable := pipeline.RemapState(previousPipeline) + // Compare previous and current state - changelog, err := diff.Diff(previousPipeline, currentPipeline) + changelog, err := diff.Diff(previousPipelineComparable, currentPipelineComparable) if err != nil { log.Warnf(ctx, "Failed to diff pipeline %s: %v", key, err) continue @@ -169,7 +179,7 @@ Note: This command is experimental and may change without notice.`, // Save changes back to YAML if save flag is set if writer != nil { - err = writer.WritePipelineDiff(ctx, key, currentPipeline) + err = writer.WritePipelineDiff(ctx, key, currentPipeline, changelog) if err != nil { log.Warnf(ctx, "Failed to save pipeline %s changes: %v", key, err) } diff --git a/test_bundle/job.yaml b/test_bundle/job.yaml index 8b373f95f4..221511c7a6 100644 --- a/test_bundle/job.yaml +++ b/test_bundle/job.yaml @@ -1,20 +1,5 @@ resources: jobs: test_exp_diff_job: - deployment: - kind: BUNDLE - metadata_file_path: /Workspace/Users/ilya.kuznetsov@databricks.com/.bundle/databricks-bundle-test/dev/state/metadata.json - edit_mode: EDITABLE - email_notifications: {} - format: MULTI_TASK - max_concurrent_runs: 4 - name: '[dev ilya_kuznetsov]' queue: enabled: true - run_as: - user_name: ilya.kuznetsov@databricks.com - tags: - dev: ilya_kuznetsov - my_tag: new_tag_2 - timeout_seconds: 0 - webhook_notifications: {} From 7a8bd5d1322863d4f92245de1cee9d0ddd77f8da Mon Sep 17 00:00:00 2001 From: Ilya Kuznetsov Date: Thu, 13 Nov 2025 14:23:25 +0100 Subject: [PATCH 07/10] Fix --- cmd/bundle/debug/diff_writer.go | 60 ++++++++++++++++----------------- test_bundle/job.yaml | 3 ++ 2 files changed, 32 insertions(+), 31 deletions(-) diff --git a/cmd/bundle/debug/diff_writer.go b/cmd/bundle/debug/diff_writer.go index 05387732bc..288ff9f6fb 100644 --- a/cmd/bundle/debug/diff_writer.go +++ b/cmd/bundle/debug/diff_writer.go @@ -69,37 +69,6 @@ func extractPipelineSpec(state any) (any, error) { return pipeline.Spec, nil } -// filterReadOnlyFields filters out read-only fields from the changelog -func filterReadOnlyFields(ctx context.Context, changelog diff.Changelog) diff.Changelog { - var filtered diff.Changelog - for _, change := range changelog { - if len(change.Path) == 0 { - continue - } - - // Skip read-only job fields - if len(change.Path) >= 2 && change.Path[0] == "Settings" { - fieldName := change.Path[1] - if fieldName == "EditMode" || fieldName == "Deployment" || fieldName == "Format" { - log.Debugf(ctx, "Skipping read-only field: %v", change.Path) - continue - } - } - - // Skip read-only pipeline fields - if len(change.Path) >= 2 && change.Path[0] == "Spec" { - fieldName := change.Path[1] - if fieldName == "Deployment" { - log.Debugf(ctx, "Skipping read-only field: %v", change.Path) - continue - } - } - - filtered = append(filtered, change) - } - return filtered -} - // unwrapSettingsPath removes the "Settings" or "Spec" wrapper from the path // SDK responses have job.Settings.Field, but YAML has jobs.my_job.field func unwrapSettingsPath(path []string) []string { @@ -150,6 +119,28 @@ func convertChangePathToDynPath(path []string, structType reflect.Type) (dyn.Pat return dynPath, nil } +// ensurePathExists ensures all intermediate path segments exist before setting a value +// If an intermediate path doesn't exist, it creates an empty mapping at that location +func ensurePathExists(ctx context.Context, v dyn.Value, path dyn.Path) (dyn.Value, error) { + // Build the path incrementally and ensure each level exists + for i := range path { + intermediatePath := path[:i+1] + + // Check if this path exists + item, err := dyn.GetByPath(v, intermediatePath) + if err != nil || !item.IsValid() { + // Path doesn't exist, create an empty mapping + log.Debugf(ctx, "Creating intermediate path: %s", intermediatePath) + v, err = dyn.SetByPath(v, intermediatePath, dyn.V(dyn.NewMapping())) + if err != nil { + return dyn.InvalidValue, fmt.Errorf("failed to create intermediate path %s: %w", intermediatePath, err) + } + } + } + + return v, nil +} + // writeResourceDiff writes resource diff changes back to the YAML file func (w *DiffWriter) writeResourceDiff(ctx context.Context, resourceType, resourceKey string, currentState any, changelog diff.Changelog, extractor extractorFunc) error { // Build the path to the resource in the bundle config @@ -234,6 +225,13 @@ func (w *DiffWriter) writeResourceDiff(ctx context.Context, resourceType, resour continue } + // Ensure all intermediate paths exist before setting the value + updatedFileValue, err = ensurePathExists(ctx, updatedFileValue, fullPath) + if err != nil { + log.Warnf(ctx, "Failed to ensure path exists %s: %v", fullPath, err) + continue + } + // Update the file value updatedFileValue, err = dyn.SetByPath(updatedFileValue, fullPath, fieldValue) if err != nil { diff --git a/test_bundle/job.yaml b/test_bundle/job.yaml index 221511c7a6..e56caa0def 100644 --- a/test_bundle/job.yaml +++ b/test_bundle/job.yaml @@ -3,3 +3,6 @@ resources: test_exp_diff_job: queue: enabled: true + edit_mode: EDITABLE + tags: + my_tag: new_tag_2 From a4c98d15952860680573994dbb1a66c24bbd1f9e Mon Sep 17 00:00:00 2001 From: Ilya Kuznetsov Date: Thu, 13 Nov 2025 14:25:33 +0100 Subject: [PATCH 08/10] Fix --- cmd/bundle/debug/diff_writer.go | 9 +++++++++ test_bundle/job.yaml | 3 --- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/cmd/bundle/debug/diff_writer.go b/cmd/bundle/debug/diff_writer.go index 288ff9f6fb..15041994c1 100644 --- a/cmd/bundle/debug/diff_writer.go +++ b/cmd/bundle/debug/diff_writer.go @@ -196,6 +196,15 @@ func (w *DiffWriter) writeResourceDiff(ctx context.Context, resourceType, resour // Apply each change from the changelog updatedFileValue := fileValue for _, change := range changelog { + // Skip read-only fields (EditMode, Deployment, Format) + if len(change.Path) > 0 { + fieldName := change.Path[0] + if fieldName == "EditMode" || fieldName == "Deployment" || fieldName == "Format" { + log.Debugf(ctx, "Skipping read-only field: %v", change.Path) + continue + } + } + // Unwrap Settings/Spec wrapper from the path unwrappedPath := unwrapSettingsPath(change.Path) if len(unwrappedPath) == 0 { diff --git a/test_bundle/job.yaml b/test_bundle/job.yaml index e56caa0def..221511c7a6 100644 --- a/test_bundle/job.yaml +++ b/test_bundle/job.yaml @@ -3,6 +3,3 @@ resources: test_exp_diff_job: queue: enabled: true - edit_mode: EDITABLE - tags: - my_tag: new_tag_2 From c500732f54bf99e1e6217ee4318edb0f57a441ff Mon Sep 17 00:00:00 2001 From: Ilya Kuznetsov Date: Thu, 13 Nov 2025 14:48:53 +0100 Subject: [PATCH 09/10] Working prototype --- cmd/bundle/debug/diff_writer.go | 69 ++++++++++++++++++++++++++------- cmd/bundle/debug/exp_diff.go | 57 ++++++++++++++++----------- test_bundle/databricks.yml | 1 + test_bundle/job.yaml | 3 ++ test_bundle/notebook.py | 0 test_bundle/pipeline.yaml | 10 +++++ 6 files changed, 104 insertions(+), 36 deletions(-) create mode 100644 test_bundle/notebook.py create mode 100644 test_bundle/pipeline.yaml diff --git a/cmd/bundle/debug/diff_writer.go b/cmd/bundle/debug/diff_writer.go index 15041994c1..5f42c69c2b 100644 --- a/cmd/bundle/debug/diff_writer.go +++ b/cmd/bundle/debug/diff_writer.go @@ -24,7 +24,8 @@ import ( // DiffWriter handles writing diff changes back to YAML files type DiffWriter struct { - bundle *bundle.Bundle + bundle *bundle.Bundle + saveToFile bool } // NewDiffWriter creates a new DiffWriter @@ -33,12 +34,12 @@ func NewDiffWriter(b *bundle.Bundle) *DiffWriter { } // WriteJobDiff writes job diff changes back to the YAML file -func (w *DiffWriter) WriteJobDiff(ctx context.Context, jobKey string, currentState any, changelog diff.Changelog) error { +func (w *DiffWriter) WriteJobDiff(ctx context.Context, jobKey string, currentState any, changelog diff.Changelog) (*FileChange, error) { return w.writeResourceDiff(ctx, "jobs", jobKey, currentState, changelog, extractJobSettings) } // WritePipelineDiff writes pipeline diff changes back to the YAML file -func (w *DiffWriter) WritePipelineDiff(ctx context.Context, pipelineKey string, currentState any, changelog diff.Changelog) error { +func (w *DiffWriter) WritePipelineDiff(ctx context.Context, pipelineKey string, currentState any, changelog diff.Changelog) (*FileChange, error) { return w.writeResourceDiff(ctx, "pipelines", pipelineKey, currentState, changelog, extractPipelineSpec) } @@ -142,20 +143,20 @@ func ensurePathExists(ctx context.Context, v dyn.Value, path dyn.Path) (dyn.Valu } // writeResourceDiff writes resource diff changes back to the YAML file -func (w *DiffWriter) writeResourceDiff(ctx context.Context, resourceType, resourceKey string, currentState any, changelog diff.Changelog, extractor extractorFunc) error { +func (w *DiffWriter) writeResourceDiff(ctx context.Context, resourceType, resourceKey string, currentState any, changelog diff.Changelog, extractor extractorFunc) (*FileChange, error) { // Build the path to the resource in the bundle config resourcePath := dyn.MustPathFromString(fmt.Sprintf("resources.%s.%s", resourceType, resourceKey)) // Get the current config value for this resource resourceValue, err := dyn.GetByPath(w.bundle.Config.Value(), resourcePath) if err != nil { - return fmt.Errorf("failed to get resource at path %s: %w", resourcePath, err) + return nil, fmt.Errorf("failed to get resource at path %s: %w", resourcePath, err) } // Get the file location for this resource location := resourceValue.Location() if location.File == "" { - return fmt.Errorf("resource %s.%s has no file location", resourceType, resourceKey) + return nil, fmt.Errorf("resource %s.%s has no file location", resourceType, resourceKey) } log.Infof(ctx, "Updating %s.%s in %s with %d changes", resourceType, resourceKey, location.File, len(changelog)) @@ -164,27 +165,27 @@ func (w *DiffWriter) writeResourceDiff(ctx context.Context, resourceType, resour // (e.g., JobSettings from Job, PipelineSpec from Pipeline) settings, err := extractor(currentState) if err != nil { - return fmt.Errorf("failed to extract settings from current state: %w", err) + return nil, fmt.Errorf("failed to extract settings from current state: %w", err) } // Convert the entire remote settings to dyn.Value so we can extract specific field values remoteValue, err := convert.FromTyped(settings, resourceValue) if err != nil { - return fmt.Errorf("failed to convert settings to dyn.Value: %w", err) + return nil, fmt.Errorf("failed to convert settings to dyn.Value: %w", err) } log.Debugf(ctx, "Converted remote state to dyn.Value") // Read the YAML file - content, err := os.ReadFile(location.File) + originalContent, err := os.ReadFile(location.File) if err != nil { - return fmt.Errorf("failed to read file %s: %w", location.File, err) + return nil, fmt.Errorf("failed to read file %s: %w", location.File, err) } // Parse the YAML file to dyn.Value - fileValue, err := yamlloader.LoadYAML(location.File, bytes.NewReader(content)) + fileValue, err := yamlloader.LoadYAML(location.File, bytes.NewReader(originalContent)) if err != nil { - return fmt.Errorf("failed to parse YAML file %s: %w", location.File, err) + return nil, fmt.Errorf("failed to parse YAML file %s: %w", location.File, err) } // Get the struct type for path conversion @@ -256,12 +257,54 @@ func (w *DiffWriter) writeResourceDiff(ctx context.Context, resourceType, resour } } + // Generate the modified content + modifiedContent, err := w.generateYAMLContent(updatedFileValue) + if err != nil { + return nil, fmt.Errorf("failed to generate modified YAML: %w", err) + } + // Write the updated file - return w.writeYAMLFile(ctx, location.File, updatedFileValue) + err = w.writeYAMLFile(ctx, location.File, updatedFileValue) + if err != nil { + return nil, err + } + + // Return the file change info + return &FileChange{ + Path: location.File, + OriginalContent: string(originalContent), + ModifiedContent: modifiedContent, + }, nil +} + +// generateYAMLContent converts a dyn.Value to YAML string +func (w *DiffWriter) generateYAMLContent(fileValue dyn.Value) (string, error) { + // Convert to yaml.Node directly from dyn.Value + yamlNode, err := dynValueToYamlNode(fileValue) + if err != nil { + return "", fmt.Errorf("failed to convert to YAML node: %w", err) + } + + // Write the YAML to a buffer + var buf bytes.Buffer + enc := yaml.NewEncoder(&buf) + enc.SetIndent(2) + err = enc.Encode(yamlNode) + if err != nil { + return "", fmt.Errorf("failed to encode YAML: %w", err) + } + + return buf.String(), nil } // writeYAMLFile writes a dyn.Value to a YAML file func (w *DiffWriter) writeYAMLFile(ctx context.Context, filePath string, fileValue dyn.Value) error { + // Skip writing if saveToFile is false + if !w.saveToFile { + log.Debugf(ctx, "Skipping file write (save flag not set)") + return nil + } + // Create directory if needed err := os.MkdirAll(filepath.Dir(filePath), 0o755) if err != nil { diff --git a/cmd/bundle/debug/exp_diff.go b/cmd/bundle/debug/exp_diff.go index b34bf3a649..9465c418d2 100644 --- a/cmd/bundle/debug/exp_diff.go +++ b/cmd/bundle/debug/exp_diff.go @@ -19,11 +19,22 @@ type ResourceDiff struct { Changes diff.Changelog `json:"changes"` } -type DiffOutput struct { +type FileChange struct { + Path string `json:"path"` + OriginalContent string `json:"originalContent"` + ModifiedContent string `json:"modifiedContent"` +} + +type ChangesSummary struct { Jobs map[string]*ResourceDiff `json:"jobs,omitempty"` Pipelines map[string]*ResourceDiff `json:"pipelines,omitempty"` } +type DiffOutput struct { + Files []FileChange `json:"files"` + Changes *ChangesSummary `json:"changes"` +} + func NewExpDiffCommand() *cobra.Command { var save bool @@ -60,8 +71,11 @@ Note: This command is experimental and may change without notice.`, } output := &DiffOutput{ - Jobs: make(map[string]*ResourceDiff), - Pipelines: make(map[string]*ResourceDiff), + Files: []FileChange{}, + Changes: &ChangesSummary{ + Jobs: make(map[string]*ResourceDiff), + Pipelines: make(map[string]*ResourceDiff), + }, } // If no snapshot exists, return empty diff @@ -72,11 +86,9 @@ Note: This command is experimental and may change without notice.`, w := b.WorkspaceClient() - // Create diff writer if save flag is set - var writer *DiffWriter - if save { - writer = NewDiffWriter(b) - } + // Create diff writer + writer := NewDiffWriter(b) + writer.saveToFile = save // Compare jobs for key, job := range b.Config.Resources.Jobs { @@ -120,18 +132,17 @@ Note: This command is experimental and may change without notice.`, // Only add to output if there are changes if len(changelog) > 0 { - output.Jobs[key] = &ResourceDiff{ + output.Changes.Jobs[key] = &ResourceDiff{ Changes: changelog, } log.Debugf(ctx, "Found %d changes for job %s", len(changelog), key) - // Save changes back to YAML if save flag is set - if writer != nil { - // Filter out read-only fields before saving - err = writer.WriteJobDiff(ctx, key, currentJob, changelog) - if err != nil { - log.Warnf(ctx, "Failed to save job %s changes: %v", key, err) - } + // Generate file change (will save to file if save flag is set) + fileChange, err := writer.WriteJobDiff(ctx, key, currentJob, changelog) + if err != nil { + log.Warnf(ctx, "Failed to process job %s changes: %v", key, err) + } else if fileChange != nil { + output.Files = append(output.Files, *fileChange) } } } @@ -172,17 +183,17 @@ Note: This command is experimental and may change without notice.`, // Only add to output if there are changes if len(changelog) > 0 { - output.Pipelines[key] = &ResourceDiff{ + output.Changes.Pipelines[key] = &ResourceDiff{ Changes: changelog, } log.Debugf(ctx, "Found %d changes for pipeline %s", len(changelog), key) - // Save changes back to YAML if save flag is set - if writer != nil { - err = writer.WritePipelineDiff(ctx, key, currentPipeline, changelog) - if err != nil { - log.Warnf(ctx, "Failed to save pipeline %s changes: %v", key, err) - } + // Generate file change (will save to file if save flag is set) + fileChange, err := writer.WritePipelineDiff(ctx, key, currentPipeline, changelog) + if err != nil { + log.Warnf(ctx, "Failed to process pipeline %s changes: %v", key, err) + } else if fileChange != nil { + output.Files = append(output.Files, *fileChange) } } } diff --git a/test_bundle/databricks.yml b/test_bundle/databricks.yml index a8b4ff4a60..8838e6c615 100644 --- a/test_bundle/databricks.yml +++ b/test_bundle/databricks.yml @@ -9,3 +9,4 @@ targets: include: - job.yaml + - pipeline.yaml diff --git a/test_bundle/job.yaml b/test_bundle/job.yaml index 221511c7a6..7ac6d4ce55 100644 --- a/test_bundle/job.yaml +++ b/test_bundle/job.yaml @@ -3,3 +3,6 @@ resources: test_exp_diff_job: queue: enabled: true + performance_target: PERFORMANCE_OPTIMIZED + tags: + new: taggg diff --git a/test_bundle/notebook.py b/test_bundle/notebook.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/test_bundle/pipeline.yaml b/test_bundle/pipeline.yaml new file mode 100644 index 0000000000..deeb015ff9 --- /dev/null +++ b/test_bundle/pipeline.yaml @@ -0,0 +1,10 @@ +resources: + pipelines: + test_exp_diff_pipeline: + name: test_exp_diff_pipeline + tags: + my_tag: new_tag + new_tag: tag3 + libraries: + - file: + path: ./notebook.py From 7f67c2931e26247c228ce7ffea612cf21f0e07f8 Mon Sep 17 00:00:00 2001 From: Ilya Kuznetsov Date: Mon, 17 Nov 2025 16:40:42 +0100 Subject: [PATCH 10/10] Fix diff --- cmd/bundle/debug/diff_writer.go | 12 ++++- cmd/bundle/debug/diff_writer_test.go | 78 ++++++++++++++++++++++++++++ 2 files changed, 89 insertions(+), 1 deletion(-) create mode 100644 cmd/bundle/debug/diff_writer_test.go diff --git a/cmd/bundle/debug/diff_writer.go b/cmd/bundle/debug/diff_writer.go index 5f42c69c2b..c5677c3ab8 100644 --- a/cmd/bundle/debug/diff_writer.go +++ b/cmd/bundle/debug/diff_writer.go @@ -91,7 +91,17 @@ func convertChangePathToDynPath(path []string, structType reflect.Type) (dyn.Pat } if currentType.Kind() != reflect.Struct { - // For non-struct types (maps, slices, etc.), use the segment as-is + // Check if segment is a numeric index (for arrays/slices) + if index, err := strconv.Atoi(segment); err == nil { + // It's a numeric index - use Index instead of Key + dynPath = dynPath.Append(dyn.Index(index)) + // Update currentType to element type for arrays/slices + if currentType.Kind() == reflect.Slice || currentType.Kind() == reflect.Array { + currentType = currentType.Elem() + } + continue + } + // For non-numeric segments in non-structs (e.g., map keys), use the segment as-is dynPath = dynPath.Append(dyn.Key(segment)) continue } diff --git a/cmd/bundle/debug/diff_writer_test.go b/cmd/bundle/debug/diff_writer_test.go new file mode 100644 index 0000000000..15f05ac70f --- /dev/null +++ b/cmd/bundle/debug/diff_writer_test.go @@ -0,0 +1,78 @@ +package debug + +import ( + "reflect" + "testing" + + "github.com/databricks/cli/libs/dyn" + "github.com/databricks/databricks-sdk-go/service/pipelines" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestConvertChangePathToDynPath_WithArrayIndex(t *testing.T) { + // Test case: path with array index like ["Libraries", "1", "Glob"] + // This simulates the path from r3labs/diff when comparing pipeline libraries + path := []string{"Libraries", "1", "Glob"} + structType := reflect.TypeOf(pipelines.PipelineSpec{}) + + dynPath, err := convertChangePathToDynPath(path, structType) + require.NoError(t, err) + + // Expected path: libraries[1].glob + // Should have 3 components: Key("libraries"), Index(1), Key("glob") + assert.Equal(t, 3, len(dynPath)) + + // First component should be a key "libraries" + assert.Equal(t, dyn.Key("libraries"), dynPath[0]) + + // Second component should be an index 1 + assert.Equal(t, dyn.Index(1), dynPath[1]) + + // Third component should be a key "glob" + assert.Equal(t, dyn.Key("glob"), dynPath[2]) +} + +func TestConvertChangePathToDynPath_WithMultipleArrayIndices(t *testing.T) { + // Test case: path with multiple array indices + path := []string{"Libraries", "0"} + structType := reflect.TypeOf(pipelines.PipelineSpec{}) + + dynPath, err := convertChangePathToDynPath(path, structType) + require.NoError(t, err) + + // Expected path: libraries[0] + assert.Equal(t, 2, len(dynPath)) + assert.Equal(t, dyn.Key("libraries"), dynPath[0]) + assert.Equal(t, dyn.Index(0), dynPath[1]) +} + +func TestConvertChangePathToDynPath_WithoutArrayIndex(t *testing.T) { + // Test case: simple path without array indices + path := []string{"Name"} + structType := reflect.TypeOf(pipelines.PipelineSpec{}) + + dynPath, err := convertChangePathToDynPath(path, structType) + require.NoError(t, err) + + // Expected path: name + assert.Equal(t, 1, len(dynPath)) + assert.Equal(t, dyn.Key("name"), dynPath[0]) +} + +func TestConvertChangePathToDynPath_NestedStructWithArrayIndex(t *testing.T) { + // Test case: array of structs, accessing a field in array element + // ["Libraries", "1", "Notebook", "Path"] + path := []string{"Libraries", "1", "Notebook", "Path"} + structType := reflect.TypeOf(pipelines.PipelineSpec{}) + + dynPath, err := convertChangePathToDynPath(path, structType) + require.NoError(t, err) + + // Expected: libraries[1].notebook.path + assert.Equal(t, 4, len(dynPath)) + assert.Equal(t, dyn.Key("libraries"), dynPath[0]) + assert.Equal(t, dyn.Index(1), dynPath[1]) + assert.Equal(t, dyn.Key("notebook"), dynPath[2]) + assert.Equal(t, dyn.Key("path"), dynPath[3]) +}