Skip to content

Commit 2f83c27

Browse files
authored
drain and batching
1 parent bf4364e commit 2f83c27

File tree

3 files changed

+114
-57
lines changed

3 files changed

+114
-57
lines changed

sdk/cosmos/azure_data_cosmos/src/query/engine.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,12 @@ pub struct QueryRequest {
33
/// The ID of the partition key range to query.
44
pub partition_key_range_id: String,
55

6+
/// The index of this request, within the partition key range.
7+
///
8+
/// This value will always increase for subsequent requests for the same partition key range.
9+
/// It must be provided back to the pipeline when providing data, so that the pipeline can ensure that data is provided in order.
10+
pub index: usize,
11+
612
/// The continuation to use, if any.
713
pub continuation: Option<String>,
814

@@ -13,12 +19,26 @@ pub struct QueryRequest {
1319
///
1420
/// Sometimes, when an override query is specified, it differs in structure from the original query, and the original parameters are not valid.
1521
pub include_parameters: bool,
22+
23+
/// If specified, indicates that the SDK should IMMEDIATELY drain all remaining results from this partition key range, following continuation tokens, until no more results are available.
24+
/// All the data from this partition key range should be provided BEFORE any new items will be made available.
25+
///
26+
/// This allows engines to optimize for non-streaming scenarios, where the entire result set must be provided to the engine before it can make progress.
27+
pub drain: bool,
1628
}
1729

1830
/// The request of a single-partition query for a specific partition key range.
1931
pub struct QueryResult<'a> {
32+
/// The ID of the partition key range that was queried.
2033
pub partition_key_range_id: &'a str,
34+
35+
/// The index of the [`QueryRequest`] that generated this result.
36+
pub request_index: usize,
37+
38+
/// The continuation token to be used for the next request, if any.
2139
pub next_continuation: Option<String>,
40+
41+
/// The raw body of the response from the query.
2242
pub result: &'a [u8],
2343
}
2444

@@ -46,7 +66,16 @@ pub trait QueryPipeline: Send {
4666
fn run(&mut self) -> azure_core::Result<PipelineResult>;
4767

4868
/// Provides additional single-partition data to the pipeline.
49-
fn provide_data(&mut self, data: QueryResult) -> azure_core::Result<()>;
69+
///
70+
/// Data from multiple partition ranges may be provided at once.
71+
/// However, each page of data must be provided in order.
72+
/// So, for any given partition key range, page n's results must be earlier in the `data` vector than page n+1's results.
73+
/// Data from different partition key ranges may be interleaved, as long as each partition key range's pages are in order.
74+
///
75+
/// The pipeline will use the [`QueryResult::request_index`] field to validate this.
76+
///
77+
/// When providing data from a draining request (i.e. a request with `drain = true`), all pages for that draining request can share the same [`QueryResult::request_index`].
78+
fn provide_data(&mut self, data: Vec<QueryResult>) -> azure_core::Result<()>;
5079
}
5180

5281
/// Provides an interface to a query engine, which constructs query pipelines.

sdk/cosmos/azure_data_cosmos/src/query/executor.rs

Lines changed: 36 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,8 @@ impl<T: DeserializeOwned + Send + 'static> QueryExecutor<T> {
128128
}
129129

