diff --git a/build/Dockerfile b/build/Dockerfile index 4b928875f..1187e8c3a 100644 --- a/build/Dockerfile +++ b/build/Dockerfile @@ -422,8 +422,6 @@ RUN < Result; -} - -// ------------------------------------------------------------------------------------------------ -// DefaultBrokerListener -// ------------------------------------------------------------------------------------------------ - -#[derive(Debug)] -pub struct DefaultBrokerListener { - broker: Broker, - stream: RollupsClaimsStream, - last_claim_id: String, -} - -#[derive(Debug, snafu::Snafu)] -pub enum BrokerListenerError { - #[snafu(display("broker error"))] - BrokerError { source: BrokerError }, -} - -impl DefaultBrokerListener { - pub async fn new( - broker_config: BrokerConfig, - chain_id: u64, - ) -> Result { - tracing::trace!("Connecting to the broker ({:?})", broker_config); - let broker = Broker::new(broker_config).await?; - let stream = RollupsClaimsStream::new(chain_id); - let last_claim_id = INITIAL_ID.to_string(); - Ok(Self { - broker, - stream, - last_claim_id, - }) - } -} - -#[async_trait] -impl BrokerListener for DefaultBrokerListener { - type Error = BrokerListenerError; - - async fn listen(&mut self) -> Result { - tracing::trace!("Waiting for claim with id {}", self.last_claim_id); - let event = self - .broker - .consume_blocking(&self.stream, &self.last_claim_id) - .await - .context(BrokerSnafu)?; - - self.last_claim_id = event.id; - - Ok(event.payload) - } -} - -#[cfg(test)] -mod tests { - use crate::listener::{BrokerListener, DefaultBrokerListener}; - use crate::redacted::{RedactedUrl, Url}; - use crate::rollups_events::{ - broker::BrokerEndpoint, BrokerConfig, BrokerError, RollupsClaim, - }; - use crate::test_fixtures::BrokerFixture; - use backoff::ExponentialBackoffBuilder; - use std::time::Duration; - use testcontainers::clients::Cli; - - // ------------------------------------------------------------------------------------------------ - // Broker Mock - // ------------------------------------------------------------------------------------------------ - - pub async fn setup_broker( - docker: &Cli, - should_fail: bool, - ) -> Result<(BrokerFixture, DefaultBrokerListener), BrokerError> { - let fixture = BrokerFixture::setup(docker).await; - - let redis_endpoint = if should_fail { - BrokerEndpoint::Single(RedactedUrl::new( - Url::parse("https://invalid.com").unwrap(), - )) - } else { - fixture.redis_endpoint().clone() - }; - - let config = BrokerConfig { - redis_endpoint, - consume_timeout: 300000, - backoff: ExponentialBackoffBuilder::new() - .with_initial_interval(Duration::from_millis(1000)) - .with_max_elapsed_time(Some(Duration::from_millis(3000))) - .build(), - }; - let broker = - DefaultBrokerListener::new(config, fixture.chain_id()).await?; - Ok((fixture, broker)) - } - - pub async fn produce_rollups_claims( - fixture: &BrokerFixture<'_>, - n: usize, - epoch_index_start: usize, - ) -> Vec { - let mut rollups_claims = Vec::new(); - for i in 0..n { - let mut rollups_claim = RollupsClaim::default(); - rollups_claim.epoch_index = (i + epoch_index_start) as u64; - fixture.produce_rollups_claim(rollups_claim.clone()).await; - rollups_claims.push(rollups_claim); - } - rollups_claims - } - - /// The last claim should trigger an `EndError` error. - pub async fn produce_last_claim( - fixture: &BrokerFixture<'_>, - epoch_index: usize, - ) -> Vec { - produce_rollups_claims(fixture, 1, epoch_index).await - } - - // ------------------------------------------------------------------------------------------------ - // Listener Unit Tests - // ------------------------------------------------------------------------------------------------ - - #[tokio::test] - async fn instantiate_new_broker_listener_ok() { - let docker = Cli::default(); - let _ = setup_broker(&docker, false).await; - } - - #[tokio::test] - async fn instantiate_new_broker_listener_error() { - let docker = Cli::default(); - let result = setup_broker(&docker, true).await; - assert!(result.is_err(), "setup_broker didn't fail as it should"); - let error = result.err().unwrap().to_string(); - assert_eq!(error, "error connecting to Redis"); - } - - #[tokio::test] - async fn start_broker_listener_with_one_claim_enqueued() { - let docker = Cli::default(); - let (fixture, mut broker_listener) = - setup_broker(&docker, false).await.unwrap(); - let n = 5; - produce_rollups_claims(&fixture, n, 0).await; - produce_last_claim(&fixture, n).await; - let result = broker_listener.listen().await; - assert!(result.is_ok()); - } - - #[tokio::test] - async fn start_broker_listener_with_claims_enqueued() { - let docker = Cli::default(); - let (fixture, mut broker_listener) = - setup_broker(&docker, false).await.unwrap(); - produce_last_claim(&fixture, 0).await; - let claim = broker_listener.listen().await; - assert!(claim.is_ok()); - } - - #[tokio::test] - async fn start_broker_listener_listener_with_no_claims_enqueued() { - let docker = Cli::default(); - let (fixture, mut broker_listener) = - setup_broker(&docker, false).await.unwrap(); - let n = 7; - - let broker_listener_thread = tokio::spawn(async move { - println!("Spawned the broker-listener thread."); - let claim = broker_listener.listen().await; - assert!(claim.is_ok()); - }); - - println!("Going to sleep for 1 second."); - tokio::time::sleep(Duration::from_secs(1)).await; - - let x = 2; - println!("Creating {} claims.", x); - produce_rollups_claims(&fixture, x, 0).await; - - println!("Going to sleep for 2 seconds."); - tokio::time::sleep(Duration::from_secs(2)).await; - - let y = 5; - println!("Creating {} claims.", y); - produce_rollups_claims(&fixture, y, x).await; - - assert_eq!(x + y, n); - produce_last_claim(&fixture, n).await; - - broker_listener_thread.await.unwrap(); - } -} diff --git a/cmd/authority-claimer/src/redacted.rs b/cmd/authority-claimer/src/redacted.rs index 00158e8f4..476ac619b 100644 --- a/cmd/authority-claimer/src/redacted.rs +++ b/cmd/authority-claimer/src/redacted.rs @@ -38,6 +38,7 @@ fn redacts_debug_fmt() { #[derive(Clone)] pub struct RedactedUrl(Url); +#[allow(dead_code)] impl RedactedUrl { pub fn new(url: Url) -> Self { Self(url) diff --git a/cmd/authority-claimer/src/rollups_events/broker.rs b/cmd/authority-claimer/src/rollups_events/broker.rs deleted file mode 100644 index 51f31bd17..000000000 --- a/cmd/authority-claimer/src/rollups_events/broker.rs +++ /dev/null @@ -1,411 +0,0 @@ -// (c) Cartesi and individual authors (see AUTHORS) -// SPDX-License-Identifier: Apache-2.0 (see LICENSE) - -use crate::redacted::{RedactedUrl, Url}; -use backoff::{future::retry, ExponentialBackoff, ExponentialBackoffBuilder}; -use clap::Parser; -use redis::{ - aio::{ConnectionLike, ConnectionManager}, - cluster::ClusterClient, - cluster_async::ClusterConnection, - streams::{StreamId, StreamRangeReply, StreamReadOptions, StreamReadReply}, - AsyncCommands, Client, Cmd, Pipeline, RedisError, RedisFuture, Value, -}; -use serde::{de::DeserializeOwned, Serialize}; -use snafu::{ResultExt, Snafu}; -use std::{fmt, time::Duration}; - -pub const INITIAL_ID: &str = "0"; - -/// The `BrokerConnection` enum implements the `ConnectionLike` trait -/// to satisfy the `AsyncCommands` trait bounds. -/// As `AsyncCommands` requires its implementors to be `Sized`, we couldn't -/// use a trait object instead. -#[derive(Clone)] -enum BrokerConnection { - ConnectionManager(ConnectionManager), - ClusterConnection(ClusterConnection), -} - -impl ConnectionLike for BrokerConnection { - fn req_packed_command<'a>( - &'a mut self, - cmd: &'a Cmd, - ) -> RedisFuture<'a, Value> { - match self { - Self::ConnectionManager(connection) => { - connection.req_packed_command(cmd) - } - Self::ClusterConnection(connection) => { - connection.req_packed_command(cmd) - } - } - } - - fn req_packed_commands<'a>( - &'a mut self, - cmd: &'a Pipeline, - offset: usize, - count: usize, - ) -> RedisFuture<'a, Vec> { - match self { - Self::ConnectionManager(connection) => { - connection.req_packed_commands(cmd, offset, count) - } - Self::ClusterConnection(connection) => { - connection.req_packed_commands(cmd, offset, count) - } - } - } - - fn get_db(&self) -> i64 { - match self { - Self::ConnectionManager(connection) => connection.get_db(), - Self::ClusterConnection(connection) => connection.get_db(), - } - } -} - -/// Client that connects to the broker -#[derive(Clone)] -pub struct Broker { - connection: BrokerConnection, - backoff: ExponentialBackoff, - consume_timeout: usize, -} - -impl Broker { - /// Create a new client - /// The broker_address should be in the format redis://host:port/db. - #[tracing::instrument(level = "trace", skip_all)] - pub async fn new(config: BrokerConfig) -> Result { - tracing::trace!(?config, "connecting to broker"); - - let connection = retry(config.backoff.clone(), || async { - match config.redis_endpoint.clone() { - BrokerEndpoint::Single(endpoint) => { - tracing::trace!("creating Redis Client"); - let client = Client::open(endpoint.inner().as_str())?; - - tracing::trace!("creating Redis ConnectionManager"); - let connection = ConnectionManager::new(client).await?; - - Ok(BrokerConnection::ConnectionManager(connection)) - } - BrokerEndpoint::Cluster(endpoints) => { - tracing::trace!("creating Redis Cluster Client"); - let client = ClusterClient::new( - endpoints - .iter() - .map(|endpoint| endpoint.inner().as_str()) - .collect::>(), - )?; - tracing::trace!("connecting to Redis Cluster"); - let connection = client.get_async_connection().await?; - Ok(BrokerConnection::ClusterConnection(connection)) - } - } - }) - .await - .context(ConnectionSnafu)?; - - tracing::trace!("returning successful connection"); - Ok(Self { - connection, - backoff: config.backoff, - consume_timeout: config.consume_timeout, - }) - } - - /// Produce an event and return its id - #[tracing::instrument(level = "trace", skip_all)] - pub async fn produce( - &mut self, - stream: &S, - payload: S::Payload, - ) -> Result { - tracing::trace!("converting payload to JSON string"); - let payload = - serde_json::to_string(&payload).context(InvalidPayloadSnafu)?; - - let event_id = retry(self.backoff.clone(), || async { - tracing::trace!( - stream_key = stream.key(), - payload, - "producing event" - ); - let event_id = self - .connection - .clone() - .xadd(stream.key(), "*", &[("payload", &payload)]) - .await?; - - Ok(event_id) - }) - .await - .context(ConnectionSnafu)?; - - tracing::trace!(event_id, "returning event id"); - Ok(event_id) - } - - /// Peek at the end of the stream - /// This function doesn't block; if there is no event in the stream it returns None. - #[tracing::instrument(level = "trace", skip_all)] - pub async fn peek_latest( - &mut self, - stream: &S, - ) -> Result>, BrokerError> { - let mut reply = retry(self.backoff.clone(), || async { - tracing::trace!(stream_key = stream.key(), "peeking at the stream"); - let reply: StreamRangeReply = self - .connection - .clone() - .xrevrange_count(stream.key(), "+", "-", 1) - .await?; - - Ok(reply) - }) - .await - .context(ConnectionSnafu)?; - - if let Some(event) = reply.ids.pop() { - tracing::trace!("parsing received event"); - Some(event.try_into()).transpose() - } else { - tracing::trace!("stream is empty"); - Ok(None) - } - } - - #[tracing::instrument(level = "trace", skip_all)] - async fn _consume_blocking( - &mut self, - stream: &S, - last_consumed_id: &str, - ) -> Result, BrokerError> { - let mut reply = retry(self.backoff.clone(), || async { - tracing::trace!( - stream_key = stream.key(), - last_consumed_id, - "consuming event" - ); - let opts = StreamReadOptions::default() - .count(1) - .block(self.consume_timeout); - let reply: StreamReadReply = self - .connection - .clone() - .xread_options(&[stream.key()], &[last_consumed_id], &opts) - .await?; - - Ok(reply) - }) - .await - .context(ConnectionSnafu)?; - - tracing::trace!("checking for timeout"); - let mut events = reply.keys.pop().ok_or(BrokerError::ConsumeTimeout)?; - - tracing::trace!("checking if event was received"); - let event = events.ids.pop().ok_or(BrokerError::FailedToConsume)?; - - tracing::trace!("parsing received event"); - event.try_into() - } - - /// Consume the next event in stream - /// - /// This function blocks until a new event is available - /// and retries whenever a timeout happens instead of returning an error. - /// - /// To consume the first event in the stream, `last_consumed_id` should be `INITIAL_ID`. - #[tracing::instrument(level = "trace", skip_all)] - pub async fn consume_blocking( - &mut self, - stream: &S, - last_consumed_id: &str, - ) -> Result, BrokerError> { - loop { - let result = self._consume_blocking(stream, last_consumed_id).await; - - if let Err(BrokerError::ConsumeTimeout) = result { - tracing::trace!("consume timed out, retrying"); - } else { - return result; - } - } - } - - /// Consume the next event in stream without blocking - /// This function returns None if there are no more remaining events. - /// To consume the first event in the stream, `last_consumed_id` should be `INITIAL_ID`. - #[tracing::instrument(level = "trace", skip_all)] - pub async fn consume_nonblocking( - &mut self, - stream: &S, - last_consumed_id: &str, - ) -> Result>, BrokerError> { - let mut reply = retry(self.backoff.clone(), || async { - tracing::trace!( - stream_key = stream.key(), - last_consumed_id, - "consuming event (non-blocking)" - ); - let opts = StreamReadOptions::default().count(1); - let reply: StreamReadReply = self - .connection - .clone() - .xread_options(&[stream.key()], &[last_consumed_id], &opts) - .await?; - - Ok(reply) - }) - .await - .context(ConnectionSnafu)?; - - tracing::trace!("checking if event was received"); - if let Some(mut events) = reply.keys.pop() { - let event = events.ids.pop().ok_or(BrokerError::FailedToConsume)?; - tracing::trace!("parsing received event"); - Some(event.try_into()).transpose() - } else { - tracing::trace!("stream is empty"); - Ok(None) - } - } -} - -/// Custom implementation of Debug because ConnectionManager doesn't implement debug -impl fmt::Debug for Broker { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Broker") - .field("consume_timeout", &self.consume_timeout) - .finish() - } -} - -/// Trait that defines the type of a stream -pub trait BrokerStream { - type Payload: Serialize + DeserializeOwned + Clone + Eq + PartialEq; - fn key(&self) -> &str; -} - -/// Event that goes through the broker -#[derive(Debug, Clone, Eq, PartialEq)] -pub struct Event { - pub id: String, - pub payload: P, -} - -impl TryFrom - for Event

