Skip to content

Commit cdddbf5

Browse files
authored
Implement Pager resumption even through into_pages (#3286)
By design, we didn't support resuming `Pager` (`ItemIterator`) but, upon further discussion, decided we can but will resume from the current page until all items on the current page are processed. This also fixes a problem that, because of the design, we didn't pass through resumption to the `PageIterator` at all - it couldn't even resume as a `PageIterator` for subsequent pages.
1 parent c2e29f0 commit cdddbf5

File tree

21 files changed

+1313
-718
lines changed

21 files changed

+1313
-718
lines changed

sdk/core/azure_core/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
### Features Added
66

7+
- Added `ItemIterator::continuation_token()` and `with_continuation_token()` to resume paging items. The current page is restarted until _after_ all items have been iterated.
78
- Added `Response::to_raw_response()` function to create a `RawResponse` from cloned data.
89
- Added `UrlExt::append_path()`.
910
- Implemented `IntoFuture` for a `Poller`. Call `await` on a Poller to get the final model, or `into_stream()` to get a `futures::Stream` to poll the operation manually.
@@ -27,6 +28,8 @@
2728

2829
### Bugs Fixed
2930

31+
- `ItemIterator::into_pages()` now properly supports resuming from the current page until _after_ all items have been iterated.
32+
3033
### Other Changes
3134

3235
## 0.29.1 (2025-10-06)

sdk/core/azure_core/src/http/pager.rs

Lines changed: 715 additions & 192 deletions
Large diffs are not rendered by default.

sdk/core/azure_core/src/http/poller.rs

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -444,10 +444,10 @@ where
444444
/// }, None);
445445
/// ```
446446
pub fn from_callback<
447-
#[cfg(not(target_arch = "wasm32"))] N: Send + 'static,
447+
#[cfg(not(target_arch = "wasm32"))] N: AsRef<str> + Send + 'static,
448448
#[cfg(not(target_arch = "wasm32"))] Fun: Fn(PollerState<N>) -> Fut + Send + 'static,
449449
#[cfg(not(target_arch = "wasm32"))] Fut: Future<Output = crate::Result<PollerResult<M, N, F>>> + Send + 'static,
450-
#[cfg(target_arch = "wasm32")] N: 'static,
450+
#[cfg(target_arch = "wasm32")] N: AsRef<str> + 'static,
451451
#[cfg(target_arch = "wasm32")] Fun: Fn(PollerState<N>) -> Fut + 'static,
452452
#[cfg(target_arch = "wasm32")] Fut: Future<Output = crate::Result<PollerResult<M, N, F>>> + 'static,
453453
>(
@@ -582,10 +582,10 @@ enum State<N> {
582582
fn create_poller_stream<
583583
M,
584584
F: Format,
585-
#[cfg(not(target_arch = "wasm32"))] N: Send + 'static,
585+
#[cfg(not(target_arch = "wasm32"))] N: AsRef<str> + Send + 'static,
586586
#[cfg(not(target_arch = "wasm32"))] Fun: Fn(PollerState<N>) -> Fut + Send + 'static,
587587
#[cfg(not(target_arch = "wasm32"))] Fut: Future<Output = crate::Result<PollerResult<M, N, F>>> + Send + 'static,
588-
#[cfg(target_arch = "wasm32")] N: 'static,
588+
#[cfg(target_arch = "wasm32")] N: AsRef<str> + 'static,
589589
#[cfg(target_arch = "wasm32")] Fun: Fn(PollerState<N>) -> Fut + 'static,
590590
#[cfg(target_arch = "wasm32")] Fut: Future<Output = crate::Result<PollerResult<M, N, F>>> + 'static,
591591
>(
@@ -617,9 +617,21 @@ where
617617
(State::Init, make_request, Some(target_tx)),
618618
move |(state, make_request, target_tx)| async move {
619619
let result = match state {
620-
State::Init => make_request(PollerState::Initial).await,
621-
State::InProgress(n) => make_request(PollerState::More(n)).await,
622-
State::Done => return None,
620+
State::Init => {
621+
tracing::debug!("initial operation request");
622+
make_request(PollerState::Initial).await
623+
}
624+
State::InProgress(n) => {
625+
tracing::debug!(
626+
"subsequent operation request to {:?}",
627+
AsRef::<str>::as_ref(&n)
628+
);
629+
make_request(PollerState::More(n)).await
630+
}
631+
State::Done => {
632+
tracing::debug!("done");
633+
return None;
634+
}
623635
};
624636
let (item, next_state) = match result {
625637
Err(e) => return Some((Err(e), (State::Done, make_request, target_tx))),
@@ -830,7 +842,7 @@ mod tests {
830842
PollerStatus::InProgress => Ok(PollerResult::InProgress {
831843
response,
832844
retry_after: Some(Duration::ZERO),
833-
next: (),
845+
next: "",
834846
}),
835847
_ => Ok(PollerResult::Done { response }),
836848
}
@@ -915,7 +927,7 @@ mod tests {
915927
PollerStatus::InProgress => Ok(PollerResult::InProgress {
916928
response,
917929
retry_after: Some(Duration::ZERO),
918-
next: (),
930+
next: "",
919931
}),
920932
_ => Ok(PollerResult::Done { response }),
921933
}
@@ -1001,7 +1013,7 @@ mod tests {
10011013
PollerStatus::InProgress => Ok(PollerResult::InProgress {
10021014
response,
10031015
retry_after: Some(Duration::ZERO),
1004-
next: (),
1016+
next: "",
10051017
}),
10061018
_ => Ok(PollerResult::Done { response }),
10071019
}
@@ -1087,7 +1099,7 @@ mod tests {
10871099
PollerStatus::InProgress => Ok(PollerResult::InProgress {
10881100
response,
10891101
retry_after: Some(Duration::ZERO),
1090-
next: (),
1102+
next: "",
10911103
}),
10921104
PollerStatus::Succeeded => {
10931105
// Return the status response with a callback to fetch the final resource
@@ -1192,7 +1204,7 @@ mod tests {
11921204
PollerStatus::InProgress => Ok(PollerResult::InProgress {
11931205
response,
11941206
retry_after: Some(Duration::ZERO),
1195-
next: (),
1207+
next: "",
11961208
}),
11971209
PollerStatus::Succeeded => {
11981210
// Return the status response with a callback to fetch the final resource
@@ -1311,7 +1323,7 @@ mod tests {
13111323
PollerStatus::InProgress => Ok(PollerResult::InProgress {
13121324
response,
13131325
retry_after: Some(Duration::ZERO),
1314-
next: (),
1326+
next: "",
13151327
}),
13161328
PollerStatus::Succeeded => {
13171329
// Return the status response with a callback
@@ -1397,7 +1409,7 @@ mod tests {
13971409
PollerStatus::InProgress => Ok(PollerResult::InProgress {
13981410
response,
13991411
retry_after: Some(Duration::ZERO),
1400-
next: (),
1412+
next: "",
14011413
}),
14021414
_ => Ok(PollerResult::Done { response }),
14031415
}
@@ -1483,7 +1495,7 @@ mod tests {
14831495
PollerStatus::InProgress => Ok(PollerResult::InProgress {
14841496
response,
14851497
retry_after: Some(Duration::ZERO),
1486-
next: (),
1498+
next: "",
14871499
}),
14881500
PollerStatus::Succeeded => {
14891501
// Return the status response with a callback
@@ -1591,7 +1603,7 @@ mod tests {
15911603
PollerStatus::InProgress => Ok(PollerResult::InProgress {
15921604
response,
15931605
retry_after: Some(Duration::ZERO),
1594-
next: (),
1606+
next: "",
15951607
}),
15961608
PollerStatus::Succeeded => {
15971609
// The final result is already in the status response itself
@@ -1703,7 +1715,7 @@ mod tests {
17031715
PollerStatus::InProgress => Ok(PollerResult::InProgress {
17041716
response,
17051717
retry_after: Some(Duration::ZERO),
1706-
next: (),
1718+
next: "",
17071719
}),
17081720
PollerStatus::Succeeded => {
17091721
// The final result is already in the status response itself

sdk/core/azure_core_test/src/recording.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ impl Recording {
220220
/// ```
221221
/// # let recording = azure_core_test::Recording::with_seed();
222222
/// let dek: [u8; 32] = recording.random();
223-
/// # assert_eq!(typespec_client_core::base64::encode(dek), "HumPRAN6RqKWf0YhFV2CAFWu/8L/pwh0LRzeam5VlGo=");
223+
/// # assert_eq!(azure_core::base64::encode(dek), "HumPRAN6RqKWf0YhFV2CAFWu/8L/pwh0LRzeam5VlGo=");
224224
/// ```
225225
///
226226
/// Generate a UUID.

sdk/cosmos/azure_data_cosmos/src/pipeline/mod.rs

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ mod signature_target;
66

77
pub use authorization_policy::AuthorizationPolicy;
88
use azure_core::http::{
9-
pager::PagerState,
9+
pager::{PagerOptions, PagerState},
1010
request::{options::ContentType, Request},
1111
response::Response,
1212
ClientOptions, Context, Method, RawResponse, RetryOptions,
@@ -115,25 +115,30 @@ impl CosmosPipeline {
115115
// We have to double-clone here.
116116
// First we clone the pipeline to pass it in to the closure
117117
let pipeline = self.pipeline.clone();
118-
let ctx = ctx.with_value(resource_link).into_owned();
119-
Ok(FeedPager::from_callback(move |continuation| {
120-
// Then we have to clone it again to pass it in to the async block.
121-
// This is because Pageable can't borrow any data, it has to own it all.
122-
// That's probably good, because it means a Pageable can outlive the client that produced it, but it requires some extra cloning.
123-
let pipeline = pipeline.clone();
124-
let mut req = base_request.clone();
125-
let ctx = ctx.clone();
126-
async move {
127-
if let PagerState::More(continuation) = continuation {
128-
req.insert_header(constants::CONTINUATION, continuation);
118+
let options = PagerOptions {
119+
context: ctx.with_value(resource_link).into_owned(),
120+
};
121+
Ok(FeedPager::from_callback(
122+
move |continuation, ctx| {
123+
// Then we have to clone it again to pass it in to the async block.
124+
// This is because Pageable can't borrow any data, it has to own it all.
125+
// That's probably good, because it means a Pageable can outlive the client that produced it, but it requires some extra cloning.
126+
let pipeline = pipeline.clone();
127+
let mut req = base_request.clone();
128+
let ctx = ctx.clone();
129+
async move {
130+
if let PagerState::More(continuation) = continuation {
131+
req.insert_header(constants::CONTINUATION, continuation);
132+
}
133+
134+
let resp = pipeline.send(&ctx, &mut req, None).await?;
135+
let page = FeedPage::<T>::from_response(resp).await?;
136+
137+
Ok(page.into())
129138
}
130-
131-
let resp = pipeline.send(&ctx, &mut req, None).await?;
132-
let page = FeedPage::<T>::from_response(resp).await?;
133-
134-
Ok(page.into())
135-
}
136-
}))
139+
},
140+
Some(options),
141+
))
137142
}
138143

139144
/// Helper function to read a throughput offer given a resource ID.

sdk/keyvault/azure_security_keyvault_certificates/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@
77
### Breaking Changes
88

99
- `CertificateClient::create_certificate()` now returns a `Poller<CertificateOperation>`.
10+
- `CertificateClientListCertificatePropertiesOptions::method_options` is now `PagerOptions`.
11+
- `CertificateClientListCertificatePropertiesVersionsOptions::method_options` is now `PagerOptions`.
12+
- `CertificateClientListDeletedCertificatePropertiesOptions::method_options` is now `PagerOptions`.
13+
- `CertificateClientListIssuerPropertiesOptions::method_options` is now `PagerOptions`.
1014
- Removed `CertificateClient::begin_create_certificate()`.
1115
- Removed `CertificateClient::resume_create_certificate()`.
1216
- Removed `wait()` function from `Poller<CertificateOperation>`.

0 commit comments

Comments
 (0)