From a893989ebefd1146bc030967fb220fa1c1354892 Mon Sep 17 00:00:00 2001 From: Gustavo Madeira Krieger Date: Tue, 2 Jul 2024 15:02:09 -0300 Subject: [PATCH] feat(claimer): remove broker and redis --- cmd/authority-claimer/Cargo.lock | 88 ---- cmd/authority-claimer/Cargo.toml | 2 - cmd/authority-claimer/src/config.rs | 9 +- cmd/authority-claimer/src/lib.rs | 4 - cmd/authority-claimer/src/listener.rs | 212 --------- cmd/authority-claimer/src/redacted.rs | 76 ++-- .../src/rollups_events/broker.rs | 411 ------------------ .../src/rollups_events/mod.rs | 7 +- .../src/rollups_events/rollups_claims.rs | 23 +- cmd/authority-claimer/src/test_fixtures.rs | 154 ------- 10 files changed, 41 insertions(+), 945 deletions(-) delete mode 100644 cmd/authority-claimer/src/listener.rs delete mode 100644 cmd/authority-claimer/src/rollups_events/broker.rs delete mode 100644 cmd/authority-claimer/src/test_fixtures.rs diff --git a/cmd/authority-claimer/Cargo.lock b/cmd/authority-claimer/Cargo.lock index 59f1fbcbd..f731ad749 100644 --- a/cmd/authority-claimer/Cargo.lock +++ b/cmd/authority-claimer/Cargo.lock @@ -135,12 +135,6 @@ version = "1.0.81" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0952808a6c2afd1aa8947271f3a60f1a6763c7b912d210184c5149b5cf147247" -[[package]] -name = "arc-swap" -version = "1.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" - [[package]] name = "arrayvec" version = "0.7.4" @@ -226,7 +220,6 @@ version = "1.4.0" dependencies = [ "async-trait", "axum", - "backoff", "base64 0.22.0", "clap", "eth-state-fold", @@ -236,7 +229,6 @@ dependencies = [ "ethers-signers", "hex", "prometheus-client", - "redis", "reqwest", "rusoto_core", "rusoto_kms", @@ -341,20 +333,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "backoff" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1" -dependencies = [ - "futures-core", - "getrandom", - "instant", - "pin-project-lite", - "rand", - "tokio", -] - [[package]] name = "backtrace" version = "0.3.71" @@ -784,20 +762,6 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" -[[package]] -name = "combine" -version = "4.6.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35ed6e9d84f0b51a7f52daf1c7d71dd136fd7a3f41a8462b8cdb8c78d920fad4" -dependencies = [ - "bytes", - "futures-core", - "memchr", - "pin-project-lite", - "tokio", - "tokio-util", -] - [[package]] name = "const-oid" version = "0.9.6" @@ -859,12 +823,6 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" -[[package]] -name = "crc16" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "338089f42c427b86394a5ee60ff321da23a5c89c9d89514c829687b26359fcff" - [[package]] name = "crc32fast" version = "1.4.0" @@ -3184,35 +3142,6 @@ dependencies = [ "crossbeam-utils", ] -[[package]] -name = "redis" -version = "0.25.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6472825949c09872e8f2c50bde59fcefc17748b6be5c90fd67cd8b4daca73bfd" -dependencies = [ - "arc-swap", - "async-trait", - "bytes", - "combine", - "crc16", - "futures", - "futures-util", - "itoa", - "log", - "native-tls", - "percent-encoding", - "pin-project-lite", - "rand", - "ryu", - "sha1_smol", - "socket2", - "tokio", - "tokio-native-tls", - "tokio-retry", - "tokio-util", - "url", -] - [[package]] name = "redox_syscall" version = "0.2.16" @@ -3904,12 +3833,6 @@ dependencies = [ "digest 0.10.7", ] -[[package]] -name = "sha1_smol" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012" - [[package]] name = "sha2" version = "0.8.2" @@ -4626,17 +4549,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-retry" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f" -dependencies = [ - "pin-project", - "rand", - "tokio", -] - [[package]] name = "tokio-rustls" version = "0.23.4" diff --git a/cmd/authority-claimer/Cargo.toml b/cmd/authority-claimer/Cargo.toml index 355d22b02..1f78eacdf 100644 --- a/cmd/authority-claimer/Cargo.toml +++ b/cmd/authority-claimer/Cargo.toml @@ -13,7 +13,6 @@ test = false [dependencies] async-trait = "0.1" axum = "0.7" -backoff = {version = "0.4", features = ["tokio"]} base64 = "0.22" clap = {version = "4.5", features = ["string", "derive", "env"]} ethers = "1.0" @@ -23,7 +22,6 @@ eth-state-fold-types = {version = "0.9", features = ["ethers"]} eth-tx-manager = "0.10" hex = "0.4" prometheus-client = "0.22" -redis = {version = "0.25", features = ["streams", "tokio-comp", "connection-manager", "tls-native-tls", "tokio-native-tls-comp", "cluster", "cluster-async"]} reqwest = "=0.11.24" # Set specific reqwest version to fix the build rusoto_core = "0.48" rusoto_kms = "0.48" diff --git a/cmd/authority-claimer/src/config.rs b/cmd/authority-claimer/src/config.rs index 0edd348b4..53db32488 100644 --- a/cmd/authority-claimer/src/config.rs +++ b/cmd/authority-claimer/src/config.rs @@ -4,7 +4,7 @@ use crate::{ log::{LogConfig, LogEnvCliConfig}, redacted::Redacted, - rollups_events::{Address, BrokerCLIConfig, BrokerConfig, HexArrayError}, + rollups_events::{Address, HexArrayError}, }; use clap::{command, Parser}; use eth_tx_manager::{ @@ -48,7 +48,6 @@ pub struct Config { pub tx_manager_config: TxManagerConfig, pub tx_signing_config: TxSigningConfig, pub tx_manager_priority: Priority, - pub broker_config: BrokerConfig, pub log_config: LogConfig, pub iconsensus_address: Address, pub genesis_block: u64, @@ -83,8 +82,6 @@ impl Config { let tx_signing_config = TxSigningConfig::try_from(cli_config.tx_signing_config)?; - let broker_config = BrokerConfig::from(cli_config.broker_config); - let log_config = LogConfig::initialize(cli_config.log_config); let iconsensus_address = cli_config @@ -96,7 +93,6 @@ impl Config { tx_manager_config, tx_signing_config, tx_manager_priority: Priority::Normal, - broker_config, log_config, iconsensus_address, genesis_block: cli_config.genesis_block, @@ -115,9 +111,6 @@ struct AuthorityClaimerCLI { #[command(flatten)] pub tx_signing_config: TxSigningCLIConfig, - #[command(flatten)] - pub broker_config: BrokerCLIConfig, - #[command(flatten)] pub log_config: LogEnvCliConfig, diff --git a/cmd/authority-claimer/src/lib.rs b/cmd/authority-claimer/src/lib.rs index 12576d607..a0d6f3e2a 100644 --- a/cmd/authority-claimer/src/lib.rs +++ b/cmd/authority-claimer/src/lib.rs @@ -6,7 +6,6 @@ mod claimer; mod config; mod contracts; mod http_server; -mod listener; pub mod log; mod metrics; mod redacted; @@ -16,9 +15,6 @@ mod sender; mod signer; mod types; -#[cfg(test)] -mod test_fixtures; - use checker::DefaultDuplicateChecker; use claimer::{Claimer, DefaultClaimer}; pub use config::Config; diff --git a/cmd/authority-claimer/src/listener.rs b/cmd/authority-claimer/src/listener.rs deleted file mode 100644 index 358979250..000000000 --- a/cmd/authority-claimer/src/listener.rs +++ /dev/null @@ -1,212 +0,0 @@ -// (c) Cartesi and individual authors (see AUTHORS) -// SPDX-License-Identifier: Apache-2.0 (see LICENSE) - -use crate::rollups_events::{ - Broker, BrokerConfig, BrokerError, RollupsClaim, RollupsClaimsStream, - INITIAL_ID, -}; -use async_trait::async_trait; -use snafu::ResultExt; -use std::fmt::Debug; - -/// The `BrokerListener` listens for new claims from the broker -#[async_trait] -pub trait BrokerListener: Debug { - type Error: snafu::Error + 'static; - - /// Listen to claims - async fn listen(&mut self) -> Result; -} - -// ------------------------------------------------------------------------------------------------ -// DefaultBrokerListener -// ------------------------------------------------------------------------------------------------ - -#[derive(Debug)] -pub struct DefaultBrokerListener { - broker: Broker, - stream: RollupsClaimsStream, - last_claim_id: String, -} - -#[derive(Debug, snafu::Snafu)] -pub enum BrokerListenerError { - #[snafu(display("broker error"))] - BrokerError { source: BrokerError }, -} - -impl DefaultBrokerListener { - pub async fn new( - broker_config: BrokerConfig, - chain_id: u64, - ) -> Result { - tracing::trace!("Connecting to the broker ({:?})", broker_config); - let broker = Broker::new(broker_config).await?; - let stream = RollupsClaimsStream::new(chain_id); - let last_claim_id = INITIAL_ID.to_string(); - Ok(Self { - broker, - stream, - last_claim_id, - }) - } -} - -#[async_trait] -impl BrokerListener for DefaultBrokerListener { - type Error = BrokerListenerError; - - async fn listen(&mut self) -> Result { - tracing::trace!("Waiting for claim with id {}", self.last_claim_id); - let event = self - .broker - .consume_blocking(&self.stream, &self.last_claim_id) - .await - .context(BrokerSnafu)?; - - self.last_claim_id = event.id; - - Ok(event.payload) - } -} - -#[cfg(test)] -mod tests { - use crate::listener::{BrokerListener, DefaultBrokerListener}; - use crate::redacted::{RedactedUrl, Url}; - use crate::rollups_events::{ - broker::BrokerEndpoint, BrokerConfig, BrokerError, RollupsClaim, - }; - use crate::test_fixtures::BrokerFixture; - use backoff::ExponentialBackoffBuilder; - use std::time::Duration; - use testcontainers::clients::Cli; - - // ------------------------------------------------------------------------------------------------ - // Broker Mock - // ------------------------------------------------------------------------------------------------ - - pub async fn setup_broker( - docker: &Cli, - should_fail: bool, - ) -> Result<(BrokerFixture, DefaultBrokerListener), BrokerError> { - let fixture = BrokerFixture::setup(docker).await; - - let redis_endpoint = if should_fail { - BrokerEndpoint::Single(RedactedUrl::new( - Url::parse("https://invalid.com").unwrap(), - )) - } else { - fixture.redis_endpoint().clone() - }; - - let config = BrokerConfig { - redis_endpoint, - consume_timeout: 300000, - backoff: ExponentialBackoffBuilder::new() - .with_initial_interval(Duration::from_millis(1000)) - .with_max_elapsed_time(Some(Duration::from_millis(3000))) - .build(), - }; - let broker = - DefaultBrokerListener::new(config, fixture.chain_id()).await?; - Ok((fixture, broker)) - } - - pub async fn produce_rollups_claims( - fixture: &BrokerFixture<'_>, - n: usize, - epoch_index_start: usize, - ) -> Vec { - let mut rollups_claims = Vec::new(); - for i in 0..n { - let mut rollups_claim = RollupsClaim::default(); - rollups_claim.epoch_index = (i + epoch_index_start) as u64; - fixture.produce_rollups_claim(rollups_claim.clone()).await; - rollups_claims.push(rollups_claim); - } - rollups_claims - } - - /// The last claim should trigger an `EndError` error. - pub async fn produce_last_claim( - fixture: &BrokerFixture<'_>, - epoch_index: usize, - ) -> Vec { - produce_rollups_claims(fixture, 1, epoch_index).await - } - - // ------------------------------------------------------------------------------------------------ - // Listener Unit Tests - // ------------------------------------------------------------------------------------------------ - - #[tokio::test] - async fn instantiate_new_broker_listener_ok() { - let docker = Cli::default(); - let _ = setup_broker(&docker, false).await; - } - - #[tokio::test] - async fn instantiate_new_broker_listener_error() { - let docker = Cli::default(); - let result = setup_broker(&docker, true).await; - assert!(result.is_err(), "setup_broker didn't fail as it should"); - let error = result.err().unwrap().to_string(); - assert_eq!(error, "error connecting to Redis"); - } - - #[tokio::test] - async fn start_broker_listener_with_one_claim_enqueued() { - let docker = Cli::default(); - let (fixture, mut broker_listener) = - setup_broker(&docker, false).await.unwrap(); - let n = 5; - produce_rollups_claims(&fixture, n, 0).await; - produce_last_claim(&fixture, n).await; - let result = broker_listener.listen().await; - assert!(result.is_ok()); - } - - #[tokio::test] - async fn start_broker_listener_with_claims_enqueued() { - let docker = Cli::default(); - let (fixture, mut broker_listener) = - setup_broker(&docker, false).await.unwrap(); - produce_last_claim(&fixture, 0).await; - let claim = broker_listener.listen().await; - assert!(claim.is_ok()); - } - - #[tokio::test] - async fn start_broker_listener_listener_with_no_claims_enqueued() { - let docker = Cli::default(); - let (fixture, mut broker_listener) = - setup_broker(&docker, false).await.unwrap(); - let n = 7; - - let broker_listener_thread = tokio::spawn(async move { - println!("Spawned the broker-listener thread."); - let claim = broker_listener.listen().await; - assert!(claim.is_ok()); - }); - - println!("Going to sleep for 1 second."); - tokio::time::sleep(Duration::from_secs(1)).await; - - let x = 2; - println!("Creating {} claims.", x); - produce_rollups_claims(&fixture, x, 0).await; - - println!("Going to sleep for 2 seconds."); - tokio::time::sleep(Duration::from_secs(2)).await; - - let y = 5; - println!("Creating {} claims.", y); - produce_rollups_claims(&fixture, y, x).await; - - assert_eq!(x + y, n); - produce_last_claim(&fixture, n).await; - - broker_listener_thread.await.unwrap(); - } -} diff --git a/cmd/authority-claimer/src/redacted.rs b/cmd/authority-claimer/src/redacted.rs index 00158e8f4..a86f22637 100644 --- a/cmd/authority-claimer/src/redacted.rs +++ b/cmd/authority-claimer/src/redacted.rs @@ -39,17 +39,17 @@ fn redacts_debug_fmt() { pub struct RedactedUrl(Url); impl RedactedUrl { - pub fn new(url: Url) -> Self { - Self(url) - } + // pub fn new(url: Url) -> Self { + // Self(url) + // } pub fn inner(&self) -> &Url { &self.0 } - pub fn into_inner(self) -> Url { - self.0 - } + // pub fn into_inner(self) -> Url { + // self.0 + // } } impl fmt::Debug for RedactedUrl { @@ -83,35 +83,35 @@ impl fmt::Debug for RedactedUrl { } } -#[test] -fn redacts_valid_url_without_credentials() { - let url = RedactedUrl::new(Url::parse("http://example.com/").unwrap()); - assert_eq!(format!("{:?}", url), "http://example.com/"); -} - -#[test] -fn redacts_valid_url_with_username() { - let url = - RedactedUrl::new(Url::parse("http://james@example.com/").unwrap()); - assert_eq!(format!("{:?}", url), "http://***@example.com/"); -} - -#[test] -fn redacts_valid_url_with_password() { - let url = - RedactedUrl::new(Url::parse("http://:bond@example.com/").unwrap()); - assert_eq!(format!("{:?}", url), "http://:***@example.com/"); -} - -#[test] -fn redacts_valid_url_with_full_credentials() { - let url = - RedactedUrl::new(Url::parse("http://james:bond@example.com/").unwrap()); - assert_eq!(format!("{:?}", url), "http://***:***@example.com/"); -} - -#[test] -fn redacts_non_base_url() { - let url = RedactedUrl::new(Url::parse("james:bond@example.com").unwrap()); - assert_eq!(format!("{:?}", url), "[NON-BASE URL REDACTED]"); -} +// #[test] +// fn redacts_valid_url_without_credentials() { +// let url = RedactedUrl::new(Url::parse("http://example.com/").unwrap()); +// assert_eq!(format!("{:?}", url), "http://example.com/"); +// } + +// #[test] +// fn redacts_valid_url_with_username() { +// let url = +// RedactedUrl::new(Url::parse("http://james@example.com/").unwrap()); +// assert_eq!(format!("{:?}", url), "http://***@example.com/"); +// } + +// #[test] +// fn redacts_valid_url_with_password() { +// let url = +// RedactedUrl::new(Url::parse("http://:bond@example.com/").unwrap()); +// assert_eq!(format!("{:?}", url), "http://:***@example.com/"); +// } + +// #[test] +// fn redacts_valid_url_with_full_credentials() { +// let url = +// RedactedUrl::new(Url::parse("http://james:bond@example.com/").unwrap()); +// assert_eq!(format!("{:?}", url), "http://***:***@example.com/"); +// } + +// #[test] +// fn redacts_non_base_url() { +// let url = RedactedUrl::new(Url::parse("james:bond@example.com").unwrap()); +// assert_eq!(format!("{:?}", url), "[NON-BASE URL REDACTED]"); +// } diff --git a/cmd/authority-claimer/src/rollups_events/broker.rs b/cmd/authority-claimer/src/rollups_events/broker.rs deleted file mode 100644 index 51f31bd17..000000000 --- a/cmd/authority-claimer/src/rollups_events/broker.rs +++ /dev/null @@ -1,411 +0,0 @@ -// (c) Cartesi and individual authors (see AUTHORS) -// SPDX-License-Identifier: Apache-2.0 (see LICENSE) - -use crate::redacted::{RedactedUrl, Url}; -use backoff::{future::retry, ExponentialBackoff, ExponentialBackoffBuilder}; -use clap::Parser; -use redis::{ - aio::{ConnectionLike, ConnectionManager}, - cluster::ClusterClient, - cluster_async::ClusterConnection, - streams::{StreamId, StreamRangeReply, StreamReadOptions, StreamReadReply}, - AsyncCommands, Client, Cmd, Pipeline, RedisError, RedisFuture, Value, -}; -use serde::{de::DeserializeOwned, Serialize}; -use snafu::{ResultExt, Snafu}; -use std::{fmt, time::Duration}; - -pub const INITIAL_ID: &str = "0"; - -/// The `BrokerConnection` enum implements the `ConnectionLike` trait -/// to satisfy the `AsyncCommands` trait bounds. -/// As `AsyncCommands` requires its implementors to be `Sized`, we couldn't -/// use a trait object instead. -#[derive(Clone)] -enum BrokerConnection { - ConnectionManager(ConnectionManager), - ClusterConnection(ClusterConnection), -} - -impl ConnectionLike for BrokerConnection { - fn req_packed_command<'a>( - &'a mut self, - cmd: &'a Cmd, - ) -> RedisFuture<'a, Value> { - match self { - Self::ConnectionManager(connection) => { - connection.req_packed_command(cmd) - } - Self::ClusterConnection(connection) => { - connection.req_packed_command(cmd) - } - } - } - - fn req_packed_commands<'a>( - &'a mut self, - cmd: &'a Pipeline, - offset: usize, - count: usize, - ) -> RedisFuture<'a, Vec> { - match self { - Self::ConnectionManager(connection) => { - connection.req_packed_commands(cmd, offset, count) - } - Self::ClusterConnection(connection) => { - connection.req_packed_commands(cmd, offset, count) - } - } - } - - fn get_db(&self) -> i64 { - match self { - Self::ConnectionManager(connection) => connection.get_db(), - Self::ClusterConnection(connection) => connection.get_db(), - } - } -} - -/// Client that connects to the broker -#[derive(Clone)] -pub struct Broker { - connection: BrokerConnection, - backoff: ExponentialBackoff, - consume_timeout: usize, -} - -impl Broker { - /// Create a new client - /// The broker_address should be in the format redis://host:port/db. - #[tracing::instrument(level = "trace", skip_all)] - pub async fn new(config: BrokerConfig) -> Result { - tracing::trace!(?config, "connecting to broker"); - - let connection = retry(config.backoff.clone(), || async { - match config.redis_endpoint.clone() { - BrokerEndpoint::Single(endpoint) => { - tracing::trace!("creating Redis Client"); - let client = Client::open(endpoint.inner().as_str())?; - - tracing::trace!("creating Redis ConnectionManager"); - let connection = ConnectionManager::new(client).await?; - - Ok(BrokerConnection::ConnectionManager(connection)) - } - BrokerEndpoint::Cluster(endpoints) => { - tracing::trace!("creating Redis Cluster Client"); - let client = ClusterClient::new( - endpoints - .iter() - .map(|endpoint| endpoint.inner().as_str()) - .collect::>(), - )?; - tracing::trace!("connecting to Redis Cluster"); - let connection = client.get_async_connection().await?; - Ok(BrokerConnection::ClusterConnection(connection)) - } - } - }) - .await - .context(ConnectionSnafu)?; - - tracing::trace!("returning successful connection"); - Ok(Self { - connection, - backoff: config.backoff, - consume_timeout: config.consume_timeout, - }) - } - - /// Produce an event and return its id - #[tracing::instrument(level = "trace", skip_all)] - pub async fn produce( - &mut self, - stream: &S, - payload: S::Payload, - ) -> Result { - tracing::trace!("converting payload to JSON string"); - let payload = - serde_json::to_string(&payload).context(InvalidPayloadSnafu)?; - - let event_id = retry(self.backoff.clone(), || async { - tracing::trace!( - stream_key = stream.key(), - payload, - "producing event" - ); - let event_id = self - .connection - .clone() - .xadd(stream.key(), "*", &[("payload", &payload)]) - .await?; - - Ok(event_id) - }) - .await - .context(ConnectionSnafu)?; - - tracing::trace!(event_id, "returning event id"); - Ok(event_id) - } - - /// Peek at the end of the stream - /// This function doesn't block; if there is no event in the stream it returns None. - #[tracing::instrument(level = "trace", skip_all)] - pub async fn peek_latest( - &mut self, - stream: &S, - ) -> Result>, BrokerError> { - let mut reply = retry(self.backoff.clone(), || async { - tracing::trace!(stream_key = stream.key(), "peeking at the stream"); - let reply: StreamRangeReply = self - .connection - .clone() - .xrevrange_count(stream.key(), "+", "-", 1) - .await?; - - Ok(reply) - }) - .await - .context(ConnectionSnafu)?; - - if let Some(event) = reply.ids.pop() { - tracing::trace!("parsing received event"); - Some(event.try_into()).transpose() - } else { - tracing::trace!("stream is empty"); - Ok(None) - } - } - - #[tracing::instrument(level = "trace", skip_all)] - async fn _consume_blocking( - &mut self, - stream: &S, - last_consumed_id: &str, - ) -> Result, BrokerError> { - let mut reply = retry(self.backoff.clone(), || async { - tracing::trace!( - stream_key = stream.key(), - last_consumed_id, - "consuming event" - ); - let opts = StreamReadOptions::default() - .count(1) - .block(self.consume_timeout); - let reply: StreamReadReply = self - .connection - .clone() - .xread_options(&[stream.key()], &[last_consumed_id], &opts) - .await?; - - Ok(reply) - }) - .await - .context(ConnectionSnafu)?; - - tracing::trace!("checking for timeout"); - let mut events = reply.keys.pop().ok_or(BrokerError::ConsumeTimeout)?; - - tracing::trace!("checking if event was received"); - let event = events.ids.pop().ok_or(BrokerError::FailedToConsume)?; - - tracing::trace!("parsing received event"); - event.try_into() - } - - /// Consume the next event in stream - /// - /// This function blocks until a new event is available - /// and retries whenever a timeout happens instead of returning an error. - /// - /// To consume the first event in the stream, `last_consumed_id` should be `INITIAL_ID`. - #[tracing::instrument(level = "trace", skip_all)] - pub async fn consume_blocking( - &mut self, - stream: &S, - last_consumed_id: &str, - ) -> Result, BrokerError> { - loop { - let result = self._consume_blocking(stream, last_consumed_id).await; - - if let Err(BrokerError::ConsumeTimeout) = result { - tracing::trace!("consume timed out, retrying"); - } else { - return result; - } - } - } - - /// Consume the next event in stream without blocking - /// This function returns None if there are no more remaining events. - /// To consume the first event in the stream, `last_consumed_id` should be `INITIAL_ID`. - #[tracing::instrument(level = "trace", skip_all)] - pub async fn consume_nonblocking( - &mut self, - stream: &S, - last_consumed_id: &str, - ) -> Result>, BrokerError> { - let mut reply = retry(self.backoff.clone(), || async { - tracing::trace!( - stream_key = stream.key(), - last_consumed_id, - "consuming event (non-blocking)" - ); - let opts = StreamReadOptions::default().count(1); - let reply: StreamReadReply = self - .connection - .clone() - .xread_options(&[stream.key()], &[last_consumed_id], &opts) - .await?; - - Ok(reply) - }) - .await - .context(ConnectionSnafu)?; - - tracing::trace!("checking if event was received"); - if let Some(mut events) = reply.keys.pop() { - let event = events.ids.pop().ok_or(BrokerError::FailedToConsume)?; - tracing::trace!("parsing received event"); - Some(event.try_into()).transpose() - } else { - tracing::trace!("stream is empty"); - Ok(None) - } - } -} - -/// Custom implementation of Debug because ConnectionManager doesn't implement debug -impl fmt::Debug for Broker { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Broker") - .field("consume_timeout", &self.consume_timeout) - .finish() - } -} - -/// Trait that defines the type of a stream -pub trait BrokerStream { - type Payload: Serialize + DeserializeOwned + Clone + Eq + PartialEq; - fn key(&self) -> &str; -} - -/// Event that goes through the broker -#[derive(Debug, Clone, Eq, PartialEq)] -pub struct Event { - pub id: String, - pub payload: P, -} - -impl TryFrom - for Event

