Skip to content

Commit

Permalink
chore(conductor): implement clippy pedantic suggestions (#573)
Browse files Browse the repository at this point in the history
## Summary
Implement all clippy pedantic suggestions for conductor, activate in CI.

## Background
These changes were made for consistency. All repositories should be
checked with clippy pedantic.

## Changes
- Run `cargo clippy -- -W clippy::pedantic`, fix all nags
- Remove `astria-conductor` from the `--exclude` list in the clippy CI.
- Shuffle around some code (remove `types` module, move it under
`executor` and `data_availability`, where they are more appropriate)
-Slim down functions that were too big and unwieldy

## Testing
All tests still pass. No extra tests are necessary because no structural
refactoring took place.

## Related Issues
Closes #570
  • Loading branch information
SuperFluffy committed Nov 10, 2023
1 parent 29f8bfe commit 709c06c
Show file tree
Hide file tree
Showing 14 changed files with 241 additions and 261 deletions.
7 changes: 0 additions & 7 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,8 @@ jobs:
cargo clippy --workspace \
--all-targets \
--all-features \
--exclude astria-conductor \
-- --warn clippy::pedantic \
--deny warnings
- name: run default clippy
run: |
cargo clippy --workspace \
--all-targets \
--all-features \
-- --deny warnings
test:
if: ${{ always() && !cancelled() }}
Expand Down
4 changes: 2 additions & 2 deletions crates/astria-conductor/src/block_verifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ mod test {
let chain_ids_commitment_inclusion_proof = tx_tree.prove_inclusion(1).unwrap();

let mut header = astria_sequencer_types::test_utils::default_header();
let height = header.height.value() as u32;
let height = header.height.value().try_into().unwrap();
header.data_hash = Some(Hash::try_from(data_hash.to_vec()).unwrap());

let (validator_set, proposer_address, commit) =
Expand Down Expand Up @@ -481,7 +481,7 @@ mod test {
let chain_ids_commitment_inclusion_proof = tx_tree.prove_inclusion(1).unwrap();

let mut header = astria_sequencer_types::test_utils::default_header();
let height = header.height.value() as u32;
let height = header.height.value().try_into().unwrap();
header.data_hash = Some(Hash::try_from(data_hash.to_vec()).unwrap());

let (validator_set, proposer_address, commit) =
Expand Down
95 changes: 54 additions & 41 deletions crates/astria-conductor/src/client_provider.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::time::Duration;

use async_trait::async_trait;
use color_eyre::eyre::{
self,
Expand All @@ -16,15 +18,21 @@ use tokio::{
},
task::JoinHandle,
};
use tracing::instrument::Instrumented;
use tracing::{
instrument::Instrumented,
warn,
};
use tryhard::{
backoff_strategies::ExponentialBackoff,
OnRetry,
RetryFutureConfig,
};

type ClientRx = mpsc::UnboundedReceiver<oneshot::Sender<Result<WebSocketClient, Error>>>;
type ClientTx = mpsc::UnboundedSender<oneshot::Sender<Result<WebSocketClient, Error>>>;

pub(super) async fn start_pool(url: &str) -> eyre::Result<Pool<ClientProvider>> {
let client_provider = ClientProvider::new(url)
.await
.wrap_err("failed initializing sequencer client provider")?;
pub(super) fn start_pool(url: &str) -> eyre::Result<Pool<ClientProvider>> {
let client_provider = ClientProvider::new(url);
Pool::builder(client_provider)
.max_size(50)
.build()
Expand All @@ -44,20 +52,47 @@ pub(crate) struct ClientProvider {
_provider_loop: Instrumented<JoinHandle<()>>,
}

fn make_retry_config(
attempts: u32,
) -> RetryFutureConfig<
ExponentialBackoff,
impl Copy + OnRetry<sequencer_client::tendermint_rpc::Error>,
> {
RetryFutureConfig::new(attempts)
.exponential_backoff(Duration::from_secs(5))
.max_delay(Duration::from_secs(60))
.on_retry(
|attempt,
next_delay: Option<Duration>,
error: &sequencer_client::tendermint_rpc::Error| {
let error = error.clone();
let wait_duration = next_delay
.map(humantime::format_duration)
.map(tracing::field::display);
async move {
let error = &error as &(dyn std::error::Error + 'static);
warn!(
attempt,
wait_duration,
error,
"attempt to connect to sequencer websocket failed; retrying after backoff",
);
}
},
)
}

impl ClientProvider {
const RECONNECTION_ATTEMPTS: u32 = 1024;

pub(crate) async fn new(url: &str) -> eyre::Result<Self> {
use std::time::Duration;

pub(crate) fn new(url: &str) -> Self {
use futures::{
future::FusedFuture as _,
FutureExt as _,
};
use tracing::{
info,
info_span,
warn,
Instrument as _,
};
let (client_tx, mut client_rx): (ClientTx, ClientRx) = mpsc::unbounded_channel();
Expand All @@ -67,32 +102,10 @@ impl ClientProvider {
strategy = "exponential backoff",
"connecting to sequencer websocket"
);
let retry_config = tryhard::RetryFutureConfig::new(Self::RECONNECTION_ATTEMPTS)
.exponential_backoff(Duration::from_secs(5))
.max_delay(Duration::from_secs(60))
.on_retry(
|attempt,
next_delay: Option<Duration>,
error: &sequencer_client::tendermint_rpc::Error| {
let error = error.clone();
let wait_duration = next_delay
.map(humantime::format_duration)
.map(tracing::field::display);
async move {
let error = &error as &(dyn std::error::Error + 'static);
warn!(
attempt,
wait_duration,
error,
"attempt to connect to sequencer websocket failed; retrying after \
backoff",
);
}
},
);
let retry_config = make_retry_config(Self::RECONNECTION_ATTEMPTS);

let url_ = url.to_string();
let _provider_loop = tokio::spawn(async move {
let provider_loop = tokio::spawn(async move {
let mut client = None;
let mut driver_task = futures::future::Fuse::terminated();
let mut reconnect = tryhard::retry_fn(|| {
Expand All @@ -114,7 +127,7 @@ impl ClientProvider {
Ok(Err(e)) => ("error", Some(eyre::Report::new(e).wrap_err("driver task exited with error"))),
Err(e) => ("panic", Some(eyre::Report::new(e).wrap_err("driver task failed"))),
};
let error: Option<&(dyn std::error::Error + 'static)> = err.as_ref().map(|e| e.as_ref());
let error: Option<&(dyn std::error::Error + 'static)> = err.as_ref().map(AsRef::as_ref);
warn!(
error,
reason,
Expand Down Expand Up @@ -158,14 +171,14 @@ impl ClientProvider {
pending_requests.push(tx);
}
}
)
);
}
}).instrument(info_span!("client provider loop", url));

Ok(Self {
Self {
client_tx,
_provider_loop,
})
_provider_loop: provider_loop,
}
}

async fn get(&self) -> Result<WebSocketClient, Error> {
Expand Down Expand Up @@ -227,10 +240,10 @@ pub(crate) mod mock {
let mut module = RpcModule::new(());
module.register_method("say_hello", |_, _| "lo").unwrap();
let address = server.local_addr().unwrap();
let _handle = server.start(module);
let pool = start_pool(&format!("ws://{address}")).await.unwrap();
let handle = server.start(module);
let pool = start_pool(&format!("ws://{address}")).unwrap();
Self {
_handle,
_handle: handle,
pool,
}
}
Expand Down
79 changes: 35 additions & 44 deletions crates/astria-conductor/src/conductor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,32 +47,38 @@ use crate::{
};

pub struct Conductor {
/// Listens for several unix signals and notifies its subscribers.
signals: SignalReceiver,
/// The data availability reader that is spawned after sync is completed.
/// Constructed if constructed if `disable_finalization = false`.
data_availability_reader: Option<data_availability::Reader>,

/// The different long-running tasks that make up the conductor;
tasks: JoinMap<&'static str, eyre::Result<()>>,
/// The object pool of sequencer clients that restarts the websocket connection
/// on failure.
sequencer_client_pool: deadpool::managed::Pool<ClientProvider>,

/// Channels to the long-running tasks to shut them down gracefully
shutdown_channels: HashMap<&'static str, oneshot::Sender<()>>,

/// The object pool of sequencer clients that restarts the websocket connection
/// on failure.
sequencer_client_pool: deadpool::managed::Pool<ClientProvider>,
/// Listens for several unix signals and notifies its subscribers.
signals: SignalReceiver,

/// The channel over which the sequencer reader task notifies conductor that sync is completed.
sync_done: Fuse<oneshot::Receiver<()>>,

/// The data availability reader that is spawned after sync is completed.
/// Constructed if constructed if `disable_finalization = false`.
data_availability_reader: Option<data_availability::Reader>,
/// The different long-running tasks that make up the conductor;
tasks: JoinMap<&'static str, eyre::Result<()>>,
}

impl Conductor {
const DATA_AVAILABILITY: &'static str = "data_availability";
const EXECUTOR: &'static str = "executor";
const SEQUENCER: &'static str = "sequencer";

/// Create a new [`Conductor`] from a [`Config`].
///
/// # Errors
/// Returns an error in the following cases if one of its constituent
/// actors could not be spawned (executor, sequencer reader, or data availability reader).
/// This usually happens if the actors failed to connect to their respective endpoints.
pub async fn new(cfg: Config) -> eyre::Result<Self> {
use futures::FutureExt;

Expand Down Expand Up @@ -105,7 +111,6 @@ impl Conductor {
};

let sequencer_client_pool = client_provider::start_pool(&cfg.sequencer_url)
.await
.wrap_err("failed to create sequencer client pool")?;

// Spawn the sequencer task
Expand Down Expand Up @@ -165,68 +170,50 @@ impl Conductor {
};

Ok(Self {
signals,
tasks,
data_availability_reader,
sequencer_client_pool,
shutdown_channels,
signals,
sync_done,
data_availability_reader,
tasks,
})
}

pub async fn run_until_stopped(self) -> eyre::Result<()> {
use futures::future::{
FusedFuture as _,
FutureExt as _,
};

let Self {
signals:
SignalReceiver {
mut reload_rx,
mut stop_rx,
},
mut tasks,
shutdown_channels,
sequencer_client_pool,
sync_done,
mut data_availability_reader,
} = self;

let mut sync_done = sync_done.fuse();
pub async fn run_until_stopped(mut self) {
use futures::future::FusedFuture as _;

loop {
select! {
// FIXME: The bias should only be on the signal channels. The two handlers should have the same bias.
biased;

_ = stop_rx.changed() => {
_ = self.signals.stop_rx.changed() => {
info!("shutting down conductor");
break;
}

_ = reload_rx.changed() => {
_ = self.signals.reload_rx.changed() => {
info!("reloading is currently not implemented");
}

res = &mut sync_done, if !sync_done.is_terminated() => {
res = &mut self.sync_done, if !self.sync_done.is_terminated() => {
match res {
Ok(()) => info!("received sync-complete signal from sequencer reader"),
Err(e) => {
let error = &e as &(dyn std::error::Error + 'static);
warn!(error, "sync-complete channel failed prematurely");
}
}
if let Some(data_availability_reader) = data_availability_reader.take() {
if let Some(data_availability_reader) = self.data_availability_reader.take() {
info!("starting data availability reader");
tasks.spawn(
self.tasks.spawn(
Self::DATA_AVAILABILITY,
data_availability_reader.run_until_stopped(),
);
}
}

Some((name, res)) = tasks.join_next() => {
Some((name, res)) = self.tasks.join_next() => {
match res {
Ok(Ok(())) => error!(task.name = name, "task exited unexpectedly, shutting down"),
Ok(Err(e)) => {
Expand All @@ -242,16 +229,21 @@ impl Conductor {
}
}

info!("shutting down conductor");
self.shutdown().await;
}

async fn shutdown(self) {
info!("sending shutdown command to all tasks");
for (_, channel) in shutdown_channels {
for (_, channel) in self.shutdown_channels {
let _ = channel.send(());
}

sequencer_client_pool.close();
self.sequencer_client_pool.close();

info!("waiting 5 seconds for all tasks to shut down");
// put the tasks into an Rc to make them 'static so they can run on a local set
let mut tasks = Rc::new(tasks);
let mut tasks = Rc::new(self.tasks);
let local_set = LocalSet::new();
local_set
.run_until(async {
Expand Down Expand Up @@ -294,7 +286,6 @@ impl Conductor {
.shutdown()
.await;
}
Ok(())
}
}

Expand Down
4 changes: 2 additions & 2 deletions crates/astria-conductor/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ pub enum CommitLevel {
}

impl CommitLevel {
pub fn is_soft_only(&self) -> bool {
pub(crate) fn is_soft_only(&self) -> bool {
matches!(self, Self::SoftOnly)
}

pub fn is_firm_only(&self) -> bool {
pub(crate) fn is_firm_only(&self) -> bool {
matches!(self, Self::FirmOnly)
}
}
Expand Down
Loading

0 comments on commit 709c06c

Please sign in to comment.