Skip to content

Commit 5902d42

Browse files
authored
RUST-2166 Update convenient transactions API to use async closures (#1372)
1 parent 34ce1e5 commit 5902d42

File tree

8 files changed

+164
-140
lines changed

8 files changed

+164
-140
lines changed

.evergreen/check-clippy.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ set -o errexit
55
source ./.evergreen/env.sh
66

77
# Pin clippy to the latest version. This should be updated when new versions of Rust are released.
8-
CLIPPY_VERSION=1.84.0
8+
CLIPPY_VERSION=1.85.0
99

1010
rustup install $CLIPPY_VERSION
1111

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ typed-builder = "0.20.0"
118118
webpki-roots = "0.26"
119119
zstd = { version = "0.11.2", optional = true }
120120
macro_magic = "0.5.1"
121+
rustversion = "1.0.20"
121122

122123
[dependencies.pbkdf2]
123124
version = "0.11.0"

src/client/session/action.rs

Lines changed: 138 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,67 @@ impl<'a> Action for StartTransaction<&'a mut ClientSession> {
9999
}
100100
}
101101

102+
macro_rules! convenient_run {
103+
(
104+
$session:expr,
105+
$start_transaction:expr,
106+
$callback:expr,
107+
$abort_transaction:expr,
108+
$commit_transaction:expr,
109+
) => {{
110+
let timeout = Duration::from_secs(120);
111+
#[cfg(test)]
112+
let timeout = $session.convenient_transaction_timeout.unwrap_or(timeout);
113+
let start = Instant::now();
114+
115+
use crate::error::{TRANSIENT_TRANSACTION_ERROR, UNKNOWN_TRANSACTION_COMMIT_RESULT};
116+
117+
'transaction: loop {
118+
$start_transaction?;
119+
let ret = match $callback {
120+
Ok(v) => v,
121+
Err(e) => {
122+
if matches!(
123+
$session.transaction.state,
124+
TransactionState::Starting | TransactionState::InProgress
125+
) {
126+
$abort_transaction?;
127+
}
128+
if e.contains_label(TRANSIENT_TRANSACTION_ERROR) && start.elapsed() < timeout {
129+
continue 'transaction;
130+
}
131+
return Err(e);
132+
}
133+
};
134+
if matches!(
135+
$session.transaction.state,
136+
TransactionState::None
137+
| TransactionState::Aborted
138+
| TransactionState::Committed { .. }
139+
) {
140+
return Ok(ret);
141+
}
142+
'commit: loop {
143+
match $commit_transaction {
144+
Ok(()) => return Ok(ret),
145+
Err(e) => {
146+
if e.is_max_time_ms_expired_error() || start.elapsed() >= timeout {
147+
return Err(e);
148+
}
149+
if e.contains_label(UNKNOWN_TRANSACTION_COMMIT_RESULT) {
150+
continue 'commit;
151+
}
152+
if e.contains_label(TRANSIENT_TRANSACTION_ERROR) {
153+
continue 'transaction;
154+
}
155+
return Err(e);
156+
}
157+
}
158+
}
159+
}
160+
}};
161+
}
162+
102163
impl StartTransaction<&mut ClientSession> {
103164
/// Starts a transaction, runs the given callback, and commits or aborts the transaction.
104165
/// Transient transaction errors will cause the callback or the commit to be retried;
@@ -146,66 +207,84 @@ impl StartTransaction<&mut ClientSession> {
146207
/// # Ok(())
147208
/// # }
148209
/// ```
210+
#[rustversion::attr(since(1.85), deprecated = "use and_run2")]
149211
pub async fn and_run<R, C, F>(self, mut context: C, mut callback: F) -> Result<R>
150212
where
151213
F: for<'b> FnMut(&'b mut ClientSession, &'b mut C) -> BoxFuture<'b, Result<R>>,
152214
{
153-
let timeout = Duration::from_secs(120);
154-
#[cfg(test)]
155-
let timeout = self
156-
.session
157-
.convenient_transaction_timeout
158-
.unwrap_or(timeout);
159-
let start = Instant::now();
160-
161-
use crate::error::{TRANSIENT_TRANSACTION_ERROR, UNKNOWN_TRANSACTION_COMMIT_RESULT};
215+
convenient_run!(
216+
self.session,
217+
self.session
218+
.start_transaction()
219+
.with_options(self.options.clone())
220+
.await,
221+
callback(self.session, &mut context).await,
222+
self.session.abort_transaction().await,
223+
self.session.commit_transaction().await,
224+
)
225+
}
162226

163-
'transaction: loop {
227+
/// Starts a transaction, runs the given callback, and commits or aborts the transaction.
228+
/// Transient transaction errors will cause the callback or the commit to be retried;
229+
/// other errors will cause the transaction to be aborted and the error returned to the
230+
/// caller. If the callback needs to provide its own error information, the
231+
/// [`Error::custom`](crate::error::Error::custom) method can accept an arbitrary payload that
232+
/// can be retrieved via [`Error::get_custom`](crate::error::Error::get_custom).
233+
///
234+
/// If a command inside the callback fails, it may cause the transaction on the server to be
235+
/// aborted. This situation is normally handled transparently by the driver. However, if the
236+
/// application does not return that error from the callback, the driver will not be able to
237+
/// determine whether the transaction was aborted or not. The driver will then retry the
238+
/// callback indefinitely. To avoid this situation, the application MUST NOT silently handle
239+
/// errors within the callback. If the application needs to handle errors within the
240+
/// callback, it MUST return them after doing so.
241+
///
242+
/// This version of the method uses an async closure, which means it's both more convenient and
243+
/// avoids the lifetime issues of `and_run`, but is only available in Rust versions 1.85 and
244+
/// above.
245+
///
246+
/// Because the callback can be repeatedly executed, code within the callback cannot consume
247+
/// owned values, even values owned by the callback itself:
248+
///
249+
/// ```no_run
250+
/// # use mongodb::{bson::{doc, Document}, error::Result, Client};
251+
/// # use futures::FutureExt;
252+
/// # async fn wrapper() -> Result<()> {
253+
/// # let client = Client::with_uri_str("mongodb://example.com").await?;
254+
/// # let mut session = client.start_session().await?;
255+
/// let coll = client.database("mydb").collection::<Document>("mycoll");
256+
/// let my_data = "my data".to_string();
257+
/// // This works:
258+
/// session.start_transaction().and_run2(
259+
/// async move |session| {
260+
/// coll.insert_one(doc! { "data": my_data.clone() }).session(session).await
261+
/// }
262+
/// ).await?;
263+
/// /* This will not compile:
264+
/// session.start_transaction().and_run2(
265+
/// async move |session| {
266+
/// coll.insert_one(doc! { "data": my_data }).session(session).await
267+
/// }
268+
/// ).await?;
269+
/// */
270+
/// # Ok(())
271+
/// # }
272+
/// ```
273+
#[rustversion::since(1.85)]
274+
pub async fn and_run2<R, F>(self, mut callback: F) -> Result<R>
275+
where
276+
F: for<'b> AsyncFnMut(&'b mut ClientSession) -> Result<R>,
277+
{
278+
convenient_run!(
279+
self.session,
164280
self.session
165281
.start_transaction()
166282
.with_options(self.options.clone())
167-
.await?;
168-
let ret = match callback(self.session, &mut context).await {
169-
Ok(v) => v,
170-
Err(e) => {
171-
if matches!(
172-
self.session.transaction.state,
173-
TransactionState::Starting | TransactionState::InProgress
174-
) {
175-
self.session.abort_transaction().await?;
176-
}
177-
if e.contains_label(TRANSIENT_TRANSACTION_ERROR) && start.elapsed() < timeout {
178-
continue 'transaction;
179-
}
180-
return Err(e);
181-
}
182-
};
183-
if matches!(
184-
self.session.transaction.state,
185-
TransactionState::None
186-
| TransactionState::Aborted
187-
| TransactionState::Committed { .. }
188-
) {
189-
return Ok(ret);
190-
}
191-
'commit: loop {
192-
match self.session.commit_transaction().await {
193-
Ok(()) => return Ok(ret),
194-
Err(e) => {
195-
if e.is_max_time_ms_expired_error() || start.elapsed() >= timeout {
196-
return Err(e);
197-
}
198-
if e.contains_label(UNKNOWN_TRANSACTION_COMMIT_RESULT) {
199-
continue 'commit;
200-
}
201-
if e.contains_label(TRANSIENT_TRANSACTION_ERROR) {
202-
continue 'transaction;
203-
}
204-
return Err(e);
205-
}
206-
}
207-
}
208-
}
283+
.await,
284+
callback(self.session).await,
285+
self.session.abort_transaction().await,
286+
self.session.commit_transaction().await,
287+
)
209288
}
210289
}
211290