-{ - type Error = BrokerError; - - #[tracing::instrument(level = "trace", skip_all)] - fn try_from(stream_id: StreamId) -> Result, BrokerError> { - tracing::trace!("getting event payload"); - let payload = stream_id - .get::("payload") - .ok_or(BrokerError::InvalidEvent)?; - let id = stream_id.id; - - tracing::trace!(id, payload, "received event"); - - tracing::trace!("parsing JSON payload"); - let payload = - serde_json::from_str(&payload).context(InvalidPayloadSnafu)?; - - tracing::trace!("returning event"); - Ok(Event { id, payload }) - } -} - -#[derive(Debug, Snafu)] -pub enum BrokerError { - #[snafu(display("error connecting to Redis"))] - ConnectionError { source: RedisError }, - - #[snafu(display("failed to consume event"))] - FailedToConsume, - - #[snafu(display("timed out when consuming event"))] - ConsumeTimeout, - - #[snafu(display("event in invalid format"))] - InvalidEvent, - - #[snafu(display("error parsing event payload"))] - InvalidPayload { source: serde_json::Error }, -} - -#[derive(Debug, Parser)] -#[command(name = "broker")] -pub struct BrokerCLIConfig { - /// Redis address - #[arg(long, env, default_value = "redis://127.0.0.1:6379")] - redis_endpoint: String, - - /// Address list of Redis cluster nodes, defined by a single string - /// separated by commas. If present, it supersedes `redis_endpoint`. - /// A single endpoint can be enough as the client will discover - /// other nodes automatically - #[arg(long, env, num_args = 1.., value_delimiter = ',')] - redis_cluster_endpoints: Option>, - - /// Timeout when consuming input events (in millis) - #[arg(long, env, default_value = "5000")] - broker_consume_timeout: usize, - - /// The max elapsed time for backoff in ms - #[arg(long, env, default_value = "120000")] - broker_backoff_max_elapsed_duration: u64, -} - -#[derive(Debug, Clone)] -pub enum BrokerEndpoint { - Single(RedactedUrl), - Cluster(Vec), -} - -#[derive(Debug, Clone)] -pub struct BrokerConfig { - pub redis_endpoint: BrokerEndpoint, - pub consume_timeout: usize, - pub backoff: ExponentialBackoff, -} - -impl From for BrokerConfig { - fn from(cli_config: BrokerCLIConfig) -> BrokerConfig { - let max_elapsed_time = Duration::from_millis( - cli_config.broker_backoff_max_elapsed_duration, - ); - let backoff = ExponentialBackoffBuilder::new() - .with_max_elapsed_time(Some(max_elapsed_time)) - .build(); - let redis_endpoint = - if let Some(endpoints) = cli_config.redis_cluster_endpoints { - let urls = endpoints - .iter() - .map(|endpoint| { - RedactedUrl::new( - Url::parse(endpoint) - .expect("failed to parse Redis URL"), - ) - }) - .collect(); - BrokerEndpoint::Cluster(urls) - } else { - let url = Url::parse(&cli_config.redis_endpoint) - .map(RedactedUrl::new) - .expect("failed to parse Redis URL"); - BrokerEndpoint::Single(url) - }; - BrokerConfig { - redis_endpoint, - consume_timeout: cli_config.broker_consume_timeout, - backoff, - } - } -} diff --git a/cmd/authority-claimer/src/rollups_events/mod.rs b/cmd/authority-claimer/src/rollups_events/mod.rs index bc4506b02..60ad85f27 100644 --- a/cmd/authority-claimer/src/rollups_events/mod.rs +++ b/cmd/authority-claimer/src/rollups_events/mod.rs @@ -1,15 +1,10 @@ // (c) Cartesi and individual authors (see AUTHORS) // SPDX-License-Identifier: Apache-2.0 (see LICENSE) -pub mod broker; pub mod common; pub mod rollups_claims; pub mod rollups_stream; -pub use broker::{ - Broker, BrokerCLIConfig, BrokerConfig, BrokerError, BrokerStream, - INITIAL_ID, -}; pub use common::{Address, Hash, HexArrayError}; -pub use rollups_claims::{RollupsClaim, RollupsClaimsStream}; +pub use rollups_claims::RollupsClaim; pub use rollups_stream::DAppMetadata; diff --git a/cmd/authority-claimer/src/rollups_events/rollups_claims.rs b/cmd/authority-claimer/src/rollups_events/rollups_claims.rs index 93d8ab359..e2de524c1 100644 --- a/cmd/authority-claimer/src/rollups_events/rollups_claims.rs +++ b/cmd/authority-claimer/src/rollups_events/rollups_claims.rs @@ -1,30 +1,9 @@ // (c) Cartesi and individual authors (see AUTHORS) // SPDX-License-Identifier: Apache-2.0 (see LICENSE) -use super::{Address, BrokerStream, Hash}; +use super::{Address, Hash}; use serde::{Deserialize, Serialize}; -#[derive(Debug)] -pub struct RollupsClaimsStream { - key: String, -} - -impl BrokerStream for RollupsClaimsStream { - type Payload = RollupsClaim; - - fn key(&self) -> &str { - &self.key - } -} - -impl RollupsClaimsStream { - pub fn new(chain_id: u64) -> Self { - Self { - key: format!("{{chain-{}}}:rollups-claims", chain_id), - } - } -} - /// Event generated when the Cartesi Rollups epoch finishes #[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize)] pub struct RollupsClaim { diff --git a/cmd/authority-claimer/src/test_fixtures.rs b/cmd/authority-claimer/src/test_fixtures.rs deleted file mode 100644 index 2e7dc0f69..000000000 --- a/cmd/authority-claimer/src/test_fixtures.rs +++ /dev/null @@ -1,154 +0,0 @@ -// (c) Cartesi and individual authors (see AUTHORS) -// SPDX-License-Identifier: Apache-2.0 (see LICENSE) - -use crate::{ - redacted::{RedactedUrl, Url}, - rollups_events::{ - broker::BrokerEndpoint, common::ADDRESS_SIZE, Address, Broker, - BrokerConfig, DAppMetadata, RollupsClaim, RollupsClaimsStream, - INITIAL_ID, - }, -}; -use backoff::ExponentialBackoff; -use testcontainers::{ - clients::Cli, core::WaitFor, images::generic::GenericImage, Container, -}; -use tokio::sync::Mutex; - -const CHAIN_ID: u64 = 0; -const DAPP_ADDRESS: Address = Address::new([0xfa; ADDRESS_SIZE]); -const CONSUME_TIMEOUT: usize = 10_000; // ms - -pub struct BrokerFixture<'d> { - _node: Container<'d, GenericImage>, - client: Mutex, - claims_stream: RollupsClaimsStream, - redis_endpoint: BrokerEndpoint, - chain_id: u64, -} - -impl BrokerFixture<'_> { - #[tracing::instrument(level = "trace", skip_all)] - pub async fn setup(docker: &Cli) -> BrokerFixture<'_> { - tracing::info!("setting up redis fixture"); - - tracing::trace!("starting redis docker container"); - let image = GenericImage::new("redis", "6.2").with_wait_for( - WaitFor::message_on_stdout("Ready to accept connections"), - ); - let node = docker.run(image); - let port = node.get_host_port_ipv4(6379); - let redis_endpoint = BrokerEndpoint::Single( - Url::parse(&format!("redis://127.0.0.1:{}", port)) - .map(RedactedUrl::new) - .expect("failed to parse Redis Url"), - ); - let chain_id = CHAIN_ID; - let backoff = ExponentialBackoff::default(); - let metadata = DAppMetadata { - chain_id, - dapp_address: DAPP_ADDRESS.clone(), - }; - let claims_stream = RollupsClaimsStream::new(metadata.chain_id); - let config = BrokerConfig { - redis_endpoint: redis_endpoint.clone(), - consume_timeout: CONSUME_TIMEOUT, - backoff, - }; - - tracing::trace!( - ?redis_endpoint, - "connecting to redis with rollups_events crate" - ); - let client = Mutex::new( - Broker::new(config) - .await - .expect("failed to connect to broker"), - ); - BrokerFixture { - _node: node, - client, - claims_stream, - redis_endpoint, - chain_id, - } - } - - pub fn redis_endpoint(&self) -> &BrokerEndpoint { - &self.redis_endpoint - } - - pub fn chain_id(&self) -> u64 { - self.chain_id - } - - /// Produce the claim given the hash - #[tracing::instrument(level = "trace", skip_all)] - pub async fn produce_rollups_claim(&self, rollups_claim: RollupsClaim) { - tracing::trace!(?rollups_claim.epoch_hash, "producing rollups-claim event"); - { - let last_claim = self - .client - .lock() - .await - .peek_latest(&self.claims_stream) - .await - .expect("failed to get latest claim"); - let epoch_index = match last_claim { - Some(event) => event.payload.epoch_index + 1, - None => 0, - }; - assert_eq!( - rollups_claim.epoch_index, epoch_index, - "invalid epoch index", - ); - } - self.client - .lock() - .await - .produce(&self.claims_stream, rollups_claim) - .await - .expect("failed to produce claim"); - } - - /// Obtain all produced claims - #[tracing::instrument(level = "trace", skip_all)] - pub async fn consume_all_claims(&self) -> Vec { - tracing::trace!("consuming all rollups-claims events"); - let mut claims = vec![]; - let mut last_id = INITIAL_ID.to_owned(); - while let Some(event) = self - .client - .lock() - .await - .consume_nonblocking(&self.claims_stream, &last_id) - .await - .expect("failed to consume claim") - { - claims.push(event.payload); - last_id = event.id; - } - claims - } - - /// Obtain the first n produced claims - /// Panic in case of timeout - #[tracing::instrument(level = "trace", skip_all)] - pub async fn consume_n_claims(&self, n: usize) -> Vec { - tracing::trace!(n, "consuming n rollups-claims events"); - let mut claims = vec![]; - let mut last_id = INITIAL_ID.to_owned(); - for _ in 0..n { - let event = self - .client - .lock() - .await - .consume_blocking(&self.claims_stream, &last_id) - .await - .expect("failed to consume claim"); - claims.push(event.payload); - last_id = event.id - } - claims - } -} diff --git a/internal/node/services.go b/internal/node/services.go index cda99cc56..0d856a7fd 100644 --- a/internal/node/services.go +++ b/internal/node/services.go @@ -20,26 +20,14 @@ type portOffset = int const ( portOffsetProxy = iota portOffsetAuthorityClaimer - portOffsetRedis portOffsetPostgraphile ) -const localhost = "127.0.0.1" - // Get the port of the given service. func getPort(c config.NodeConfig, offset portOffset) int { return c.HttpPort + int(offset) } -// Get the redis endpoint based on whether the experimental sunodo validator mode is enabled. -func getRedisEndpoint(c config.NodeConfig) string { - if c.ExperimentalSunodoValidatorEnabled { - return c.ExperimentalSunodoValidatorRedisEndpoint - } else { - return fmt.Sprintf("redis://%v:%v", localhost, getPort(c, portOffsetRedis)) - } -} - // Create the RUST_LOG variable using the config log level. // If the log level is set to debug, set tracing log for the given rust module. func getRustLog(c config.NodeConfig, rustModule string) string { @@ -98,20 +86,6 @@ func newAuthorityClaimer(c config.NodeConfig, workDir string) services.CommandSe return s } -func newRedis(c config.NodeConfig, workDir string) services.CommandService { - var s services.CommandService - s.Name = "redis" - s.HealthcheckPort = getPort(c, portOffsetRedis) - s.Path = "redis-server" - s.Args = append(s.Args, "--port", fmt.Sprint(getPort(c, portOffsetRedis))) - // Disable persistence with --save and --appendonly config - s.Args = append(s.Args, "--save", "") - s.Args = append(s.Args, "--appendonly", "no") - s.Env = append(s.Env, os.Environ()...) - s.WorkDir = workDir - return s -} - func newSupervisorService( c config.NodeConfig, workDir string, @@ -119,11 +93,6 @@ func newSupervisorService( ) services.SupervisorService { var s []services.Service - if !c.ExperimentalSunodoValidatorEnabled { - // add Redis first - s = append(s, newRedis(c, workDir)) - } - // enable claimer if reader mode and sunodo validator mode are not enabled if c.FeatureClaimerEnabled && !c.ExperimentalSunodoValidatorEnabled { s = append(s, newAuthorityClaimer(c, workDir))