130130
// No items, so make any requests we need to make and provide them to the pipeline.
131-
for request in results.requests {
131+
// TODO: We can absolutely parallelize these requests.
132+
for mut request in results.requests {
132133
let mut query_request = if let Some(query) = request.query {
133134
let mut query = Query::from(query);
134135
if request.include_parameters {
@@ -146,30 +147,42 @@ impl<T: DeserializeOwned + Send + 'static> QueryExecutor<T> {
146147
constants::PARTITION_KEY_RANGE_ID,
147148
request.partition_key_range_id.clone(),
148149
);
149-
if let Some(continuation) = request.continuation {
150-
query_request.insert_header(constants::CONTINUATION, continuation);
151-
}
152150

153-
let resp = self
154-
.http_pipeline
155-
.send_raw(
156-
self.context.to_borrowed(),
157-
&mut query_request,
158-
self.items_link.clone(),
159-
)
160-
.await?;
161-
162-
let next_continuation =
163-
resp.headers().get_optional_string(&constants::CONTINUATION);
164-
let body = resp.into_body();
165-
166-
let result = QueryResult {
167-
partition_key_range_id: &request.partition_key_range_id,
168-
next_continuation,
169-
result: &body,
170-
};
151+
let mut draining = true;
152+
while draining {
153+
if let Some(c) = request.continuation.clone() {
154+
query_request.insert_header(constants::CONTINUATION, c);
155+
} else {
156+
// Make sure we don't send a continuation header if we don't have one, even if we did on a previous iteration.
157+
query_request.headers_mut().remove(constants::CONTINUATION);
158+
}
171159

172-
pipeline.provide_data(result)?;
160+
let resp = self
161+
.http_pipeline
162+
.send_raw(
163+
self.context.to_borrowed(),
164+
&mut query_request,
165+
self.items_link.clone(),
166+
)
167+
.await?;
168+
169+
let next_continuation =
170+
resp.headers().get_optional_string(&constants::CONTINUATION);
171+
172+
draining = request.drain && next_continuation.is_some();
173+
174+
let body = resp.into_body();
175+
let result = QueryResult {
176+
partition_key_range_id: &request.partition_key_range_id,
177+
request_index: request.index,
178+
next_continuation,
179+
result: &body,
180+
};
181+
182+
// For now, just provide a single result at a time.
183+
// When we parallelize requests, we can more easily provide multiple results at once.
184+
pipeline.provide_data(vec![result])?;
185+
}
173186
}
174187

175188
// No items, but we provided more data (probably), so continue the loop.

sdk/cosmos/azure_data_cosmos/tests/framework/query_engine.rs

Lines changed: 48 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ struct PartitionState {
114114
started: bool,
115115
queue: VecDeque<MockItem>,
116116
next_continuation: Option<String>,
117+
next_index: usize,
117118
}
118119

119120
impl PartitionState {
@@ -158,6 +159,7 @@ impl MockQueryPipeline {
158159
started: false,
159160
queue: VecDeque::new(),
160161
next_continuation: None,
162+
next_index: 0,
161163
})
162164
.collect();
163165

@@ -170,25 +172,26 @@ impl MockQueryPipeline {
170172
}
171173

172174
fn get_requests(&self) -> Vec<azure_data_cosmos::query::QueryRequest> {
175+
let (query, include_parameters) = if let Some(config) = &self.query_request_config {
176+
(config.query.clone(), config.include_parameters)
177+
} else {
178+
(None, false)
179+
};
180+
173181
self.partitions
174182
.iter()
175183
.filter(|state| !state.exhausted())
176-
.map(|state| azure_data_cosmos::query::QueryRequest {
184+
.map(move |state| azure_data_cosmos::query::QueryRequest {
177185
partition_key_range_id: state.range.id.clone(),
186+
index: state.next_index,
178187
continuation: if state.started {
179188
state.next_continuation.clone()
180189
} else {
181190
None
182191
},
183-
query: self
184-
.query_request_config
185-
.as_ref()
186-
.and_then(|config| config.query.clone()),
187-
include_parameters: self
188-
.query_request_config
189-
.as_ref()
190-
.map(|config| config.include_parameters)
191-
.unwrap_or(false),
192+
query: query.clone(),
193+
include_parameters,
194+
drain: false,
192195
})
193196
.collect()
194197
}
@@ -266,31 +269,43 @@ impl QueryPipeline for MockQueryPipeline {
266269

267270
fn provide_data(
268271
&mut self,
269-
data: azure_data_cosmos::query::QueryResult,
272+
data: std::vec::Vec<azure_data_cosmos::query::QueryResult<'_>>,
270273
) -> azure_core::Result<()> {
271-
let payload: DocumentPayload<MockItem> =
272-
serde_json::from_slice(data.result).map_err(|_| {
273-
azure_core::Error::with_message(
274+
for data in data {
275+
let payload: DocumentPayload<MockItem> =
276+
serde_json::from_slice(data.result).map_err(|_| {
277+
azure_core::Error::with_message(
278+
azure_core::error::ErrorKind::Other,
279+
"Failed to deserialize payload",
280+
)
281+
})?;
282+
283+
let partition_state = self
284+
.partitions
285+
.iter_mut()
286+
.find(|state| state.range.id == data.partition_key_range_id);
287+
if let Some(partition_state) = partition_state {
288+
if partition_state.next_index != data.request_index {
289+
return Err(azure_core::Error::with_message(
290+
azure_core::error::ErrorKind::Other,
291+
format!(
292+
"Out of order data provided for partition key range {}: expected index {}, got {}",
293+
data.partition_key_range_id, partition_state.next_index, data.request_index
294+
),
295+
));
296+
}
297+
partition_state.next_index += 1;
298+
partition_state.provide_data(payload.documents, data.next_continuation);
299+
} else {
300+
return Err(azure_core::Error::with_message(
274301
azure_core::error::ErrorKind::Other,
275-
"Failed to deserialize payload",
276-
)
277-
})?;
278-
279-
let partition_state = self
280-
.partitions
281-
.iter_mut()
282-
.find(|state| state.range.id == data.partition_key_range_id);
283-
if let Some(partition_state) = partition_state {
284-
partition_state.provide_data(payload.documents, data.next_continuation);
285-
Ok(())
286-
} else {
287-
Err(azure_core::Error::with_message(
288-
azure_core::error::ErrorKind::Other,
289-
format!(
290-
"Partition key range {} not found",
291-
data.partition_key_range_id
292-
),
293-
))
302+
format!(
303+
"Partition key range {} not found",
304+
data.partition_key_range_id
305+
),
306+
));
307+
}
294308
}
309+
Ok(())
295310
}
296311
}

0 commit comments

Comments
 (0)