diff --git a/.github/actions/ts-e2e/action.yml b/.github/actions/ts-e2e/action.yml index 89a9a3a97b531..83d37dd1c0ee7 100644 --- a/.github/actions/ts-e2e/action.yml +++ b/.github/actions/ts-e2e/action.yml @@ -65,7 +65,7 @@ runs: - name: Set env run: | - echo "E2E_RUN_LOCAL_NET_CMD=(RUST_LOG=\"consensus=off\" $(echo $PWD/target/debug/sui-test-validator) --with-indexer --use-indexer-v2 --pg-port 5432 --pg-db-name sui_indexer_v2 --graphql-host 127.0.0.1 --graphql-port 9125)" >> $GITHUB_ENV + echo "E2E_RUN_LOCAL_NET_CMD=(RUST_LOG=\"consensus=off\" $(echo $PWD/target/debug/sui-test-validator) --with-indexer --pg-port 5432 --pg-db-name sui_indexer_v2 --graphql-host 127.0.0.1 --graphql-port 9125)" >> $GITHUB_ENV echo "VITE_SUI_BIN=$PWD/target/debug/sui" >> $GITHUB_ENV shell: bash diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index 240576aeb9134..f64d04c973da5 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -63,7 +63,7 @@ jobs: - name: Set env run: | echo "VITE_SUI_BIN=$PWD/target/debug/sui" >> $GITHUB_ENV - echo "E2E_RUN_LOCAL_NET_CMD=(RUST_LOG=info RUST_BACKTRACE=1 $(echo $PWD/target/debug/sui-test-validator) --with-indexer --use-indexer-v2 --pg-port 5432 --pg-db-name sui_indexer_v2 --graphql-host 127.0.0.1 --graphql-port 9125)" >> $GITHUB_ENV + echo "E2E_RUN_LOCAL_NET_CMD=(RUST_LOG=info RUST_BACKTRACE=1 $(echo $PWD/target/debug/sui-test-validator) --with-indexer --pg-port 5432 --pg-db-name sui_indexer_v2 --graphql-host 127.0.0.1 --graphql-port 9125)" >> $GITHUB_ENV - name: Run TS SDK e2e tests if: ${{ needs.diff.outputs.isTypescriptSDK == 'true' || needs.diff.outputs.isRust == 'true'}} diff --git a/crates/data-transform/src/main.rs b/crates/data-transform/src/main.rs index a7d99a74ae4a8..2ceec9026a530 100644 --- a/crates/data-transform/src/main.rs +++ b/crates/data-transform/src/main.rs @@ -16,8 +16,8 @@ use sui_types::object::MoveObject; use self::models::*; use std::env; +use sui_indexer::db::new_pg_connection_pool; use sui_indexer::errors::IndexerError; -use sui_indexer::new_pg_connection_pool; use sui_indexer::store::module_resolver_v2::IndexerStorePackageModuleResolver; use move_core_types::language_storage::ModuleId; diff --git a/crates/sui-cluster-test/src/cluster.rs b/crates/sui-cluster-test/src/cluster.rs index 7b9225ce4beb9..668cca7d55b8a 100644 --- a/crates/sui-cluster-test/src/cluster.rs +++ b/crates/sui-cluster-test/src/cluster.rs @@ -9,7 +9,7 @@ use sui_config::Config; use sui_config::{PersistedConfig, SUI_KEYSTORE_FILENAME, SUI_NETWORK_CONFIG}; use sui_graphql_rpc::config::ConnectionConfig; use sui_graphql_rpc::test_infra::cluster::start_graphql_server_with_fn_rpc; -use sui_indexer::test_utils::{start_test_indexer_v2, ReaderWriterConfig}; +use sui_indexer::test_utils::{start_test_indexer, ReaderWriterConfig}; use sui_keys::keystore::{AccountKeystore, FileBasedKeystore, Keystore}; use sui_sdk::sui_client_config::{SuiClientConfig, SuiEnv}; use sui_sdk::wallet_context::WalletContext; @@ -223,19 +223,17 @@ impl Cluster for LocalNewCluster { (options.pg_address.clone(), indexer_address) { // Start in writer mode - start_test_indexer_v2( + start_test_indexer( Some(pg_address.clone()), fullnode_url.clone(), - options.use_indexer_experimental_methods, ReaderWriterConfig::writer_mode(None), ) .await; // Start in reader mode - start_test_indexer_v2( + start_test_indexer( Some(pg_address), fullnode_url.clone(), - options.use_indexer_experimental_methods, ReaderWriterConfig::reader_mode(indexer_address.to_string()), ) .await; diff --git a/crates/sui-cluster-test/src/config.rs b/crates/sui-cluster-test/src/config.rs index 733b8f3e15dba..a40c72ff2af0c 100644 --- a/crates/sui-cluster-test/src/config.rs +++ b/crates/sui-cluster-test/src/config.rs @@ -31,16 +31,10 @@ pub struct ClusterTestOpt { /// URL for the indexer RPC server #[clap(long)] pub indexer_address: Option, - /// Use new version of indexer or not - #[clap(long)] - pub use_indexer_v2: bool, /// URL for the Indexer Postgres DB #[clap(long)] #[derivative(Debug(format_with = "obfuscated_pg_address"))] pub pg_address: Option, - /// TODO(gegao): remove this after indexer migration is complete. - #[clap(long)] - pub use_indexer_experimental_methods: bool, #[clap(long)] pub config_dir: Option, /// URL for the indexer RPC server @@ -72,10 +66,8 @@ impl ClusterTestOpt { epoch_duration_ms: None, indexer_address: None, pg_address: None, - use_indexer_experimental_methods: false, config_dir: None, graphql_address: None, - use_indexer_v2: false, } } } diff --git a/crates/sui-cluster-test/tests/local_cluster_test.rs b/crates/sui-cluster-test/tests/local_cluster_test.rs index 9a46ce50564e8..129d14d45340c 100644 --- a/crates/sui-cluster-test/tests/local_cluster_test.rs +++ b/crates/sui-cluster-test/tests/local_cluster_test.rs @@ -31,10 +31,8 @@ async fn test_sui_cluster() { epoch_duration_ms: Some(60000), indexer_address: Some(format!("127.0.0.1:{}", indexer_rpc_port)), pg_address: Some(pg_address), - use_indexer_experimental_methods: false, config_dir: None, graphql_address: Some(graphql_address), - use_indexer_v2: true, }; let _cluster = LocalNewCluster::start(&opts).await.unwrap(); diff --git a/crates/sui-graphql-rpc/src/context_data/db_data_provider.rs b/crates/sui-graphql-rpc/src/context_data/db_data_provider.rs index b1bfa3915d4e4..858e02eca352b 100644 --- a/crates/sui-graphql-rpc/src/context_data/db_data_provider.rs +++ b/crates/sui-graphql-rpc/src/context_data/db_data_provider.rs @@ -7,9 +7,8 @@ use crate::{ types::{address::Address, sui_address::SuiAddress, validator::Validator}, }; use std::{collections::BTreeMap, time::Duration}; -use sui_indexer::{ - apis::GovernanceReadApiV2, indexer_reader::IndexerReader, PgConnectionPoolConfig, -}; +use sui_indexer::db::PgConnectionPoolConfig; +use sui_indexer::{apis::GovernanceReadApiV2, indexer_reader::IndexerReader}; use sui_json_rpc_types::Stake as RpcStakedSui; use sui_types::{ base_types::SuiAddress as NativeSuiAddress, diff --git a/crates/sui-graphql-rpc/src/data/pg.rs b/crates/sui-graphql-rpc/src/data/pg.rs index a59d1ba2a9fe2..a042767c783ef 100644 --- a/crates/sui-graphql-rpc/src/data/pg.rs +++ b/crates/sui-graphql-rpc/src/data/pg.rs @@ -188,8 +188,10 @@ mod tests { use diesel::QueryDsl; use sui_framework::BuiltInFramework; use sui_indexer::{ - get_pg_pool_connection, models_v2::objects::StoredObject, new_pg_connection_pool, - schema::objects, types::IndexedObject, utils::reset_database, + db::{get_pg_pool_connection, new_pg_connection_pool, reset_database}, + models_v2::objects::StoredObject, + schema::objects, + types::IndexedObject, }; #[test] diff --git a/crates/sui-graphql-rpc/src/test_infra/cluster.rs b/crates/sui-graphql-rpc/src/test_infra/cluster.rs index 797f29a3d249d..cc940802a6110 100644 --- a/crates/sui-graphql-rpc/src/test_infra/cluster.rs +++ b/crates/sui-graphql-rpc/src/test_infra/cluster.rs @@ -15,8 +15,8 @@ pub use sui_indexer::processors_v2::objects_snapshot_processor::SnapshotLagConfi use sui_indexer::store::indexer_store_v2::IndexerStoreV2; use sui_indexer::store::PgIndexerStoreV2; use sui_indexer::test_utils::force_delete_database; -use sui_indexer::test_utils::start_test_indexer_v2; -use sui_indexer::test_utils::start_test_indexer_v2_impl; +use sui_indexer::test_utils::start_test_indexer; +use sui_indexer::test_utils::start_test_indexer_impl; use sui_indexer::test_utils::ReaderWriterConfig; use sui_swarm_config::genesis_config::{AccountConfig, DEFAULT_GAS_AMOUNT}; use sui_types::storage::ReadStore; @@ -60,10 +60,9 @@ pub async fn start_cluster( let val_fn = start_validator_with_fullnode(internal_data_source_rpc_port).await; // Starts indexer - let (pg_store, pg_handle) = start_test_indexer_v2( + let (pg_store, pg_handle) = start_test_indexer( Some(db_url), val_fn.rpc_url().to_string(), - true, ReaderWriterConfig::writer_mode(None), ) .await; @@ -118,10 +117,9 @@ pub async fn serve_executor( .await; }); - let (pg_store, pg_handle) = start_test_indexer_v2_impl( + let (pg_store, pg_handle) = start_test_indexer_impl( Some(db_url), format!("http://{}", executor_server_url), - true, ReaderWriterConfig::writer_mode(snapshot_config.clone()), Some(graphql_connection_config.db_name()), ) diff --git a/crates/sui-indexer/src/db.rs b/crates/sui-indexer/src/db.rs new file mode 100644 index 0000000000000..a9e7a8632f42c --- /dev/null +++ b/crates/sui-indexer/src/db.rs @@ -0,0 +1,177 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::time::Duration; + +use anyhow::anyhow; +use diesel::migration::MigrationSource; +use diesel::{r2d2::ConnectionManager, PgConnection, RunQueryDsl}; +use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness}; +use tracing::info; + +use crate::errors::IndexerError; + +pub type PgConnectionPool = diesel::r2d2::Pool>; +pub type PgPoolConnection = diesel::r2d2::PooledConnection>; + +#[derive(Debug, Clone, Copy)] +pub struct PgConnectionPoolConfig { + pub pool_size: u32, + pub connection_timeout: Duration, + pub statement_timeout: Duration, +} + +impl PgConnectionPoolConfig { + const DEFAULT_POOL_SIZE: u32 = 100; + const DEFAULT_CONNECTION_TIMEOUT: u64 = 30; + const DEFAULT_STATEMENT_TIMEOUT: u64 = 30; + + fn connection_config(&self) -> PgConnectionConfig { + PgConnectionConfig { + statement_timeout: self.statement_timeout, + read_only: false, + } + } + + pub fn set_pool_size(&mut self, size: u32) { + self.pool_size = size; + } + + pub fn set_connection_timeout(&mut self, timeout: Duration) { + self.connection_timeout = timeout; + } + + pub fn set_statement_timeout(&mut self, timeout: Duration) { + self.statement_timeout = timeout; + } +} + +impl Default for PgConnectionPoolConfig { + fn default() -> Self { + let db_pool_size = std::env::var("DB_POOL_SIZE") + .ok() + .and_then(|s| s.parse::().ok()) + .unwrap_or(Self::DEFAULT_POOL_SIZE); + let conn_timeout_secs = std::env::var("DB_CONNECTION_TIMEOUT") + .ok() + .and_then(|s| s.parse::().ok()) + .unwrap_or(Self::DEFAULT_CONNECTION_TIMEOUT); + let statement_timeout_secs = std::env::var("DB_STATEMENT_TIMEOUT") + .ok() + .and_then(|s| s.parse::().ok()) + .unwrap_or(Self::DEFAULT_STATEMENT_TIMEOUT); + + Self { + pool_size: db_pool_size, + connection_timeout: Duration::from_secs(conn_timeout_secs), + statement_timeout: Duration::from_secs(statement_timeout_secs), + } + } +} + +#[derive(Debug, Clone, Copy)] +pub struct PgConnectionConfig { + pub statement_timeout: Duration, + pub read_only: bool, +} + +impl diesel::r2d2::CustomizeConnection for PgConnectionConfig { + fn on_acquire(&self, conn: &mut PgConnection) -> std::result::Result<(), diesel::r2d2::Error> { + use diesel::sql_query; + + sql_query(format!( + "SET statement_timeout = {}", + self.statement_timeout.as_millis(), + )) + .execute(conn) + .map_err(diesel::r2d2::Error::QueryError)?; + + if self.read_only { + sql_query("SET default_transaction_read_only = 't'") + .execute(conn) + .map_err(diesel::r2d2::Error::QueryError)?; + } + + Ok(()) + } +} + +pub fn new_pg_connection_pool( + db_url: &str, + pool_size: Option, +) -> Result { + let pool_config = PgConnectionPoolConfig::default(); + let manager = ConnectionManager::::new(db_url); + + let pool_size = pool_size.unwrap_or(pool_config.pool_size); + diesel::r2d2::Pool::builder() + .max_size(pool_size) + .connection_timeout(pool_config.connection_timeout) + .connection_customizer(Box::new(pool_config.connection_config())) + .build(manager) + .map_err(|e| { + IndexerError::PgConnectionPoolInitError(format!( + "Failed to initialize connection pool with error: {:?}", + e + )) + }) +} + +pub fn get_pg_pool_connection(pool: &PgConnectionPool) -> Result { + pool.get().map_err(|e| { + IndexerError::PgPoolConnectionError(format!( + "Failed to get connection from PG connection pool with error: {:?}", + e + )) + }) +} + +const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations"); + +/// Resets the database by reverting all migrations and reapplying them. +/// +/// If `drop_all` is set to `true`, the function will drop all tables in the database before +/// resetting the migrations. This option is destructive and will result in the loss of all +/// data in the tables. Use with caution, especially in production environments. +pub fn reset_database(conn: &mut PgPoolConnection, drop_all: bool) -> Result<(), anyhow::Error> { + info!("Resetting database ..."); + if drop_all { + drop_all_tables(conn) + .map_err(|e| anyhow!("Encountering error when dropping all tables {e}"))?; + } else { + conn.revert_all_migrations(MIGRATIONS) + .map_err(|e| anyhow!("Error reverting all migrations {e}"))?; + } + conn.run_migrations(&MIGRATIONS.migrations().unwrap()) + .map_err(|e| anyhow!("Failed to run migrations {e}"))?; + info!("Reset database complete."); + Ok(()) +} + +fn drop_all_tables(conn: &mut PgConnection) -> Result<(), diesel::result::Error> { + info!("Dropping all tables in the database"); + let table_names: Vec = diesel::dsl::sql::( + " + SELECT tablename FROM pg_tables WHERE schemaname = 'public' + ", + ) + .load(conn)?; + + for table_name in table_names { + let drop_table_query = format!("DROP TABLE IF EXISTS {} CASCADE", table_name); + diesel::sql_query(drop_table_query).execute(conn)?; + } + + // Recreate the __diesel_schema_migrations table + diesel::sql_query( + " + CREATE TABLE __diesel_schema_migrations ( + version VARCHAR(50) PRIMARY KEY, + run_on TIMESTAMP NOT NULL DEFAULT NOW() + ) + ", + ) + .execute(conn)?; + info!("Dropped all tables in the database"); + Ok(()) +} diff --git a/crates/sui-indexer/src/handlers/checkpoint_handler_v2.rs b/crates/sui-indexer/src/handlers/checkpoint_handler_v2.rs index 161d3c21cd1bc..d0fc3c41056d8 100644 --- a/crates/sui-indexer/src/handlers/checkpoint_handler_v2.rs +++ b/crates/sui-indexer/src/handlers/checkpoint_handler_v2.rs @@ -42,14 +42,13 @@ use crate::errors::IndexerError; use crate::framework::interface::Handler; use crate::metrics::IndexerMetrics; +use crate::db::PgConnectionPool; use crate::store::module_resolver_v2::{IndexerStorePackageModuleResolver, InterimPackageResolver}; use crate::store::{IndexerStoreV2, PgIndexerStoreV2}; -use crate::types::IndexedEpochInfo; use crate::types::{ - IndexedCheckpoint, IndexedEvent, IndexedTransaction, IndexerResult, TransactionKind, TxIndex, + IndexedCheckpoint, IndexedDeletedObject, IndexedEpochInfo, IndexedEvent, IndexedObject, + IndexedPackage, IndexedTransaction, IndexerResult, TransactionKind, TxIndex, }; -use crate::types::{IndexedDeletedObject, IndexedObject, IndexedPackage}; -use crate::{IndexerConfig, PgConnectionPool}; use super::tx_processor::EpochEndIndexingObjectStore; use super::tx_processor::TxChangesProcessor; @@ -62,7 +61,6 @@ const CHECKPOINT_QUEUE_SIZE: usize = 1000; pub async fn new_handlers( state: S, metrics: IndexerMetrics, - config: &IndexerConfig, ) -> Result, IndexerError> where S: IndexerStoreV2 + Clone + Sync + Send + 'static, @@ -82,12 +80,10 @@ where let state_clone = state.clone(); let metrics_clone = metrics.clone(); - let config_clone = config.clone(); let (tx, package_tx) = watch::channel(None); spawn_monitored_task!(start_tx_checkpoint_commit_task( state_clone, metrics_clone, - config_clone, indexed_checkpoint_receiver, tx, )); diff --git a/crates/sui-indexer/src/handlers/committer.rs b/crates/sui-indexer/src/handlers/committer.rs index 4162b51368d19..e75e0154b6d9a 100644 --- a/crates/sui-indexer/src/handlers/committer.rs +++ b/crates/sui-indexer/src/handlers/committer.rs @@ -12,17 +12,14 @@ use tracing::{error, info}; use sui_types::messages_checkpoint::CheckpointSequenceNumber; use crate::metrics::IndexerMetrics; - use crate::store::IndexerStoreV2; use crate::types::IndexerResult; -use crate::IndexerConfig; use super::{CheckpointDataToCommit, EpochToCommit}; pub async fn start_tx_checkpoint_commit_task( state: S, metrics: IndexerMetrics, - config: IndexerConfig, tx_indexing_receiver: mysten_metrics::metered_channel::Receiver, commit_notifier: watch::Sender>, ) where @@ -44,15 +41,6 @@ pub async fn start_tx_checkpoint_commit_task( if indexed_checkpoint_batch.is_empty() { continue; } - if config.skip_db_commit { - info!( - "[Checkpoint/Tx] Downloaded and indexed checkpoint {:?} - {:?} successfully, skipping DB commit...", - indexed_checkpoint_batch.first().map(|c| c.checkpoint.sequence_number), - indexed_checkpoint_batch.last().map(|c| c.checkpoint.sequence_number), - ); - continue; - } - // split the batch into smaller batches per epoch to handle partitioning let mut indexed_checkpoint_batch_per_epoch = vec![]; for indexed_checkpoint in indexed_checkpoint_batch { diff --git a/crates/sui-indexer/src/indexer_v2.rs b/crates/sui-indexer/src/indexer.rs similarity index 67% rename from crates/sui-indexer/src/indexer_v2.rs rename to crates/sui-indexer/src/indexer.rs index dd2e15ac4b7fc..9db82247e12f5 100644 --- a/crates/sui-indexer/src/indexer_v2.rs +++ b/crates/sui-indexer/src/indexer.rs @@ -1,44 +1,39 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::apis::{ - CoinReadApiV2, ExtendedApiV2, GovernanceReadApiV2, IndexerApiV2, MoveUtilsApiV2, ReadApiV2, - TransactionBuilderApiV2, WriteApi, -}; -use crate::errors::IndexerError; -use crate::indexer_reader::IndexerReader; -use crate::metrics::IndexerMetrics; -use crate::IndexerConfig; +use std::env; + use anyhow::Result; -use mysten_metrics::spawn_monitored_task; use prometheus::Registry; -use std::env; -use std::net::SocketAddr; -use sui_json_rpc::ServerType; -use sui_json_rpc::{JsonRpcServerBuilder, ServerHandle}; -use tokio::runtime::Handle; use tracing::info; +use mysten_metrics::spawn_monitored_task; + +use crate::build_json_rpc_server; +use crate::errors::IndexerError; use crate::framework::fetcher::CheckpointFetcher; use crate::handlers::checkpoint_handler_v2::new_handlers; +use crate::indexer_reader::IndexerReader; +use crate::metrics::IndexerMetrics; use crate::processors_v2::objects_snapshot_processor::{ ObjectsSnapshotProcessor, SnapshotLagConfig, }; use crate::processors_v2::processor_orchestrator_v2::ProcessorOrchestratorV2; use crate::store::{IndexerStoreV2, PgIndexerAnalyticalStore}; - -pub struct IndexerV2; +use crate::IndexerConfig; const DOWNLOAD_QUEUE_SIZE: usize = 1000; -impl IndexerV2 { +pub struct Indexer; + +impl Indexer { pub async fn start_writer( config: &IndexerConfig, store: S, metrics: IndexerMetrics, ) -> Result<(), IndexerError> { let snapshot_config = SnapshotLagConfig::default(); - IndexerV2::start_writer_with_config(config, store, metrics, snapshot_config).await + Indexer::start_writer_with_config(config, store, metrics, snapshot_config).await } pub async fn start_writer_with_config( @@ -48,7 +43,7 @@ impl IndexerV2 { snapshot_config: SnapshotLagConfig, ) -> Result<(), IndexerError> { info!( - "Sui indexerV2 Writer (version {:?}) started...", + "Sui Indexer Writer (version {:?}) started...", env!("CARGO_PKG_VERSION") ); @@ -87,7 +82,7 @@ impl IndexerV2 { ); spawn_monitored_task!(objects_snapshot_processor.start()); - let checkpoint_handler = new_handlers(store, metrics, config).await?; + let checkpoint_handler = new_handlers(store, metrics).await?; crate::framework::runner::run( mysten_metrics::metered_channel::ReceiverStream::new( downloaded_checkpoint_data_receiver, @@ -105,7 +100,7 @@ impl IndexerV2 { db_url: String, ) -> Result<(), IndexerError> { info!( - "Sui indexerV2 Reader (version {:?}) started...", + "Sui Indexer Reader (version {:?}) started...", env!("CARGO_PKG_VERSION") ); let indexer_reader = IndexerReader::new(db_url)?; @@ -124,7 +119,7 @@ impl IndexerV2 { metrics: IndexerMetrics, ) -> Result<(), IndexerError> { info!( - "Sui indexerV2 Analytical Worker (version {:?}) started...", + "Sui Indexer Analytical Worker (version {:?}) started...", env!("CARGO_PKG_VERSION") ); let mut processor_orchestrator_v2 = ProcessorOrchestratorV2::new(store, metrics); @@ -132,31 +127,3 @@ impl IndexerV2 { Ok(()) } } - -pub async fn build_json_rpc_server( - prometheus_registry: &Registry, - reader: IndexerReader, - config: &IndexerConfig, - custom_runtime: Option, -) -> Result { - let mut builder = JsonRpcServerBuilder::new(env!("CARGO_PKG_VERSION"), prometheus_registry); - let http_client = crate::get_http_client(config.rpc_client_url.as_str())?; - - builder.register_module(WriteApi::new(http_client.clone()))?; - builder.register_module(IndexerApiV2::new(reader.clone()))?; - builder.register_module(TransactionBuilderApiV2::new(reader.clone()))?; - builder.register_module(MoveUtilsApiV2::new(reader.clone()))?; - builder.register_module(GovernanceReadApiV2::new(reader.clone()))?; - builder.register_module(ReadApiV2::new(reader.clone()))?; - builder.register_module(CoinReadApiV2::new(reader.clone()))?; - builder.register_module(ExtendedApiV2::new(reader.clone()))?; - - let default_socket_addr: SocketAddr = SocketAddr::new( - // unwrap() here is safe b/c the address is a static config. - config.rpc_server_url.as_str().parse().unwrap(), - config.rpc_server_port, - ); - Ok(builder - .start(default_socket_addr, custom_runtime, Some(ServerType::Http)) - .await?) -} diff --git a/crates/sui-indexer/src/indexer_reader.rs b/crates/sui-indexer/src/indexer_reader.rs index 8ffdfb2441c57..c4fc3d7b57677 100644 --- a/crates/sui-indexer/src/indexer_reader.rs +++ b/crates/sui-indexer/src/indexer_reader.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ + db::{PgConnectionConfig, PgConnectionPoolConfig, PgPoolConnection}, errors::IndexerError, models_v2::{ address_metrics::StoredAddressMetrics, @@ -21,7 +22,6 @@ use crate::{ objects_snapshot, packages, transactions, }, types::{IndexerResult, OwnerType}, - PgConnectionConfig, PgConnectionPoolConfig, PgPoolConnection, }; use anyhow::{anyhow, Result}; use cached::proc_macro::cached; @@ -69,7 +69,7 @@ pub const EVENT_SEQUENCE_NUMBER_STR: &str = "event_sequence_number"; #[derive(Clone)] pub struct IndexerReader { - pool: crate::PgConnectionPool, + pool: crate::db::PgConnectionPool, package_cache: PackageCache, } diff --git a/crates/sui-indexer/src/lib.rs b/crates/sui-indexer/src/lib.rs index 2730c135b3b9c..20abc0f337f5e 100644 --- a/crates/sui-indexer/src/lib.rs +++ b/crates/sui-indexer/src/lib.rs @@ -3,30 +3,34 @@ #![recursion_limit = "256"] use std::net::SocketAddr; -use std::{collections::HashMap, time::Duration}; use anyhow::{anyhow, Result}; -use axum::{extract::Extension, http::StatusCode, routing::get, Router}; use clap::Parser; -use diesel::pg::PgConnection; -use diesel::r2d2::ConnectionManager; use jsonrpsee::http_client::{HeaderMap, HeaderValue, HttpClient, HttpClientBuilder}; use metrics::IndexerMetrics; -use prometheus::{Registry, TextEncoder}; -use regex::Regex; -use tracing::{info, warn}; +use prometheus::Registry; +use tokio::runtime::Handle; +use tracing::warn; use url::Url; -use errors::IndexerError; -use mysten_metrics::RegistryService; +use sui_json_rpc::ServerType; +use sui_json_rpc::{JsonRpcServerBuilder, ServerHandle}; use sui_json_rpc_api::CLIENT_SDK_TYPE_HEADER; +use crate::apis::{ + CoinReadApiV2, ExtendedApiV2, GovernanceReadApiV2, IndexerApiV2, MoveUtilsApiV2, ReadApiV2, + TransactionBuilderApiV2, WriteApi, +}; +use crate::indexer_reader::IndexerReader; +use errors::IndexerError; + pub mod apis; +pub mod db; pub mod errors; pub mod framework; mod handlers; +pub mod indexer; pub mod indexer_reader; -pub mod indexer_v2; pub mod metrics; pub mod models_v2; pub mod processors_v2; @@ -34,28 +38,6 @@ pub mod schema; pub mod store; pub mod test_utils; pub mod types; -pub mod utils; - -pub type PgConnectionPool = diesel::r2d2::Pool>; -pub type PgPoolConnection = diesel::r2d2::PooledConnection>; - -const METRICS_ROUTE: &str = "/metrics"; -/// Returns all endpoints for which we have implemented on the indexer, -/// some of them are not validated yet. -/// NOTE: we only use this for integration testing -const IMPLEMENTED_METHODS: [&str; 9] = [ - // read apis - "get_checkpoint", - "get_latest_checkpoint_sequence_number", - "get_object", - "get_owned_objects", - "get_total_transaction_blocks", - "get_transaction_block", - "multi_get_transaction_blocks", - // indexer apis - "query_events", - "query_transaction_blocks", -]; #[derive(Parser, Clone, Debug)] #[clap( @@ -86,8 +68,6 @@ pub struct IndexerConfig { pub rpc_server_url: String, #[clap(long, default_value = "9000", global = true)] pub rpc_server_port: u16, - #[clap(long, num_args(1..))] - pub migrated_methods: Vec, #[clap(long)] pub reset_db: bool, #[clap(long)] @@ -96,11 +76,6 @@ pub struct IndexerConfig { pub rpc_server_worker: bool, #[clap(long)] pub analytical_worker: bool, - // NOTE: experimental only, do not use in production. - #[clap(long)] - pub skip_db_commit: bool, - #[clap(long)] - pub use_v2: bool, } impl IndexerConfig { @@ -118,10 +93,6 @@ impl IndexerConfig { )) } - pub fn all_implemented_methods() -> Vec { - IMPLEMENTED_METHODS.iter().map(|&s| s.to_string()).collect() - } - pub fn get_db_url(&self) -> Result { match (&self.db_url, &self.db_user_name, &self.db_password, &self.db_host, &self.db_port, &self.db_name) { (Some(db_url), _, _, _, _, _) => Ok(db_url.clone()), @@ -150,17 +121,42 @@ impl Default for IndexerConfig { client_metric_port: 9184, rpc_server_url: "0.0.0.0".to_string(), rpc_server_port: 9000, - migrated_methods: vec![], reset_db: false, fullnode_sync_worker: true, rpc_server_worker: true, analytical_worker: false, - skip_db_commit: false, - use_v2: false, } } } +pub async fn build_json_rpc_server( + prometheus_registry: &Registry, + reader: IndexerReader, + config: &IndexerConfig, + custom_runtime: Option, +) -> Result { + let mut builder = JsonRpcServerBuilder::new(env!("CARGO_PKG_VERSION"), prometheus_registry); + let http_client = crate::get_http_client(config.rpc_client_url.as_str())?; + + builder.register_module(WriteApi::new(http_client.clone()))?; + builder.register_module(IndexerApiV2::new(reader.clone()))?; + builder.register_module(TransactionBuilderApiV2::new(reader.clone()))?; + builder.register_module(MoveUtilsApiV2::new(reader.clone()))?; + builder.register_module(GovernanceReadApiV2::new(reader.clone()))?; + builder.register_module(ReadApiV2::new(reader.clone()))?; + builder.register_module(CoinReadApiV2::new(reader.clone()))?; + builder.register_module(ExtendedApiV2::new(reader.clone()))?; + + let default_socket_addr: SocketAddr = SocketAddr::new( + // unwrap() here is safe b/c the address is a static config. + config.rpc_server_url.as_str().parse().unwrap(), + config.rpc_server_port, + ); + Ok(builder + .start(default_socket_addr, custom_runtime, Some(ServerType::Http)) + .await?) +} + fn get_http_client(rpc_client_url: &str) -> Result { let mut headers = HeaderMap::new(); headers.insert(CLIENT_SDK_TYPE_HEADER, HeaderValue::from_static("indexer")); @@ -178,165 +174,3 @@ fn get_http_client(rpc_client_url: &str) -> Result { )) }) } - -pub fn new_pg_connection_pool( - db_url: &str, - pool_size: Option, -) -> Result { - let pool_config = PgConnectionPoolConfig::default(); - let manager = ConnectionManager::::new(db_url); - - let pool_size = pool_size.unwrap_or(pool_config.pool_size); - diesel::r2d2::Pool::builder() - .max_size(pool_size) - .connection_timeout(pool_config.connection_timeout) - .connection_customizer(Box::new(pool_config.connection_config())) - .build(manager) - .map_err(|e| { - IndexerError::PgConnectionPoolInitError(format!( - "Failed to initialize connection pool with error: {:?}", - e - )) - }) -} - -#[derive(Debug, Clone, Copy)] -pub struct PgConnectionPoolConfig { - pool_size: u32, - connection_timeout: Duration, - statement_timeout: Duration, -} - -impl PgConnectionPoolConfig { - const DEFAULT_POOL_SIZE: u32 = 100; - const DEFAULT_CONNECTION_TIMEOUT: u64 = 30; - const DEFAULT_STATEMENT_TIMEOUT: u64 = 30; - - fn connection_config(&self) -> PgConnectionConfig { - PgConnectionConfig { - statement_timeout: self.statement_timeout, - read_only: false, - } - } - - pub fn set_pool_size(&mut self, size: u32) { - self.pool_size = size; - } - - pub fn set_connection_timeout(&mut self, timeout: Duration) { - self.connection_timeout = timeout; - } - - pub fn set_statement_timeout(&mut self, timeout: Duration) { - self.statement_timeout = timeout; - } -} - -impl Default for PgConnectionPoolConfig { - fn default() -> Self { - let db_pool_size = std::env::var("DB_POOL_SIZE") - .ok() - .and_then(|s| s.parse::().ok()) - .unwrap_or(Self::DEFAULT_POOL_SIZE); - let conn_timeout_secs = std::env::var("DB_CONNECTION_TIMEOUT") - .ok() - .and_then(|s| s.parse::().ok()) - .unwrap_or(Self::DEFAULT_CONNECTION_TIMEOUT); - let statement_timeout_secs = std::env::var("DB_STATEMENT_TIMEOUT") - .ok() - .and_then(|s| s.parse::().ok()) - .unwrap_or(Self::DEFAULT_STATEMENT_TIMEOUT); - - Self { - pool_size: db_pool_size, - connection_timeout: Duration::from_secs(conn_timeout_secs), - statement_timeout: Duration::from_secs(statement_timeout_secs), - } - } -} - -#[derive(Debug, Clone, Copy)] -struct PgConnectionConfig { - statement_timeout: Duration, - read_only: bool, -} - -impl diesel::r2d2::CustomizeConnection for PgConnectionConfig { - fn on_acquire(&self, conn: &mut PgConnection) -> std::result::Result<(), diesel::r2d2::Error> { - use diesel::{sql_query, RunQueryDsl}; - - sql_query(format!( - "SET statement_timeout = {}", - self.statement_timeout.as_millis(), - )) - .execute(conn) - .map_err(diesel::r2d2::Error::QueryError)?; - - if self.read_only { - sql_query("SET default_transaction_read_only = 't'") - .execute(conn) - .map_err(diesel::r2d2::Error::QueryError)?; - } - - Ok(()) - } -} - -pub fn get_pg_pool_connection(pool: &PgConnectionPool) -> Result { - pool.get().map_err(|e| { - IndexerError::PgPoolConnectionError(format!( - "Failed to get connection from PG connection pool with error: {:?}", - e - )) - }) -} - -fn convert_url(url_str: &str) -> Option { - // NOTE: unwrap here is safe because the regex is a constant. - let re = Regex::new(r"https?://([a-z0-9-]+\.[a-z0-9-]+\.[a-z]+)").unwrap(); - let captures = re.captures(url_str)?; - - captures.get(1).map(|m| m.as_str().to_string()) -} - -pub fn start_prometheus_server( - addr: SocketAddr, - fn_url: &str, -) -> Result<(RegistryService, Registry), anyhow::Error> { - let converted_fn_url = convert_url(fn_url); - if converted_fn_url.is_none() { - warn!( - "Failed to convert full node url {} to a shorter version", - fn_url - ); - } - let fn_url_str = converted_fn_url.unwrap_or_else(|| "unknown_url".to_string()); - - let labels = HashMap::from([("indexer_fullnode".to_string(), fn_url_str)]); - info!("Starting prometheus server with labels: {:?}", labels); - let registry = Registry::new_custom(Some("indexer".to_string()), Some(labels))?; - let registry_service = RegistryService::new(registry.clone()); - - let app = Router::new() - .route(METRICS_ROUTE, get(metrics)) - .layer(Extension(registry_service.clone())); - - tokio::spawn(async move { - axum::Server::bind(&addr) - .serve(app.into_make_service()) - .await - .unwrap(); - }); - Ok((registry_service, registry)) -} - -async fn metrics(Extension(registry_service): Extension) -> (StatusCode, String) { - let metrics_families = registry_service.gather_all(); - match TextEncoder.encode_to_string(&metrics_families) { - Ok(metrics) => (StatusCode::OK, metrics), - Err(error) => ( - StatusCode::INTERNAL_SERVER_ERROR, - format!("unable to encode metrics: {error}"), - ), - } -} diff --git a/crates/sui-indexer/src/main.rs b/crates/sui-indexer/src/main.rs index 74fcbf097769b..87f1cbfe9c781 100644 --- a/crates/sui-indexer/src/main.rs +++ b/crates/sui-indexer/src/main.rs @@ -4,14 +4,14 @@ use clap::Parser; use tracing::{error, info}; +use sui_indexer::db::{get_pg_pool_connection, new_pg_connection_pool, reset_database}; use sui_indexer::errors::IndexerError; -use sui_indexer::indexer_v2::IndexerV2; +use sui_indexer::indexer::Indexer; +use sui_indexer::metrics::start_prometheus_server; use sui_indexer::metrics::IndexerMetrics; -use sui_indexer::start_prometheus_server; use sui_indexer::store::PgIndexerAnalyticalStore; use sui_indexer::store::PgIndexerStoreV2; -use sui_indexer::utils::reset_database; -use sui_indexer::{get_pg_pool_connection, new_pg_connection_pool, IndexerConfig}; +use sui_indexer::IndexerConfig; #[tokio::main] async fn main() -> Result<(), IndexerError> { @@ -66,7 +66,6 @@ async fn main() -> Result<(), IndexerError> { indexer_config.rpc_client_url.as_str(), )?; let indexer_metrics = IndexerMetrics::new(®istry); - mysten_metrics::init_metrics(®istry); let report_cp = blocking_cp.clone(); @@ -88,17 +87,14 @@ async fn main() -> Result<(), IndexerError> { } }); - if indexer_config.use_v2 { - info!("Use v2"); - if indexer_config.fullnode_sync_worker { - let store = PgIndexerStoreV2::new(blocking_cp, indexer_metrics.clone()); - return IndexerV2::start_writer(&indexer_config, store, indexer_metrics).await; - } else if indexer_config.rpc_server_worker { - return IndexerV2::start_reader(&indexer_config, ®istry, db_url).await; - } else if indexer_config.analytical_worker { - let store = PgIndexerAnalyticalStore::new(blocking_cp); - return IndexerV2::start_analytical_worker(store, indexer_metrics.clone()).await; - } + if indexer_config.fullnode_sync_worker { + let store = PgIndexerStoreV2::new(blocking_cp, indexer_metrics.clone()); + return Indexer::start_writer(&indexer_config, store, indexer_metrics).await; + } else if indexer_config.rpc_server_worker { + return Indexer::start_reader(&indexer_config, ®istry, db_url).await; + } else if indexer_config.analytical_worker { + let store = PgIndexerAnalyticalStore::new(blocking_cp); + return Indexer::start_analytical_worker(store, indexer_metrics.clone()).await; } Ok(()) } diff --git a/crates/sui-indexer/src/metrics.rs b/crates/sui-indexer/src/metrics.rs index 938b2c2f76ffe..2d301b459b510 100644 --- a/crates/sui-indexer/src/metrics.rs +++ b/crates/sui-indexer/src/metrics.rs @@ -1,10 +1,71 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use std::collections::HashMap; +use std::net::SocketAddr; + +use axum::{extract::Extension, http::StatusCode, routing::get, Router}; use prometheus::{ register_histogram_with_registry, register_int_counter_with_registry, - register_int_gauge_with_registry, Histogram, IntCounter, IntGauge, Registry, + register_int_gauge_with_registry, Histogram, IntCounter, IntGauge, }; +use prometheus::{Registry, TextEncoder}; +use regex::Regex; +use tracing::{info, warn}; + +use mysten_metrics::RegistryService; + +const METRICS_ROUTE: &str = "/metrics"; + +pub fn start_prometheus_server( + addr: SocketAddr, + fn_url: &str, +) -> Result<(RegistryService, Registry), anyhow::Error> { + let converted_fn_url = convert_url(fn_url); + if converted_fn_url.is_none() { + warn!( + "Failed to convert full node url {} to a shorter version", + fn_url + ); + } + let fn_url_str = converted_fn_url.unwrap_or_else(|| "unknown_url".to_string()); + + let labels = HashMap::from([("indexer_fullnode".to_string(), fn_url_str)]); + info!("Starting prometheus server with labels: {:?}", labels); + let registry = Registry::new_custom(Some("indexer".to_string()), Some(labels))?; + let registry_service = RegistryService::new(registry.clone()); + + let app = Router::new() + .route(METRICS_ROUTE, get(metrics)) + .layer(Extension(registry_service.clone())); + + tokio::spawn(async move { + axum::Server::bind(&addr) + .serve(app.into_make_service()) + .await + .unwrap(); + }); + Ok((registry_service, registry)) +} + +async fn metrics(Extension(registry_service): Extension) -> (StatusCode, String) { + let metrics_families = registry_service.gather_all(); + match TextEncoder.encode_to_string(&metrics_families) { + Ok(metrics) => (StatusCode::OK, metrics), + Err(error) => ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("unable to encode metrics: {error}"), + ), + } +} + +fn convert_url(url_str: &str) -> Option { + // NOTE: unwrap here is safe because the regex is a constant. + let re = Regex::new(r"https?://([a-z0-9-]+\.[a-z0-9-]+\.[a-z]+)").unwrap(); + let captures = re.captures(url_str)?; + + captures.get(1).map(|m| m.as_str().to_string()) +} /// Prometheus metrics for sui-indexer. // buckets defined in seconds @@ -623,28 +684,3 @@ impl IndexerMetrics { } } } - -#[derive(Clone, Debug)] -pub struct IndexerObjectProcessorMetrics { - pub total_object_batch_processed: IntCounter, - pub total_object_processor_error: IntCounter, -} - -impl IndexerObjectProcessorMetrics { - pub fn new(registry: &Registry) -> Self { - Self { - total_object_batch_processed: register_int_counter_with_registry!( - "total_object_batch_processed", - "Total number of object batches processed", - registry, - ) - .unwrap(), - total_object_processor_error: register_int_counter_with_registry!( - "total_object_processor_error", - "Total number of object processor error", - registry, - ) - .unwrap(), - } - } -} diff --git a/crates/sui-indexer/src/store/mod.rs b/crates/sui-indexer/src/store/mod.rs index f8fbc0c879d59..301b46fd98626 100644 --- a/crates/sui-indexer/src/store/mod.rs +++ b/crates/sui-indexer/src/store/mod.rs @@ -17,7 +17,7 @@ mod query; pub(crate) mod diesel_macro { macro_rules! read_only_blocking { ($pool:expr, $query:expr) => {{ - let mut pg_pool_conn = crate::get_pg_pool_connection($pool)?; + let mut pg_pool_conn = crate::db::get_pg_pool_connection($pool)?; pg_pool_conn .build_transaction() .read_only() @@ -32,7 +32,7 @@ pub(crate) mod diesel_macro { backoff.max_elapsed_time = Some($max_elapsed); let result = match backoff::retry(backoff, || { - let mut pg_pool_conn = crate::get_pg_pool_connection($pool).map_err(|e| { + let mut pg_pool_conn = crate::db::get_pg_pool_connection($pool).map_err(|e| { backoff::Error::Transient { err: IndexerError::PostgresWriteError(e.to_string()), retry_after: None, diff --git a/crates/sui-indexer/src/store/module_resolver_v2.rs b/crates/sui-indexer/src/store/module_resolver_v2.rs index afe7f38facb15..7b7c74588c720 100644 --- a/crates/sui-indexer/src/store/module_resolver_v2.rs +++ b/crates/sui-indexer/src/store/module_resolver_v2.rs @@ -15,6 +15,7 @@ use sui_types::base_types::{ObjectID, SequenceNumber}; use sui_types::move_package::MovePackage; use sui_types::object::Object; +use crate::db::PgConnectionPool; use crate::errors::{Context, IndexerError}; use crate::handlers::tx_processor::IndexingPackageBuffer; use crate::metrics::IndexerMetrics; @@ -22,7 +23,6 @@ use crate::models_v2::packages::StoredPackage; use crate::schema::{objects, packages}; use crate::store::diesel_macro::read_only_blocking; use crate::types::IndexedPackage; -use crate::PgConnectionPool; /// A package resolver that reads packages from the database. pub struct IndexerStorePackageModuleResolver { diff --git a/crates/sui-indexer/src/store/pg_indexer_analytical_store.rs b/crates/sui-indexer/src/store/pg_indexer_analytical_store.rs index 20e1294f30b60..0111f436d4252 100644 --- a/crates/sui-indexer/src/store/pg_indexer_analytical_store.rs +++ b/crates/sui-indexer/src/store/pg_indexer_analytical_store.rs @@ -12,6 +12,7 @@ use diesel::{ExpressionMethods, OptionalExtension}; use diesel::{QueryDsl, RunQueryDsl}; use sui_types::base_types::ObjectID; +use crate::db::PgConnectionPool; use crate::errors::{Context, IndexerError}; use crate::models_v2::address_metrics::StoredAddressMetrics; use crate::models_v2::checkpoints::StoredCheckpoint; @@ -30,7 +31,6 @@ use crate::schema::{ }; use crate::store::diesel_macro::{read_only_blocking, transactional_blocking_with_retry}; use crate::types::IndexerResult; -use crate::PgConnectionPool; use super::IndexerAnalyticalStore; diff --git a/crates/sui-indexer/src/store/pg_indexer_store_v2.rs b/crates/sui-indexer/src/store/pg_indexer_store_v2.rs index 521a43d2d0e3d..6ff78e29d2f1d 100644 --- a/crates/sui-indexer/src/store/pg_indexer_store_v2.rs +++ b/crates/sui-indexer/src/store/pg_indexer_store_v2.rs @@ -29,6 +29,7 @@ use crate::handlers::EpochToCommit; use crate::handlers::TransactionObjectChangesToCommit; use crate::metrics::IndexerMetrics; +use crate::db::PgConnectionPool; use crate::models_v2::checkpoints::StoredCheckpoint; use crate::models_v2::display::StoredDisplay; use crate::models_v2::epoch::StoredEpochInfo; @@ -45,7 +46,6 @@ use crate::schema::{ use crate::store::diesel_macro::{read_only_blocking, transactional_blocking_with_retry}; use crate::store::module_resolver_v2::IndexerStorePackageModuleResolver; use crate::types::{IndexedCheckpoint, IndexedEvent, IndexedPackage, IndexedTransaction, TxIndex}; -use crate::PgConnectionPool; use super::pg_partition_manager::{EpochPartitionData, PgPartitionManager}; use super::IndexerStoreV2; diff --git a/crates/sui-indexer/src/store/pg_partition_manager.rs b/crates/sui-indexer/src/store/pg_partition_manager.rs index 182596882cfd0..17d5a23b710ef 100644 --- a/crates/sui-indexer/src/store/pg_partition_manager.rs +++ b/crates/sui-indexer/src/store/pg_partition_manager.rs @@ -7,11 +7,11 @@ use std::collections::BTreeMap; use std::time::Duration; use tracing::{error, info}; +use crate::db::PgConnectionPool; use crate::handlers::EpochToCommit; use crate::models_v2::epoch::StoredEpochInfo; use crate::store::diesel_macro::{read_only_blocking, transactional_blocking_with_retry}; use crate::IndexerError; -use crate::PgConnectionPool; const GET_PARTITION_SQL: &str = r" SELECT parent.relname AS table_name, diff --git a/crates/sui-indexer/src/test_utils.rs b/crates/sui-indexer/src/test_utils.rs index 256078a5ba5e9..fcf2548516f68 100644 --- a/crates/sui-indexer/src/test_utils.rs +++ b/crates/sui-indexer/src/test_utils.rs @@ -10,13 +10,12 @@ use std::net::SocketAddr; use sui_json_rpc_types::SuiTransactionBlockResponse; use tracing::info; +use crate::db::{new_pg_connection_pool, reset_database}; use crate::errors::IndexerError; -use crate::indexer_v2::IndexerV2; +use crate::indexer::Indexer; use crate::processors_v2::objects_snapshot_processor::SnapshotLagConfig; use crate::store::PgIndexerStoreV2; -use crate::utils::reset_database; -use crate::IndexerConfig; -use crate::{new_pg_connection_pool, IndexerMetrics}; +use crate::{IndexerConfig, IndexerMetrics}; pub enum ReaderWriterConfig { Reader { reader_mode_rpc_url: String }, @@ -37,26 +36,17 @@ impl ReaderWriterConfig { } } -pub async fn start_test_indexer_v2( +pub async fn start_test_indexer( db_url: Option, rpc_url: String, - use_indexer_experimental_methods: bool, reader_writer_config: ReaderWriterConfig, ) -> (PgIndexerStoreV2, JoinHandle>) { - start_test_indexer_v2_impl( - db_url, - rpc_url, - use_indexer_experimental_methods, - reader_writer_config, - None, - ) - .await + start_test_indexer_impl(db_url, rpc_url, reader_writer_config, None).await } -pub async fn start_test_indexer_v2_impl( +pub async fn start_test_indexer_impl( db_url: Option, rpc_url: String, - use_indexer_experimental_methods: bool, reader_writer_config: ReaderWriterConfig, new_database: Option, ) -> (PgIndexerStoreV2, JoinHandle>) { @@ -72,21 +62,13 @@ pub async fn start_test_indexer_v2_impl( format!("postgres://postgres:{pw}@{pg_host}:{pg_port}") }); - let migrated_methods = if use_indexer_experimental_methods { - IndexerConfig::all_implemented_methods() - } else { - vec![] - }; - // Default writer mode let mut config = IndexerConfig { db_url: Some(db_url.clone()), rpc_client_url: rpc_url, - migrated_methods, reset_db: true, fullnode_sync_worker: true, rpc_server_worker: false, - use_v2: true, ..Default::default() }; @@ -132,8 +114,7 @@ pub async fn start_test_indexer_v2_impl( config.rpc_server_worker = true; config.rpc_server_url = reader_mode_rpc_url.ip().to_string(); config.rpc_server_port = reader_mode_rpc_url.port(); - - tokio::spawn(async move { IndexerV2::start_reader(&config, ®istry, db_url).await }) + tokio::spawn(async move { Indexer::start_reader(&config, ®istry, db_url).await }) } ReaderWriterConfig::Writer { snapshot_config } => { if config.reset_db { @@ -142,7 +123,7 @@ pub async fn start_test_indexer_v2_impl( let store_clone = store.clone(); tokio::spawn(async move { - IndexerV2::start_writer_with_config( + Indexer::start_writer_with_config( &config, store_clone, indexer_metrics, diff --git a/crates/sui-indexer/src/utils.rs b/crates/sui-indexer/src/utils.rs deleted file mode 100644 index e50a70950dc87..0000000000000 --- a/crates/sui-indexer/src/utils.rs +++ /dev/null @@ -1,59 +0,0 @@ -// Copyright (c) Mysten Labs, Inc. -// SPDX-License-Identifier: Apache-2.0 - -use crate::PgPoolConnection; -use anyhow::anyhow; -use diesel::migration::MigrationSource; -use diesel::{PgConnection, RunQueryDsl}; -use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness}; -use tracing::info; - -const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations"); - -/// Resets the database by reverting all migrations and reapplying them. -/// -/// If `drop_all` is set to `true`, the function will drop all tables in the database before -/// resetting the migrations. This option is destructive and will result in the loss of all -/// data in the tables. Use with caution, especially in production environments. -pub fn reset_database(conn: &mut PgPoolConnection, drop_all: bool) -> Result<(), anyhow::Error> { - info!("Resetting database ..."); - if drop_all { - drop_all_tables(conn) - .map_err(|e| anyhow!("Encountering error when dropping all tables {e}"))?; - } else { - conn.revert_all_migrations(MIGRATIONS) - .map_err(|e| anyhow!("Error reverting all migrations {e}"))?; - } - conn.run_migrations(&MIGRATIONS.migrations().unwrap()) - .map_err(|e| anyhow!("Failed to run migrations {e}"))?; - info!("Reset database complete."); - Ok(()) -} - -pub fn drop_all_tables(conn: &mut PgConnection) -> Result<(), diesel::result::Error> { - info!("Dropping all tables in the database"); - let table_names: Vec = diesel::dsl::sql::( - " - SELECT tablename FROM pg_tables WHERE schemaname = 'public' - ", - ) - .load(conn)?; - - for table_name in table_names { - let drop_table_query = format!("DROP TABLE IF EXISTS {} CASCADE", table_name); - diesel::sql_query(drop_table_query).execute(conn)?; - } - - // Recreate the __diesel_schema_migrations table - diesel::sql_query( - " - CREATE TABLE __diesel_schema_migrations ( - version VARCHAR(50) PRIMARY KEY, - run_on TIMESTAMP NOT NULL DEFAULT NOW() - ) - ", - ) - .execute(conn)?; - info!("Dropped all tables in the database"); - Ok(()) -} diff --git a/crates/sui-indexer/tests/ingestion_tests.rs b/crates/sui-indexer/tests/ingestion_tests.rs index b56c58d98b1fa..ec7bc06c6ed83 100644 --- a/crates/sui-indexer/tests/ingestion_tests.rs +++ b/crates/sui-indexer/tests/ingestion_tests.rs @@ -9,13 +9,13 @@ mod ingestion_tests { use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; + use sui_indexer::db::get_pg_pool_connection; use sui_indexer::errors::Context; use sui_indexer::errors::IndexerError; - use sui_indexer::get_pg_pool_connection; use sui_indexer::models_v2::transactions::StoredTransaction; use sui_indexer::schema::transactions; use sui_indexer::store::{indexer_store_v2::IndexerStoreV2, PgIndexerStoreV2}; - use sui_indexer::test_utils::{start_test_indexer_v2, ReaderWriterConfig}; + use sui_indexer::test_utils::{start_test_indexer, ReaderWriterConfig}; use sui_types::base_types::SuiAddress; use sui_types::effects::TransactionEffectsAPI; use sui_types::storage::ReadStore; @@ -60,10 +60,9 @@ mod ingestion_tests { .await; }); // Starts indexer - let (pg_store, pg_handle) = start_test_indexer_v2( + let (pg_store, pg_handle) = start_test_indexer( Some(DEFAULT_DB_URL.to_owned()), format!("http://{}", server_url), - true, ReaderWriterConfig::writer_mode(None), ) .await; diff --git a/crates/sui-test-validator/src/main.rs b/crates/sui-test-validator/src/main.rs index 4ee3660ee4477..2af9322691d4b 100644 --- a/crates/sui-test-validator/src/main.rs +++ b/crates/sui-test-validator/src/main.rs @@ -83,14 +83,6 @@ struct Args { /// if we should run indexer #[clap(long)] pub with_indexer: bool, - - /// TODO(gegao): remove this after indexer migration is complete. - #[clap(long)] - pub use_indexer_experimental_methods: bool, - - /// If we should use the new version of the indexer - #[clap(long)] - pub use_indexer_v2: bool, } #[tokio::main] @@ -114,8 +106,6 @@ async fn main() -> Result<()> { epoch_duration_ms, faucet_port, with_indexer, - use_indexer_experimental_methods, - use_indexer_v2, } = args; // We don't pass epoch duration if we have a genesis config. @@ -130,8 +120,6 @@ async fn main() -> Result<()> { } if !with_indexer { println!("`with_indexer` flag unset. Indexer service will not run.") - } else if !use_indexer_v2 { - println!("`with_indexer` flag unset. Indexer service will run unmaintained indexer.") } let cluster_config = ClusterTestOpt { @@ -144,10 +132,8 @@ async fn main() -> Result<()> { )), faucet_address: Some(format!("127.0.0.1:{}", faucet_port)), epoch_duration_ms, - use_indexer_experimental_methods, config_dir, graphql_address: graphql_port.map(|p| format!("{}:{}", graphql_host, p)), - use_indexer_v2, }; println!("Starting Sui validator with config: {:#?}", cluster_config);