Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions sdk/core/azure_core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### Features Added

- Added `ItemIterator::continuation_token()` and `with_continuation_token()` to resume paging items. The current page is restarted until _after_ all items have been iterated.
- Added `Response::to_raw_response()` function to create a `RawResponse` from cloned data.
- Added `UrlExt::append_path()`.
- Implemented `IntoFuture` for a `Poller`. Call `await` on a Poller to get the final model, or `into_stream()` to get a `futures::Stream` to poll the operation manually.
Expand All @@ -25,6 +26,8 @@

### Bugs Fixed

- `ItemIterator::into_pages()` now properly supports resuming from the current page until _after_ all items have been iterated.

### Other Changes

## 0.29.1 (2025-10-06)
Expand Down
907 changes: 715 additions & 192 deletions sdk/core/azure_core/src/http/pager.rs

Large diffs are not rendered by default.

46 changes: 29 additions & 17 deletions sdk/core/azure_core/src/http/poller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,10 +444,10 @@ where
/// }, None);
/// ```
pub fn from_callback<
#[cfg(not(target_arch = "wasm32"))] N: Send + 'static,
#[cfg(not(target_arch = "wasm32"))] N: AsRef<str> + Send + 'static,
#[cfg(not(target_arch = "wasm32"))] Fun: Fn(PollerState<N>) -> Fut + Send + 'static,
#[cfg(not(target_arch = "wasm32"))] Fut: Future<Output = crate::Result<PollerResult<M, N, F>>> + Send + 'static,
#[cfg(target_arch = "wasm32")] N: 'static,
#[cfg(target_arch = "wasm32")] N: AsRef<str> + 'static,
#[cfg(target_arch = "wasm32")] Fun: Fn(PollerState<N>) -> Fut + 'static,
#[cfg(target_arch = "wasm32")] Fut: Future<Output = crate::Result<PollerResult<M, N, F>>> + 'static,
>(
Expand Down Expand Up @@ -582,10 +582,10 @@ enum State<N> {
fn create_poller_stream<
M,
F: Format,
#[cfg(not(target_arch = "wasm32"))] N: Send + 'static,
#[cfg(not(target_arch = "wasm32"))] N: AsRef<str> + Send + 'static,
#[cfg(not(target_arch = "wasm32"))] Fun: Fn(PollerState<N>) -> Fut + Send + 'static,
#[cfg(not(target_arch = "wasm32"))] Fut: Future<Output = crate::Result<PollerResult<M, N, F>>> + Send + 'static,
#[cfg(target_arch = "wasm32")] N: 'static,
#[cfg(target_arch = "wasm32")] N: AsRef<str> + 'static,
#[cfg(target_arch = "wasm32")] Fun: Fn(PollerState<N>) -> Fut + 'static,
#[cfg(target_arch = "wasm32")] Fut: Future<Output = crate::Result<PollerResult<M, N, F>>> + 'static,
>(
Expand Down Expand Up @@ -617,9 +617,21 @@ where
(State::Init, make_request, Some(target_tx)),
move |(state, make_request, target_tx)| async move {
let result = match state {
State::Init => make_request(PollerState::Initial).await,
State::InProgress(n) => make_request(PollerState::More(n)).await,
State::Done => return None,
State::Init => {
tracing::debug!("initial operation request");
make_request(PollerState::Initial).await
}
State::InProgress(n) => {
tracing::debug!(
"subsequent operation request to {:?}",
AsRef::<str>::as_ref(&n)
);
make_request(PollerState::More(n)).await
}
State::Done => {
tracing::debug!("done");
return None;
}
};
let (item, next_state) = match result {
Err(e) => return Some((Err(e), (State::Done, make_request, target_tx))),
Expand Down Expand Up @@ -830,7 +842,7 @@ mod tests {
PollerStatus::InProgress => Ok(PollerResult::InProgress {
response,
retry_after: Some(Duration::ZERO),
next: (),
next: "",
}),
_ => Ok(PollerResult::Done { response }),
}
Expand Down Expand Up @@ -915,7 +927,7 @@ mod tests {
PollerStatus::InProgress => Ok(PollerResult::InProgress {
response,
retry_after: Some(Duration::ZERO),
next: (),
next: "",
}),
_ => Ok(PollerResult::Done { response }),
}
Expand Down Expand Up @@ -1001,7 +1013,7 @@ mod tests {
PollerStatus::InProgress => Ok(PollerResult::InProgress {
response,
retry_after: Some(Duration::ZERO),
next: (),
next: "",
}),
_ => Ok(PollerResult::Done { response }),
}
Expand Down Expand Up @@ -1087,7 +1099,7 @@ mod tests {
PollerStatus::InProgress => Ok(PollerResult::InProgress {
response,
retry_after: Some(Duration::ZERO),
next: (),
next: "",
}),
PollerStatus::Succeeded => {
// Return the status response with a callback to fetch the final resource
Expand Down Expand Up @@ -1192,7 +1204,7 @@ mod tests {
PollerStatus::InProgress => Ok(PollerResult::InProgress {
response,
retry_after: Some(Duration::ZERO),
next: (),
next: "",
}),
PollerStatus::Succeeded => {
// Return the status response with a callback to fetch the final resource
Expand Down Expand Up @@ -1311,7 +1323,7 @@ mod tests {
PollerStatus::InProgress => Ok(PollerResult::InProgress {
response,
retry_after: Some(Duration::ZERO),
next: (),
next: "",
}),
PollerStatus::Succeeded => {
// Return the status response with a callback
Expand Down Expand Up @@ -1397,7 +1409,7 @@ mod tests {
PollerStatus::InProgress => Ok(PollerResult::InProgress {
response,
retry_after: Some(Duration::ZERO),
next: (),
next: "",
}),
_ => Ok(PollerResult::Done { response }),
}
Expand Down Expand Up @@ -1483,7 +1495,7 @@ mod tests {
PollerStatus::InProgress => Ok(PollerResult::InProgress {
response,
retry_after: Some(Duration::ZERO),
next: (),
next: "",
}),
PollerStatus::Succeeded => {
// Return the status response with a callback
Expand Down Expand Up @@ -1591,7 +1603,7 @@ mod tests {
PollerStatus::InProgress => Ok(PollerResult::InProgress {
response,
retry_after: Some(Duration::ZERO),
next: (),
next: "",
}),
PollerStatus::Succeeded => {
// The final result is already in the status response itself
Expand Down Expand Up @@ -1703,7 +1715,7 @@ mod tests {
PollerStatus::InProgress => Ok(PollerResult::InProgress {
response,
retry_after: Some(Duration::ZERO),
next: (),
next: "",
}),
PollerStatus::Succeeded => {
// The final result is already in the status response itself
Expand Down
2 changes: 1 addition & 1 deletion sdk/core/azure_core_test/src/recording.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ impl Recording {
/// ```
/// # let recording = azure_core_test::Recording::with_seed();
/// let dek: [u8; 32] = recording.random();
/// # assert_eq!(typespec_client_core::base64::encode(dek), "HumPRAN6RqKWf0YhFV2CAFWu/8L/pwh0LRzeam5VlGo=");
/// # assert_eq!(azure_core::base64::encode(dek), "HumPRAN6RqKWf0YhFV2CAFWu/8L/pwh0LRzeam5VlGo=");
/// ```
///
/// Generate a UUID.
Expand Down
43 changes: 24 additions & 19 deletions sdk/cosmos/azure_data_cosmos/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod signature_target;

pub use authorization_policy::AuthorizationPolicy;
use azure_core::http::{
pager::PagerState,
pager::{PagerOptions, PagerState},
request::{options::ContentType, Request},
response::Response,
ClientOptions, Context, Method, RawResponse, RetryOptions,
Expand Down Expand Up @@ -115,25 +115,30 @@ impl CosmosPipeline {
// We have to double-clone here.
// First we clone the pipeline to pass it in to the closure
let pipeline = self.pipeline.clone();
let ctx = ctx.with_value(resource_link).into_owned();
Ok(FeedPager::from_callback(move |continuation| {
// Then we have to clone it again to pass it in to the async block.
// This is because Pageable can't borrow any data, it has to own it all.
// That's probably good, because it means a Pageable can outlive the client that produced it, but it requires some extra cloning.
let pipeline = pipeline.clone();
let mut req = base_request.clone();
let ctx = ctx.clone();
async move {
if let PagerState::More(continuation) = continuation {
req.insert_header(constants::CONTINUATION, continuation);
let options = PagerOptions {
context: ctx.with_value(resource_link).into_owned(),
};
Ok(FeedPager::from_callback(
move |continuation, ctx| {
// Then we have to clone it again to pass it in to the async block.
// This is because Pageable can't borrow any data, it has to own it all.
// That's probably good, because it means a Pageable can outlive the client that produced it, but it requires some extra cloning.
let pipeline = pipeline.clone();
let mut req = base_request.clone();
let ctx = ctx.clone();
async move {
if let PagerState::More(continuation) = continuation {
req.insert_header(constants::CONTINUATION, continuation);
}

let resp = pipeline.send(&ctx, &mut req, None).await?;
let page = FeedPage::<T>::from_response(resp).await?;

Ok(page.into())
}

let resp = pipeline.send(&ctx, &mut req, None).await?;
let page = FeedPage::<T>::from_response(resp).await?;

Ok(page.into())
}
}))
},
Some(options),
))
}

/// Helper function to read a throughput offer given a resource ID.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
### Breaking Changes

- `CertificateClient::create_certificate()` now returns a `Poller<CertificateOperation>`.
- `CertificateClientListCertificatePropertiesOptions::method_options` is now `PagerOptions`.
- `CertificateClientListCertificatePropertiesVersionsOptions::method_options` is now `PagerOptions`.
- `CertificateClientListDeletedCertificatePropertiesOptions::method_options` is now `PagerOptions`.
- `CertificateClientListIssuerPropertiesOptions::method_options` is now `PagerOptions`.
- Removed `CertificateClient::begin_create_certificate()`.
- Removed `CertificateClient::resume_create_certificate()`.
- Removed `wait()` function from `Poller<CertificateOperation>`.
Expand Down
Loading