Skip to content

Commit 1156ff6

Browse files
authored
Fixes to runtime.Poller (Azure#17837)
* Fixes to runtime.Poller Fixed the behavior of Poll and Result methods and updated their doc comments explaining the behavior. Clarified some other Poller doc comments and improved error messages. * clean-up
1 parent 2499c06 commit 1156ff6

File tree

3 files changed

+131
-16
lines changed

3 files changed

+131
-16
lines changed

sdk/azcore/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222
### Bugs Fixed
2323
* When per-try timeouts are enabled, only cancel the context after the body has been read and closed.
2424
* The `Operation-Location` poller now properly handles `final-state-via` values.
25+
* Improvements in `runtime.Poller[T]`
26+
* `Poll()` shouldn't cache errors, allowing for additional retries when in a non-terminal state.
27+
* `Result()` will cache the terminal result or error but not transient errors, allowing for additional retries.
2528

2629
### Other Changes
2730
* The internal poller implementation has been refactored.

sdk/azcore/runtime/poller.go

Lines changed: 41 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@ type Poller[T any] struct {
183183
resp *http.Response
184184
err error
185185
result *T
186+
done bool
186187
}
187188

188189
// PollUntilDoneOptions contains the optional values for the Poller[T].PollUntilDone() method.
@@ -193,6 +194,7 @@ type PollUntilDoneOptions struct {
193194
}
194195

195196
// PollUntilDone will poll the service endpoint until a terminal state is reached, an error is received, or the context expires.
197+
// It internally uses Poll(), Done(), and Result() in its polling loop, sleeping for the specified duration between intervals.
196198
// options: pass nil to accept the default values.
197199
// NOTE: the default polling frequency is 30 seconds which works well for most operations. However, some operations might
198200
// benefit from a shorter or longer duration.
@@ -250,46 +252,69 @@ func (p *Poller[T]) PollUntilDone(ctx context.Context, options *PollUntilDoneOpt
250252
}
251253

252254
// Poll fetches the latest state of the LRO. It returns an HTTP response or error.
253-
// If the LRO has completed successfully, the poller's state is updated and the HTTP
254-
// response is returned.
255-
// If the LRO has completed with failure or was cancelled, the poller's state is
256-
// updated and the error is returned.
257-
// If the LRO has not reached a terminal state, the poller's state is updated and
258-
// the latest HTTP response is returned.
255+
// If Poll succeeds, the poller's state is updated and the HTTP response is returned.
259256
// If Poll fails, the poller's state is unmodified and the error is returned.
260-
// Calling Poll on an LRO that has reached a terminal state will return the final
261-
// HTTP response or error.
257+
// Calling Poll on an LRO that has reached a terminal state will return the last HTTP response.
262258
func (p *Poller[T]) Poll(ctx context.Context) (*http.Response, error) {
263259
if p.Done() {
264260
// the LRO has reached a terminal state, don't poll again
265-
if p.err != nil {
266-
return nil, p.err
267-
}
268261
return p.resp, nil
269262
}
270-
p.resp, p.err = p.op.Poll(ctx)
271-
return p.resp, p.err
263+
resp, err := p.op.Poll(ctx)
264+
if err != nil {
265+
return nil, err
266+
}
267+
p.resp = resp
268+
return p.resp, nil
272269
}
273270

274271
// Done returns true if the LRO has reached a terminal state.
272+
// Once a terminal state is reached, call Result().
275273
func (p *Poller[T]) Done() bool {
276274
return p.op.Done()
277275
}
278276

279277
// Result returns the result of the LRO and is meant to be used in conjunction with Poll and Done.
278+
// If the LRO completed successfully, a populated instance of T is returned.
279+
// If the LRO failed or was canceled, an *azcore.ResponseError error is returned.
280280
// Calling this on an LRO in a non-terminal state will return an error.
281281
func (p *Poller[T]) Result(ctx context.Context) (T, error) {
282282
if !p.Done() {
283-
return *new(T), errors.New("cannot return a final response from a poller in a non-terminal state")
283+
return *new(T), errors.New("poller is in a non-terminal state")
284+
}
285+
if p.done {
286+
// the result has already been retrieved, return the cached value
287+
if p.err != nil {
288+
return *new(T), p.err
289+
}
290+
return *p.result, nil
291+
}
292+
res, err := p.op.Result(ctx, p.result)
293+
var respErr *exported.ResponseError
294+
if errors.As(err, &respErr) {
295+
// the LRO failed. record the error
296+
p.err = err
297+
} else if err != nil {
298+
// the call to Result failed, don't cache anything in this case
299+
return *new(T), err
300+
} else {
301+
// the LRO succeeded. record the result
302+
p.result = &res
303+
}
304+
p.done = true
305+
if p.err != nil {
306+
return *new(T), p.err
284307
}
285-
return p.op.Result(ctx, p.result)
308+
return *p.result, nil
286309
}
287310

288311
// ResumeToken returns a value representing the poller that can be used to resume
289312
// the LRO at a later time. ResumeTokens are unique per service operation.
313+
// The token's format should be considered opaque and is subject to change.
314+
// Calling this on an LRO in a terminal state will return an error.
290315
func (p *Poller[T]) ResumeToken() (string, error) {
291316
if p.Done() {
292-
return "", errors.New("cannot create a ResumeToken from a poller in a terminal state")
317+
return "", errors.New("poller is in a terminal state")
293318
}
294319
tk, err := pollers.NewResumeToken[T](p.op)
295320
if err != nil {

sdk/azcore/runtime/poller_test.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/Azure/azure-sdk-for-go/sdk/azcore/internal/shared"
2727
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
2828
"github.com/Azure/azure-sdk-for-go/sdk/internal/mock"
29+
"github.com/stretchr/testify/require"
2930
)
3031

3132
type none struct{}
@@ -178,6 +179,16 @@ func TestLocPollerCancelled(t *testing.T) {
178179
if w.Size != 0 {
179180
t.Fatalf("unexpected widget size %d", w.Size)
180181
}
182+
w, err = lro.Result(context.Background())
183+
if err == nil {
184+
t.Fatal("unexpected nil error")
185+
}
186+
if _, ok := err.(*exported.ResponseError); !ok {
187+
t.Fatal("expected pollerError")
188+
}
189+
if w.Size != 0 {
190+
t.Fatalf("unexpected widget size %d", w.Size)
191+
}
181192
}
182193

183194
func TestLocPollerWithError(t *testing.T) {
@@ -380,6 +391,68 @@ func TestOpPollerWithWidgetPUT(t *testing.T) {
380391
}
381392
}
382393

394+
type nonRetriableError struct {
395+
Msg string
396+
}
397+
398+
func (n *nonRetriableError) Error() string {
399+
return n.Msg
400+
}
401+
402+
func (*nonRetriableError) NonRetriable() {
403+
// prevent the retry policy from masking this transient error
404+
}
405+
406+
func TestOpPollerWithWidgetFinalGetError(t *testing.T) {
407+
srv, close := mock.NewServer()
408+
srv.AppendResponse(mock.WithStatusCode(http.StatusAccepted), mock.WithBody([]byte(`{"status": "InProgress"}`)))
409+
srv.AppendResponse(mock.WithStatusCode(http.StatusOK), mock.WithBody([]byte(`{"status": "Succeeded"}`)))
410+
// PUT and PATCH state that a final GET will happen
411+
// the first attempt at a final GET returns an error
412+
srv.AppendError(&nonRetriableError{Msg: "failed attempt"})
413+
srv.AppendResponse(mock.WithStatusCode(http.StatusOK), mock.WithBody([]byte(`{"size": 2}`)))
414+
defer close()
415+
416+
reqURL, err := url.Parse(srv.URL())
417+
if err != nil {
418+
t.Fatal(err)
419+
}
420+
body, closed := mock.NewTrackedCloser(http.NoBody)
421+
firstResp := &http.Response{
422+
Body: body,
423+
StatusCode: http.StatusAccepted,
424+
Header: http.Header{
425+
"Operation-Location": []string{srv.URL()},
426+
},
427+
Request: &http.Request{
428+
Method: http.MethodPut,
429+
URL: reqURL,
430+
},
431+
}
432+
pl := newTestPipeline(&policy.ClientOptions{Transport: srv})
433+
lro, err := NewPoller[widget](firstResp, pl, nil)
434+
require.Nil(t, err)
435+
require.True(t, closed(), "initial response body wasn't closed")
436+
437+
resp, err := lro.Poll(context.Background())
438+
require.NoError(t, err)
439+
require.Equal(t, http.StatusAccepted, resp.StatusCode)
440+
require.False(t, lro.Done())
441+
442+
resp, err = lro.Poll(context.Background())
443+
require.NoError(t, err)
444+
require.Equal(t, http.StatusOK, resp.StatusCode)
445+
require.True(t, lro.Done())
446+
447+
w, err := lro.Result(context.Background())
448+
require.Error(t, err)
449+
require.Empty(t, w)
450+
451+
w, err = lro.Result(context.Background())
452+
require.NoError(t, err)
453+
require.Equal(t, w.Size, 2)
454+
}
455+
383456
func TestOpPollerWithWidgetPOSTLocation(t *testing.T) {
384457
srv, close := mock.NewServer()
385458
srv.AppendResponse(mock.WithStatusCode(http.StatusAccepted), mock.WithBody([]byte(`{"status": "InProgress"}`)))
@@ -751,6 +824,13 @@ func TestNewPollerAsync(t *testing.T) {
751824
if v := *result.Field; v != "value" {
752825
t.Fatalf("unexpected value %s", v)
753826
}
827+
result, err = poller.Result(context.Background())
828+
if err != nil {
829+
t.Fatal(err)
830+
}
831+
if v := *result.Field; v != "value" {
832+
t.Fatalf("unexpected value %s", v)
833+
}
754834
}
755835

756836
func TestNewPollerBody(t *testing.T) {
@@ -831,6 +911,13 @@ func TestNewPollerARMLoc(t *testing.T) {
831911
if v := *result.Field; v != "value" {
832912
t.Fatalf("unexpected value %s", v)
833913
}
914+
result, err = poller.Result(context.Background())
915+
if err != nil {
916+
t.Fatal(err)
917+
}
918+
if v := *result.Field; v != "value" {
919+
t.Fatalf("unexpected value %s", v)
920+
}
834921
}
835922

836923
func TestNewPollerInitialRetryAfter(t *testing.T) {

0 commit comments

Comments
 (0)