diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 84ba610..56d3a21 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -20,7 +20,8 @@ stackstate-backup-cli/ │ ├── root.go # Root command and global flags │ ├── version/ # Version information command │ ├── elasticsearch/ # Elasticsearch backup/restore commands -│ └── stackgraph/ # Stackgraph backup/restore commands +│ ├── stackgraph/ # Stackgraph backup/restore commands +│ └── victoriametrics/ # VictoriaMetrics backup/restore commands │ ├── internal/ # Internal packages (Layers 0-3) │ ├── foundation/ # Layer 0: Core utilities @@ -35,7 +36,8 @@ stackstate-backup-cli/ │ │ │ ├── orchestration/ # Layer 2: Workflows │ │ ├── portforward/ # Port-forwarding orchestration -│ │ └── scale/ # Deployment scaling workflows +│ │ ├── scale/ # Deployment/StatefulSet scaling workflows +│ │ └── restore/ # Restore job orchestration │ │ │ ├── app/ # Layer 3: Dependency Container │ │ └── app.go # Application context and dependency injection @@ -62,7 +64,8 @@ stackstate-backup-cli/ **Key Packages**: - `cmd/elasticsearch/`: Elasticsearch snapshot/restore commands (configure, list-snapshots, list-indices, restore-snapshot) -- `cmd/stackgraph/`: Stackgraph backup/restore commands (list, restore) +- `cmd/stackgraph/`: Stackgraph backup/restore commands (list, restore, check-and-finalize) +- `cmd/victoriametrics/`: VictoriaMetrics backup/restore commands (list, restore, check-and-finalize) - `cmd/version/`: Version information **Dependency Rules**: @@ -117,7 +120,8 @@ appCtx.Formatter **Key Packages**: - `portforward/`: Manages Kubernetes port-forwarding lifecycle -- `scale/`: Deployment scaling workflows with detailed logging +- `scale/`: Deployment and StatefulSet scaling workflows with detailed logging +- `restore/`: Restore job orchestration (confirmation, job lifecycle, finalization, resource management) **Dependency Rules**: - ✅ Can import: `internal/foundation/*`, `internal/clients/*` @@ -167,7 +171,7 @@ appCtx.Formatter ``` 1. User invokes CLI command - └─> cmd/elasticsearch/restore-snapshot.go + └─> cmd/victoriametrics/restore.go (or stackgraph/restore.go) │ 2. Parse flags and validate input └─> Cobra command receives global flags @@ -177,16 +181,17 @@ appCtx.Formatter ├─> internal/clients/k8s/ (K8s client) ├─> internal/foundation/config/ (Load from ConfigMap/Secret) ├─> internal/clients/s3/ (S3/Minio client) - ├─> internal/clients/elasticsearch/ (ES client) ├─> internal/foundation/logger/ (Logger) └─> internal/foundation/output/ (Formatter) │ 4. Execute business logic with injected dependencies └─> runRestore(appCtx) - ├─> internal/orchestration/scale/ (Scale down) - ├─> internal/orchestration/portforward/ (Port-forward) - ├─> internal/clients/elasticsearch/ (Restore snapshot) - └─> internal/orchestration/scale/ (Scale up) + ├─> internal/orchestration/restore/ (User confirmation) + ├─> internal/orchestration/scale/ (Scale down StatefulSets) + ├─> internal/orchestration/restore/ (Ensure resources: ConfigMaps, Secrets) + ├─> internal/clients/k8s/ (Create restore Job) + ├─> internal/orchestration/restore/ (Wait for completion & cleanup) + └─> internal/orchestration/scale/ (Scale up StatefulSets) │ 5. Format and display results └─> appCtx.Formatter.PrintTable() or PrintJSON() @@ -262,15 +267,50 @@ defer close(pf.StopChan) // Automatic cleanup ### 5. Scale Down/Up Pattern -Deployments are scaled down before restore operations and scaled up afterward: +Deployments and StatefulSets are scaled down before restore operations and scaled up afterward: ```go // Example usage -scaledDeployments, _ := scale.ScaleDown(k8sClient, namespace, selector, log) -defer scale.ScaleUp(k8sClient, namespace, scaledDeployments, log) +scaledResources, _ := scale.ScaleDown(k8sClient, namespace, selector, log) +defer scale.ScaleUpFromAnnotations(k8sClient, namespace, selector, log) ``` -### 6. Structured Logging +**Note**: Scaling now supports both Deployments and StatefulSets through a unified interface. + +### 6. Restore Orchestration Pattern + +Common restore operations are centralized in the `restore` orchestration layer: + +```go +// User confirmation +if !restore.PromptForConfirmation() { +return fmt.Errorf("operation cancelled") +} + +// Wait for job completion and cleanup +restore.PrintWaitingMessage(log, "service-name", jobName, namespace) +err := restore.WaitAndCleanup(k8sClient, namespace, jobName, log, cleanupPVC) + +// Check and finalize background jobs +err := restore.CheckAndFinalize(restore.CheckAndFinalizeParams{ +K8sClient: k8sClient, +Namespace: namespace, +JobName: jobName, +ServiceName: "service-name", +ScaleSelector: config.ScaleDownLabelSelector, +CleanupPVC: true, +WaitForJob: false, +Log: log, +}) +``` + +**Benefits**: + +- Eliminates duplicate code between Stackgraph and VictoriaMetrics restore commands +- Consistent user experience across services +- Centralized job lifecycle management and cleanup + +### 7. Structured Logging All operations use structured logging with consistent levels: diff --git a/README.md b/README.md index 7f9a2e9..1cc244f 100644 --- a/README.md +++ b/README.md @@ -9,8 +9,9 @@ This CLI tool replaces the legacy Bash-based backup/restore scripts with a singl **Current Support:** - Elasticsearch snapshots and restores - Stackgraph backups and restores +- VictoriaMetrics backups and restores -**Planned:** VictoriaMetrics, ClickHouse, Configuration backups +**Planned:** ClickHouse, Configuration backups ## Installation @@ -112,11 +113,76 @@ sts-backup stackgraph restore --namespace [--archive | --late **Flags:** - `--archive` - Specific archive name to restore (e.g., sts-backup-20210216-0300.graph) - `--latest` - Restore from the most recent backup -- `--force` - Force delete existing data during restore - `--background` - Run restore job in background without waiting for completion +- `--yes, -y` - Skip confirmation prompt **Note**: Either `--archive` or `--latest` must be specified (mutually exclusive). +#### check-and-finalize + +Check the status of a background Stackgraph restore job and clean up resources. + +```bash +sts-backup stackgraph check-and-finalize --namespace --job [--wait] +``` + +**Flags:** + +- `--job, -j` - Stackgraph restore job name (required) +- `--wait, -w` - Wait for job to complete before cleanup + +**Use Case**: This command is useful when a restore job was started with `--background` flag or was interrupted ( +Ctrl+C). + +### victoriametrics + +Manage VictoriaMetrics backups and restores. + +#### list + +List available VictoriaMetrics backups from S3/Minio. + +```bash +sts-backup victoriametrics list --namespace +``` + +**Note**: In HA mode, backups from both instances (victoria-metrics-0 and victoria-metrics-1) are listed. The restore +command accepts either backup to restore both instances. + +#### restore + +Restore VictoriaMetrics from a backup archive. Automatically scales down affected StatefulSets before restore and scales +them back up afterward. + +```bash +sts-backup victoriametrics restore --namespace [--archive | --latest] [flags] +``` + +**Flags:** + +- `--archive` - Specific backup name to restore (e.g., sts-victoria-metrics-backup/victoria-metrics-0-20251030143500) +- `--latest` - Restore from the most recent backup +- `--background` - Run restore job in background without waiting for completion +- `--yes, -y` - Skip confirmation prompt + +**Note**: Either `--archive` or `--latest` must be specified (mutually exclusive). + +#### check-and-finalize + +Check the status of a background VictoriaMetrics restore job and clean up resources. + +```bash +sts-backup victoriametrics check-and-finalize --namespace --job [--wait] +``` + +**Flags:** + +- `--job, -j` - VictoriaMetrics restore job name (required) +- `--wait, -w` - Wait for job to complete before cleanup + +**Use Case**: This command is useful when a restore job was started with `--background` flag or was interrupted ( +Ctrl+C). + ## Configuration The CLI uses configuration from Kubernetes ConfigMaps and Secrets with the following precedence: @@ -194,9 +260,14 @@ See [internal/foundation/config/testdata/validConfigMapConfig.yaml](internal/fou │ │ ├── list-indices.go # List indices │ │ ├── list-snapshots.go # List snapshots │ │ └── restore-snapshot.go # Restore snapshot -│ └── stackgraph/ # Stackgraph subcommands +│ ├── stackgraph/ # Stackgraph subcommands +│ │ ├── list.go # List backups +│ │ ├── restore.go # Restore backup +│ │ └── check-and-finalize.go # Check and finalize restore job +│ └── victoriametrics/ # VictoriaMetrics subcommands │ ├── list.go # List backups -│ └── restore.go # Restore backup +│ ├── restore.go # Restore backup +│ └── check-and-finalize.go # Check and finalize restore job ├── internal/ # Internal packages (Layers 0-3) │ ├── foundation/ # Layer 0: Core utilities │ │ ├── config/ # Configuration management @@ -208,7 +279,12 @@ See [internal/foundation/config/testdata/validConfigMapConfig.yaml](internal/fou │ │ └── s3/ # S3/Minio client │ ├── orchestration/ # Layer 2: Workflows │ │ ├── portforward/ # Port-forwarding lifecycle -│ │ └── scale/ # Deployment scaling +│ │ ├── scale/ # Deployment/StatefulSet scaling +│ │ └── restore/ # Restore job orchestration +│ │ ├── confirmation.go # User confirmation prompts +│ │ ├── finalize.go # Job status check and cleanup +│ │ ├── job.go # Job lifecycle management +│ │ └── resources.go # Restore resource management │ ├── app/ # Layer 3: Dependency container │ │ └── app.go # Application context and DI │ └── scripts/ # Embedded bash scripts diff --git a/cmd/elasticsearch/list_snapshots_test.go b/cmd/elasticsearch/list_snapshots_test.go index 21ca38e..1766370 100644 --- a/cmd/elasticsearch/list_snapshots_test.go +++ b/cmd/elasticsearch/list_snapshots_test.go @@ -49,6 +49,26 @@ stackgraph: memory: "2Gi" pvc: size: "10Gi" +victoriaMetrics: + S3Locations: + - bucket: vm-backup + prefix: victoria-metrics-0 + - bucket: vm-backup + prefix: victoria-metrics-1 + restore: + haMode: "mirror" + persistentVolumeClaimPrefix: "database-victoria-metrics-" + scaleDownLabelSelector: "app=victoria-metrics" + job: + image: vm-backup:latest + waitImage: wait:latest + resources: + limits: + cpu: "1" + memory: "2Gi" + requests: + cpu: "500m" + memory: "1Gi" ` // mockESClient is a simple mock for testing commands diff --git a/cmd/root.go b/cmd/root.go index 258e85e..daf729b 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -7,6 +7,7 @@ import ( "github.com/stackvista/stackstate-backup-cli/cmd/elasticsearch" "github.com/stackvista/stackstate-backup-cli/cmd/stackgraph" "github.com/stackvista/stackstate-backup-cli/cmd/version" + "github.com/stackvista/stackstate-backup-cli/cmd/victoriametrics" "github.com/stackvista/stackstate-backup-cli/internal/foundation/config" ) @@ -39,6 +40,10 @@ func init() { addBackupConfigFlags(stackgraphCmd) rootCmd.AddCommand(stackgraphCmd) + victoriaMetricsCmd := victoriametrics.Cmd(flags) + addBackupConfigFlags(victoriaMetricsCmd) + rootCmd.AddCommand(victoriaMetricsCmd) + // Add commands that don't need backup config flags rootCmd.AddCommand(version.Cmd()) } diff --git a/cmd/stackgraph/check_and_finalize.go b/cmd/stackgraph/check_and_finalize.go index 03a2637..fc47df9 100644 --- a/cmd/stackgraph/check_and_finalize.go +++ b/cmd/stackgraph/check_and_finalize.go @@ -7,8 +7,7 @@ import ( "github.com/spf13/cobra" "github.com/stackvista/stackstate-backup-cli/internal/app" "github.com/stackvista/stackstate-backup-cli/internal/foundation/config" - "github.com/stackvista/stackstate-backup-cli/internal/orchestration/scale" - batchv1 "k8s.io/api/batch/v1" + "github.com/stackvista/stackstate-backup-cli/internal/orchestration/restore" ) // Check and finalize command flags @@ -53,92 +52,14 @@ Examples: } func runCheckAndFinalize(appCtx *app.Context) error { - // Get job - appCtx.Logger.Infof("Checking status of job: %s", checkJobName) - job, err := appCtx.K8sClient.GetJob(appCtx.Namespace, checkJobName) - if err != nil { - return fmt.Errorf("failed to get job '%s': %w (job may not exist or has been deleted)", checkJobName, err) - } - - // Check if job is already complete - completed, succeeded := isJobComplete(job) - - if completed { - // Job already finished - print status and cleanup - return handleCompletedJob(appCtx, checkJobName, succeeded) - } - - // Job still running - if waitForJob { - // Wait for completion, then cleanup - return waitAndFinalize(appCtx, checkJobName) - } - - // Not waiting - just print status - printRunningJobStatus(appCtx.Logger, checkJobName, appCtx.Namespace, job.Status.Active) - return nil -} - -// isJobComplete checks if job is in a terminal state -func isJobComplete(job *batchv1.Job) (completed bool, succeeded bool) { - if job.Status.Succeeded > 0 { - return true, true - } - if job.Status.Failed > 0 { - return true, false - } - return false, false -} - -// handleCompletedJob handles a job that's already complete -func handleCompletedJob(appCtx *app.Context, jobName string, succeeded bool) error { - appCtx.Logger.Println() - if succeeded { - appCtx.Logger.Successf("Job completed successfully: %s", jobName) - appCtx.Logger.Println() - - // Scale up deployments that were scaled down before restore - scaleDownLabelSelector := appCtx.Config.Stackgraph.Restore.ScaleDownLabelSelector - if err := scale.ScaleUpFromAnnotations(appCtx.K8sClient, appCtx.Namespace, scaleDownLabelSelector, appCtx.Logger); err != nil { - appCtx.Logger.Warningf("Failed to scale up deployments: %v", err) - } - } else { - appCtx.Logger.Errorf("Job failed: %s", jobName) - appCtx.Logger.Println() - appCtx.Logger.Infof("Fetching logs...") - appCtx.Logger.Println() - if err := printJobLogs(appCtx.K8sClient, appCtx.Namespace, jobName, appCtx.Logger); err != nil { - appCtx.Logger.Warningf("Failed to fetch logs: %v", err) - } - } - - // Cleanup resources - appCtx.Logger.Println() - return cleanupRestoreResources(appCtx.K8sClient, appCtx.Namespace, jobName, appCtx.Logger) -} - -// waitAndFinalize waits for job completion and then cleans up -func waitAndFinalize(appCtx *app.Context, jobName string) error { - printWaitingMessage(appCtx.Logger, jobName, appCtx.Namespace) - - if err := waitForJobCompletion(appCtx.K8sClient, appCtx.Namespace, jobName, appCtx.Logger); err != nil { - appCtx.Logger.Errorf("Job failed: %v", err) - // Still cleanup even if failed - appCtx.Logger.Println() - _ = cleanupRestoreResources(appCtx.K8sClient, appCtx.Namespace, jobName, appCtx.Logger) - return err - } - - appCtx.Logger.Println() - appCtx.Logger.Successf("Job completed successfully: %s", jobName) - appCtx.Logger.Println() - - // Scale up deployments that were scaled down before restore - scaleDownLabelSelector := appCtx.Config.Stackgraph.Restore.ScaleDownLabelSelector - if err := scale.ScaleUpFromAnnotations(appCtx.K8sClient, appCtx.Namespace, scaleDownLabelSelector, appCtx.Logger); err != nil { - appCtx.Logger.Warningf("Failed to scale up deployments: %v", err) - } - - appCtx.Logger.Println() - return cleanupRestoreResources(appCtx.K8sClient, appCtx.Namespace, jobName, appCtx.Logger) + return restore.CheckAndFinalize(restore.CheckAndFinalizeParams{ + K8sClient: appCtx.K8sClient, + Namespace: appCtx.Namespace, + JobName: checkJobName, + ServiceName: "stackgraph", + ScaleSelector: appCtx.Config.Stackgraph.Restore.ScaleDownLabelSelector, + CleanupPVC: true, + WaitForJob: waitForJob, + Log: appCtx.Logger, + }) } diff --git a/cmd/stackgraph/restore.go b/cmd/stackgraph/restore.go index 9a301c5..cc49992 100644 --- a/cmd/stackgraph/restore.go +++ b/cmd/stackgraph/restore.go @@ -1,13 +1,11 @@ package stackgraph import ( - "bufio" "context" "fmt" "os" "sort" "strconv" - "strings" "time" "github.com/aws/aws-sdk-go-v2/aws" @@ -19,19 +17,15 @@ import ( "github.com/stackvista/stackstate-backup-cli/internal/foundation/config" "github.com/stackvista/stackstate-backup-cli/internal/foundation/logger" "github.com/stackvista/stackstate-backup-cli/internal/orchestration/portforward" + "github.com/stackvista/stackstate-backup-cli/internal/orchestration/restore" "github.com/stackvista/stackstate-backup-cli/internal/orchestration/scale" - "github.com/stackvista/stackstate-backup-cli/internal/scripts" corev1 "k8s.io/api/core/v1" ) const ( - jobNameTemplate = "stackgraph-restore" - minioKeysSecretName = "suse-observability-backup-cli-minio-keys" //nolint:gosec // This is a Kubernetes secret name, not a credential - restoreScriptsConfigMap = "suse-observability-backup-cli-restore-scripts" - defaultJobCompletionTimeout = 30 * time.Minute - defaultJobStatusCheckInterval = 10 * time.Second - configMapDefaultFileMode = 0755 - purgeStackgraphDataFlag = "-force" + jobNameTemplate = "stackgraph-restore" + configMapDefaultFileMode = 0755 + purgeStackgraphDataFlag = "-force" ) // Restore command flags @@ -93,7 +87,7 @@ func runRestore(appCtx *app.Context) error { appCtx.Logger.Infof("Namespace: %s", appCtx.Namespace) appCtx.Logger.Println() - if !promptForConfirmation() { + if !restore.PromptForConfirmation() { return fmt.Errorf("restore operation cancelled by user") } } @@ -118,7 +112,7 @@ func runRestore(appCtx *app.Context) error { // Setup Kubernetes resources for restore job appCtx.Logger.Println() - if err := ensureRestoreResources(appCtx.K8sClient, appCtx.Namespace, appCtx.Config, appCtx.Logger); err != nil { + if err := restore.EnsureRestoreResources(appCtx.K8sClient, appCtx.Namespace, appCtx.Config, appCtx.Logger); err != nil { return err } @@ -135,120 +129,17 @@ func runRestore(appCtx *app.Context) error { appCtx.Logger.Successf("Restore job created: %s", jobName) if background { - printRunningJobStatus(appCtx.Logger, jobName, appCtx.Namespace, 0) + restore.PrintRunningJobStatus(appCtx.Logger, "stackgraph", jobName, appCtx.Namespace, 0) return nil } return waitAndCleanupRestoreJob(appCtx.K8sClient, appCtx.Namespace, jobName, appCtx.Logger) } -// ensureRestoreResources ensures that required Kubernetes resources exist for the restore job -func ensureRestoreResources(k8sClient *k8s.Client, namespace string, config *config.Config, log *logger.Logger) error { - // Ensure backup scripts ConfigMap exists - log.Infof("Ensuring backup scripts ConfigMap exists...") - - scriptNames, err := scripts.ListScripts() - if err != nil { - return fmt.Errorf("failed to list embedded scripts: %w", err) - } - - scriptsData := make(map[string]string) - for _, scriptName := range scriptNames { - scriptContent, err := scripts.GetScript(scriptName) - if err != nil { - return fmt.Errorf("failed to get script %s: %w", scriptName, err) - } - scriptsData[scriptName] = string(scriptContent) - } - - configMapLabels := k8s.MergeLabels(config.Kubernetes.CommonLabels, map[string]string{}) - if _, err := k8sClient.EnsureConfigMap(namespace, restoreScriptsConfigMap, scriptsData, configMapLabels); err != nil { - return fmt.Errorf("failed to ensure backup scripts ConfigMap: %w", err) - } - log.Successf("Backup scripts ConfigMap ready") - - // Ensure Minio keys secret exists - log.Infof("Ensuring Minio keys secret exists...") - - secretData := map[string][]byte{ - "accesskey": []byte(config.Minio.AccessKey), - "secretkey": []byte(config.Minio.SecretKey), - } - - secretLabels := k8s.MergeLabels(config.Kubernetes.CommonLabels, map[string]string{}) - if _, err := k8sClient.EnsureSecret(namespace, minioKeysSecretName, secretData, secretLabels); err != nil { - return fmt.Errorf("failed to ensure Minio keys secret: %w", err) - } - log.Successf("Minio keys secret ready") - - return nil -} - -// printWaitingMessage prints waiting message with instructions for interruption -func printWaitingMessage(log *logger.Logger, jobName, namespace string) { - log.Println() - log.Infof("Waiting for restore job to complete (this may take several minutes)...") - log.Println() - log.Infof("You can safely interrupt this command with Ctrl+C.") - log.Infof("To check status, scale up the required deployments and cleanup later, run:") - log.Infof(" sts-backup stackgraph check-and-finalize --job %s --wait -n %s", jobName, namespace) -} - -// printRunningJobStatus prints status and instructions for a running job -func printRunningJobStatus(log *logger.Logger, jobName, namespace string, activePods int32) { - log.Println() - log.Infof("Job is running in background: %s", jobName) - if activePods > 0 { - log.Infof(" Active pods: %d", activePods) - } - log.Println() - log.Infof("Monitoring commands:") - log.Infof(" kubectl logs --follow job/%s -n %s", jobName, namespace) - log.Infof(" kubectl get job %s -n %s", jobName, namespace) - log.Println() - log.Infof("To wait for completion, scaling up the necessary deployments and cleanup, run:") - log.Infof(" sts-backup stackgraph check-and-finalize --job %s --wait -n %s", jobName, namespace) -} - -// cleanupRestoreResources cleans up job and PVC resources -func cleanupRestoreResources(k8sClient *k8s.Client, namespace, jobName string, log *logger.Logger) error { - log.Infof("Cleaning up job and PVC...") - - // Delete job - if err := k8sClient.DeleteJob(namespace, jobName); err != nil { - log.Warningf("Failed to delete job: %v", err) - } else { - log.Successf("Job deleted: %s", jobName) - } - - // Delete PVC (same name as job) - if err := k8sClient.DeletePVC(namespace, jobName); err != nil { - log.Warningf("Failed to delete PVC: %v", err) - } else { - log.Successf("PVC deleted: %s", jobName) - } - - return nil -} - // waitAndCleanupRestoreJob waits for job completion and cleans up resources func waitAndCleanupRestoreJob(k8sClient *k8s.Client, namespace, jobName string, log *logger.Logger) error { - printWaitingMessage(log, jobName, namespace) - - if err := waitForJobCompletion(k8sClient, namespace, jobName, log); err != nil { - log.Errorf("Job failed: %v", err) - log.Println() - log.Infof("Cleanup commands:") - log.Infof(" kubectl delete job,pvc %s -n %s", jobName, namespace) - return err - } - - log.Println() - log.Successf("Restore completed successfully") - - // Cleanup job and PVC using shared function - log.Println() - return cleanupRestoreResources(k8sClient, namespace, jobName, log) + restore.PrintWaitingMessage(log, "stackgraph", jobName, namespace) + return restore.WaitAndCleanup(k8sClient, namespace, jobName, log, true) } // getLatestBackup retrieves the most recent backup from S3 @@ -346,21 +237,16 @@ func createRestoreJob(k8sClient *k8s.Client, namespace, jobName, backupFile stri // Build job spec using configuration spec := k8s.BackupJobSpec{ - Name: jobName, - Labels: jobLabels, - ImagePullSecrets: k8s.ConvertImagePullSecrets(config.Stackgraph.Restore.Job.ImagePullSecrets), - SecurityContext: k8s.ConvertPodSecurityContext(&config.Stackgraph.Restore.Job.SecurityContext), - NodeSelector: config.Stackgraph.Restore.Job.NodeSelector, - Tolerations: k8s.ConvertTolerations(config.Stackgraph.Restore.Job.Tolerations), - Affinity: k8s.ConvertAffinity(config.Stackgraph.Restore.Job.Affinity), - ContainerSecurityContext: k8s.ConvertSecurityContext(config.Stackgraph.Restore.Job.ContainerSecurityContext), - Image: config.Stackgraph.Restore.Job.Image, - Command: []string{"/backup-restore-scripts/restore-stackgraph-backup.sh"}, - Env: buildRestoreEnvVars(backupFile, config), - Resources: k8s.ConvertResources(config.Stackgraph.Restore.Job.Resources), - VolumeMounts: buildRestoreVolumeMounts(), - InitContainers: buildRestoreInitContainers(config), - Volumes: buildRestoreVolumes(jobName, config, defaultMode), + Name: jobName, + Labels: jobLabels, + ImagePullSecrets: k8s.ConvertImagePullSecrets(config.Stackgraph.Restore.Job.ImagePullSecrets), + SecurityContext: k8s.ConvertPodSecurityContext(&config.Stackgraph.Restore.Job.SecurityContext), + NodeSelector: config.Stackgraph.Restore.Job.NodeSelector, + Tolerations: k8s.ConvertTolerations(config.Stackgraph.Restore.Job.Tolerations), + Affinity: k8s.ConvertAffinity(config.Stackgraph.Restore.Job.Affinity), + Containers: buildRestoreContainers(backupFile, config), + InitContainers: buildRestoreInitContainers(config), + Volumes: buildRestoreVolumes(jobName, config, defaultMode), } // Create job @@ -409,6 +295,7 @@ func buildRestoreInitContainers(config *config.Config) []corev1.Container { "-c", fmt.Sprintf("/entrypoint -c %s:%d -t 300", config.Minio.Service.Name, config.Minio.Service.Port), }, + SecurityContext: k8s.ConvertSecurityContext(config.Stackgraph.Restore.Job.ContainerSecurityContext), }, } } @@ -431,7 +318,7 @@ func buildRestoreVolumes(jobName string, config *config.Config, defaultMode int3 VolumeSource: corev1.VolumeSource{ ConfigMap: &corev1.ConfigMapVolumeSource{ LocalObjectReference: corev1.LocalObjectReference{ - Name: restoreScriptsConfigMap, + Name: restore.RestoreScriptsConfigMap, }, DefaultMode: &defaultMode, }, @@ -441,7 +328,7 @@ func buildRestoreVolumes(jobName string, config *config.Config, defaultMode int3 Name: "minio-keys", VolumeSource: corev1.VolumeSource{ Secret: &corev1.SecretVolumeSource{ - SecretName: minioKeysSecretName, + SecretName: restore.MinioKeysSecretName, }, }, }, @@ -456,88 +343,18 @@ func buildRestoreVolumes(jobName string, config *config.Config, defaultMode int3 } } -// waitForJobCompletion waits for a Kubernetes job to complete -func waitForJobCompletion(k8sClient *k8s.Client, namespace, jobName string, log *logger.Logger) error { - timeout := time.After(defaultJobCompletionTimeout) - ticker := time.NewTicker(defaultJobStatusCheckInterval) - defer ticker.Stop() - - for { - select { - case <-timeout: - return fmt.Errorf("timeout waiting for job to complete") - case <-ticker.C: - job, err := k8sClient.GetJob(namespace, jobName) - if err != nil { - return fmt.Errorf("failed to get job status: %w", err) - } - - if job.Status.Succeeded > 0 { - return nil - } - - if job.Status.Failed > 0 { - // Get and print logs from failed job - log.Println() - log.Errorf("Job failed. Fetching logs...") - log.Println() - if err := printJobLogs(k8sClient, namespace, jobName, log); err != nil { - log.Warningf("Failed to fetch job logs: %v", err) - } - return fmt.Errorf("job failed") - } - - log.Debugf("Job status: Active=%d, Succeeded=%d, Failed=%d", - job.Status.Active, job.Status.Succeeded, job.Status.Failed) - } - } -} - -// printJobLogs retrieves and prints logs from all containers in a job's pods -func printJobLogs(k8sClient *k8s.Client, namespace, jobName string, log *logger.Logger) error { - // Get logs from all pods in the job - allPodLogs, err := k8sClient.GetJobLogs(namespace, jobName) - if err != nil { - return err - } - - // Print logs from each pod - for _, podLogs := range allPodLogs { - log.Infof("=== Logs from pod: %s ===", podLogs.PodName) - log.Println() - - // Print logs from each container - for _, containerLog := range podLogs.ContainerLogs { - containerType := "container" - if containerLog.IsInit { - containerType = "init container" - } - - log.Infof("--- Logs from %s: %s ---", containerType, containerLog.Name) - - // Print the actual logs - if containerLog.Logs != "" { - fmt.Println(containerLog.Logs) - } else { - log.Infof("(no logs)") - } - log.Println() - } - } - - return nil -} - -// promptForConfirmation prompts the user for confirmation and returns true if they confirm -func promptForConfirmation() bool { - reader := bufio.NewReader(os.Stdin) - fmt.Print("Do you want to continue? (yes/no): ") - - response, err := reader.ReadString('\n') - if err != nil { - return false +// buildRestoreContainers constructs containers for the restore job +func buildRestoreContainers(backupFile string, config *config.Config) []corev1.Container { + return []corev1.Container{ + { + Name: "restore", + Image: config.Stackgraph.Restore.Job.Image, + ImagePullPolicy: corev1.PullIfNotPresent, + SecurityContext: k8s.ConvertSecurityContext(config.Stackgraph.Restore.Job.ContainerSecurityContext), + Command: []string{"/backup-restore-scripts/restore-stackgraph-backup.sh"}, + Env: buildRestoreEnvVars(backupFile, config), + Resources: k8s.ConvertResources(config.Stackgraph.Restore.Job.Resources), + VolumeMounts: buildRestoreVolumeMounts(), + }, } - - response = strings.TrimSpace(strings.ToLower(response)) - return response == "yes" || response == "y" } diff --git a/cmd/victoriametrics/check_and_finalize.go b/cmd/victoriametrics/check_and_finalize.go new file mode 100644 index 0000000..04924bb --- /dev/null +++ b/cmd/victoriametrics/check_and_finalize.go @@ -0,0 +1,65 @@ +package victoriametrics + +import ( + "fmt" + "os" + + "github.com/spf13/cobra" + "github.com/stackvista/stackstate-backup-cli/internal/app" + "github.com/stackvista/stackstate-backup-cli/internal/foundation/config" + "github.com/stackvista/stackstate-backup-cli/internal/orchestration/restore" +) + +// Check and finalize command flags +var ( + checkJobName string + waitForJob bool +) + +func checkAndFinalizeCmd(globalFlags *config.CLIGlobalFlags) *cobra.Command { + cmd := &cobra.Command{ + Use: "check-and-finalize", + Short: "Check and finalize a VictoriaMetrics restore job", + Long: `Check the status of a background VictoriaMetrics restore job and clean up resources. + +This command is useful when a restore job was started with --background flag or was interrupted (Ctrl+C). +It will check the job status, print logs if it failed, and clean up the job resources. + +Examples: + # Check job status without waiting + sts-backup victoriametrics check-and-finalize --job victoriametrics-restore-20250128t143000 -n my-namespace + + # Wait for job completion and cleanup + sts-backup victoriametrics check-and-finalize --job victoriametrics-restore-20250128t143000 --wait -n my-namespace`, + Run: func(_ *cobra.Command, _ []string) { + appCtx, err := app.NewContext(globalFlags) + if err != nil { + _, _ = fmt.Fprintf(os.Stderr, "error: %v\n", err) + os.Exit(1) + } + if err := runCheckAndFinalize(appCtx); err != nil { + _, _ = fmt.Fprintf(os.Stderr, "error: %v\n", err) + os.Exit(1) + } + }, + } + + cmd.Flags().StringVarP(&checkJobName, "job", "j", "", "VictoriaMetrics restore job name (required)") + cmd.Flags().BoolVarP(&waitForJob, "wait", "w", false, "Wait for job to complete before cleanup") + _ = cmd.MarkFlagRequired("job") + + return cmd +} + +func runCheckAndFinalize(appCtx *app.Context) error { + return restore.CheckAndFinalize(restore.CheckAndFinalizeParams{ + K8sClient: appCtx.K8sClient, + Namespace: appCtx.Namespace, + JobName: checkJobName, + ServiceName: "victoria-metrics", + ScaleSelector: appCtx.Config.VictoriaMetrics.Restore.ScaleDownLabelSelector, + CleanupPVC: false, + WaitForJob: waitForJob, + Log: appCtx.Logger, + }) +} diff --git a/cmd/victoriametrics/list.go b/cmd/victoriametrics/list.go new file mode 100644 index 0000000..ea5d8d7 --- /dev/null +++ b/cmd/victoriametrics/list.go @@ -0,0 +1,127 @@ +package victoriametrics + +import ( + "context" + "fmt" + "os" + "sort" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/spf13/cobra" + "github.com/stackvista/stackstate-backup-cli/internal/app" + s3client "github.com/stackvista/stackstate-backup-cli/internal/clients/s3" + "github.com/stackvista/stackstate-backup-cli/internal/foundation/config" + "github.com/stackvista/stackstate-backup-cli/internal/foundation/output" + "github.com/stackvista/stackstate-backup-cli/internal/orchestration/portforward" +) + +const ( + vmBackupSuccessFile = "backup_complete.ignore" + vmHaMirrorMode = "mirror" +) + +func listCmd(globalFlags *config.CLIGlobalFlags) *cobra.Command { + return &cobra.Command{ + Use: "list", + Short: "List available VictoriaMetrics backups from S3/Minio", + Run: func(_ *cobra.Command, _ []string) { + appCtx, err := app.NewContext(globalFlags) + if err != nil { + _, _ = fmt.Fprintf(os.Stderr, "error: %v\n", err) + os.Exit(1) + } + if err := runList(appCtx); err != nil { + _, _ = fmt.Fprintf(os.Stderr, "error: %v\n", err) + os.Exit(1) + } + }, + } +} + +func runList(appCtx *app.Context) error { + // Setup port-forward to Minio + serviceName := appCtx.Config.Minio.Service.Name + localPort := appCtx.Config.Minio.Service.LocalPortForwardPort + remotePort := appCtx.Config.Minio.Service.Port + + pf, err := portforward.SetupPortForward(appCtx.K8sClient, appCtx.Namespace, serviceName, localPort, remotePort, appCtx.Logger) + if err != nil { + return err + } + defer close(pf.StopChan) + + var vmBackups []s3client.Object + // List objects in bucket + appCtx.Logger.Infof("Listing VictoriaMetrics backups in bucket ...") + if appCtx.Config.VictoriaMetrics.Restore.HaMode == vmHaMirrorMode { + appCtx.Logger.Println() + appCtx.Logger.Infof("NOTE: In HA mode, backups from both instances (victoria-metrics-0 and victoria-metrics-1) are listed.") + appCtx.Logger.Infof(" The restore command accepts either backup to restore both instances.") + } + appCtx.Logger.Println() + for _, s3Location := range appCtx.Config.VictoriaMetrics.S3Locations { + bucket := s3Location.Bucket + prefix := s3Location.Prefix + + input := &s3.ListObjectsV2Input{ + Bucket: aws.String(bucket), + Prefix: aws.String(prefix), + Delimiter: aws.String("/"), + } + + result, err := appCtx.S3Client.ListObjectsV2(context.Background(), input) + if err != nil { + return fmt.Errorf("failed to list S3 objects: %w", err) + } + + for _, key := range s3client.FilterByCommonPrefix(result.CommonPrefixes) { + vmBackups = append(vmBackups, s3client.Object{ + Key: fmt.Sprintf("%s/%s", bucket, key.Key), + LastModified: getVMBackupTime(appCtx.S3Client, bucket, key.Key), + }) + } + } + + if len(vmBackups) == 0 { + appCtx.Formatter.PrintMessage("No backups found") + return nil + } + + sort.Slice(vmBackups, func(i, j int) bool { + return vmBackups[i].LastModified.After(vmBackups[j].LastModified) + }) + + table := output.Table{ + Headers: []string{"NAME ({bucket}/{instance}-{created})", "UPDATED"}, + Rows: make([][]string, 0, len(vmBackups)), + } + + for _, obj := range vmBackups { + row := []string{ + obj.Key, + obj.LastModified.Format("2006-01-02 15:04:05 MST"), + } + table.Rows = append(table.Rows, row) + } + + return appCtx.Formatter.PrintTable(table) +} + +// getVMBackupTime extracts timestamp from the VM backup name +// The expected format is: victoria-metrics-(0|1)-20251030143500 +func getVMBackupTime(s3client s3client.Interface, bucket, key string) time.Time { + input := &s3.ListObjectsV2Input{ + Bucket: aws.String(bucket), + Prefix: aws.String(key + "/" + vmBackupSuccessFile), + } + + result, err := s3client.ListObjectsV2(context.Background(), input) + if err != nil || len(result.Contents) != 1 { + return time.Time{} + } + vmbackup := result.Contents[0] + + return *vmbackup.LastModified +} diff --git a/cmd/victoriametrics/restore.go b/cmd/victoriametrics/restore.go new file mode 100644 index 0000000..fb1c007 --- /dev/null +++ b/cmd/victoriametrics/restore.go @@ -0,0 +1,353 @@ +package victoriametrics + +import ( + "context" + "fmt" + "os" + "sort" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/spf13/cobra" + "github.com/stackvista/stackstate-backup-cli/internal/app" + "github.com/stackvista/stackstate-backup-cli/internal/clients/k8s" + s3client "github.com/stackvista/stackstate-backup-cli/internal/clients/s3" + "github.com/stackvista/stackstate-backup-cli/internal/foundation/config" + "github.com/stackvista/stackstate-backup-cli/internal/foundation/logger" + "github.com/stackvista/stackstate-backup-cli/internal/orchestration/portforward" + "github.com/stackvista/stackstate-backup-cli/internal/orchestration/restore" + "github.com/stackvista/stackstate-backup-cli/internal/orchestration/scale" + corev1 "k8s.io/api/core/v1" +) + +const ( + jobNameTemplate = "victoriametrics-restore" + configMapDefaultFileMode = 0755 +) + +// Restore command flags +var ( + archiveName string + useLatest bool + background bool + skipConfirmation bool +) + +func restoreCmd(globalFlags *config.CLIGlobalFlags) *cobra.Command { + cmd := &cobra.Command{ + Use: "restore", + Short: "Restore VictoriaMetrics from a backup archive", + Long: `Restore VictoriaMetrics data from a backup archive stored in S3/Minio. Can use --latest or --archive to specify which backup to restore.`, + Run: func(_ *cobra.Command, _ []string) { + appCtx, err := app.NewContext(globalFlags) + if err != nil { + _, _ = fmt.Fprintf(os.Stderr, "error: %v\n", err) + os.Exit(1) + } + if err := runRestore(appCtx); err != nil { + _, _ = fmt.Fprintf(os.Stderr, "error: %v\n", err) + os.Exit(1) + } + }, + } + + cmd.Flags().StringVar(&archiveName, "archive", "", "Specific archive to restore (e.g., sts-victoria-metrics-backup/victoria-metrics-0-20251030152500)") + cmd.Flags().BoolVar(&useLatest, "latest", false, "Restore from the most recent backup") + cmd.Flags().BoolVar(&background, "background", false, "Run restore job in background without waiting for completion") + cmd.Flags().BoolVarP(&skipConfirmation, "yes", "y", false, "Skip confirmation prompt") + cmd.MarkFlagsMutuallyExclusive("archive", "latest") + cmd.MarkFlagsOneRequired("archive", "latest") + + return cmd +} + +func runRestore(appCtx *app.Context) error { + // Determine which archive to restore + backupFile := archiveName + if useLatest { + appCtx.Logger.Infof("Finding latest backup...") + latest, err := getLatestBackup(appCtx.K8sClient, appCtx.Namespace, appCtx.Config, appCtx.Logger) + if err != nil { + return err + } + backupFile = latest + appCtx.Logger.Infof("Using latest backup: %s", backupFile) + } + + // Warn user and ask for confirmation + if !skipConfirmation { + appCtx.Logger.Println() + appCtx.Logger.Warningf("WARNING: Restoring from backup will PURGE all existing VictoriaMetrics data!") + appCtx.Logger.Warningf("This operation cannot be undone.") + appCtx.Logger.Println() + appCtx.Logger.Infof("Backup to restore: %s", backupFile) + appCtx.Logger.Infof("Namespace: %s", appCtx.Namespace) + appCtx.Logger.Println() + + if !restore.PromptForConfirmation() { + return fmt.Errorf("restore operation cancelled by user") + } + } + + // Scale down workload before restore + appCtx.Logger.Println() + scaleDownLabelSelector := appCtx.Config.VictoriaMetrics.Restore.ScaleDownLabelSelector + scaledStatefulSets, err := scale.ScaleDown(appCtx.K8sClient, appCtx.Namespace, scaleDownLabelSelector, appCtx.Logger) + if err != nil { + return err + } + + // Ensure workload are scaled back up on exit (even if restore fails) + defer func() { + if len(scaledStatefulSets) > 0 && !background { + appCtx.Logger.Println() + if err := scale.ScaleUpFromAnnotations(appCtx.K8sClient, appCtx.Namespace, scaleDownLabelSelector, appCtx.Logger); err != nil { + appCtx.Logger.Warningf("Failed to scale up workload: %v", err) + } + } + }() + + // Setup Kubernetes resources for restore job + appCtx.Logger.Println() + if err := restore.EnsureRestoreResources(appCtx.K8sClient, appCtx.Namespace, appCtx.Config, appCtx.Logger); err != nil { + return err + } + + // Create restore job + appCtx.Logger.Println() + appCtx.Logger.Infof("Creating restore job for backup: %s", backupFile) + + jobName := fmt.Sprintf("%s-%s", jobNameTemplate, time.Now().Format("20060102t150405")) + + if err = createRestoreJob(appCtx.K8sClient, appCtx.Namespace, jobName, backupFile, appCtx.Config); err != nil { + return fmt.Errorf("failed to create restore job: %w", err) + } + + appCtx.Logger.Successf("Restore job created: %s", jobName) + + if background { + restore.PrintRunningJobStatus(appCtx.Logger, "victoria-metrics", jobName, appCtx.Namespace, 0) + return nil + } + + return waitAndCleanupRestoreJob(appCtx.K8sClient, appCtx.Namespace, jobName, appCtx.Logger) +} + +// waitAndCleanupRestoreJob waits for job completion and cleans up resources +func waitAndCleanupRestoreJob(k8sClient *k8s.Client, namespace, jobName string, log *logger.Logger) error { + restore.PrintWaitingMessage(log, "victoria-metrics", jobName, namespace) + return restore.WaitAndCleanup(k8sClient, namespace, jobName, log, false) +} + +// getLatestBackup retrieves the most recent backup from S3 +func getLatestBackup(k8sClient *k8s.Client, namespace string, config *config.Config, log *logger.Logger) (string, error) { + // Setup port-forward to Minio + serviceName := config.Minio.Service.Name + localPort := config.Minio.Service.LocalPortForwardPort + remotePort := config.Minio.Service.Port + + pf, err := portforward.SetupPortForward(k8sClient, namespace, serviceName, localPort, remotePort, log) + if err != nil { + return "", err + } + defer close(pf.StopChan) + + // Create S3 client + endpoint := fmt.Sprintf("http://localhost:%d", pf.LocalPort) + s3Client, err := s3client.NewClient(endpoint, config.Minio.AccessKey, config.Minio.SecretKey) + if err != nil { + return "", err + } + + var vmBackups []s3client.Object + // List objects in bucket + log.Infof("Listing VictoriaMetrics backups in bucket ...") + for _, s3Location := range config.VictoriaMetrics.S3Locations { + bucket := s3Location.Bucket + prefix := s3Location.Prefix + + input := &s3.ListObjectsV2Input{ + Bucket: aws.String(bucket), + Prefix: aws.String(prefix), + Delimiter: aws.String("/"), + } + + result, err := s3Client.ListObjectsV2(context.Background(), input) + if err != nil { + return "", fmt.Errorf("failed to list S3 objects: %w", err) + } + + for _, key := range s3client.FilterByCommonPrefix(result.CommonPrefixes) { + vmBackups = append(vmBackups, s3client.Object{ + Key: fmt.Sprintf("%s/%s", bucket, key.Key), + LastModified: getVMBackupTime(s3Client, bucket, key.Key), + }) + } + } + + if len(vmBackups) == 0 { + return "", fmt.Errorf("no backups found") + } + + sort.Slice(vmBackups, func(i, j int) bool { + return vmBackups[i].LastModified.After(vmBackups[j].LastModified) + }) + + return vmBackups[0].Key, nil +} + +// createRestoreJob creates a Kubernetes Job for restoring from backup +func createRestoreJob(k8sClient *k8s.Client, namespace, jobName, backupFile string, config *config.Config) error { + defaultMode := int32(configMapDefaultFileMode) + + // Merge common labels with resource-specific labels + jobLabels := k8s.MergeLabels(config.Kubernetes.CommonLabels, config.VictoriaMetrics.Restore.Job.Labels) + + // Build job spec using configuration + spec := k8s.BackupJobSpec{ + Name: jobName, + Labels: jobLabels, + ImagePullSecrets: k8s.ConvertImagePullSecrets(config.VictoriaMetrics.Restore.Job.ImagePullSecrets), + SecurityContext: k8s.ConvertPodSecurityContext(&config.VictoriaMetrics.Restore.Job.SecurityContext), + NodeSelector: config.VictoriaMetrics.Restore.Job.NodeSelector, + Tolerations: k8s.ConvertTolerations(config.VictoriaMetrics.Restore.Job.Tolerations), + Affinity: k8s.ConvertAffinity(config.VictoriaMetrics.Restore.Job.Affinity), + Containers: buildRestoreContainers(backupFile, config), + InitContainers: buildRestoreInitContainers(config), + Volumes: buildRestoreVolumes(config, defaultMode), + } + + // Create job + _, err := k8sClient.CreateBackupJob(namespace, spec) + if err != nil { + return fmt.Errorf("failed to create job: %w", err) + } + + return nil +} + +// buildRestoreEnvVars constructs environment variables for the restore job +func buildRestoreEnvVars(config *config.Config) []corev1.EnvVar { + return []corev1.EnvVar{ + {Name: "MINIO_ENDPOINT", Value: fmt.Sprintf("%s:%d", config.Minio.Service.Name, config.Minio.Service.Port)}, + } +} + +// buildRestoreVolumeMounts constructs volume mounts for the restore job container +func buildRestoreVolumeMounts(vmPvc string) []corev1.VolumeMount { + return []corev1.VolumeMount{ + {Name: "backup-restore-scripts", MountPath: "/backup-restore-scripts"}, + {Name: "minio-keys", MountPath: "/aws-keys"}, + {Name: vmPvc, MountPath: "/storage"}, + } +} + +// buildRestoreInitContainers constructs init containers for the restore job +func buildRestoreInitContainers(config *config.Config) []corev1.Container { + return []corev1.Container{ + { + Name: "wait", + Image: config.VictoriaMetrics.Restore.Job.WaitImage, + ImagePullPolicy: corev1.PullIfNotPresent, + Command: []string{ + "sh", + "-c", + fmt.Sprintf("/entrypoint -c %s:%d -t 300", config.Minio.Service.Name, config.Minio.Service.Port), + }, + }, + } +} + +// buildRestoreVolumes constructs volumes for the restore job pod +func buildRestoreVolumes(config *config.Config, defaultMode int32) []corev1.Volume { + volumes := []corev1.Volume{ + { + Name: "backup-restore-scripts", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: restore.RestoreScriptsConfigMap, + }, + DefaultMode: &defaultMode, + }, + }, + }, + { + Name: "minio-keys", + VolumeSource: corev1.VolumeSource{ + Secret: &corev1.SecretVolumeSource{ + SecretName: restore.MinioKeysSecretName, + }, + }, + }, + { + Name: vmPvcName(config.VictoriaMetrics.Restore.PersistentVolumeClaimPrefix, 0), + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: vmPvcName(config.VictoriaMetrics.Restore.PersistentVolumeClaimPrefix, 0), + }, + }, + }, + } + + if config.VictoriaMetrics.Restore.HaMode == vmHaMirrorMode { + v := corev1.Volume{ + Name: vmPvcName(config.VictoriaMetrics.Restore.PersistentVolumeClaimPrefix, 1), + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: vmPvcName(config.VictoriaMetrics.Restore.PersistentVolumeClaimPrefix, 1), + }, + }, + } + volumes = append(volumes, []corev1.Volume{v}...) + } + + return volumes +} + +// buildRestoreContainers constructs containers for the restore job +func buildRestoreContainers(backupFile string, config *config.Config) []corev1.Container { + containers := []corev1.Container{ + { + Name: "restore", + Image: config.VictoriaMetrics.Restore.Job.Image, + ImagePullPolicy: corev1.PullIfNotPresent, + SecurityContext: k8s.ConvertSecurityContext(config.VictoriaMetrics.Restore.Job.ContainerSecurityContext), + Command: []string{ + "sh", + "/backup-restore-scripts/restore-victoria-metrics-backup.sh", + backupFile, + "127.0.0.1:8420", + }, + Env: buildRestoreEnvVars(config), + Resources: k8s.ConvertResources(config.VictoriaMetrics.Restore.Job.Resources), + VolumeMounts: buildRestoreVolumeMounts(vmPvcName(config.VictoriaMetrics.Restore.PersistentVolumeClaimPrefix, 0)), + }, + } + + if config.VictoriaMetrics.Restore.HaMode == vmHaMirrorMode { + containers = append(containers, []corev1.Container{ + { + Name: "restore-1", + Image: config.VictoriaMetrics.Restore.Job.Image, + ImagePullPolicy: corev1.PullIfNotPresent, + SecurityContext: k8s.ConvertSecurityContext(config.VictoriaMetrics.Restore.Job.ContainerSecurityContext), + Command: []string{ + "sh", + "/backup-restore-scripts/restore-victoria-metrics-backup.sh", + backupFile, + "127.0.0.1:8421", + }, + Env: buildRestoreEnvVars(config), + Resources: k8s.ConvertResources(config.VictoriaMetrics.Restore.Job.Resources), + VolumeMounts: buildRestoreVolumeMounts(vmPvcName(config.VictoriaMetrics.Restore.PersistentVolumeClaimPrefix, 1)), + }, + }...) + } + return containers +} + +func vmPvcName(prefix string, instance int) string { + return fmt.Sprintf("%svictoria-metrics-%d-0", prefix, instance) +} diff --git a/cmd/victoriametrics/victoriametrics.go b/cmd/victoriametrics/victoriametrics.go new file mode 100644 index 0000000..0b7b23b --- /dev/null +++ b/cmd/victoriametrics/victoriametrics.go @@ -0,0 +1,19 @@ +package victoriametrics + +import ( + "github.com/spf13/cobra" + "github.com/stackvista/stackstate-backup-cli/internal/foundation/config" +) + +func Cmd(globalFlags *config.CLIGlobalFlags) *cobra.Command { + cmd := &cobra.Command{ + Use: "victoria-metrics", + Short: "VictoriaMetrics backup and restore operations", + } + + cmd.AddCommand(listCmd(globalFlags)) + cmd.AddCommand(restoreCmd(globalFlags)) + cmd.AddCommand(checkAndFinalizeCmd(globalFlags)) + + return cmd +} diff --git a/internal/clients/k8s/client.go b/internal/clients/k8s/client.go index 7eaf677..5a5a597 100644 --- a/internal/clients/k8s/client.go +++ b/internal/clients/k8s/client.go @@ -11,6 +11,7 @@ import ( "os" "path/filepath" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" @@ -151,119 +152,256 @@ const ( PreRestoreReplicasAnnotation = "stackstate.com/pre-restore-replicas" ) -// DeploymentScale holds the name and original replica count of a deployment -type DeploymentScale struct { +// AppsScale holds the name and original replica count of a scalable resource +type AppsScale struct { Name string Replicas int32 } -// ScaleDownDeployments scales down deployments matching a label selector to 0 replicas -// Returns a map of deployment names to their original replica counts -func (c *Client) ScaleDownDeployments(namespace, labelSelector string) ([]DeploymentScale, error) { - ctx := context.Background() +// ScalableResource abstracts operations on resources that can be scaled +type ScalableResource interface { + GetName() string + GetReplicas() int32 + SetReplicas(replicas int32) + GetAnnotations() map[string]string + SetAnnotations(annotations map[string]string) + Update(ctx context.Context, client kubernetes.Interface, namespace string) error +} - // List deployments matching the label selector - deployments, err := c.clientset.AppsV1().Deployments(namespace).List(ctx, metav1.ListOptions{ - LabelSelector: labelSelector, - }) - if err != nil { - return nil, fmt.Errorf("failed to list deployments: %w", err) +// deploymentAdapter adapts appsv1.Deployment to ScalableResource interface +type deploymentAdapter struct { + deployment *appsv1.Deployment +} + +func (d *deploymentAdapter) GetName() string { + return d.deployment.Name +} + +func (d *deploymentAdapter) GetReplicas() int32 { + if d.deployment.Spec.Replicas == nil { + return 0 } + return *d.deployment.Spec.Replicas +} - if len(deployments.Items) == 0 { - return []DeploymentScale{}, nil +func (d *deploymentAdapter) SetReplicas(replicas int32) { + d.deployment.Spec.Replicas = &replicas +} + +func (d *deploymentAdapter) GetAnnotations() map[string]string { + return d.deployment.Annotations +} + +func (d *deploymentAdapter) SetAnnotations(annotations map[string]string) { + d.deployment.Annotations = annotations +} + +func (d *deploymentAdapter) Update(ctx context.Context, client kubernetes.Interface, namespace string) error { + _, err := client.AppsV1().Deployments(namespace).Update(ctx, d.deployment, metav1.UpdateOptions{}) + return err +} + +// statefulSetAdapter adapts appsv1.StatefulSet to ScalableResource interface +type statefulSetAdapter struct { + statefulSet *appsv1.StatefulSet +} + +func (s *statefulSetAdapter) GetName() string { + return s.statefulSet.Name +} + +func (s *statefulSetAdapter) GetReplicas() int32 { + if s.statefulSet.Spec.Replicas == nil { + return 0 } + return *s.statefulSet.Spec.Replicas +} - var scaledDeployments []DeploymentScale +func (s *statefulSetAdapter) SetReplicas(replicas int32) { + s.statefulSet.Spec.Replicas = &replicas +} - // Scale down each deployment - for _, deployment := range deployments.Items { - originalReplicas := int32(0) - if deployment.Spec.Replicas != nil { - originalReplicas = *deployment.Spec.Replicas - } +func (s *statefulSetAdapter) GetAnnotations() map[string]string { + return s.statefulSet.Annotations +} + +func (s *statefulSetAdapter) SetAnnotations(annotations map[string]string) { + s.statefulSet.Annotations = annotations +} + +func (s *statefulSetAdapter) Update(ctx context.Context, client kubernetes.Interface, namespace string) error { + _, err := client.AppsV1().StatefulSets(namespace).Update(ctx, s.statefulSet, metav1.UpdateOptions{}) + return err +} + +// scaleDownResources is a generic function that scales down resources to 0 replicas +func scaleDownResources(ctx context.Context, client kubernetes.Interface, namespace string, resources []ScalableResource) ([]AppsScale, error) { + if len(resources) == 0 { + return []AppsScale{}, nil + } + + var scaledResources []AppsScale + + for _, resource := range resources { + originalReplicas := resource.GetReplicas() // Store original replica count - scaledDeployments = append(scaledDeployments, DeploymentScale{ - Name: deployment.Name, + scaledResources = append(scaledResources, AppsScale{ + Name: resource.GetName(), Replicas: originalReplicas, }) // Scale to 0 if not already at 0 if originalReplicas > 0 { // Add annotation with original replica count - if deployment.Annotations == nil { - deployment.Annotations = make(map[string]string) + annotations := resource.GetAnnotations() + if annotations == nil { + annotations = make(map[string]string) } - deployment.Annotations[PreRestoreReplicasAnnotation] = fmt.Sprintf("%d", originalReplicas) - - replicas := int32(0) - deployment.Spec.Replicas = &replicas + annotations[PreRestoreReplicasAnnotation] = fmt.Sprintf("%d", originalReplicas) + resource.SetAnnotations(annotations) + resource.SetReplicas(0) - _, err := c.clientset.AppsV1().Deployments(namespace).Update(ctx, &deployment, metav1.UpdateOptions{}) - if err != nil { - return scaledDeployments, fmt.Errorf("failed to scale down deployment %s: %w", deployment.Name, err) + if err := resource.Update(ctx, client, namespace); err != nil { + return scaledResources, fmt.Errorf("failed to scale down resource %s: %w", resource.GetName(), err) } } } - return scaledDeployments, nil + return scaledResources, nil } -// ScaleUpDeploymentsFromAnnotations scales up deployments that have the pre-restore-replicas annotation -// Returns a list of deployments that were scaled up with their replica counts -func (c *Client) ScaleUpDeploymentsFromAnnotations(namespace, labelSelector string) ([]DeploymentScale, error) { - ctx := context.Background() - - // List deployments matching the label selector - deployments, err := c.clientset.AppsV1().Deployments(namespace).List(ctx, metav1.ListOptions{ - LabelSelector: labelSelector, - }) - if err != nil { - return nil, fmt.Errorf("failed to list deployments: %w", err) - } - - if len(deployments.Items) == 0 { - return []DeploymentScale{}, nil +// scaleUpResourcesFromAnnotations is a generic function that scales up resources based on annotations +func scaleUpResourcesFromAnnotations(ctx context.Context, client kubernetes.Interface, namespace string, resources []ScalableResource) ([]AppsScale, error) { + if len(resources) == 0 { + return []AppsScale{}, nil } - var scaledDeployments []DeploymentScale + var scaledResources []AppsScale - // Scale up each deployment that has the annotation - for _, deployment := range deployments.Items { - if deployment.Annotations == nil { + for _, resource := range resources { + annotations := resource.GetAnnotations() + if annotations == nil { continue } - replicasStr, exists := deployment.Annotations[PreRestoreReplicasAnnotation] + replicasStr, exists := annotations[PreRestoreReplicasAnnotation] if !exists { continue } var originalReplicas int32 if _, err := fmt.Sscanf(replicasStr, "%d", &originalReplicas); err != nil { - return scaledDeployments, fmt.Errorf("failed to parse replicas annotation for deployment %s: %w", deployment.Name, err) + return scaledResources, fmt.Errorf("failed to parse replicas annotation for resource %s: %w", resource.GetName(), err) } // Scale up to original replica count - deployment.Spec.Replicas = &originalReplicas + resource.SetReplicas(originalReplicas) // Remove the annotation - delete(deployment.Annotations, PreRestoreReplicasAnnotation) + delete(annotations, PreRestoreReplicasAnnotation) + resource.SetAnnotations(annotations) - _, err := c.clientset.AppsV1().Deployments(namespace).Update(ctx, &deployment, metav1.UpdateOptions{}) - if err != nil { - return scaledDeployments, fmt.Errorf("failed to scale up deployment %s: %w", deployment.Name, err) + if err := resource.Update(ctx, client, namespace); err != nil { + return scaledResources, fmt.Errorf("failed to scale up resource %s: %w", resource.GetName(), err) } - // Record scaled deployment - scaledDeployments = append(scaledDeployments, DeploymentScale{ - Name: deployment.Name, + // Record scaled resource + scaledResources = append(scaledResources, AppsScale{ + Name: resource.GetName(), Replicas: originalReplicas, }) } - return scaledDeployments, nil + return scaledResources, nil +} + +// ScaleDownDeployments scales down deployments matching a label selector to 0 replicas +// Returns a list of deployment names and their original replica counts +func (c *Client) ScaleDownDeployments(namespace, labelSelector string) ([]AppsScale, error) { + ctx := context.Background() + + // List deployments matching the label selector + deployments, err := c.clientset.AppsV1().Deployments(namespace).List(ctx, metav1.ListOptions{ + LabelSelector: labelSelector, + }) + if err != nil { + return nil, fmt.Errorf("failed to list deployments: %w", err) + } + + // Convert to ScalableResource slice + resources := make([]ScalableResource, len(deployments.Items)) + for i := range deployments.Items { + resources[i] = &deploymentAdapter{deployment: &deployments.Items[i]} + } + + return scaleDownResources(ctx, c.clientset, namespace, resources) +} + +// ScaleUpDeploymentsFromAnnotations scales up deployments that have the pre-restore-replicas annotation +// Returns a list of deployments that were scaled up with their replica counts +func (c *Client) ScaleUpDeploymentsFromAnnotations(namespace, labelSelector string) ([]AppsScale, error) { + ctx := context.Background() + + // List deployments matching the label selector + deployments, err := c.clientset.AppsV1().Deployments(namespace).List(ctx, metav1.ListOptions{ + LabelSelector: labelSelector, + }) + if err != nil { + return nil, fmt.Errorf("failed to list deployments: %w", err) + } + + // Convert to ScalableResource slice + resources := make([]ScalableResource, len(deployments.Items)) + for i := range deployments.Items { + resources[i] = &deploymentAdapter{deployment: &deployments.Items[i]} + } + + return scaleUpResourcesFromAnnotations(ctx, c.clientset, namespace, resources) +} + +// ScaleDownStatefulSets scales down statefulsets matching a label selector to 0 replicas +// Returns a list of statefulset names and their original replica counts +func (c *Client) ScaleDownStatefulSets(namespace, labelSelector string) ([]AppsScale, error) { + ctx := context.Background() + + // List statefulsets matching the label selector + statefulSets, err := c.clientset.AppsV1().StatefulSets(namespace).List(ctx, metav1.ListOptions{ + LabelSelector: labelSelector, + }) + if err != nil { + return nil, fmt.Errorf("failed to list statefulsets: %w", err) + } + + // Convert to ScalableResource slice + resources := make([]ScalableResource, len(statefulSets.Items)) + for i := range statefulSets.Items { + resources[i] = &statefulSetAdapter{statefulSet: &statefulSets.Items[i]} + } + + return scaleDownResources(ctx, c.clientset, namespace, resources) +} + +// ScaleUpStatefulSetsFromAnnotations scales up statefulsets that have the pre-restore-replicas annotation +// Returns a list of statefulsets that were scaled up with their replica counts +func (c *Client) ScaleUpStatefulSetsFromAnnotations(namespace, labelSelector string) ([]AppsScale, error) { + ctx := context.Background() + + // List statefulsets matching the label selector + statefulSets, err := c.clientset.AppsV1().StatefulSets(namespace).List(ctx, metav1.ListOptions{ + LabelSelector: labelSelector, + }) + if err != nil { + return nil, fmt.Errorf("failed to list statefulsets: %w", err) + } + + // Convert to ScalableResource slice + resources := make([]ScalableResource, len(statefulSets.Items)) + for i := range statefulSets.Items { + resources[i] = &statefulSetAdapter{statefulSet: &statefulSets.Items[i]} + } + + return scaleUpResourcesFromAnnotations(ctx, c.clientset, namespace, resources) } // NewTestClient creates a k8s Client for testing with a fake clientset. diff --git a/internal/clients/k8s/client_test.go b/internal/clients/k8s/client_test.go index f9f66f4..eb4a5a7 100644 --- a/internal/clients/k8s/client_test.go +++ b/internal/clients/k8s/client_test.go @@ -19,7 +19,7 @@ func TestClient_ScaleDownDeployments(t *testing.T) { namespace string labelSelector string deployments []appsv1.Deployment - expectedScales []DeploymentScale + expectedScales []AppsScale expectError bool }{ { @@ -30,7 +30,7 @@ func TestClient_ScaleDownDeployments(t *testing.T) { createDeployment("deploy1", "test-ns", map[string]string{"app": "test"}, 3), createDeployment("deploy2", "test-ns", map[string]string{"app": "test"}, 5), }, - expectedScales: []DeploymentScale{ + expectedScales: []AppsScale{ {Name: "deploy1", Replicas: 3}, {Name: "deploy2", Replicas: 5}, }, @@ -43,7 +43,7 @@ func TestClient_ScaleDownDeployments(t *testing.T) { deployments: []appsv1.Deployment{ createDeployment("deploy1", "test-ns", map[string]string{"app": "test"}, 0), }, - expectedScales: []DeploymentScale{ + expectedScales: []AppsScale{ {Name: "deploy1", Replicas: 0}, }, expectError: false, @@ -53,7 +53,7 @@ func TestClient_ScaleDownDeployments(t *testing.T) { namespace: "test-ns", labelSelector: "app=nonexistent", deployments: []appsv1.Deployment{}, - expectedScales: []DeploymentScale{}, + expectedScales: []AppsScale{}, expectError: false, }, { @@ -64,7 +64,7 @@ func TestClient_ScaleDownDeployments(t *testing.T) { createDeployment("deploy1", "test-ns", map[string]string{"app": "test"}, 3), createDeployment("deploy2", "test-ns", map[string]string{"app": "other"}, 2), }, - expectedScales: []DeploymentScale{ + expectedScales: []AppsScale{ {Name: "deploy1", Replicas: 3}, }, expectError: false, @@ -126,7 +126,7 @@ func TestClient_ScaleUpDeploymentsFromAnnotations(t *testing.T) { namespace string labelSelector string deployments []appsv1.Deployment - expectedScales []DeploymentScale + expectedScales []AppsScale expectError bool errorContains string }{ @@ -146,7 +146,7 @@ func TestClient_ScaleUpDeploymentsFromAnnotations(t *testing.T) { return d }(), }, - expectedScales: []DeploymentScale{ + expectedScales: []AppsScale{ {Name: "deploy1", Replicas: 3}, {Name: "deploy2", Replicas: 5}, }, @@ -159,7 +159,7 @@ func TestClient_ScaleUpDeploymentsFromAnnotations(t *testing.T) { deployments: []appsv1.Deployment{ createDeployment("deploy1", "test-ns", map[string]string{"app": "test"}, 0), }, - expectedScales: []DeploymentScale{}, + expectedScales: []AppsScale{}, expectError: false, }, { @@ -174,7 +174,7 @@ func TestClient_ScaleUpDeploymentsFromAnnotations(t *testing.T) { }(), createDeployment("deploy2", "test-ns", map[string]string{"app": "test"}, 0), }, - expectedScales: []DeploymentScale{ + expectedScales: []AppsScale{ {Name: "deploy1", Replicas: 3}, }, expectError: false, @@ -190,7 +190,7 @@ func TestClient_ScaleUpDeploymentsFromAnnotations(t *testing.T) { return d }(), }, - expectedScales: []DeploymentScale{}, + expectedScales: []AppsScale{}, expectError: true, errorContains: "failed to parse replicas annotation", }, @@ -205,7 +205,7 @@ func TestClient_ScaleUpDeploymentsFromAnnotations(t *testing.T) { return d }(), }, - expectedScales: []DeploymentScale{ + expectedScales: []AppsScale{ {Name: "deploy1", Replicas: 0}, }, expectError: false, @@ -215,7 +215,7 @@ func TestClient_ScaleUpDeploymentsFromAnnotations(t *testing.T) { namespace: "test-ns", labelSelector: "app=test", deployments: []appsv1.Deployment{}, - expectedScales: []DeploymentScale{}, + expectedScales: []AppsScale{}, expectError: false, }, } diff --git a/internal/clients/k8s/interface.go b/internal/clients/k8s/interface.go index 13cfee8..2ddc1a4 100644 --- a/internal/clients/k8s/interface.go +++ b/internal/clients/k8s/interface.go @@ -13,8 +13,12 @@ type Interface interface { PortForwardService(namespace, serviceName string, localPort, remotePort int) (stopChan chan struct{}, readyChan chan struct{}, err error) // Deployment scaling operations - ScaleDownDeployments(namespace, labelSelector string) ([]DeploymentScale, error) - ScaleUpDeploymentsFromAnnotations(namespace, labelSelector string) ([]DeploymentScale, error) + ScaleDownDeployments(namespace, labelSelector string) ([]AppsScale, error) + ScaleUpDeploymentsFromAnnotations(namespace, labelSelector string) ([]AppsScale, error) + + // StatefulSet scaling operations + ScaleDownStatefulSets(namespace, labelSelector string) ([]AppsScale, error) + ScaleUpStatefulSetsFromAnnotations(namespace, labelSelector string) ([]AppsScale, error) } // Ensure *Client implements Interface diff --git a/internal/clients/k8s/job.go b/internal/clients/k8s/job.go index e976f1a..4e8b0c6 100644 --- a/internal/clients/k8s/job.go +++ b/internal/clients/k8s/job.go @@ -11,8 +11,8 @@ import ( ) const ( - // defaultJobTTLSeconds is the time-to-live for completed/failed jobs (10 minutes) - defaultJobTTLSeconds = 600 + // defaultJobTTLSeconds is the time-to-live for completed/failed jobs (1 day) + defaultJobTTLSeconds = 86400 ) // BackupJobSpec contains all parameters needed to create a backup/restore job @@ -30,11 +30,7 @@ type BackupJobSpec struct { ContainerSecurityContext *corev1.SecurityContext // Container spec - Image string - Command []string - Env []corev1.EnvVar - Resources corev1.ResourceRequirements - VolumeMounts []corev1.VolumeMount + Containers []corev1.Container InitContainers []corev1.Container // Volumes @@ -110,18 +106,8 @@ func (c *Client) CreateBackupJob(namespace string, spec BackupJobSpec) (*batchv1 Spec: corev1.PodSpec{ RestartPolicy: corev1.RestartPolicyNever, InitContainers: spec.InitContainers, - Containers: []corev1.Container{ - { - Name: "restore", - Image: spec.Image, - ImagePullPolicy: corev1.PullIfNotPresent, - Command: spec.Command, - Env: spec.Env, - Resources: spec.Resources, - VolumeMounts: spec.VolumeMounts, - }, - }, - Volumes: spec.Volumes, + Containers: spec.Containers, + Volumes: spec.Volumes, }, }, }, @@ -132,11 +118,6 @@ func (c *Client) CreateBackupJob(namespace string, spec BackupJobSpec) (*batchv1 job.Spec.Template.Spec.SecurityContext = spec.SecurityContext } - // Apply container security context if provided - if spec.ContainerSecurityContext != nil { - job.Spec.Template.Spec.Containers[0].SecurityContext = spec.ContainerSecurityContext - } - // Apply node selector if provided if len(spec.NodeSelector) > 0 { job.Spec.Template.Spec.NodeSelector = spec.NodeSelector diff --git a/internal/clients/k8s/job_test.go b/internal/clients/k8s/job_test.go index 7c9c1bf..0dbde62 100644 --- a/internal/clients/k8s/job_test.go +++ b/internal/clients/k8s/job_test.go @@ -119,9 +119,11 @@ func TestClient_CreateBackupJob(t *testing.T) { spec: BackupJobSpec{ Name: "backup-job", Labels: map[string]string{"app": "backup"}, - Image: "backup:latest", - Command: []string{ - "/bin/sh", "-c", "echo 'backup complete'", + Containers: []corev1.Container{ + { + Name: "backup", + Image: "backup:latest", + }, }, }, expectError: false, @@ -129,7 +131,7 @@ func TestClient_CreateBackupJob(t *testing.T) { assert.Equal(t, "backup-job", job.Name) assert.Equal(t, map[string]string{"app": "backup"}, job.Labels) assert.Equal(t, int32(1), *job.Spec.BackoffLimit) - assert.Equal(t, int32(600), *job.Spec.TTLSecondsAfterFinished) + assert.Equal(t, int32(defaultJobTTLSeconds), *job.Spec.TTLSecondsAfterFinished) assert.Equal(t, corev1.RestartPolicyNever, job.Spec.Template.Spec.RestartPolicy) }, }, @@ -137,12 +139,16 @@ func TestClient_CreateBackupJob(t *testing.T) { name: "create job with environment variables", namespace: "test-ns", spec: BackupJobSpec{ - Name: "restore-job", - Image: "restore:v1", - Command: []string{"/restore.sh"}, - Env: []corev1.EnvVar{ - {Name: "BACKUP_NAME", Value: "snapshot-123"}, - {Name: "LOG_LEVEL", Value: "debug"}, + Name: "restore-job", + Containers: []corev1.Container{ + { + Name: "restore", + Image: "restore:v1", + Env: []corev1.EnvVar{ + {Name: "BACKUP_NAME", Value: "snapshot-123"}, + {Name: "LOG_LEVEL", Value: "debug"}, + }, + }, }, }, expectError: false, @@ -156,17 +162,21 @@ func TestClient_CreateBackupJob(t *testing.T) { name: "create job with resource requirements", namespace: "test-ns", spec: BackupJobSpec{ - Name: "resource-job", - Image: "backup:latest", - Command: []string{"/backup.sh"}, - Resources: corev1.ResourceRequirements{ - Requests: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("1"), - corev1.ResourceMemory: resource.MustParse("2Gi"), - }, - Limits: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("2"), - corev1.ResourceMemory: resource.MustParse("4Gi"), + Name: "resource-job", + Containers: []corev1.Container{ + { + Name: "backup", + Image: "backup:latest", + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("2Gi"), + }, + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("2"), + corev1.ResourceMemory: resource.MustParse("4Gi"), + }, + }, }, }, }, @@ -181,9 +191,13 @@ func TestClient_CreateBackupJob(t *testing.T) { name: "create job with init containers", namespace: "test-ns", spec: BackupJobSpec{ - Name: "init-job", - Image: "main:latest", - Command: []string{"/main.sh"}, + Name: "init-job", + Containers: []corev1.Container{ + { + Name: "main", + Image: "main:latest", + }, + }, InitContainers: []corev1.Container{ { Name: "wait-for-deps", @@ -202,12 +216,16 @@ func TestClient_CreateBackupJob(t *testing.T) { name: "create job with volumes and mounts", namespace: "test-ns", spec: BackupJobSpec{ - Name: "volume-job", - Image: "backup:latest", - Command: []string{"/backup.sh"}, - VolumeMounts: []corev1.VolumeMount{ - {Name: "data", MountPath: "/data"}, - {Name: "config", MountPath: "/config"}, + Name: "volume-job", + Containers: []corev1.Container{ + { + Name: "backup", + Image: "backup:latest", + VolumeMounts: []corev1.VolumeMount{ + {Name: "data", MountPath: "/data"}, + {Name: "config", MountPath: "/config"}, + }, + }, }, Volumes: []corev1.Volume{ { @@ -240,18 +258,22 @@ func TestClient_CreateBackupJob(t *testing.T) { name: "create job with security context", namespace: "test-ns", spec: BackupJobSpec{ - Name: "secure-job", - Image: "backup:latest", - Command: []string{"/backup.sh"}, + Name: "secure-job", + Containers: []corev1.Container{ + { + Name: "backup", + Image: "backup:latest", + SecurityContext: &corev1.SecurityContext{ + AllowPrivilegeEscalation: ptr(false), + ReadOnlyRootFilesystem: ptr(true), + }, + }, + }, SecurityContext: &corev1.PodSecurityContext{ RunAsUser: ptr(int64(1000)), RunAsGroup: ptr(int64(2000)), FSGroup: ptr(int64(3000)), }, - ContainerSecurityContext: &corev1.SecurityContext{ - AllowPrivilegeEscalation: ptr(false), - ReadOnlyRootFilesystem: ptr(true), - }, }, expectError: false, validateFunc: func(t *testing.T, job *batchv1.Job) { @@ -265,9 +287,13 @@ func TestClient_CreateBackupJob(t *testing.T) { name: "create job with node selector and tolerations", namespace: "test-ns", spec: BackupJobSpec{ - Name: "scheduled-job", - Image: "backup:latest", - Command: []string{"/backup.sh"}, + Name: "scheduled-job", + Containers: []corev1.Container{ + { + Name: "backup", + Image: "backup:latest", + }, + }, NodeSelector: map[string]string{ "disktype": "ssd", "zone": "us-west-1a", @@ -292,9 +318,13 @@ func TestClient_CreateBackupJob(t *testing.T) { name: "create job with image pull secrets", namespace: "test-ns", spec: BackupJobSpec{ - Name: "private-image-job", - Image: "private-registry.com/backup:latest", - Command: []string{"/backup.sh"}, + Name: "private-image-job", + Containers: []corev1.Container{ + { + Name: "backup", + Image: "private-registry.com/backup:latest", + }, + }, ImagePullSecrets: []corev1.LocalObjectReference{ {Name: "registry-secret"}, }, @@ -428,5 +458,5 @@ func TestClient_GetJob_NotFound(t *testing.T) { // TestClient_DefaultJobTTL tests the default TTL constant func TestClient_DefaultJobTTL(t *testing.T) { - assert.Equal(t, int32(600), int32(defaultJobTTLSeconds)) + assert.Equal(t, int32(86400), int32(defaultJobTTLSeconds)) } diff --git a/internal/clients/s3/filter.go b/internal/clients/s3/filter.go index 32cafac..5787352 100644 --- a/internal/clients/s3/filter.go +++ b/internal/clients/s3/filter.go @@ -140,3 +140,19 @@ func getBaseName(key string) (string, bool) { return key, false } + +func FilterByCommonPrefix(objects []s3types.CommonPrefix) []Object { + var filteredObjects []Object + + for _, obj := range objects { + key := aws.ToString(obj.Prefix) + key = strings.TrimSuffix(key, "/") + filteredObjects = append(filteredObjects, Object{ + Key: key, + LastModified: aws.ToTime(nil), + Size: 0, + }) + } + + return filteredObjects +} diff --git a/internal/foundation/config/config.go b/internal/foundation/config/config.go index d083340..c1d6bc5 100644 --- a/internal/foundation/config/config.go +++ b/internal/foundation/config/config.go @@ -16,10 +16,11 @@ import ( // Config represents the merged configuration from ConfigMap and Secret type Config struct { - Kubernetes KubernetesConfig `yaml:"kubernetes"` - Elasticsearch ElasticsearchConfig `yaml:"elasticsearch" validate:"required"` - Minio MinioConfig `yaml:"minio" validate:"required"` - Stackgraph StackgraphConfig `yaml:"stackgraph" validate:"required"` + Kubernetes KubernetesConfig `yaml:"kubernetes"` + Elasticsearch ElasticsearchConfig `yaml:"elasticsearch" validate:"required"` + Minio MinioConfig `yaml:"minio" validate:"required"` + Stackgraph StackgraphConfig `yaml:"stackgraph" validate:"required"` + VictoriaMetrics VictoriaMetricsConfig `yaml:"victoriaMetrics" validate:"required"` } // KubernetesConfig holds Kubernetes-wide configuration @@ -89,6 +90,24 @@ type StackgraphConfig struct { Restore StackgraphRestoreConfig `yaml:"restore" validate:"required"` } +type VictoriaMetricsConfig struct { + S3Locations []S3Location `yaml:"S3Locations" validate:"required"` + Restore VictoriaMetricsRestoreConfig `yaml:"restore" validate:"required"` +} + +// VictoriaMetricsRestoreConfig holds VictoriaMetrics restore-specific configuration +type VictoriaMetricsRestoreConfig struct { + HaMode string `yaml:"haMode" validate:"required"` + PersistentVolumeClaimPrefix string `yaml:"persistentVolumeClaimPrefix" validate:"required"` + ScaleDownLabelSelector string `yaml:"scaleDownLabelSelector" validate:"required"` + Job JobConfig `yaml:"job" validate:"required"` +} + +type S3Location struct { + Bucket string `yaml:"bucket" validate:"required"` + Prefix string `yaml:"prefix"` +} + // StackgraphRestoreConfig holds Stackgraph restore-specific configuration type StackgraphRestoreConfig struct { ScaleDownLabelSelector string `yaml:"scaleDownLabelSelector" validate:"required"` diff --git a/internal/foundation/config/config_test.go b/internal/foundation/config/config_test.go index 2cd91c8..af5086b 100644 --- a/internal/foundation/config/config_test.go +++ b/internal/foundation/config/config_test.go @@ -394,6 +394,37 @@ func TestConfig_StructValidation(t *testing.T) { }, }, }, + VictoriaMetrics: VictoriaMetricsConfig{ + S3Locations: []S3Location{ + { + Bucket: "vm-backup", + Prefix: "victoria-metrics-0", + }, + { + Bucket: "vm-backup", + Prefix: "victoria-metrics-1", + }, + }, + Restore: VictoriaMetricsRestoreConfig{ + HaMode: "mirror", + PersistentVolumeClaimPrefix: "database-victoria-metrics-", + ScaleDownLabelSelector: "app=victoria-metrics", + Job: JobConfig{ + Image: "vm-backup:latest", + WaitImage: "wait:latest", + Resources: ResourceRequirements{ + Limits: ResourceList{ + CPU: "1", + Memory: "2Gi", + }, + Requests: ResourceList{ + CPU: "500m", + Memory: "1Gi", + }, + }, + }, + }, + }, }, expectError: false, }, diff --git a/internal/foundation/config/testdata/validConfigMapConfig.yaml b/internal/foundation/config/testdata/validConfigMapConfig.yaml index ebff615..66115c3 100644 --- a/internal/foundation/config/testdata/validConfigMapConfig.yaml +++ b/internal/foundation/config/testdata/validConfigMapConfig.yaml @@ -106,3 +106,33 @@ stackgraph: size: "10Gi" accessModes: - ReadWriteOnce + +# VictoriaMetrics backup configuration +victoriaMetrics: + # S3 locations for VictoriaMetrics backups (one per instance) + S3Locations: + - bucket: sts-victoria-metrics-backup + prefix: victoria-metrics-0 + - bucket: sts-victoria-metrics-backup + prefix: victoria-metrics-1 + # Restore configuration + restore: + # HA mode for VictoriaMetrics (mirror = two independent instances) + haMode: "mirror" + # PVC prefix for VictoriaMetrics StatefulSet PVCs + persistentVolumeClaimPrefix: "database-victoria-metrics-" + # Label selector for deployments to scale down during restore + scaleDownLabelSelector: "observability.suse.com/scalable-during-vm-restore=true" + # Job configuration + job: + labels: + app: victoria-metrics-restore + image: quay.io/stackstate/victoria-metrics-backup:latest + waitImage: quay.io/stackstate/wait:latest + resources: + limits: + cpu: "1" + memory: "2Gi" + requests: + cpu: "500m" + memory: "1Gi" diff --git a/internal/foundation/config/testdata/validConfigMapOnly.yaml b/internal/foundation/config/testdata/validConfigMapOnly.yaml index df61a1a..9aa76d6 100644 --- a/internal/foundation/config/testdata/validConfigMapOnly.yaml +++ b/internal/foundation/config/testdata/validConfigMapOnly.yaml @@ -102,3 +102,27 @@ stackgraph: size: "10Gi" accessModes: - ReadWriteOnce + +# VictoriaMetrics backup configuration +victoriaMetrics: + S3Locations: + - bucket: sts-victoria-metrics-backup + prefix: victoria-metrics-0 + - bucket: sts-victoria-metrics-backup + prefix: victoria-metrics-1 + restore: + haMode: "mirror" + persistentVolumeClaimPrefix: "database-victoria-metrics-" + scaleDownLabelSelector: "observability.suse.com/scalable-during-vm-restore=true" + job: + labels: + app: victoria-metrics-restore + image: quay.io/stackstate/victoria-metrics-backup:latest + waitImage: quay.io/stackstate/wait:latest + resources: + limits: + cpu: "1" + memory: "2Gi" + requests: + cpu: "500m" + memory: "1Gi" diff --git a/internal/orchestration/restore/confirmation.go b/internal/orchestration/restore/confirmation.go new file mode 100644 index 0000000..aca6624 --- /dev/null +++ b/internal/orchestration/restore/confirmation.go @@ -0,0 +1,22 @@ +package restore + +import ( + "bufio" + "fmt" + "os" + "strings" +) + +// PromptForConfirmation prompts the user for confirmation and returns true if they confirm +func PromptForConfirmation() bool { + reader := bufio.NewReader(os.Stdin) + fmt.Print("Do you want to continue? (yes/no): ") + + response, err := reader.ReadString('\n') + if err != nil { + return false + } + + response = strings.TrimSpace(strings.ToLower(response)) + return response == "yes" || response == "y" +} diff --git a/internal/orchestration/restore/finalize.go b/internal/orchestration/restore/finalize.go new file mode 100644 index 0000000..832791b --- /dev/null +++ b/internal/orchestration/restore/finalize.go @@ -0,0 +1,157 @@ +package restore + +import ( + "fmt" + + "github.com/stackvista/stackstate-backup-cli/internal/clients/k8s" + "github.com/stackvista/stackstate-backup-cli/internal/foundation/logger" + "github.com/stackvista/stackstate-backup-cli/internal/orchestration/scale" + batchv1 "k8s.io/api/batch/v1" +) + +// IsJobComplete checks if a job is in a terminal state +// Returns (completed, succeeded) where: +// - completed: true if job is in a terminal state (succeeded or failed) +// - succeeded: true if job completed successfully +func IsJobComplete(job *batchv1.Job) (completed bool, succeeded bool) { + if job.Status.Succeeded > 0 { + return true, true + } + if job.Status.Failed > 0 { + return true, false + } + return false, false +} + +// HandleCompletedJobParams contains parameters for HandleCompletedJob +type HandleCompletedJobParams struct { + K8sClient *k8s.Client + Namespace string + JobName string + ServiceName string + ScaleSelector string + CleanupPVC bool + Log *logger.Logger + JobSucceeded bool +} + +// HandleCompletedJob handles a job that's already complete +// This includes printing status, fetching logs on failure, scaling up, and cleanup +func HandleCompletedJob(params HandleCompletedJobParams) error { + params.Log.Println() + if params.JobSucceeded { + params.Log.Successf("Job completed successfully: %s", params.JobName) + params.Log.Println() + + // Scale up deployments that were scaled down before restore + if err := scale.ScaleUpFromAnnotations(params.K8sClient, params.Namespace, params.ScaleSelector, params.Log); err != nil { + params.Log.Warningf("Failed to scale up workload: %v", err) + } + } else { + params.Log.Errorf("Job failed: %s", params.JobName) + params.Log.Println() + params.Log.Infof("Fetching logs...") + params.Log.Println() + if err := PrintJobLogs(params.K8sClient, params.Namespace, params.JobName, params.Log); err != nil { + params.Log.Warningf("Failed to fetch logs: %v", err) + } + } + + // Cleanup resources + params.Log.Println() + return CleanupResources(params.K8sClient, params.Namespace, params.JobName, "", params.Log, params.CleanupPVC) +} + +// WaitAndFinalizeParams contains parameters for WaitAndFinalize +type WaitAndFinalizeParams struct { + K8sClient *k8s.Client + Namespace string + JobName string + ServiceName string + ScaleSelector string + CleanupPVC bool + Log *logger.Logger +} + +// WaitAndFinalize waits for job completion and then cleans up +func WaitAndFinalize(params WaitAndFinalizeParams) error { + PrintWaitingMessage(params.Log, params.ServiceName, params.JobName, params.Namespace) + + if err := WaitForJobCompletion(params.K8sClient, params.Namespace, params.JobName, params.Log); err != nil { + params.Log.Errorf("Job failed: %v", err) + // Still cleanup even if failed + params.Log.Println() + _ = CleanupResources(params.K8sClient, params.Namespace, params.JobName, "", params.Log, params.CleanupPVC) + return err + } + + params.Log.Println() + params.Log.Successf("Job completed successfully: %s", params.JobName) + params.Log.Println() + + // Scale up deployments that were scaled down before restore + if err := scale.ScaleUpFromAnnotations(params.K8sClient, params.Namespace, params.ScaleSelector, params.Log); err != nil { + params.Log.Warningf("Failed to scale up workload: %v", err) + } + + params.Log.Println() + return CleanupResources(params.K8sClient, params.Namespace, params.JobName, "", params.Log, params.CleanupPVC) +} + +// CheckAndFinalizeParams contains parameters for CheckAndFinalize +type CheckAndFinalizeParams struct { + K8sClient *k8s.Client + Namespace string + JobName string + ServiceName string + ScaleSelector string + CleanupPVC bool + WaitForJob bool + Log *logger.Logger +} + +// CheckAndFinalize checks the status of a background restore job and cleans up resources +// This is useful when a restore job was started with --background flag or was interrupted (Ctrl+C) +func CheckAndFinalize(params CheckAndFinalizeParams) error { + // Get job + params.Log.Infof("Checking status of job: %s", params.JobName) + job, err := params.K8sClient.GetJob(params.Namespace, params.JobName) + if err != nil { + return fmt.Errorf("failed to get job '%s': %w (job may not exist or has been deleted)", params.JobName, err) + } + + // Check if job is already complete + completed, succeeded := IsJobComplete(job) + + if completed { + // Job already finished - print status and cleanup + return HandleCompletedJob(HandleCompletedJobParams{ + K8sClient: params.K8sClient, + Namespace: params.Namespace, + JobName: params.JobName, + ServiceName: params.ServiceName, + ScaleSelector: params.ScaleSelector, + CleanupPVC: params.CleanupPVC, + Log: params.Log, + JobSucceeded: succeeded, + }) + } + + // Job still running + if params.WaitForJob { + // Wait for completion, then cleanup + return WaitAndFinalize(WaitAndFinalizeParams{ + K8sClient: params.K8sClient, + Namespace: params.Namespace, + JobName: params.JobName, + ServiceName: params.ServiceName, + ScaleSelector: params.ScaleSelector, + CleanupPVC: params.CleanupPVC, + Log: params.Log, + }) + } + + // Not waiting - just print status + PrintRunningJobStatus(params.Log, params.ServiceName, params.JobName, params.Namespace, job.Status.Active) + return nil +} diff --git a/internal/orchestration/restore/job.go b/internal/orchestration/restore/job.go new file mode 100644 index 0000000..023f84b --- /dev/null +++ b/internal/orchestration/restore/job.go @@ -0,0 +1,134 @@ +package restore + +import ( + "fmt" + "time" + + "github.com/stackvista/stackstate-backup-cli/internal/clients/k8s" + "github.com/stackvista/stackstate-backup-cli/internal/foundation/logger" +) + +const ( + defaultJobCompletionTimeout = 30 * time.Minute + defaultJobStatusCheckInterval = 10 * time.Second +) + +// WaitForJobCompletion waits for a Kubernetes job to complete +func WaitForJobCompletion(k8sClient *k8s.Client, namespace, jobName string, log *logger.Logger) error { + timeout := time.After(defaultJobCompletionTimeout) + ticker := time.NewTicker(defaultJobStatusCheckInterval) + defer ticker.Stop() + + for { + select { + case <-timeout: + return fmt.Errorf("timeout waiting for job to complete") + case <-ticker.C: + job, err := k8sClient.GetJob(namespace, jobName) + if err != nil { + return fmt.Errorf("failed to get job status: %w", err) + } + + if job.Status.Succeeded > 0 { + return nil + } + + if job.Status.Failed > 0 { + // Get and print logs from failed job + log.Println() + log.Errorf("Job failed. Fetching logs...") + log.Println() + if err := PrintJobLogs(k8sClient, namespace, jobName, log); err != nil { + log.Warningf("Failed to fetch job logs: %v", err) + } + return fmt.Errorf("job failed") + } + + log.Debugf("Job status: Active=%d, Succeeded=%d, Failed=%d", + job.Status.Active, job.Status.Succeeded, job.Status.Failed) + } + } +} + +// PrintJobLogs retrieves and prints logs from all containers in a job's pods +func PrintJobLogs(k8sClient *k8s.Client, namespace, jobName string, log *logger.Logger) error { + // Get logs from all pods in the job + allPodLogs, err := k8sClient.GetJobLogs(namespace, jobName) + if err != nil { + return err + } + + // Print logs from each pod + for _, podLogs := range allPodLogs { + log.Infof("=== Logs from pod: %s ===", podLogs.PodName) + log.Println() + + // Print logs from each container + for _, containerLog := range podLogs.ContainerLogs { + containerType := "container" + if containerLog.IsInit { + containerType = "init container" + } + + log.Infof("--- Logs from %s: %s ---", containerType, containerLog.Name) + + // Print the actual logs + if containerLog.Logs != "" { + fmt.Println(containerLog.Logs) + } else { + log.Infof("(no logs)") + } + log.Println() + } + } + + return nil +} + +// PrintWaitingMessage prints waiting message with instructions for interruption +func PrintWaitingMessage(log *logger.Logger, serviceName, jobName, namespace string) { + log.Println() + log.Infof("Waiting for restore job to complete (this may take significant amount of time depending on the archive size)...") + log.Println() + log.Infof("You can safely interrupt this command with Ctrl+C.") + log.Infof("To check status, scale up the required deployments and cleanup later, run:") + log.Infof(" sts-backup %s check-and-finalize --job %s --wait -n %s", serviceName, jobName, namespace) +} + +// PrintRunningJobStatus prints status and instructions for a running job +func PrintRunningJobStatus(log *logger.Logger, serviceName, jobName, namespace string, activePods int32) { + log.Println() + log.Infof("Job is running in background: %s", jobName) + if activePods > 0 { + log.Infof(" Active pods: %d", activePods) + } + log.Println() + log.Infof("Monitoring commands:") + log.Infof(" kubectl logs --follow job/%s -n %s", jobName, namespace) + log.Infof(" kubectl get job %s -n %s", jobName, namespace) + log.Println() + log.Infof("To wait for completion, scaling up the necessary deployments and cleanup, run:") + log.Infof(" sts-backup %s check-and-finalize --job %s --wait -n %s", serviceName, jobName, namespace) +} + +// WaitAndCleanup waits for job completion and cleans up resources +func WaitAndCleanup(k8sClient *k8s.Client, namespace, jobName string, log *logger.Logger, cleanupPVC bool) error { + if err := WaitForJobCompletion(k8sClient, namespace, jobName, log); err != nil { + log.Errorf("Job failed: %v", err) + log.Println() + log.Infof("Cleanup commands:") + if cleanupPVC { + log.Infof(" kubectl delete job,pvc %s -n %s", jobName, namespace) + } else { + log.Infof(" kubectl delete job %s -n %s", jobName, namespace) + } + return err + } + + log.Println() + log.Successf("Restore completed successfully") + + // Cleanup resources + log.Println() + return CleanupResources(k8sClient, namespace, jobName, "", log, cleanupPVC) +} diff --git a/internal/orchestration/restore/resources.go b/internal/orchestration/restore/resources.go new file mode 100644 index 0000000..74f56a5 --- /dev/null +++ b/internal/orchestration/restore/resources.go @@ -0,0 +1,87 @@ +package restore + +import ( + "fmt" + + "github.com/stackvista/stackstate-backup-cli/internal/clients/k8s" + "github.com/stackvista/stackstate-backup-cli/internal/foundation/config" + "github.com/stackvista/stackstate-backup-cli/internal/foundation/logger" + "github.com/stackvista/stackstate-backup-cli/internal/scripts" +) + +const ( + // MinioKeysSecretName is the name of the secret containing Minio access/secret keys + MinioKeysSecretName = "suse-observability-backup-cli-minio-keys" //nolint:gosec // This is a Kubernetes secret name, not a credential + // RestoreScriptsConfigMap is the name of the ConfigMap containing restore scripts + RestoreScriptsConfigMap = "suse-observability-backup-cli-restore-scripts" +) + +// EnsureRestoreResources ensures that required Kubernetes resources exist for the restore job +func EnsureRestoreResources(k8sClient *k8s.Client, namespace string, config *config.Config, log *logger.Logger) error { + // Ensure backup scripts ConfigMap exists + log.Infof("Ensuring backup scripts ConfigMap exists...") + + scriptNames, err := scripts.ListScripts() + if err != nil { + return fmt.Errorf("failed to list embedded scripts: %w", err) + } + + scriptsData := make(map[string]string) + for _, scriptName := range scriptNames { + scriptContent, err := scripts.GetScript(scriptName) + if err != nil { + return fmt.Errorf("failed to get script %s: %w", scriptName, err) + } + scriptsData[scriptName] = string(scriptContent) + } + + configMapLabels := k8s.MergeLabels(config.Kubernetes.CommonLabels, map[string]string{}) + if _, err := k8sClient.EnsureConfigMap(namespace, RestoreScriptsConfigMap, scriptsData, configMapLabels); err != nil { + return fmt.Errorf("failed to ensure backup scripts ConfigMap: %w", err) + } + log.Successf("Backup scripts ConfigMap ready") + + // Ensure Minio keys secret exists + log.Infof("Ensuring Minio keys secret exists...") + + secretData := map[string][]byte{ + "accesskey": []byte(config.Minio.AccessKey), + "secretkey": []byte(config.Minio.SecretKey), + } + + secretLabels := k8s.MergeLabels(config.Kubernetes.CommonLabels, map[string]string{}) + if _, err := k8sClient.EnsureSecret(namespace, MinioKeysSecretName, secretData, secretLabels); err != nil { + return fmt.Errorf("failed to ensure Minio keys secret: %w", err) + } + log.Successf("Minio keys secret ready") + + return nil +} + +// CleanupResources cleans up job and optionally PVC resources +// If pvcName is empty, no PVC cleanup is attempted +func CleanupResources(k8sClient *k8s.Client, namespace, jobName, pvcName string, log *logger.Logger, cleanupPVC bool) error { + log.Infof("Cleaning up resources...") + + // Delete job + if err := k8sClient.DeleteJob(namespace, jobName); err != nil { + log.Warningf("Failed to delete job: %v", err) + } else { + log.Successf("Job deleted: %s", jobName) + } + + // Delete PVC if requested + if cleanupPVC { + // Use jobName as PVC name if pvcName not specified + if pvcName == "" { + pvcName = jobName + } + if err := k8sClient.DeletePVC(namespace, pvcName); err != nil { + log.Warningf("Failed to delete PVC: %v", err) + } else { + log.Successf("PVC deleted: %s", pvcName) + } + } + + return nil +} diff --git a/internal/orchestration/scale/scale.go b/internal/orchestration/scale/scale.go index 4a07ae4..4a111cf 100644 --- a/internal/orchestration/scale/scale.go +++ b/internal/orchestration/scale/scale.go @@ -21,7 +21,7 @@ const ( // It waits for all pods to terminate before returning. // //nolint:revive // Package name "scale" with function "ScaleDown" is intentionally verbose for clarity -func ScaleDown(k8sClient *k8s.Client, namespace, labelSelector string, log *logger.Logger) ([]k8s.DeploymentScale, error) { +func ScaleDown(k8sClient *k8s.Client, namespace, labelSelector string, log *logger.Logger) ([]k8s.AppsScale, error) { log.Infof("Scaling down deployments (selector: %s)...", labelSelector) scaledDeployments, err := k8sClient.ScaleDownDeployments(namespace, labelSelector) @@ -29,9 +29,18 @@ func ScaleDown(k8sClient *k8s.Client, namespace, labelSelector string, log *logg return nil, fmt.Errorf("failed to scale down deployments: %w", err) } - if len(scaledDeployments) == 0 { - log.Infof("No deployments found to scale down") - return scaledDeployments, nil + scaledApps := scaledDeployments + + scaledStatefulSets, err := k8sClient.ScaleDownStatefulSets(namespace, labelSelector) + if err != nil { + return nil, fmt.Errorf("failed to scale down statefulsets: %w", err) + } + + scaledApps = append(scaledApps, scaledStatefulSets...) + + if len(scaledApps) == 0 { + log.Infof("No deployments or statefulsets found to scale down") + return scaledApps, nil } log.Successf("Scaled down %d deployment(s):", len(scaledDeployments)) @@ -39,12 +48,17 @@ func ScaleDown(k8sClient *k8s.Client, namespace, labelSelector string, log *logg log.Infof(" - %s (replicas: %d -> 0)", dep.Name, dep.Replicas) } + log.Successf("Scaled down %d statefulsets(s):", len(scaledStatefulSets)) + for _, dep := range scaledStatefulSets { + log.Infof(" - %s (replicas: %d -> 0)", dep.Name, dep.Replicas) + } + // Wait for pods to terminate if err := waitForPodsToTerminate(k8sClient, namespace, labelSelector, log); err != nil { - return scaledDeployments, fmt.Errorf("failed waiting for pods to terminate: %w", err) + return scaledApps, fmt.Errorf("failed waiting for pods to terminate: %w", err) } - return scaledDeployments, nil + return scaledApps, nil } // waitForPodsToTerminate polls for pod termination until all pods matching the label selector are gone @@ -101,8 +115,13 @@ func ScaleUpFromAnnotations(k8sClient *k8s.Client, namespace, labelSelector stri return fmt.Errorf("failed to scale up deployments from annotations: %w", err) } - if len(scaledDeployments) == 0 { - log.Infof("No deployments found with pre-restore annotations to scale up") + scaledStatefulSets, err := k8sClient.ScaleUpStatefulSetsFromAnnotations(namespace, labelSelector) + if err != nil { + return fmt.Errorf("failed to scale up statefulsets from annotations: %w", err) + } + + if len(scaledDeployments) == 0 && len(scaledStatefulSets) == 0 { + log.Infof("No statefulsets found with pre-restore annotations to scale up") return nil } @@ -111,5 +130,10 @@ func ScaleUpFromAnnotations(k8sClient *k8s.Client, namespace, labelSelector stri log.Infof(" - %s (replicas: 0 -> %d)", dep.Name, dep.Replicas) } + log.Successf("Scaled up %d statefulset(s) successfully:", len(scaledStatefulSets)) + for _, dep := range scaledStatefulSets { + log.Infof(" - %s (replicas: 0 -> %d)", dep.Name, dep.Replicas) + } + return nil } diff --git a/internal/scripts/scripts/restore-victoria-metrics-backup.sh b/internal/scripts/scripts/restore-victoria-metrics-backup.sh new file mode 100644 index 0000000..ee2e4f1 --- /dev/null +++ b/internal/scripts/scripts/restore-victoria-metrics-backup.sh @@ -0,0 +1,17 @@ +#!/usr/bin/env sh +set -eo + +if [ "$#" -lt 2 ]; then + echo "Usage: $0 " + echo "Example: $0 sts-victoria-metrics-backup/victoria-metrics-1-20251030143500 127.0.0.1:8421" + exit 1 +fi + +S3_LOCATION=$1 +METRICS_ADDR=$2 +export AWS_ACCESS_KEY_ID +AWS_ACCESS_KEY_ID="$(cat /aws-keys/accesskey)" +export AWS_SECRET_ACCESS_KEY +AWS_SECRET_ACCESS_KEY="$(cat /aws-keys/secretkey)" + +/vmrestore-prod -storageDataPath=/storage -src="s3://$S3_LOCATION" -customS3Endpoint="http://$MINIO_ENDPOINT" -httpListenAddr "$METRICS_ADDR"