diff --git a/Cargo.lock b/Cargo.lock index 98dd66447a..7c5e73b86d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -456,6 +456,7 @@ dependencies = [ "serde", "serde_json", "tokio", + "tracing", ] [[package]] @@ -478,6 +479,7 @@ dependencies = [ "serde_json", "sha2", "tokio", + "tracing", ] [[package]] @@ -497,6 +499,7 @@ dependencies = [ "serde_json", "time", "tokio", + "tracing", ] [[package]] diff --git a/sdk/core/azure_core/src/http/pager.rs b/sdk/core/azure_core/src/http/pager.rs index 5e858ecbd9..3937672cc3 100644 --- a/sdk/core/azure_core/src/http/pager.rs +++ b/sdk/core/azure_core/src/http/pager.rs @@ -3,25 +3,21 @@ //! Types and methods for pageable responses. +// TODO: Remove once tests re-enabled! +#![allow(missing_docs, unexpected_cfgs)] + use crate::{ - error::ErrorKind, + conditional_send::ConditionalSend, http::{ headers::HeaderName, policies::create_public_api_span, response::Response, Context, - DeserializeWith, Format, JsonFormat, + DeserializeWith, Format, JsonFormat, Url, }, tracing::{Span, SpanStatus}, }; use async_trait::async_trait; -use futures::{stream::unfold, FutureExt, Stream}; -use std::{ - fmt, - future::Future, - ops::Deref, - pin::Pin, - str::FromStr, - sync::{Arc, Mutex}, - task, -}; +use futures::{stream::FusedStream, FutureExt, Stream}; +use pin_project::pin_project; +use std::{fmt, future::Future, ops::Deref, pin::Pin, sync::Arc, task}; /// Represents the state of a [`Pager`] or [`PageIterator`]. #[derive(Debug, Default, PartialEq, Eq)] @@ -70,7 +66,7 @@ impl> PagerState { } } -impl> Clone for PagerState { +impl + Clone> Clone for PagerState { #[inline] fn clone(&self) -> Self { match self { @@ -97,7 +93,7 @@ pub enum PagerResult> { } impl PagerResult, String> { - /// Creates a [`PagerResult`] from the provided response, extracting the continuation value from the provided header. + /// Creates a [`PagerResult`] from the provided response, extracting the continuation value from the provided header. /// /// If the provided response has a header with the matching name, this returns [`PagerResult::More`], using the value from the header as the continuation. /// If the provided response does not have a header with the matching name, this returns [`PagerResult::Done`]. @@ -112,12 +108,12 @@ impl PagerResult, String> { } } -impl> fmt::Debug for PagerResult { +impl> fmt::Debug for PagerResult { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Self::More { continuation, .. } => f .debug_struct("More") - .field("continuation", &continuation) + .field("continuation", &continuation.as_ref()) .finish_non_exhaustive(), Self::Done { .. } => f.debug_struct("Done").finish_non_exhaustive(), } @@ -154,7 +150,7 @@ where /// Represents a paginated stream of items returned by a collection request to a service. /// -/// Specifically, this is a [`ItemIterator`] that yields [`Response`] items. +/// Specifically, this is a [`ItemIterator`] that yields [`Response`] items. /// /// # Examples /// @@ -204,11 +200,27 @@ where /// } /// # Ok(()) } /// ``` -pub type Pager = ItemIterator>; +pub type Pager = ItemIterator, C>; + +/// A pinned boxed [`Future`] that can be stored and called dynamically. +/// +/// Intended only for [`ItemIterator`] and [`PageIterator`]. +#[cfg(not(target_arch = "wasm32"))] +pub type BoxedFuture = + Pin>> + Send + 'static>>; + +/// A pinned boxed [`Future`] that can be stored and called dynamically. +/// +/// Intended only for [`ItemIterator`] and [`PageIterator`]. +#[cfg(target_arch = "wasm32")] +pub type BoxedFuture = + Pin>> + 'static>>; + +type PagerFn = Box, PagerOptions<'static, C>) -> BoxedFuture>; /// Options for configuring the behavior of a [`Pager`]. -#[derive(Clone, Debug, Default)] -pub struct PagerOptions<'a> { +#[derive(Clone)] +pub struct PagerOptions<'a, C: AsRef> { /// Context for HTTP requests made by the [`Pager`]. pub context: Context<'a>, @@ -236,7 +248,7 @@ pub struct PagerOptions<'a> { /// // which is the first page in this example. /// let options = SecretClientListSecretPropertiesOptions { /// method_options: PagerOptions { - /// continuation_token: pager.continuation_token(), + /// continuation_token: pager.continuation_token().map(Into::into), /// ..Default::default() /// }, /// ..Default::default() @@ -248,14 +260,29 @@ pub struct PagerOptions<'a> { /// # Ok(()) /// # } /// ``` - pub continuation_token: Option, + pub continuation_token: Option, } -#[cfg(not(target_arch = "wasm32"))] -type BoxedStream

= Box> + Send>; +impl<'a, C: AsRef> fmt::Debug for PagerOptions<'a, C> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("PagerOptions") + .field("context", &self.context) + .field( + "continuation_token", + &self.continuation_token.as_ref().map(AsRef::as_ref), + ) + .finish() + } +} -#[cfg(target_arch = "wasm32")] -type BoxedStream

= Box>>; +impl<'a, C: AsRef> Default for PagerOptions<'a, C> { + fn default() -> Self { + PagerOptions { + context: Context::new(), + continuation_token: None, + } + } +} /// Iterates over a collection of items or individual pages of items from a service. /// @@ -310,21 +337,28 @@ type BoxedStream

= Box>>; /// } /// # Ok(()) } /// ``` -#[pin_project::pin_project] -pub struct ItemIterator { +#[pin_project(project = ItemIteratorProjection, project_replace = ItemIteratorProjectionOwned)] +pub struct ItemIterator +where + P: Page, + C: AsRef, +{ #[pin] - stream: Pin>, - continuation_token: Option, - next_token: Arc>>, + iter: PageIterator, + continuation_token: Option, current: Option, } -impl ItemIterator

