Skip to content

Commit

Permalink
test(rpc-alt): Introducing FullCluster
Browse files Browse the repository at this point in the history
## Description

Add a new type to the library supporting E2E tests that combines an
off-chain cluster with an instance of simulacrum. This can be used to
drive E2E tests that need to configure indexing and RPC to a greater
degree than is possible with transactional tests.

This type will be used to write tests for SuiNS-related JSONRPC-methods,
which require us to publish some code on-chain and extract some IDs from
that process before we set-up the indexer.

## Test plan

Will be used for writing tests in a follow-up change.
  • Loading branch information
amnn committed Feb 10, 2025
1 parent c1b5b38 commit 7ba0aa9
Show file tree
Hide file tree
Showing 3 changed files with 199 additions and 1 deletion.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions crates/sui-indexer-alt-e2e-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ anyhow.workspace = true
diesel = { workspace = true, features = ["chrono"] }
diesel-async = { workspace = true, features = ["bb8", "postgres", "async-connection-wrapper"] }
prometheus.workspace = true
tempfile.workspace = true
tokio.workspace = true
tokio-util.workspace = true
url.workspace = true

simulacrum.workspace = true
sui-indexer-alt.workspace = true
sui-indexer-alt-framework.workspace = true
sui-indexer-alt-jsonrpc.workspace = true
sui-pg-db.workspace = true
sui-types.workspace = true

[target.'cfg(msim)'.dependencies]
msim.workspace = true
Expand Down
194 changes: 193 additions & 1 deletion crates/sui-indexer-alt-e2e-tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ use std::{
time::Duration,
};

