Skip to content

Commit 2d1e391

Browse files
committed
PR feedback
1 parent 3a8d3b5 commit 2d1e391

File tree

7 files changed

+96
-43
lines changed

7 files changed

+96
-43
lines changed

internal/diagutil/context.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package diagutil
2+
3+
import (
4+
"context"
5+
6+
fwdiag "github.com/hashicorp/terraform-plugin-framework/diag"
7+
"github.com/hashicorp/terraform-plugin-log/tflog"
8+
)
9+
10+
var contextDeadlineExceededDiags = FrameworkDiagFromError(context.DeadlineExceeded)
11+
12+
func ContainsContextDeadlineExceeded(ctx context.Context, diags fwdiag.Diagnostics) bool {
13+
if len(contextDeadlineExceededDiags) == 0 {
14+
tflog.Error(ctx, "Expected context deadline exceeded diagnostics to contain at least one error")
15+
return false
16+
}
17+
18+
return diags.Contains(contextDeadlineExceededDiags[0])
19+
}

internal/elasticsearch/ml/datafeed_state/create.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ func (r *mlDatafeedStateResource) Create(ctx context.Context, req resource.Creat
2525
}
2626

2727
diags = r.update(ctx, req.Plan, &resp.State, createTimeout)
28-
if diags.Contains(diagutil.FrameworkDiagFromError(context.DeadlineExceeded)[0]) {
28+
if diagutil.ContainsContextDeadlineExceeded(ctx, diags) {
2929
diags.AddError("Operation timed out", fmt.Sprintf("The operation to create the ML datafeed state timed out after %s. You may need to allocate more free memory within ML nodes by either closing other jobs, or increasing the overall ML memory. You may retry the operation.", createTimeout))
3030
}
3131

internal/elasticsearch/ml/datafeed_state/models.go

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ package datafeed_state
22

33
import (
44
"strconv"
5+
"time"
56

7+
"github.com/elastic/terraform-provider-elasticstack/internal/models"
68
"github.com/elastic/terraform-provider-elasticstack/internal/utils"
79
"github.com/elastic/terraform-provider-elasticstack/internal/utils/customtypes"
810
"github.com/hashicorp/terraform-plugin-framework-timeouts/resource/timeouts"
@@ -23,15 +25,15 @@ type MLDatafeedStateData struct {
2325
Timeouts timeouts.Value `tfsdk:"timeouts"`
2426
}
2527

26-
func (d MLDatafeedStateData) GetStartAsString() (string, diag.Diagnostics) {
28+
func (d *MLDatafeedStateData) GetStartAsString() (string, diag.Diagnostics) {
2729
return d.getTimeAttributeAsString(d.Start)
2830
}
2931

30-
func (d MLDatafeedStateData) GetEndAsString() (string, diag.Diagnostics) {
32+
func (d *MLDatafeedStateData) GetEndAsString() (string, diag.Diagnostics) {
3133
return d.getTimeAttributeAsString(d.End)
3234
}
3335

34-
func (d MLDatafeedStateData) getTimeAttributeAsString(val timetypes.RFC3339) (string, diag.Diagnostics) {
36+
func (d *MLDatafeedStateData) getTimeAttributeAsString(val timetypes.RFC3339) (string, diag.Diagnostics) {
3537
if !utils.IsKnown(val) {
3638
return "", nil
3739
}
@@ -42,3 +44,20 @@ func (d MLDatafeedStateData) getTimeAttributeAsString(val timetypes.RFC3339) (st
4244
}
4345
return strconv.FormatInt(valTime.Unix(), 10), nil
4446
}
47+
48+
func (d *MLDatafeedStateData) SetStartAndEndFromAPI(datafeedStats *models.DatafeedStats) diag.Diagnostics {
49+
var diags diag.Diagnostics
50+
if datafeedStats.RunningState == nil {
51+
diags.AddWarning("Running state was empty for a started datafeed", "The Elasticsearch API returned an empty running state for a Datafeed which was successfully started. Ignoring start and end response values.")
52+
return diags
53+
}
54+
55+
d.Start = timetypes.NewRFC3339TimeValue(time.UnixMilli(datafeedStats.RunningState.SearchInterval.StartMS))
56+
if datafeedStats.RunningState.RealTimeConfigured {
57+
d.End = timetypes.NewRFC3339Null()
58+
} else {
59+
d.End = timetypes.NewRFC3339TimeValue(time.UnixMilli(datafeedStats.RunningState.SearchInterval.EndMS))
60+
}
61+
62+
return diags
63+
}

internal/elasticsearch/ml/datafeed_state/read.go

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,11 @@ package datafeed_state
22

33
import (
44
"context"
5-
"time"
65

76
"github.com/elastic/terraform-provider-elasticstack/internal/clients"
87
"github.com/elastic/terraform-provider-elasticstack/internal/clients/elasticsearch"
8+
"github.com/elastic/terraform-provider-elasticstack/internal/diagutil"
99
"github.com/elastic/terraform-provider-elasticstack/internal/elasticsearch/ml/datafeed"
10-
"github.com/hashicorp/terraform-plugin-framework-timetypes/timetypes"
1110
"github.com/hashicorp/terraform-plugin-framework/diag"
1211
"github.com/hashicorp/terraform-plugin-framework/resource"
1312
"github.com/hashicorp/terraform-plugin-framework/types"
@@ -56,17 +55,17 @@ func (r *mlDatafeedStateResource) read(ctx context.Context, data MLDatafeedState
5655
// Update the data with current information
5756
data.State = types.StringValue(datafeedStats.State)
5857

59-
if datafeed.State(datafeedStats.State) == datafeed.StateStarted {
60-
if datafeedStats.RunningState == nil {
61-
diags.AddWarning("Running state was empty for a started datafeed", "The Elasticsearch API returned an empty running state for a Datafeed which was successfully started. Ignoring start and end response values.")
62-
}
58+
// Regenerate composite ID to ensure it's current
59+
compId, sdkDiags := client.ID(ctx, datafeedId)
60+
diags.Append(diagutil.FrameworkDiagsFromSDK(sdkDiags)...)
61+
if diags.HasError() {
62+
return nil, diags
63+
}
64+
65+
data.Id = types.StringValue(compId.String())
6366

64-
data.Start = timetypes.NewRFC3339TimeValue(time.UnixMilli(datafeedStats.RunningState.SearchInterval.StartMS))
65-
if datafeedStats.RunningState.RealTimeConfigured {
66-
data.End = timetypes.NewRFC3339Null()
67-
} else {
68-
data.End = timetypes.NewRFC3339TimeValue(time.UnixMilli(datafeedStats.RunningState.SearchInterval.EndMS))
69-
}
67+
if datafeed.State(datafeedStats.State) == datafeed.StateStarted {
68+
diags.Append(data.SetStartAndEndFromAPI(datafeedStats)...)
7069
}
7170

7271
return &data, diags

internal/elasticsearch/ml/datafeed_state/update.go

Lines changed: 41 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/elastic/terraform-provider-elasticstack/internal/clients/elasticsearch"
1010
"github.com/elastic/terraform-provider-elasticstack/internal/diagutil"
1111
"github.com/elastic/terraform-provider-elasticstack/internal/elasticsearch/ml/datafeed"
12+
"github.com/elastic/terraform-provider-elasticstack/internal/models"
1213
"github.com/hashicorp/terraform-plugin-framework-timetypes/timetypes"
1314
"github.com/hashicorp/terraform-plugin-framework/diag"
1415
"github.com/hashicorp/terraform-plugin-framework/resource"
@@ -33,7 +34,7 @@ func (r *mlDatafeedStateResource) Update(ctx context.Context, req resource.Updat
3334
}
3435

3536
diags = r.update(ctx, req.Plan, &resp.State, updateTimeout)
36-
if diags.Contains(diagutil.FrameworkDiagFromError(context.DeadlineExceeded)[0]) {
37+
if diagutil.ContainsContextDeadlineExceeded(ctx, diags) {
3738
diags.AddError("Operation timed out", fmt.Sprintf("The operation to update the ML datafeed state timed out after %s. You may need to allocate more free memory within ML nodes by either closing other jobs, or increasing the overall ML memory. You may retry the operation.", updateTimeout))
3839
}
3940

@@ -106,33 +107,12 @@ func (r *mlDatafeedStateResource) update(ctx context.Context, plan tfsdk.Plan, s
106107
return diags
107108
}
108109
} else {
109-
statsAfterUpdate, fwDiags := elasticsearch.GetDatafeedStats(ctx, client, datafeedId)
110-
diags.Append(fwDiags...)
110+
var updateDiags diag.Diagnostics
111+
finalData, updateDiags = r.updateAfterMissedTransition(ctx, client, data, datafeedStats)
112+
diags.Append(updateDiags...)
111113
if diags.HasError() {
112114
return diags
113115
}
114-
115-
if datafeedStats == nil {
116-
diags.AddError(
117-
"ML Datafeed not found",
118-
fmt.Sprintf("ML datafeed %s does not exist after successful update", datafeedId),
119-
)
120-
return diags
121-
}
122-
123-
if statsAfterUpdate.TimingStats.SearchCount < datafeedStats.TimingStats.SearchCount {
124-
diags.AddError(
125-
"Datafeed did not successfully transition to the desired state",
126-
fmt.Sprintf("[%s] datafeed did not settle into the [%s] state. The current state is [%s]", datafeedId, desiredState, statsAfterUpdate.State),
127-
)
128-
return diags
129-
}
130-
131-
if data.Start.IsUnknown() {
132-
data.Start = timetypes.NewRFC3339Null()
133-
}
134-
135-
finalData = &data
136116
}
137117

138118
if finalData == nil {
@@ -144,6 +124,42 @@ func (r *mlDatafeedStateResource) update(ctx context.Context, plan tfsdk.Plan, s
144124
return diags
145125
}
146126

127+
func (r *mlDatafeedStateResource) updateAfterMissedTransition(ctx context.Context, client *clients.ApiClient, data MLDatafeedStateData, datafeedStats *models.DatafeedStats) (*MLDatafeedStateData, diag.Diagnostics) {
128+
datafeedId := data.DatafeedId.ValueString()
129+
statsAfterUpdate, diags := elasticsearch.GetDatafeedStats(ctx, client, datafeedId)
130+
if diags.HasError() {
131+
return nil, diags
132+
}
133+
134+
if statsAfterUpdate == nil {
135+
diags.AddError(
136+
"ML Datafeed not found",
137+
fmt.Sprintf("ML datafeed %s does not exist after successful update", datafeedId),
138+
)
139+
return nil, diags
140+
}
141+
142+
// It's possible that the datafeed starts, and then immediately stops if there is no (or very little) data to process.
143+
// In this case, the state transition may occur too quickly to be detected by the wait function.
144+
// To handle this, we check if the search count has increased to determine if the datafeed actually started since the update.
145+
if statsAfterUpdate.TimingStats == nil || datafeedStats.TimingStats == nil {
146+
diags.AddWarning("Expected Datafeed to contain timing stats",
147+
fmt.Sprintf("Stats for datafeed %s did not contain timing stats either before or after the update. Before %v - After %v", datafeedId, datafeedStats, statsAfterUpdate))
148+
} else if statsAfterUpdate.TimingStats.SearchCount <= datafeedStats.TimingStats.SearchCount {
149+
diags.AddError(
150+
"Datafeed did not successfully transition to the desired state",
151+
fmt.Sprintf("[%s] datafeed did not settle into the [%s] state. The current state is [%s]", datafeedId, data.State.ValueString(), statsAfterUpdate.State),
152+
)
153+
return nil, diags
154+
}
155+
156+
if data.Start.IsUnknown() {
157+
data.Start = timetypes.NewRFC3339Null()
158+
}
159+
160+
return &data, nil
161+
}
162+
147163
// performStateTransition handles the ML datafeed state transition process
148164
func (r *mlDatafeedStateResource) performStateTransition(ctx context.Context, client *clients.ApiClient, data MLDatafeedStateData, currentState datafeed.State) (bool, diag.Diagnostics) {
149165
datafeedId := data.DatafeedId.ValueString()

internal/elasticsearch/ml/job_state/create.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ func (r *mlJobStateResource) Create(ctx context.Context, req resource.CreateRequ
2525
}
2626

2727
diags = r.update(ctx, req.Plan, &resp.State, createTimeout)
28-
if diags.Contains(diagutil.FrameworkDiagFromError(context.DeadlineExceeded)[0]) {
28+
if diagutil.ContainsContextDeadlineExceeded(ctx, diags) {
2929
diags.AddError("Operation timed out", fmt.Sprintf("The operation to create the ML job state timed out after %s. You may need to allocate more free memory within ML nodes by either closing other jobs, or increasing the overall ML memory. You may retry the operation.", createTimeout))
3030
}
3131

internal/elasticsearch/ml/job_state/update.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func (r *mlJobStateResource) Update(ctx context.Context, req resource.UpdateRequ
3131
}
3232

3333
diags = r.update(ctx, req.Plan, &resp.State, updateTimeout)
34-
if diags.Contains(diagutil.FrameworkDiagFromError(context.DeadlineExceeded)[0]) {
34+
if diagutil.ContainsContextDeadlineExceeded(ctx, diags) {
3535
diags.AddError("Operation timed out", fmt.Sprintf("The operation to update the ML job state timed out after %s. You may need to allocate more free memory within ML nodes by either closing other jobs, or increasing the overall ML memory. You may retry the operation.", updateTimeout))
3636
}
3737

0 commit comments

Comments
 (0)