Skip to content

Commit

Permalink
refactor(persistence): get rid of the StopFuture and return a BoxFutu…
Browse files Browse the repository at this point in the history
…re instead

The StopFuture does not introduce anything and is not needed. We now directly use a JoinHandle
and return a BoxFuture that awaits for it instead
  • Loading branch information
oktal committed Mar 11, 2024
1 parent 3fc39bf commit b21ff73
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 47 deletions.
33 changes: 0 additions & 33 deletions zebus/src/persistence/future.rs

This file was deleted.

1 change: 0 additions & 1 deletion zebus/src/persistence/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
pub(crate) mod command;
pub mod event;
pub mod future;
mod service;
pub mod transport;

Expand Down
12 changes: 6 additions & 6 deletions zebus/src/persistence/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,7 @@ pub(super) fn spawn<T, S>(
) -> (
mpsc::Sender<PersistenceRequest>,
BoxStream<'static, PersistenceEvent>,
super::future::StopFuture,
tokio::task::JoinHandle<Result<(), PersistenceError>>,
)
where
S: Stream<Item = BusEvent> + Send + 'static,
Expand All @@ -849,8 +849,9 @@ where
shutdown,
};

let fut = super::future::StopFuture::spawn(service.run(bus_events_rx));
(tx, events_rx.boxed(), fut)
//let fut = super::future::StopFuture::spawn(service.run(bus_events_rx));
let handle = tokio::spawn(service.run(bus_events_rx));
(tx, events_rx.boxed(), handle)
} else {
let service = TransientService {
inner,
Expand All @@ -859,13 +860,12 @@ where
shutdown,
};

let fut = super::future::StopFuture::spawn(service.run());

let events_rx = futures_util::stream::iter([
PersistenceEvent::ReplayStarted,
PersistenceEvent::SafetyStarted,
PersistenceEvent::Normal,
]);
(tx, events_rx.boxed(), fut)
let handle = tokio::spawn(service.run());
(tx, events_rx.boxed(), handle)
}
}
21 changes: 14 additions & 7 deletions zebus/src/persistence/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ enum Inner<T> {
/// Token to cancel task
shutdown: CancellationToken,

/// Future that resolves when the task has been joined
stop: super::future::StopFuture,
/// Join handle for the persistence task
handle: tokio::task::JoinHandle<Result<(), PersistenceError>>,
},
}

Expand Down Expand Up @@ -96,7 +96,7 @@ where
type MessageStream = super::MessageStream;

type StartCompletionFuture = BoxFuture<'static, Result<(), Self::Err>>;
type StopCompletionFuture = super::future::StopFuture;
type StopCompletionFuture = BoxFuture<'static, Result<(), Self::Err>>;
type SendFuture = transport::future::SendFuture<PersistenceRequest, Self::Err>;

fn configure(
Expand Down Expand Up @@ -174,7 +174,7 @@ where
let shutdown = CancellationToken::new();

// Spawn a new task for the persistence service
let (requests_tx, mut events_rx, stop) = super::service::spawn(
let (requests_tx, mut events_rx, handle) = super::service::spawn(
&configuration,
directory,
event_rx.stream(),
Expand Down Expand Up @@ -210,7 +210,7 @@ where
messages_tx,
requests_tx,
shutdown,
stop,
handle,
}),
Ok(completion),
)
Expand All @@ -224,14 +224,21 @@ where

fn stop(&mut self) -> Result<Self::StopCompletionFuture, Self::Err> {
match self.inner.take() {
Some(Inner::Started { shutdown, stop, .. }) => {
Some(Inner::Started {
shutdown, handle, ..
}) => {
// Cancel the task
shutdown.cancel();

// TODO(oktal): we should go back to the `Configured` state but we need to retrieve
// the inner transport layer that was moved to an inner task

Ok(stop)
Ok(Box::pin(async move {
match handle.await {
Ok(res) => res,
Err(e) => Err(PersistenceError::Join(e)),
}
}))
}
_ => Err(PersistenceError::InvalidOperation),
}
Expand Down

0 comments on commit b21ff73

Please sign in to comment.