@@ -238,57 +317,16 @@ impl StartTransaction<&mut crate::sync::ClientSession> {
238317
where
239318
F: for<'b> FnMut(&'b mut crate::sync::ClientSession) -> Result<R>,
240319
{
241-
let timeout = std::time::Duration::from_secs(120);
242-
let start = std::time::Instant::now();
243-
244-
use crate::error::{TRANSIENT_TRANSACTION_ERROR, UNKNOWN_TRANSACTION_COMMIT_RESULT};
245-
246-
'transaction: loop {
320+
convenient_run!(
321+
self.session.async_client_session,
247322
self.session
248323
.start_transaction()
249324
.with_options(self.options.clone())
250-
.run()?;
251-
let ret = match callback(self.session) {
252-
Ok(v) => v,
253-
Err(e) => {
254-
if matches!(
255-
self.session.async_client_session.transaction.state,
256-
TransactionState::Starting | TransactionState::InProgress
257-
) {
258-
self.session.abort_transaction().run()?;
259-
}
260-
if e.contains_label(TRANSIENT_TRANSACTION_ERROR) && start.elapsed() < timeout {
261-
continue 'transaction;
262-
}
263-
return Err(e);
264-
}
265-
};
266-
if matches!(
267-
self.session.async_client_session.transaction.state,
268-
TransactionState::None
269-
| TransactionState::Aborted
270-
| TransactionState::Committed { .. }
271-
) {
272-
return Ok(ret);
273-
}
274-
'commit: loop {
275-
match self.session.commit_transaction().run() {
276-
Ok(()) => return Ok(ret),
277-
Err(e) => {
278-
if e.is_max_time_ms_expired_error() || start.elapsed() >= timeout {
279-
return Err(e);
280-
}
281-
if e.contains_label(UNKNOWN_TRANSACTION_COMMIT_RESULT) {
282-
continue 'commit;
283-
}
284-
if e.contains_label(TRANSIENT_TRANSACTION_ERROR) {
285-
continue 'transaction;
286-
}
287-
return Err(e);
288-
}
289-
}
290-
}
291-
}
325+
.run(),
326+
callback(self.session),
327+
self.session.abort_transaction().run(),
328+
self.session.commit_transaction().run(),
329+
)
292330
}
293331
}
294332

src/test/documentation_examples.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1675,7 +1675,6 @@ async fn change_streams_examples() -> Result<()> {
16751675

16761676
async fn convenient_transaction_examples() -> Result<()> {
16771677
use crate::ClientSession;
1678-
use futures::FutureExt;
16791678
if !transactions_supported().await {
16801679
log_uncaptured(
16811680
"skipping convenient transaction API examples due to no transaction support",
@@ -1734,12 +1733,9 @@ async fn convenient_transaction_examples() -> Result<()> {
17341733
// Step 2: Start a client session.
17351734
let mut session = client.start_session().await?;
17361735

1737-
// Step 3: Use and_run to start a transaction, execute the callback, and commit (or
1736+
// Step 3: Use and_run2 to start a transaction, execute the callback, and commit (or
17381737
// abort on error).
1739-
session
1740-
.start_transaction()
1741-
.and_run((), |session, _| callback(session).boxed())
1742-
.await?;
1738+
session.start_transaction().and_run2(callback).await?;
17431739

17441740
// End Transactions withTxn API Example 1
17451741

0 commit comments

Comments
 (0)