Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions cmd/elasticsearch/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,7 @@ func (m *mockESClientForRestore) ConfigureSLMPolicy(_, _, _, _, _, _ string, _,
}

func (m *mockESClientForRestore) GetRestoreStatus(_, _ string) (string, bool, error) {
return "NOT_FOUND", true, nil
}

func (m *mockESClientForRestore) IsRestoreInProgress(_, _ string) (bool, error) {
return false, nil
return "SUCCESS", true, nil
}

// TestRestoreCmd_Unit tests the command structure
Expand Down
92 changes: 30 additions & 62 deletions internal/clients/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,7 @@ import (
// Restore status constants
const (
StatusSuccess = "SUCCESS"
StatusFailed = "FAILED"
StatusInProgress = "IN_PROGRESS"
StatusNotFound = "NOT_FOUND"
StatusPartial = "PARTIAL"
StatusStarted = "STARTED"
StatusInit = "INIT"
)

// Client represents an Elasticsearch client
Expand Down Expand Up @@ -362,79 +357,52 @@ func (c *Client) RestoreSnapshot(repository, snapshotName, indicesPattern string
return nil
}

// RestoreStatusResponse represents the response from Elasticsearch restore status API
type RestoreStatusResponse struct {
Snapshots []struct {
Snapshot string `json:"snapshot"`
State string `json:"state"`
Shards struct {
Total int `json:"total"`
Failed int `json:"failed"`
Successful int `json:"successful"`
} `json:"shards_stats"`
} `json:"snapshots"`
// RecoveryInfo represents the recovery status of a shard from _cat/recovery API
type RecoveryInfo struct {
Index string `json:"index"`
Shard string `json:"shard"`
Type string `json:"type"`
Stage string `json:"stage"`
Repository string `json:"repository"`
Snapshot string `json:"snapshot"`
}

// GetRestoreStatus checks the status of a restore operation
// GetRestoreStatus checks the status of a restore operation by examining active shard recoveries.
// When a snapshot is being restored, shards are recovered with type "snapshot".
// Returns: (statusMessage, isComplete, error)
// Status can be: "IN_PROGRESS", "SUCCESS", "FAILED", "NOT_FOUND"
// Status can be: "IN_PROGRESS", "SUCCESS"
func (c *Client) GetRestoreStatus(repository, snapshotName string) (string, bool, error) {
res, err := c.es.Snapshot.Status(
c.es.Snapshot.Status.WithContext(context.Background()),
c.es.Snapshot.Status.WithRepository(repository),
c.es.Snapshot.Status.WithSnapshot(snapshotName),
// Use _cat/recovery API to check for active snapshot recoveries.
// This shows shards that are currently being recovered from a snapshot.
res, err := c.es.Cat.Recovery(
c.es.Cat.Recovery.WithContext(context.Background()),
c.es.Cat.Recovery.WithFormat("json"),
c.es.Cat.Recovery.WithActiveOnly(true),
c.es.Cat.Recovery.WithH("index,shard,type,stage,repository,snapshot"),
)
if err != nil {
return "", false, fmt.Errorf("failed to get restore status: %w", err)
return "", false, fmt.Errorf("failed to get recovery status: %w", err)
}
defer res.Body.Close()

// 404 means no restore is in progress
if res.StatusCode == http.StatusNotFound {
return StatusNotFound, true, nil
}

if res.IsError() {
return "", false, fmt.Errorf("elasticsearch returned error: %s", res.String())
}

var statusResp RestoreStatusResponse
if err := json.NewDecoder(res.Body).Decode(&statusResp); err != nil {
var recoveries []RecoveryInfo
if err := json.NewDecoder(res.Body).Decode(&recoveries); err != nil {
return "", false, fmt.Errorf("failed to decode response: %w", err)
}

// If no snapshots are being restored, it's complete
if len(statusResp.Snapshots) == 0 {
return StatusSuccess, true, nil
}

// Check the state of the snapshot
snapshotStatus := statusResp.Snapshots[0]
state := snapshotStatus.State

switch state {
case StatusSuccess, StatusPartial:
return StatusSuccess, true, nil
case StatusFailed:
return StatusFailed, true, nil
case StatusInProgress, StatusStarted, StatusInit:
return StatusInProgress, false, nil
default:
return state, false, nil
}
}

// IsRestoreInProgress checks if a restore operation is currently in progress
func (c *Client) IsRestoreInProgress(repository, snapshotName string) (bool, error) {
status, isComplete, err := c.GetRestoreStatus(repository, snapshotName)
if err != nil {
return false, err
// Check if any active recovery is from the specified snapshot
for _, recovery := range recoveries {
if recovery.Type == "snapshot" &&
recovery.Repository == repository &&
recovery.Snapshot == snapshotName {
return StatusInProgress, false, nil
}
}

// If status is NOT_FOUND or complete, no restore in progress
if status == "NOT_FOUND" || isComplete {
return false, nil
}

return true, nil
// No active recoveries from this snapshot - restore is complete
return StatusSuccess, true, nil
}
1 change: 0 additions & 1 deletion internal/clients/elasticsearch/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ type Interface interface {
GetSnapshot(repository, snapshotName string) (*Snapshot, error)
RestoreSnapshot(repository, snapshotName, indicesPattern string) error
GetRestoreStatus(repository, snapshotName string) (string, bool, error)
IsRestoreInProgress(repository, snapshotName string) (bool, error)

// Index operations
ListIndices(pattern string) ([]string, error)
Expand Down