{ - /// Creates a [`ItemIterator

`] from a callback that will be called repeatedly to request each page. +impl ItemIterator +where + P: Page, + C: AsRef + Clone + ConditionalSend + 'static, +{ + /// Creates a [`ItemIterator`] from a callback that will be called repeatedly to request each page. /// - /// This method expect a callback that accepts a single [`PagerState`] parameter, and returns a [`PagerResult`] value asynchronously. + /// This method expect a callback that accepts a single [`PagerState`] parameter, and returns a [`PagerResult`] value asynchronously. /// The `C` type parameter is the type of the next link/continuation token. It may be any [`Send`]able type. - /// The result will be an asynchronous stream of [`Result`](crate::Result) values. + /// The result will be an asynchronous stream of [`Result`](crate::Result) values. /// /// The first time your callback is called, it will be called with [`Option::None`], indicating no next link/continuation token is present. /// @@ -356,7 +390,7 @@ impl ItemIterator

{ /// } /// let url = "https://example.com/my_paginated_api".parse().unwrap(); /// let mut base_req = Request::new(url, Method::Get); - /// let pager = ItemIterator::from_callback(move |next_link: PagerState, options: PagerOptions| { + /// let pager = ItemIterator::new(move |next_link: PagerState, options: PagerOptions| { /// // The callback must be 'static, so you have to clone and move any values you want to use. /// let pipeline = pipeline.clone(); /// let api_version = api_version.clone(); @@ -411,7 +445,7 @@ impl ItemIterator

{ /// } /// let url = "https://example.com/my_paginated_api".parse().unwrap(); /// let mut base_req = Request::new(url, Method::Get); - /// let pager = ItemIterator::from_callback(move |continuation, options| { + /// let pager = ItemIterator::new(move |continuation, options| { /// // The callback must be 'static, so you have to clone and move any values you want to use. /// let pipeline = pipeline.clone(); /// let mut req = base_req.clone(); @@ -427,121 +461,81 @@ impl ItemIterator

{ /// } /// }, None); /// ``` - pub fn from_callback< - // This is a bit gnarly, but the only thing that differs between the WASM/non-WASM configs is the presence of Send bounds. - #[cfg(not(target_arch = "wasm32"))] C: AsRef + FromStr + Send + 'static, - #[cfg(not(target_arch = "wasm32"))] F: Fn(PagerState, PagerOptions<'static>) -> Fut + Send + 'static, - #[cfg(not(target_arch = "wasm32"))] Fut: Future>> + Send + 'static, - #[cfg(target_arch = "wasm32")] C: AsRef + FromStr + 'static, - #[cfg(target_arch = "wasm32")] F: Fn(PagerState, PagerOptions<'static>) -> Fut + 'static, - #[cfg(target_arch = "wasm32")] Fut: Future>> + 'static, + pub fn new< + F: Fn(PagerState, PagerOptions<'static, C>) -> BoxedFuture + + ConditionalSend + + 'static, >( make_request: F, - options: Option>, - ) -> Self - where - ::Err: std::error::Error, - { + options: Option>, + ) -> Self { let options = options.unwrap_or_default(); // Start from the optional `PagerOptions::continuation_token`. let continuation_token = options.continuation_token.clone(); - let next_token = Arc::new(Mutex::new(continuation_token.clone())); - let stream = iter_from_callback(make_request, options, next_token.clone()); Self { - stream: Box::pin(stream), + iter: PageIterator::new(make_request, Some(options)), continuation_token, - next_token, current: None, } } - /// Creates a [`ItemIterator

`] from a raw stream of [`Result

`](crate::Result

) values. - /// - /// This constructor is used when you are implementing a completely custom stream and want to use it as a pager. - pub fn from_stream< - // This is a bit gnarly, but the only thing that differs between the WASM/non-WASM configs is the presence of Send bounds. - #[cfg(not(target_arch = "wasm32"))] S: Stream> + Send + 'static, - #[cfg(target_arch = "wasm32")] S: Stream> + 'static, - >( - stream: S, - ) -> Self { - Self { - stream: Box::pin(stream), - continuation_token: None, - next_token: Default::default(), - current: None, - } + pub fn continuation_token(&self) -> Option<&C> { + self.continuation_token.as_ref() } - /// Gets a [`PageIterator

`] to iterate over a collection of pages from a service. - /// - /// You can use this to asynchronously iterate pages returned by a collection request to a service. - /// This allows you to get the individual pages' [`Response

`], from which you can iterate items in each page - /// or deserialize the raw response as appropriate. - /// - /// The returned `PageIterator` resumes from the current page until _after_ all items are processed. - /// It does not continue on the next page until you call `next()` after the last item in the current page - /// because of how iterators are implemented. This may yield duplicates but will reduce the likelihood of skipping items instead. - pub fn into_pages(self) -> PageIterator

{ - // Attempt to start paging from the current page so that we don't skip items, - // assuming the service collection hasn't changed (most services don't create ephemeral snapshots). - if let Ok(mut token) = self.next_token.lock() { - *token = self.continuation_token; - } - - PageIterator { - stream: self.stream, - continuation_token: self.next_token, - } + pub fn into_continuation_token(self) -> Option { + self.continuation_token } - /// Gets the continuation token for the current page. - /// - /// Pass this in [`PagerOptions::continuation_token`] to create a `ItemIterator` that, when first iterated, - /// will return the current page until _after_ all items are iterated. - /// It does not continue on the next page until you call `next()` after the last item in the current page - /// because of how iterators are implemented. This may yield duplicates but will reduce the likelihood of skipping items instead. - pub fn continuation_token(&self) -> Option { - // Get the continuation_token because that will be used to start over with the current page. - self.continuation_token.clone() + pub fn into_pages(self) -> PageIterator { + let mut iter = self.iter; + + // Start with the current page until after all items are iterated. + iter.continuation_token = self.continuation_token.clone(); + iter.reset(); + + iter } } -impl futures::Stream for ItemIterator

{ +impl Stream for ItemIterator +where + P: Page, + C: AsRef + Clone + ConditionalSend + 'static, +{ type Item = crate::Result; fn poll_next( self: Pin<&mut Self>, - cx: &mut task::Context<'_>, - ) -> task::Poll> { - let mut projected_self = self.project(); + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let mut this = self.project(); + let mut iter = this.iter.as_mut(); loop { - if let Some(current) = projected_self.current.as_mut() { + if let Some(current) = this.current.as_mut() { if let Some(item) = current.next() { return task::Poll::Ready(Some(Ok(item))); } // Reset the iterator and poll for the next page. - *projected_self.current = None; + *this.current = None; } // Set the current_token to the next page only after iterating through all items. - if let Ok(token) = projected_self.next_token.lock() { - tracing::trace!( - "updating continuation_token from {:?} to {:?}", - projected_self.continuation_token, - token - ); - *projected_self.continuation_token = token.clone(); - } - - match projected_self.stream.as_mut().poll_next(cx) { + tracing::trace!( + "updating continuation_token from {:?} to {:?}", + this.continuation_token.as_ref().map(AsRef::as_ref), + iter.continuation_token.as_ref().map(AsRef::as_ref), + ); + *this.continuation_token = iter.continuation_token.clone(); + + match iter.as_mut().poll_next(cx) { task::Poll::Ready(page) => match page { Some(Ok(page)) => match page.into_items().poll_unpin(cx) { task::Poll::Ready(Ok(iter)) => { - *projected_self.current = Some(iter); + *this.current = Some(iter); continue; } task::Poll::Ready(Err(err)) => return task::Poll::Ready(Some(Err(err))), @@ -556,14 +550,32 @@ impl futures::Stream for ItemIterator

{ } } -impl fmt::Debug for ItemIterator

{ +impl fmt::Debug for ItemIterator +where + P: Page + 'static, + C: AsRef + Clone + ConditionalSend + 'static, +{ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("ItemIterator") - .field("continuation_token", &self.continuation_token) + .field("iter", &self.iter) + .field( + "next_token", + &self.continuation_token.as_ref().map(AsRef::as_ref), + ) .finish_non_exhaustive() } } +impl FusedStream for ItemIterator +where + P: Page + 'static, + C: AsRef + Clone + ConditionalSend + 'static, +{ + fn is_terminated(&self) -> bool { + self.iter.is_terminated() + } +} + /// Iterates over a collection pages of items from a service. /// /// # Examples @@ -594,19 +606,29 @@ impl fmt::Debug for ItemIterator

{ /// } /// # Ok(()) } /// ``` -#[pin_project::pin_project] -pub struct PageIterator

{ +#[must_use = "streams do nothing unless polled"] +#[pin_project(project = PageIteratorProjection, project_replace = PageIteratorProjectionOwned)] +pub struct PageIterator +where + C: AsRef, +{ #[pin] - stream: Pin>, - continuation_token: Arc>>, + make_request: PagerFn, + continuation_token: Option, + options: PagerOptions<'static, C>, + state: State, + added_span: bool, } -impl

PageIterator

{ - /// Creates a [`PageIterator

`] from a callback that will be called repeatedly to request each page. +impl PageIterator +where + C: AsRef + Clone + ConditionalSend + 'static, +{ + /// Creates a [`PageIterator`] from a callback that will be called repeatedly to request each page. /// - /// This method expect a callback that accepts a single [`PagerState`] parameter, and returns a [`PagerResult`] value asynchronously. + /// This method expect a callback that accepts a single [`PagerState`] parameter, and returns a [`PagerResult`] value asynchronously. /// The `C` type parameter is the type of the next link/continuation token. It may be any [`Send`]able type. - /// The result will be an asynchronous stream of [`Result`](crate::Result) values. + /// The result will be an asynchronous stream of [`Result`](crate::Result) values. /// /// The first time your callback is called, it will be called with [`PagerState::Initial`], indicating no next link/continuation token is present. /// @@ -629,7 +651,7 @@ impl

PageIterator

{ /// } /// let url = "https://example.com/my_paginated_api".parse().unwrap(); /// let mut base_req = Request::new(url, Method::Get); - /// let pager = PageIterator::from_callback(move |next_link: PagerState, options: PagerOptions<'static>| { + /// let pager = PageIterator::new(move |next_link: PagerState, options: PagerOptions<'static>| { /// // The callback must be 'static, so you have to clone and move any values you want to use. /// let pipeline = pipeline.clone(); /// let api_version = api_version.clone(); @@ -675,7 +697,7 @@ impl

PageIterator

{ /// } /// let url = "https://example.com/my_paginated_api".parse().unwrap(); /// let mut base_req = Request::new(url, Method::Get); - /// let pager = PageIterator::from_callback(move |continuation, options| { + /// let pager = PageIterator::new(move |continuation, options| { /// // The callback must be 'static, so you have to clone and move any values you want to use. /// let pipeline = pipeline.clone(); /// let mut req = base_req.clone(); @@ -691,106 +713,204 @@ impl

PageIterator

{ /// } /// }, None); /// ``` - pub fn from_callback< - // This is a bit gnarly, but the only thing that differs between the WASM/non-WASM configs is the presence of Send bounds. - #[cfg(not(target_arch = "wasm32"))] C: AsRef + FromStr + Send + 'static, - #[cfg(not(target_arch = "wasm32"))] F: Fn(PagerState, PagerOptions<'static>) -> Fut + Send + 'static, - #[cfg(not(target_arch = "wasm32"))] Fut: Future>> + Send + 'static, - #[cfg(target_arch = "wasm32")] C: AsRef + FromStr + 'static, - #[cfg(target_arch = "wasm32")] F: Fn(PagerState, PagerOptions<'static>) -> Fut + 'static, - #[cfg(target_arch = "wasm32")] Fut: Future>> + 'static, + pub fn new< + F: Fn(PagerState, PagerOptions<'static, C>) -> BoxedFuture + + ConditionalSend + + 'static, >( make_request: F, - options: Option>, - ) -> Self - where - ::Err: std::error::Error, - { + options: Option>, + ) -> Self { let options = options.unwrap_or_default(); // Start from the optional `PagerOptions::continuation_token`. - let continuation_token = Arc::new(Mutex::new(options.continuation_token.clone())); - let stream = iter_from_callback(make_request, options, continuation_token.clone()); + let continuation_token = options.continuation_token.clone(); - Self { - stream: Box::pin(stream), + let mut this = Self { + make_request: Box::new(make_request), continuation_token, - } + options, + state: State::Init, + added_span: false, + }; + this.reset(); + + this } - /// Creates a [`PageIterator

`] from a raw stream of [`Result

`](crate::Result

) values. - /// - /// This constructor is used when you are implementing a completely custom stream and want to use it as a pager. - pub fn from_stream< - // This is a bit gnarly, but the only thing that differs between the WASM/non-WASM configs is the presence of Send bounds. - #[cfg(not(target_arch = "wasm32"))] S: Stream> + Send + 'static, - #[cfg(target_arch = "wasm32")] S: Stream> + 'static, - >( - stream: S, - ) -> Self { - Self { - stream: Box::pin(stream), - continuation_token: Default::default(), - } + pub fn continuation_token(&self) -> Option<&C> { + self.continuation_token.as_ref() } - /// Gets the continuation token for the current page. - /// - /// Pass this to [`PagerOptions::continuation_token`] to create a `PageIterator` that, when first iterated, - /// will return the next page. You can use this to page results across separate processes. - pub fn continuation_token(&self) -> Option { - if let Ok(token) = self.continuation_token.lock() { - return token.clone(); - } + pub fn into_continuation_token(self) -> Option { + self.continuation_token + } - None + fn reset(&mut self) { + self.state = match self.continuation_token { + Some(ref n) => State::More(n.clone()), + None => State::Init, + }; } } -impl

futures::Stream for PageIterator

{ +impl Stream for PageIterator +where + C: AsRef + Clone + ConditionalSend + 'static, +{ type Item = crate::Result

; fn poll_next( self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - self.project().stream.poll_next(cx) + let this = self.project(); + + // When in the "Init" state, we are either starting fresh or resuming from a continuation token. + // In either case, attach a span to the context for the entire paging operation. + if *this.state == State::Init { + tracing::debug!("establish a public API span for new pager."); + + // At the very start of polling, create a span for the entire request, and attach it to the context + let span = create_public_api_span(&this.options.context, None, None); + if let Some(s) = span { + *this.added_span = true; + let previous_options = std::mem::take(this.options); + *this.options = PagerOptions { + context: previous_options.context.with_value(s), + continuation_token: previous_options.continuation_token, + }; + } + } + + let result = match *this.state { + State::Init => { + tracing::debug!("initial page request"); + let options = this.options.clone(); + let mut fut = (this.make_request)(PagerState::Initial, options); + + match fut.poll_unpin(cx) { + task::Poll::Ready(result) => result, + task::Poll::Pending => { + *this.state = State::Pending(fut); + return task::Poll::Pending; + } + } + } + State::Pending(ref mut fut) => task::ready!(fut.poll_unpin(cx)), + State::More(ref n) => { + tracing::debug!("subsequent page request to {:?}", AsRef::::as_ref(n)); + let options = this.options.clone(); + let mut fut = (this.make_request)(PagerState::More(n.clone()), options); + + match fut.poll_unpin(cx) { + task::Poll::Ready(result) => result, + task::Poll::Pending => { + *this.state = State::Pending(fut); + return task::Poll::Pending; + } + } + } + State::Done => { + tracing::debug!("done"); + // Set the `continuation_token` to None now that we are done. + *this.continuation_token = None; + return task::Poll::Ready(None); + } + }; + + // Update continuation token and instrumentation. + match result { + Err(e) => { + if *this.added_span { + if let Some(span) = this.options.context.value::>() { + // Mark the span as an error with an appropriate description. + span.set_status(SpanStatus::Error { + description: e.to_string(), + }); + span.set_attribute("error.type", e.kind().to_string().into()); + span.end(); + } + } + + *this.state = State::Done; + task::Poll::Ready(Some(Err(e))) + } + + Ok(PagerResult::More { + response, + continuation: continuation_token, + }) => { + // Set the `continuation_token` to the next page. + *this.continuation_token = Some(continuation_token.clone()); + *this.state = State::More(continuation_token); + task::Poll::Ready(Some(Ok(response))) + } + + Ok(PagerResult::Done { response }) => { + // Set the `continuation_token` to None now that we are done. + *this.continuation_token = None; + *this.state = State::Done; + + // When the result is done, finalize the span. Note that we only do that if we created the span in the first place; + // otherwise, it is the responsibility of the caller to end their span. + if *this.added_span { + if let Some(span) = this.options.context.value::>() { + span.end(); + } + } + + task::Poll::Ready(Some(Ok(response))) + } + } } } -impl

fmt::Debug for PageIterator

{ +impl fmt::Debug for PageIterator +where + C: AsRef, +{ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("PageIterator").finish_non_exhaustive() + f.debug_struct("PageIterator") + .field( + "continuation_token", + &self.continuation_token.as_ref().map(AsRef::as_ref), + ) + .field("options", &self.options) + .field("state", &self.state) + .field("added_span", &self.added_span) + .finish_non_exhaustive() } } -#[derive(Clone, Eq)] -enum State +impl FusedStream for PageIterator where - C: AsRef, + C: AsRef + Clone + ConditionalSend + 'static, { + fn is_terminated(&self) -> bool { + self.state == State::Done + } +} + +enum State> { Init, + Pending(BoxedFuture), More(C), Done, } -impl fmt::Debug for State -where - C: AsRef, -{ +impl> fmt::Debug for State { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - State::Init => write!(f, "Init"), + State::Init => f.write_str("Init"), + State::Pending(..) => f.debug_tuple("Pending").finish_non_exhaustive(), State::More(c) => f.debug_tuple("More").field(&c.as_ref()).finish(), - State::Done => write!(f, "Done"), + State::Done => f.write_str("Done"), } } } -impl PartialEq for State -where - C: AsRef, -{ +impl> PartialEq for State { fn eq(&self, other: &Self) -> bool { // Only needs to compare if both states are Init or Done; internally, we don't care about any other states. matches!( @@ -800,165 +920,12 @@ where } } -#[derive(Debug)] -struct StreamState<'a, C, F> -where - C: AsRef, -{ - state: State, - make_request: F, - continuation_token: Arc>>, - options: PagerOptions<'a>, - added_span: bool, -} - -fn iter_from_callback< - P, - // This is a bit gnarly, but the only thing that differs between the WASM/non-WASM configs is the presence of Send bounds. - #[cfg(not(target_arch = "wasm32"))] C: AsRef + FromStr + Send + 'static, - #[cfg(not(target_arch = "wasm32"))] F: Fn(PagerState, PagerOptions<'static>) -> Fut + Send + 'static, - #[cfg(not(target_arch = "wasm32"))] Fut: Future>> + Send + 'static, - #[cfg(target_arch = "wasm32")] C: AsRef + FromStr + 'static, - #[cfg(target_arch = "wasm32")] F: Fn(PagerState, PagerOptions<'static>) -> Fut + 'static, - #[cfg(target_arch = "wasm32")] Fut: Future>> + 'static, ->( - make_request: F, - options: PagerOptions<'static>, - continuation_token: Arc>>, -) -> impl Stream> + 'static -where - ::Err: std::error::Error, -{ - unfold( - StreamState { - state: State::Init, - make_request, - continuation_token, - options, - added_span: false, - }, - |mut stream_state| async move { - // When in the "Init" state, we are either starting fresh or resuming from a continuation token. In either case, - // attach a span to the context for the entire paging operation. - if stream_state.state == State::Init { - tracing::debug!("establish a public API span for new pager."); - - // At the very start of polling, create a span for the entire request, and attach it to the context - let span = create_public_api_span(&stream_state.options.context, None, None); - if let Some(ref s) = span { - stream_state.added_span = true; - stream_state.options.context = - stream_state.options.context.with_value(s.clone()); - } - } - - // Get the `continuation_token` to pick up where we left off, or None for the initial page, - // but don't override the terminal `State::Done`. - if stream_state.state != State::Done { - let result = match stream_state.continuation_token.lock() { - Ok(next_token) => match next_token.as_deref() { - Some(n) => match n.parse() { - Ok(s) => Ok(State::More(s)), - Err(err) => Err(crate::Error::with_message_fn( - ErrorKind::DataConversion, - || format!("invalid continuation token: {err}"), - )), - }, - // Restart the pager if `next_token` is None indicating we resumed from before or within the first page. - None => Ok(State::Init), - }, - Err(err) => Err(crate::Error::with_message_fn(ErrorKind::Other, || { - format!("continuation token lock: {err}") - })), - }; - - match result { - Ok(state) => stream_state.state = state, - Err(err) => { - stream_state.state = State::Done; - return Some((Err(err), stream_state)); - } - } - } - let result = match stream_state.state { - State::Init => { - tracing::debug!("initial page request"); - (stream_state.make_request)(PagerState::Initial, stream_state.options.clone()) - .await - } - State::More(n) => { - tracing::debug!("subsequent page request to {:?}", AsRef::::as_ref(&n)); - (stream_state.make_request)(PagerState::More(n), stream_state.options.clone()) - .await - } - State::Done => { - tracing::debug!("done"); - // Set the `continuation_token` to None now that we are done. - if let Ok(mut token) = stream_state.continuation_token.lock() { - *token = None; - } - return None; - } - }; - let (item, next_state) = match result { - Err(e) => { - if stream_state.added_span { - if let Some(span) = stream_state.options.context.value::>() { - // Mark the span as an error with an appropriate description. - span.set_status(SpanStatus::Error { - description: e.to_string(), - }); - span.set_attribute("error.type", e.kind().to_string().into()); - span.end(); - } - } - - stream_state.state = State::Done; - return Some((Err(e), stream_state)); - } - Ok(PagerResult::More { - response, - continuation: next_token, - }) => { - // Set the `continuation_token` to the next page. - if let Ok(mut token) = stream_state.continuation_token.lock() { - *token = Some(next_token.as_ref().into()); - } - (Ok(response), State::More(next_token)) - } - Ok(PagerResult::Done { response }) => { - // Set the `continuation_token` to None now that we are done. - if let Ok(mut token) = stream_state.continuation_token.lock() { - *token = None; - } - // When the result is done, finalize the span. Note that we only do that if we created the span in the first place, - // otherwise it is the responsibility of the caller to end their span. - if stream_state.added_span { - if let Some(span) = stream_state.options.context.value::>() { - // P is unconstrained, so it's not possible to retrieve the status code for now. - - span.end(); - } - } - (Ok(response), State::Done) - } - }; - - stream_state.state = next_state; - Some((item, stream_state)) - }, - ) -} - #[cfg(test)] mod tests { - use crate::{ - error::ErrorKind, - http::{ - headers::{HeaderName, HeaderValue}, - pager::{PageIterator, Pager, PagerOptions, PagerResult, PagerState}, - RawResponse, Response, StatusCode, - }, + use super::{ItemIterator, PageIterator, Pager, PagerOptions, PagerResult, PagerState}; + use crate::http::{ + headers::{HeaderName, HeaderValue}, + JsonFormat, RawResponse, Response, StatusCode, }; use async_trait::async_trait; use futures::{StreamExt as _, TryStreamExt as _}; @@ -984,51 +951,53 @@ mod tests { #[tokio::test] async fn callback_item_pagination() { - let pager: Pager = Pager::from_callback( - |continuation: PagerState, _ctx| async move { - match continuation { - PagerState::Initial => Ok(PagerResult::More { - response: RawResponse::from_bytes( - StatusCode::Ok, - HashMap::from([( - HeaderName::from_static("x-test-header"), - HeaderValue::from_static("page-1"), - )]) + let pager: Pager = Pager::new( + |continuation: PagerState, _ctx| { + Box::pin(async move { + match continuation { + PagerState::Initial => Ok(PagerResult::More { + response: RawResponse::from_bytes( + StatusCode::Ok, + HashMap::from([( + HeaderName::from_static("x-test-header"), + HeaderValue::from_static("page-1"), + )]) + .into(), + r#"{"items":[1],"page":1}"#, + ) .into(), - r#"{"items":[1],"page":1}"#, - ) - .into(), - continuation: "1".into(), - }), - PagerState::More(ref i) if i == "1" => Ok(PagerResult::More { - response: RawResponse::from_bytes( - StatusCode::Ok, - HashMap::from([( - HeaderName::from_static("x-test-header"), - HeaderValue::from_static("page-2"), - )]) + continuation: "1".into(), + }), + PagerState::More(ref i) if i == "1" => Ok(PagerResult::More { + response: RawResponse::from_bytes( + StatusCode::Ok, + HashMap::from([( + HeaderName::from_static("x-test-header"), + HeaderValue::from_static("page-2"), + )]) + .into(), + r#"{"items":[2],"page":2}"#, + ) .into(), - r#"{"items":[2],"page":2}"#, - ) - .into(), - continuation: "2".into(), - }), - PagerState::More(ref i) if i == "2" => Ok(PagerResult::Done { - response: RawResponse::from_bytes( - StatusCode::Ok, - HashMap::from([( - HeaderName::from_static("x-test-header"), - HeaderValue::from_static("page-3"), - )]) + continuation: "2".into(), + }), + PagerState::More(ref i) if i == "2" => Ok(PagerResult::Done { + response: RawResponse::from_bytes( + StatusCode::Ok, + HashMap::from([( + HeaderName::from_static("x-test-header"), + HeaderValue::from_static("page-3"), + )]) + .into(), + r#"{"items":[3],"page":3}"#, + ) .into(), - r#"{"items":[3],"page":3}"#, - ) - .into(), - }), - _ => { - panic!("Unexpected continuation value") + }), + _ => { + panic!("Unexpected continuation value") + } } - } + }) }, None, ); @@ -1038,30 +1007,32 @@ mod tests { #[tokio::test] async fn callback_item_pagination_error() { - let pager: Pager = Pager::from_callback( - |continuation: PagerState, _options| async move { - match continuation { - PagerState::Initial => Ok(PagerResult::More { - response: RawResponse::from_bytes( - StatusCode::Ok, - HashMap::from([( - HeaderName::from_static("x-test-header"), - HeaderValue::from_static("page-1"), - )]) + let pager: Pager = ItemIterator::new( + |continuation: PagerState, _options| { + Box::pin(async move { + match continuation { + PagerState::Initial => Ok(PagerResult::More { + response: RawResponse::from_bytes( + StatusCode::Ok, + HashMap::from([( + HeaderName::from_static("x-test-header"), + HeaderValue::from_static("page-1"), + )]) + .into(), + r#"{"items":[1],"page":1}"#, + ) .into(), - r#"{"items":[1],"page":1}"#, - ) - .into(), - continuation: "1".into(), - }), - PagerState::More(ref i) if i == "1" => Err(crate::Error::with_message( - crate::error::ErrorKind::Other, - "yon request didst fail", - )), - _ => { - panic!("Unexpected continuation value") + continuation: "1".into(), + }), + PagerState::More(ref i) if i == "1" => Err(crate::Error::with_message( + crate::error::ErrorKind::Other, + "yon request didst fail", + )), + _ => { + panic!("Unexpected continuation value") + } } - } + }) }, None, ); @@ -1095,11 +1066,70 @@ mod tests { assert_eq!("yon request didst fail", format!("{}", err)); } + #[tokio::test] + async fn page_iterator_iterate_all_pages() { + // Create a PageIterator and iterate through all three pages. + let mut pager = PageIterator::new(make_three_page_callback(), None); + + // Should start with no continuation_token. + assert_eq!(pager.continuation_token(), None); + + // Get first page. + let first_page = pager + .next() + .await + .expect("expected first page") + .expect("expected successful first page") + .into_model() + .expect("expected page"); + assert_eq!(first_page.page, Some(1)); + assert_eq!(first_page.items, vec![1, 2, 3]); + + // continuation_token should now point to second page. + assert_eq!( + pager.continuation_token().map(AsRef::as_ref), + Some("next-token-1") + ); + + // Get second page. + let second_page = pager + .next() + .await + .expect("expected second page") + .expect("expected successful second page") + .into_model() + .expect("expected page"); + assert_eq!(second_page.page, Some(2)); + assert_eq!(second_page.items, vec![4, 5, 6]); + + // continuation_token should now point to third page. + assert_eq!( + pager.continuation_token().map(AsRef::as_ref), + Some("next-token-2") + ); + + // Get third page. + let third_page = pager + .next() + .await + .expect("expected third page") + .expect("expected successful third page") + .into_model() + .expect("expected page"); + assert_eq!(third_page.page, None); + assert_eq!(third_page.items, vec![7, 8, 9]); + + // continuation_token should now be None (done). + assert_eq!(pager.continuation_token(), None); + + // Verify stream is exhausted. + assert!(pager.next().await.is_none()); + } + #[tokio::test] async fn page_iterator_with_continuation_token() { // Create the first PageIterator. - let mut first_pager: PageIterator> = - PageIterator::from_callback(make_three_page_callback(), None); + let mut first_pager = PageIterator::new(make_three_page_callback(), None); // Should start with no continuation_token. assert_eq!(first_pager.continuation_token(), None); @@ -1122,18 +1152,18 @@ mod tests { assert_eq!(continuation_token, "next-token-1"); // Create the second PageIterator. - let mut second_pager: PageIterator> = PageIterator::from_callback( + let mut second_pager = PageIterator::new( make_three_page_callback(), Some(PagerOptions { - continuation_token: Some(continuation_token), + continuation_token: Some(continuation_token.into()), ..Default::default() }), ); // Should start with link to second page. assert_eq!( - second_pager.continuation_token(), - Some("next-token-1".into()) + second_pager.continuation_token().map(AsRef::as_ref), + Some("next-token-1"), ); // Advance to second page. @@ -1147,8 +1177,8 @@ mod tests { assert_eq!(second_page.page, Some(2)); assert_eq!(second_page.items, vec![4, 5, 6]); assert_eq!( - second_pager.continuation_token(), - Some("next-token-2".into()) + second_pager.continuation_token().map(AsRef::as_ref), + Some("next-token-2") ); // Advance to last page. @@ -1167,7 +1197,7 @@ mod tests { #[tokio::test] async fn page_iterator_from_item_iterator_after_first_page() { // Create an ItemIterator and consume all items from first page. - let mut item_pager: Pager = Pager::from_callback(make_three_page_callback(), None); + let mut item_pager = ItemIterator::new(make_three_page_callback(), None); // Should start with no continuation_token. assert_eq!(item_pager.continuation_token(), None); @@ -1221,7 +1251,7 @@ mod tests { #[tokio::test] async fn page_iterator_from_item_iterator_second_page_first_item() { // Create an ItemIterator and consume items up to first item of second page. - let mut item_pager: Pager = Pager::from_callback(make_three_page_callback(), None); + let mut item_pager = ItemIterator::new(make_three_page_callback(), None); // Should start with no continuation_token. assert_eq!(item_pager.continuation_token(), None); @@ -1260,7 +1290,10 @@ mod tests { let mut page_pager = item_pager.into_pages(); // Should start with second page since that's where we were. - assert_eq!(page_pager.continuation_token(), Some("next-token-1".into())); + assert_eq!( + page_pager.continuation_token().map(AsRef::as_ref), + Some("next-token-1") + ); // Get second page - should be the second page. let second_page = page_pager @@ -1283,7 +1316,7 @@ mod tests { #[tokio::test] async fn item_iterator_with_continuation_token() { // Create the first ItemIterator. - let mut first_pager: Pager = Pager::from_callback(make_three_page_callback(), None); + let mut first_pager = ItemIterator::new(make_three_page_callback(), None); // Should start with no continuation_token. assert_eq!(first_pager.continuation_token(), None); @@ -1309,10 +1342,10 @@ mod tests { assert_eq!(continuation_token, None); // Create the second ItemIterator with continuation token. - let mut second_pager: Pager = Pager::from_callback( + let mut second_pager = ItemIterator::new( make_three_page_callback(), Some(PagerOptions { - continuation_token, + continuation_token: continuation_token.map(Into::into), ..Default::default() }), ); @@ -1339,7 +1372,7 @@ mod tests { #[tokio::test] async fn item_iterator_continuation_second_page_second_item() { // Create the first ItemIterator. - let mut first_pager: Pager = Pager::from_callback(make_three_page_callback(), None); + let mut first_pager = ItemIterator::new(make_three_page_callback(), None); // Should start with no continuation_token. assert_eq!(first_pager.continuation_token(), None); @@ -1384,11 +1417,11 @@ mod tests { assert_eq!(fifth_item, 5); // Get continuation token - should point to current page (second page). - let continuation_token = first_pager.continuation_token(); + let continuation_token = first_pager.into_continuation_token(); assert_eq!(continuation_token.as_deref(), Some("next-token-1")); // Create the second ItemIterator with continuation token. - let mut second_pager: Pager = Pager::from_callback( + let mut second_pager = ItemIterator::new( make_three_page_callback(), Some(PagerOptions { continuation_token, @@ -1414,7 +1447,7 @@ mod tests { #[tokio::test] async fn item_iterator_continuation_after_first_page() { // Create the first ItemIterator. - let mut first_pager: Pager = Pager::from_callback(make_three_page_callback(), None); + let mut first_pager = ItemIterator::new(make_three_page_callback(), None); // Should start with no continuation_token. assert_eq!(first_pager.continuation_token(), None); @@ -1446,10 +1479,10 @@ mod tests { assert_eq!(continuation_token, None); // Create the second ItemIterator with continuation token. - let mut second_pager: Pager = Pager::from_callback( + let mut second_pager = ItemIterator::new( make_three_page_callback(), Some(PagerOptions { - continuation_token, + continuation_token: continuation_token.map(Into::into), ..Default::default() }), ); @@ -1469,69 +1502,10 @@ mod tests { assert_eq!(items.as_slice(), vec![2, 3, 4, 5, 6, 7, 8, 9]); } - /// A continuation token type that always fails to parse, used to test FromStr constraint. - #[derive(Debug, Clone, PartialEq, Eq)] - struct ContinuationToken(String); - - impl AsRef for ContinuationToken { - fn as_ref(&self) -> &str { - &self.0 - } - } - - impl std::str::FromStr for ContinuationToken { - type Err = std::io::Error; - - fn from_str(_s: &str) -> Result { - // Always fail to parse - Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - "ContinuationToken parsing always fails", - )) - } - } - - #[tokio::test] - async fn callback_item_pagination_from_str_error() { - let mut pager: Pager = Pager::from_callback( - |continuation: PagerState, _ctx| async move { - match continuation { - PagerState::Initial => Ok(PagerResult::More { - response: RawResponse::from_bytes( - StatusCode::Ok, - HashMap::from([( - HeaderName::from_static("x-test-header"), - HeaderValue::from_static("page-1"), - )]) - .into(), - r#"{"items":[1],"page":1}"#, - ) - .into(), - // cspell:ignore unparseable - continuation: ContinuationToken("unparseable-token".to_string()), - }), - _ => { - panic!("Unexpected continuation value: {:?}", continuation) - } - } - }, - None, - ); - - // Get the first item from the first page. - let first_item = pager.try_next().await.expect("expected first page"); - assert_eq!(first_item, Some(1)); - - // Attempt to get the second page, which will attempt to parse the continuation token that should fail. - assert!( - matches!(pager.try_next().await, Err(err) if err.kind() == &ErrorKind::DataConversion) - ); - } - #[allow(clippy::type_complexity)] fn make_three_page_callback() -> impl Fn( PagerState, - PagerOptions<'_>, + PagerOptions<'_, String>, ) -> Pin< Box, String>>> + Send>, > + Send diff --git a/sdk/core/azure_core/src/lib.rs b/sdk/core/azure_core/src/lib.rs index 213bbec228..e1a7deff55 100644 --- a/sdk/core/azure_core/src/lib.rs +++ b/sdk/core/azure_core/src/lib.rs @@ -36,6 +36,26 @@ pub mod tracing { #[cfg(feature = "xml")] pub use typespec_client_core::xml; +#[cfg(not(target_arch = "wasm32"))] +mod conditional_send { + /// Conditionally implements [`Send`] based on the `target_arch`. + /// + /// This implementation requires `Send`. + pub trait ConditionalSend: Send {} + + impl ConditionalSend for T where T: Send {} +} + +#[cfg(target_arch = "wasm32")] +mod conditional_send { + /// Conditionally implements [`Send`] based on the `target_arch`. + /// + /// This implementation does not require `Send`. + pub trait ConditionalSend {} + + impl ConditionalSend for T {} +} + mod private { pub trait Sealed {} } diff --git a/sdk/keyvault/azure_security_keyvault_certificates/Cargo.toml b/sdk/keyvault/azure_security_keyvault_certificates/Cargo.toml index 50a5094256..ea9e2fd76a 100644 --- a/sdk/keyvault/azure_security_keyvault_certificates/Cargo.toml +++ b/sdk/keyvault/azure_security_keyvault_certificates/Cargo.toml @@ -35,6 +35,7 @@ include-file.workspace = true openssl.workspace = true rand.workspace = true tokio.workspace = true +tracing.workspace = true [build-dependencies] rustc_version.workspace = true diff --git a/sdk/keyvault/azure_security_keyvault_certificates/src/generated/clients/certificate_client.rs b/sdk/keyvault/azure_security_keyvault_certificates/src/generated/clients/certificate_client.rs index 9ab368ed1c..53f97fe784 100644 --- a/sdk/keyvault/azure_security_keyvault_certificates/src/generated/clients/certificate_client.rs +++ b/sdk/keyvault/azure_security_keyvault_certificates/src/generated/clients/certificate_client.rs @@ -689,7 +689,7 @@ impl CertificateClient { .append_pair("maxresults", &maxresults.to_string()); } let api_version = self.api_version.clone(); - Ok(Pager::from_callback( + Ok(Pager::new( move |next_link: PagerState, pager_options| { let url = match next_link { PagerState::More(next_link) => { @@ -709,7 +709,7 @@ impl CertificateClient { let mut request = Request::new(url, Method::Get); request.insert_header("accept", "application/json"); let pipeline = pipeline.clone(); - async move { + Box::pin(async move { let rsp = pipeline .send( &pager_options.context, @@ -732,7 +732,7 @@ impl CertificateClient { }, _ => PagerResult::Done { response: rsp }, }) - } + }) }, Some(options.method_options), )) @@ -774,7 +774,7 @@ impl CertificateClient { .append_pair("maxresults", &maxresults.to_string()); } let api_version = self.api_version.clone(); - Ok(Pager::from_callback( + Ok(Pager::new( move |next_link: PagerState, pager_options| { let url = match next_link { PagerState::More(next_link) => { @@ -794,7 +794,7 @@ impl CertificateClient { let mut request = Request::new(url, Method::Get); request.insert_header("accept", "application/json"); let pipeline = pipeline.clone(); - async move { + Box::pin(async move { let rsp = pipeline .send( &pager_options.context, @@ -817,7 +817,7 @@ impl CertificateClient { }, _ => PagerResult::Done { response: rsp }, }) - } + }) }, Some(options.method_options), )) @@ -855,7 +855,7 @@ impl CertificateClient { .append_pair("maxresults", &maxresults.to_string()); } let api_version = self.api_version.clone(); - Ok(Pager::from_callback( + Ok(Pager::new( move |next_link: PagerState, pager_options| { let url = match next_link { PagerState::More(next_link) => { @@ -875,7 +875,7 @@ impl CertificateClient { let mut request = Request::new(url, Method::Get); request.insert_header("accept", "application/json"); let pipeline = pipeline.clone(); - async move { + Box::pin(async move { let rsp = pipeline .send( &pager_options.context, @@ -898,7 +898,7 @@ impl CertificateClient { }, _ => PagerResult::Done { response: rsp }, }) - } + }) }, Some(options.method_options), )) @@ -930,7 +930,7 @@ impl CertificateClient { .append_pair("maxresults", &maxresults.to_string()); } let api_version = self.api_version.clone(); - Ok(Pager::from_callback( + Ok(Pager::new( move |next_link: PagerState, pager_options| { let url = match next_link { PagerState::More(next_link) => { @@ -950,7 +950,7 @@ impl CertificateClient { let mut request = Request::new(url, Method::Get); request.insert_header("accept", "application/json"); let pipeline = pipeline.clone(); - async move { + Box::pin(async move { let rsp = pipeline .send( &pager_options.context, @@ -973,7 +973,7 @@ impl CertificateClient { }, _ => PagerResult::Done { response: rsp }, }) - } + }) }, Some(options.method_options), )) diff --git a/sdk/keyvault/azure_security_keyvault_certificates/src/generated/models/method_options.rs b/sdk/keyvault/azure_security_keyvault_certificates/src/generated/models/method_options.rs index 5a2ec6c4e0..b54ff6ec5d 100644 --- a/sdk/keyvault/azure_security_keyvault_certificates/src/generated/models/method_options.rs +++ b/sdk/keyvault/azure_security_keyvault_certificates/src/generated/models/method_options.rs @@ -5,7 +5,7 @@ use azure_core::{ fmt::SafeDebug, - http::{pager::PagerOptions, ClientMethodOptions}, + http::{pager::PagerOptions, ClientMethodOptions, Url}, }; /// Options to be passed to [`CertificateClient::backup_certificate()`](crate::generated::clients::CertificateClient::backup_certificate()) @@ -106,7 +106,7 @@ pub struct CertificateClientListCertificatePropertiesOptions<'a> { pub maxresults: Option, /// Allows customization of the method call. - pub method_options: PagerOptions<'a>, + pub method_options: PagerOptions<'a, Url>, } impl CertificateClientListCertificatePropertiesOptions<'_> { @@ -130,7 +130,7 @@ pub struct CertificateClientListCertificatePropertiesVersionsOptions<'a> { pub maxresults: Option, /// Allows customization of the method call. - pub method_options: PagerOptions<'a>, + pub method_options: PagerOptions<'a, Url>, } impl CertificateClientListCertificatePropertiesVersionsOptions<'_> { @@ -156,7 +156,7 @@ pub struct CertificateClientListDeletedCertificatePropertiesOptions<'a> { pub maxresults: Option, /// Allows customization of the method call. - pub method_options: PagerOptions<'a>, + pub method_options: PagerOptions<'a, Url>, } impl CertificateClientListDeletedCertificatePropertiesOptions<'_> { @@ -180,7 +180,7 @@ pub struct CertificateClientListIssuerPropertiesOptions<'a> { pub maxresults: Option, /// Allows customization of the method call. - pub method_options: PagerOptions<'a>, + pub method_options: PagerOptions<'a, Url>, } impl CertificateClientListIssuerPropertiesOptions<'_> { diff --git a/sdk/keyvault/azure_security_keyvault_certificates/tests/certificate_client.rs b/sdk/keyvault/azure_security_keyvault_certificates/tests/certificate_client.rs index 44fcaf338b..26521ff7b8 100644 --- a/sdk/keyvault/azure_security_keyvault_certificates/tests/certificate_client.rs +++ b/sdk/keyvault/azure_security_keyvault_certificates/tests/certificate_client.rs @@ -219,7 +219,7 @@ async fn list_certificates(ctx: TestContext) -> Result<()> { .await?; // List certificates. - let mut pager = client.list_certificate_properties(None)?.into_stream(); + let mut pager = client.list_certificate_properties(None)?; while let Some(certificate) = pager.try_next().await? { // Get the certificate name from the ID. let name = certificate.resource_id()?.name; @@ -268,11 +268,11 @@ async fn purge_certificate(ctx: TestContext) -> Result<()> { loop { match client.purge_deleted_certificate(NAME.as_ref(), None).await { Ok(_) => { - println!("{NAME} has been purged"); + tracing::debug!("{NAME} has been purged"); break; } Err(err) if matches!(err.http_status(), Some(StatusCode::Conflict)) => { - println!( + tracing::debug!( "Retrying in {} seconds", retry.duration().unwrap_or_default().as_secs_f32() ); diff --git a/sdk/keyvault/azure_security_keyvault_keys/Cargo.toml b/sdk/keyvault/azure_security_keyvault_keys/Cargo.toml index b03250337c..ee0e55e39e 100644 --- a/sdk/keyvault/azure_security_keyvault_keys/Cargo.toml +++ b/sdk/keyvault/azure_security_keyvault_keys/Cargo.toml @@ -36,6 +36,7 @@ rand_chacha.workspace = true reqwest.workspace = true sha2.workspace = true tokio.workspace = true +tracing.workspace = true [build-dependencies] rustc_version.workspace = true diff --git a/sdk/keyvault/azure_security_keyvault_keys/tests/key_client.rs b/sdk/keyvault/azure_security_keyvault_keys/tests/key_client.rs index 389ee304f4..b730fc8d8b 100644 --- a/sdk/keyvault/azure_security_keyvault_keys/tests/key_client.rs +++ b/sdk/keyvault/azure_security_keyvault_keys/tests/key_client.rs @@ -196,11 +196,11 @@ async fn purge_key(ctx: TestContext) -> Result<()> { loop { match client.purge_deleted_key(name.as_ref(), None).await { Ok(_) => { - println!("{name} has been purged"); + tracing::debug!("{name} has been purged"); break; } Err(err) if matches!(err.http_status(), Some(StatusCode::Conflict)) => { - println!( + tracing::debug!( "Retrying in {} seconds", retry.duration().unwrap_or_default().as_secs_f32() ); diff --git a/sdk/keyvault/azure_security_keyvault_secrets/Cargo.toml b/sdk/keyvault/azure_security_keyvault_secrets/Cargo.toml index ddcc2804f6..900c280342 100644 --- a/sdk/keyvault/azure_security_keyvault_secrets/Cargo.toml +++ b/sdk/keyvault/azure_security_keyvault_secrets/Cargo.toml @@ -34,6 +34,7 @@ azure_security_keyvault_test = { path = "../azure_security_keyvault_test" } include-file.workspace = true rand.workspace = true tokio.workspace = true +tracing.workspace = true [build-dependencies] rustc_version.workspace = true diff --git a/sdk/keyvault/azure_security_keyvault_secrets/src/generated/clients/secret_client.rs b/sdk/keyvault/azure_security_keyvault_secrets/src/generated/clients/secret_client.rs index 7cfd1d9571..a323278517 100644 --- a/sdk/keyvault/azure_security_keyvault_secrets/src/generated/clients/secret_client.rs +++ b/sdk/keyvault/azure_security_keyvault_secrets/src/generated/clients/secret_client.rs @@ -312,7 +312,7 @@ impl SecretClient { .append_pair("maxresults", &maxresults.to_string()); } let api_version = self.api_version.clone(); - Ok(Pager::from_callback( + Ok(Pager::new( move |next_link: PagerState, pager_options| { let url = match next_link { PagerState::More(next_link) => { @@ -332,7 +332,7 @@ impl SecretClient { let mut request = Request::new(url, Method::Get); request.insert_header("accept", "application/json"); let pipeline = pipeline.clone(); - async move { + Box::pin(async move { let rsp = pipeline .send( &pager_options.context, @@ -355,7 +355,7 @@ impl SecretClient { }, _ => PagerResult::Done { response: rsp }, }) - } + }) }, Some(options.method_options), )) @@ -388,7 +388,7 @@ impl SecretClient { .append_pair("maxresults", &maxresults.to_string()); } let api_version = self.api_version.clone(); - Ok(Pager::from_callback( + Ok(Pager::new( move |next_link: PagerState, pager_options| { let url = match next_link { PagerState::More(next_link) => { @@ -408,7 +408,7 @@ impl SecretClient { let mut request = Request::new(url, Method::Get); request.insert_header("accept", "application/json"); let pipeline = pipeline.clone(); - async move { + Box::pin(async move { let rsp = pipeline .send( &pager_options.context, @@ -431,7 +431,7 @@ impl SecretClient { }, _ => PagerResult::Done { response: rsp }, }) - } + }) }, Some(options.method_options), )) @@ -473,7 +473,7 @@ impl SecretClient { .append_pair("maxresults", &maxresults.to_string()); } let api_version = self.api_version.clone(); - Ok(Pager::from_callback( + Ok(Pager::new( move |next_link: PagerState, pager_options| { let url = match next_link { PagerState::More(next_link) => { @@ -493,7 +493,7 @@ impl SecretClient { let mut request = Request::new(url, Method::Get); request.insert_header("accept", "application/json"); let pipeline = pipeline.clone(); - async move { + Box::pin(async move { let rsp = pipeline .send( &pager_options.context, @@ -516,7 +516,7 @@ impl SecretClient { }, _ => PagerResult::Done { response: rsp }, }) - } + }) }, Some(options.method_options), )) diff --git a/sdk/keyvault/azure_security_keyvault_secrets/src/generated/models/method_options.rs b/sdk/keyvault/azure_security_keyvault_secrets/src/generated/models/method_options.rs index f3b02b42a5..519a8c7ae6 100644 --- a/sdk/keyvault/azure_security_keyvault_secrets/src/generated/models/method_options.rs +++ b/sdk/keyvault/azure_security_keyvault_secrets/src/generated/models/method_options.rs @@ -6,7 +6,7 @@ use super::ContentType; use azure_core::{ fmt::SafeDebug, - http::{pager::PagerOptions, ClientMethodOptions}, + http::{pager::PagerOptions, ClientMethodOptions, Url}, }; /// Options to be passed to [`SecretClient::backup_secret()`](crate::generated::clients::SecretClient::backup_secret()) @@ -52,7 +52,7 @@ pub struct SecretClientListDeletedSecretPropertiesOptions<'a> { pub maxresults: Option, /// Allows customization of the method call. - pub method_options: PagerOptions<'a>, + pub method_options: PagerOptions<'a, Url>, } impl SecretClientListDeletedSecretPropertiesOptions<'_> { @@ -75,7 +75,7 @@ pub struct SecretClientListSecretPropertiesOptions<'a> { pub maxresults: Option, /// Allows customization of the method call. - pub method_options: PagerOptions<'a>, + pub method_options: PagerOptions<'a, Url>, } impl SecretClientListSecretPropertiesOptions<'_> { @@ -98,7 +98,7 @@ pub struct SecretClientListSecretPropertiesVersionsOptions<'a> { pub maxresults: Option, /// Allows customization of the method call. - pub method_options: PagerOptions<'a>, + pub method_options: PagerOptions<'a, Url>, } impl SecretClientListSecretPropertiesVersionsOptions<'_> { diff --git a/sdk/keyvault/azure_security_keyvault_secrets/tests/secret_client.rs b/sdk/keyvault/azure_security_keyvault_secrets/tests/secret_client.rs index dd0c0a36b0..71e879005c 100644 --- a/sdk/keyvault/azure_security_keyvault_secrets/tests/secret_client.rs +++ b/sdk/keyvault/azure_security_keyvault_secrets/tests/secret_client.rs @@ -148,7 +148,7 @@ async fn list_secrets(ctx: TestContext) -> Result<()> { assert_eq!(secret2.value, Some("secret-value-2".into())); // List secrets. - let mut pager = client.list_secret_properties(None)?.into_stream(); + let mut pager = client.list_secret_properties(None)?; while let Some(secret) = pager.try_next().await? { // Get the secret name from the ID. let name = secret.resource_id()?.name; @@ -201,11 +201,11 @@ async fn purge_secret(ctx: TestContext) -> Result<()> { loop { match client.purge_deleted_secret(name.as_ref(), None).await { Ok(_) => { - println!("{name} has been purged"); + tracing::debug!("{name} has been purged"); break; } Err(err) if matches!(err.http_status(), Some(StatusCode::Conflict)) => { - println!( + tracing::debug!( "Retrying in {} seconds", retry.duration().unwrap_or_default().as_secs_f32() ); @@ -527,7 +527,7 @@ async fn list_secrets_verify_telemetry_rehydrated(ctx: TestContext) -> Result<() } first_pager - .continuation_token() + .into_continuation_token() .expect("expected continuation token to be created after first page") }; let options = SecretClientListSecretPropertiesOptions {