use anyhow::Context;
use anyhow::{bail, Context};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use simulacrum::Simulacrum;
use sui_indexer_alt::{config::IndexerConfig, setup_indexer};
use sui_indexer_alt_framework::{ingestion::ClientArgs, schema::watermarks, IndexerArgs};
use sui_indexer_alt_jsonrpc::{
Expand All @@ -19,10 +20,35 @@ use sui_pg_db::{
temp::{get_available_port, TempDb},
Db, DbArgs,
};
use sui_types::{
base_types::{ObjectRef, SuiAddress},
effects::{TransactionEffects, TransactionEffectsAPI},
error::ExecutionError,
execution_status::ExecutionStatus,
messages_checkpoint::VerifiedCheckpoint,
object::Owner,
transaction::Transaction,
};
use tempfile::TempDir;
use tokio::{task::JoinHandle, time::error::Elapsed};
use tokio_util::sync::CancellationToken;
use url::Url;

/// A simulation of the network, accompanied by off-chain services (database, indexer, RPC),
/// connected by local data ingestion.
pub struct FullCluster {
/// A simulation of the network, executing transactions and producing checkpoints.
executor: Simulacrum,

/// The off-chain services (database, indexer, RPC) that are ingesting data from the
/// simulation.
offchain: OffchainCluster,

/// Temporary directory to store checkpoint information in, so that the indexer can pick it up.
#[allow(unused)]
temp_dir: TempDir,
}

/// A collection of the off-chain services (an indexer, a database and a JSON-RPC server that reads
/// from that database), grouped together to simplify set-up and tear-down for tests.
///
Expand Down Expand Up @@ -55,6 +81,133 @@ pub struct OffchainCluster {
cancel: CancellationToken,
}

impl FullCluster {
/// Creates a cluster with a fresh executor where the off-chain services are set up with a
/// default configuration.
pub async fn new() -> anyhow::Result<Self> {
Self::new_with_configs(
Simulacrum::new(),
IndexerArgs::default(),
SystemPackageTaskArgs::default(),
IndexerConfig::example(),
RpcConfig::default(),
&prometheus::Registry::new(),
CancellationToken::new(),
)
.await
}

/// Creates a new cluster executing transactions using `executor`. The indexer is configured
/// using `indexer_args` and `indexer_config, and the JSON-RPC server is configured using
/// `system_package_task_args` and `rpc_config`.
pub async fn new_with_configs(
mut executor: Simulacrum,
indexer_args: IndexerArgs,
system_package_task_args: SystemPackageTaskArgs,
indexer_config: IndexerConfig,
rpc_config: RpcConfig,
registry: &prometheus::Registry,
cancel: CancellationToken,
) -> anyhow::Result<Self> {
let temp_dir = tempfile::tempdir().context("Failed to create data ingestion path")?;
executor.set_data_ingestion_path(temp_dir.path().to_owned());

let client_args = ClientArgs {
local_ingestion_path: Some(temp_dir.path().to_owned()),
remote_store_url: None,
};

let offchain = OffchainCluster::new(
indexer_args,
client_args,
system_package_task_args,
indexer_config,
rpc_config,
registry,
cancel,
)
.await
.context("Failed to create off-chain cluster")?;

Ok(Self {
executor,
offchain,
temp_dir,
})
}

/// Return the reference gas price for the current epoch
pub fn reference_gas_price(&self) -> u64 {
self.executor.reference_gas_price()
}

/// Request gas from the faucet, sent to `address`. Return the object reference of the gas
/// object that was sent.
pub fn request_gas(
&mut self,
address: SuiAddress,
amount: u64,
) -> anyhow::Result<TransactionEffects> {
self.executor.request_gas(address, amount)
}

/// Execute a signed transaction, returning its effects.
pub fn execute_transaction(
&mut self,
tx: Transaction,
) -> anyhow::Result<(TransactionEffects, Option<ExecutionError>)> {
self.executor.execute_transaction(tx)
}

/// Execute a system transaction advancing the lock by the given `duration`.
pub fn advance_clock(&mut self, duration: Duration) -> TransactionEffects {
self.executor.advance_clock(duration)
}

/// Create a new checkpoint containing the transactions executed since the last checkpoint that
/// was created, and wait for the off-chain services to ingest it. Returns the checkpoint
/// contents.
pub async fn create_checkpoint(&mut self) -> anyhow::Result<VerifiedCheckpoint> {
let checkpoint = self.executor.create_checkpoint();
self.offchain
.wait_for_checkpoint(checkpoint.sequence_number, Duration::from_secs(10))
.await?;

Ok(checkpoint)
}

/// The URL to talk to the database on.
pub fn db_url(&self) -> Url {
self.offchain.db_url()
}

/// The URL to send JSON-RPC requests to.
pub fn rpc_url(&self) -> Url {
self.offchain.rpc_url()
}

/// Returns the latest checkpoint that we have all data for in the database, according to the
/// watermarks table. Returns `None` if any of the expected pipelines are missing data.
pub async fn latest_checkpoint(&self) -> anyhow::Result<Option<u64>> {
self.offchain.latest_checkpoint().await
}

/// Waits until the indexer has caught up to the given checkpoint, or the timeout is reached.
pub async fn wait_for_checkpoint(
&self,
checkpoint: u64,
timeout: Duration,
) -> Result<(), Elapsed> {
self.offchain.wait_for_checkpoint(checkpoint, timeout).await
}

/// Triggers cancellation of all downstream services, waits for them to stop, cleans up the
/// temporary database, and the temporary directory used for ingestion.
pub async fn stopped(self) {
self.offchain.stopped().await;
}
}

impl OffchainCluster {
/// Construct a new off-chain cluster and spin up its constituent services.
///
Expand Down Expand Up @@ -193,3 +346,42 @@ impl OffchainCluster {
let _ = self.jsonrpc.await;
}
}

/// Returns the reference for the first address-owned object created in the effects, or an error if
/// there is none.
pub fn find_address_owned(fx: &TransactionEffects) -> anyhow::Result<ObjectRef> {
if let ExecutionStatus::Failure { error, command } = fx.status() {
bail!("Transaction failed: {error} (command {command:?})");
}

fx.created()
.into_iter()
.find_map(|(oref, owner)| matches!(owner, Owner::AddressOwner(_)).then_some(oref))
.context("Could not find created object")
}

/// Returns the reference for the first immutable object created in the effects, or an error if
/// there is none.
pub fn find_immutable(fx: &TransactionEffects) -> anyhow::Result<ObjectRef> {
if let ExecutionStatus::Failure { error, command } = fx.status() {
bail!("Transaction failed: {error} (command {command:?})");
}

fx.created()
.into_iter()
.find_map(|(oref, owner)| matches!(owner, Owner::Immutable).then_some(oref))
.context("Could not find created object")
}

/// Returns the reference for the first shared object created in the effects, or an error if there
/// is none.
pub fn find_shared(fx: &TransactionEffects) -> anyhow::Result<ObjectRef> {
if let ExecutionStatus::Failure { error, command } = fx.status() {
bail!("Transaction failed: {error} (command {command:?})");
}

fx.created()
.into_iter()
.find_map(|(oref, owner)| matches!(owner, Owner::Shared { .. }).then_some(oref))
.context("Could not find created object")
}

0 comments on commit 7ba0aa9

Please sign in to comment.