-{ - type Error = BrokerError; - - #[tracing::instrument(level = "trace", skip_all)] - fn try_from(stream_id: StreamId) -> Result, BrokerError> { - tracing::trace!("getting event payload"); - let payload = stream_id - .get::("payload") - .ok_or(BrokerError::InvalidEvent)?; - let id = stream_id.id; - - tracing::trace!(id, payload, "received event"); - - tracing::trace!("parsing JSON payload"); - let payload = - serde_json::from_str(&payload).context(InvalidPayloadSnafu)?; - - tracing::trace!("returning event"); - Ok(Event { id, payload }) - } -} - -#[derive(Debug, Snafu)] -pub enum BrokerError { - #[snafu(display("error connecting to Redis"))] - ConnectionError { source: RedisError }, - - #[snafu(display("failed to consume event"))] - FailedToConsume, - - #[snafu(display("timed out when consuming event"))] - ConsumeTimeout, - - #[snafu(display("event in invalid format"))] - InvalidEvent, - - #[snafu(display("error parsing event payload"))] - InvalidPayload { source: serde_json::Error }, -} - -#[derive(Debug, Parser)] -#[command(name = "broker")] -pub struct BrokerCLIConfig { - /// Redis address - #[arg(long, env, default_value = "redis://127.0.0.1:6379")] - redis_endpoint: String, - - /// Address list of Redis cluster nodes, defined by a single string - /// separated by commas. If present, it supersedes `redis_endpoint`. - /// A single endpoint can be enough as the client will discover - /// other nodes automatically - #[arg(long, env, num_args = 1.., value_delimiter = ',')] - redis_cluster_endpoints: Option>, - - /// Timeout when consuming input events (in millis) - #[arg(long, env, default_value = "5000")] - broker_consume_timeout: usize, - - /// The max elapsed time for backoff in ms - #[arg(long, env, default_value = "120000")] - broker_backoff_max_elapsed_duration: u64, -} - -#[derive(Debug, Clone)] -pub enum BrokerEndpoint { - Single(RedactedUrl), - Cluster(Vec), -} - -#[derive(Debug, Clone)] -pub struct BrokerConfig { - pub redis_endpoint: BrokerEndpoint, - pub consume_timeout: usize, - pub backoff: ExponentialBackoff, -} - -impl From for BrokerConfig { - fn from(cli_config: BrokerCLIConfig) -> BrokerConfig { - let max_elapsed_time = Duration::from_millis( - cli_config.broker_backoff_max_elapsed_duration, - ); - let backoff = ExponentialBackoffBuilder::new() - .with_max_elapsed_time(Some(max_elapsed_time)) - .build(); - let redis_endpoint = - if let Some(endpoints) = cli_config.redis_cluster_endpoints { - let urls = endpoints - .iter() - .map(|endpoint| { - RedactedUrl::new( - Url::parse(endpoint) - .expect("failed to parse Redis URL"), - ) - }) - .collect(); - BrokerEndpoint::Cluster(urls) - } else { - let url = Url::parse(&cli_config.redis_endpoint) - .map(RedactedUrl::new) - .expect("failed to parse Redis URL"); - BrokerEndpoint::Single(url) - }; - BrokerConfig { - redis_endpoint, - consume_timeout: cli_config.broker_consume_timeout, - backoff, - } - } -} diff --git a/cmd/authority-claimer/src/rollups_events/mod.rs b/cmd/authority-claimer/src/rollups_events/mod.rs index bc4506b02..60ad85f27 100644 --- a/cmd/authority-claimer/src/rollups_events/mod.rs +++ b/cmd/authority-claimer/src/rollups_events/mod.rs @@ -1,15 +1,10 @@ // (c) Cartesi and individual authors (see AUTHORS) // SPDX-License-Identifier: Apache-2.0 (see LICENSE) -pub mod broker; pub mod common; pub mod rollups_claims; pub mod rollups_stream; -pub use broker::{ - Broker, BrokerCLIConfig, BrokerConfig, BrokerError, BrokerStream, - INITIAL_ID, -}; pub use common::{Address, Hash, HexArrayError}; -pub use rollups_claims::{RollupsClaim, RollupsClaimsStream}; +pub use rollups_claims::RollupsClaim; pub use rollups_stream::DAppMetadata; diff --git a/cmd/authority-claimer/src/rollups_events/rollups_claims.rs b/cmd/authority-claimer/src/rollups_events/rollups_claims.rs index 2d216ed61..3595b8cd5 100644 --- a/cmd/authority-claimer/src/rollups_events/rollups_claims.rs +++ b/cmd/authority-claimer/src/rollups_events/rollups_claims.rs @@ -1,30 +1,9 @@ // (c) Cartesi and individual authors (see AUTHORS) // SPDX-License-Identifier: Apache-2.0 (see LICENSE) -use super::{Address, BrokerStream, Hash}; +use super::{Address, Hash}; use serde::{Deserialize, Serialize}; -#[derive(Debug)] -pub struct RollupsClaimsStream { - key: String, -} - -impl BrokerStream for RollupsClaimsStream { - type Payload = RollupsClaim; - - fn key(&self) -> &str { - &self.key - } -} - -impl RollupsClaimsStream { - pub fn new(chain_id: u64) -> Self { - Self { - key: format!("{{chain-{}}}:rollups-claims", chain_id), - } - } -} - /// Event generated when the Cartesi Rollups epoch finishes #[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize)] pub struct RollupsClaim { diff --git a/cmd/authority-claimer/src/test_fixtures.rs b/cmd/authority-claimer/src/test_fixtures.rs deleted file mode 100644 index 2e7dc0f69..000000000 --- a/cmd/authority-claimer/src/test_fixtures.rs +++ /dev/null @@ -1,154 +0,0 @@ -// (c) Cartesi and individual authors (see AUTHORS) -// SPDX-License-Identifier: Apache-2.0 (see LICENSE) - -use crate::{ - redacted::{RedactedUrl, Url}, - rollups_events::{ - broker::BrokerEndpoint, common::ADDRESS_SIZE, Address, Broker, - BrokerConfig, DAppMetadata, RollupsClaim, RollupsClaimsStream, - INITIAL_ID, - }, -}; -use backoff::ExponentialBackoff; -use testcontainers::{ - clients::Cli, core::WaitFor, images::generic::GenericImage, Container, -}; -use tokio::sync::Mutex; - -const CHAIN_ID: u64 = 0; -const DAPP_ADDRESS: Address = Address::new([0xfa; ADDRESS_SIZE]); -const CONSUME_TIMEOUT: usize = 10_000; // ms - -pub struct BrokerFixture<'d> { - _node: Container<'d, GenericImage>, - client: Mutex, - claims_stream: RollupsClaimsStream, - redis_endpoint: BrokerEndpoint, - chain_id: u64, -} - -impl BrokerFixture<'_> { - #[tracing::instrument(level = "trace", skip_all)] - pub async fn setup(docker: &Cli) -> BrokerFixture<'_> { - tracing::info!("setting up redis fixture"); - - tracing::trace!("starting redis docker container"); - let image = GenericImage::new("redis", "6.2").with_wait_for( - WaitFor::message_on_stdout("Ready to accept connections"), - ); - let node = docker.run(image); - let port = node.get_host_port_ipv4(6379); - let redis_endpoint = BrokerEndpoint::Single( - Url::parse(&format!("redis://127.0.0.1:{}", port)) - .map(RedactedUrl::new) - .expect("failed to parse Redis Url"), - ); - let chain_id = CHAIN_ID; - let backoff = ExponentialBackoff::default(); - let metadata = DAppMetadata { - chain_id, - dapp_address: DAPP_ADDRESS.clone(), - }; - let claims_stream = RollupsClaimsStream::new(metadata.chain_id); - let config = BrokerConfig { - redis_endpoint: redis_endpoint.clone(), - consume_timeout: CONSUME_TIMEOUT, - backoff, - }; - - tracing::trace!( - ?redis_endpoint, - "connecting to redis with rollups_events crate" - ); - let client = Mutex::new( - Broker::new(config) - .await - .expect("failed to connect to broker"), - ); - BrokerFixture { - _node: node, - client, - claims_stream, - redis_endpoint, - chain_id, - } - } - - pub fn redis_endpoint(&self) -> &BrokerEndpoint { - &self.redis_endpoint - } - - pub fn chain_id(&self) -> u64 { - self.chain_id - } - - /// Produce the claim given the hash - #[tracing::instrument(level = "trace", skip_all)] - pub async fn produce_rollups_claim(&self, rollups_claim: RollupsClaim) { - tracing::trace!(?rollups_claim.epoch_hash, "producing rollups-claim event"); - { - let last_claim = self - .client - .lock() - .await - .peek_latest(&self.claims_stream) - .await - .expect("failed to get latest claim"); - let epoch_index = match last_claim { - Some(event) => event.payload.epoch_index + 1, - None => 0, - }; - assert_eq!( - rollups_claim.epoch_index, epoch_index, - "invalid epoch index", - ); - } - self.client - .lock() - .await - .produce(&self.claims_stream, rollups_claim) - .await - .expect("failed to produce claim"); - } - - /// Obtain all produced claims - #[tracing::instrument(level = "trace", skip_all)] - pub async fn consume_all_claims(&self) -> Vec { - tracing::trace!("consuming all rollups-claims events"); - let mut claims = vec![]; - let mut last_id = INITIAL_ID.to_owned(); - while let Some(event) = self - .client - .lock() - .await - .consume_nonblocking(&self.claims_stream, &last_id) - .await - .expect("failed to consume claim") - { - claims.push(event.payload); - last_id = event.id; - } - claims - } - - /// Obtain the first n produced claims - /// Panic in case of timeout - #[tracing::instrument(level = "trace", skip_all)] - pub async fn consume_n_claims(&self, n: usize) -> Vec { - tracing::trace!(n, "consuming n rollups-claims events"); - let mut claims = vec![]; - let mut last_id = INITIAL_ID.to_owned(); - for _ in 0..n { - let event = self - .client - .lock() - .await - .consume_blocking(&self.claims_stream, &last_id) - .await - .expect("failed to consume claim"); - claims.push(event.payload); - last_id = event.id - } - claims - } -}