Skip to content

Commit

Permalink
indexer cleanup 3/N: lib, db and utils (MystenLabs#16264)
Browse files Browse the repository at this point in the history
## Description 

- group db related utils to db.rs
- group metrics related functions to meterics.rs
- indexer v2 -> indexer
- misc. cleanups

## Test Plan 

CI

---
If your changes are not user-facing and do not break anything, you can
skip the following section. Otherwise, please briefly describe what has
changed under the Release Notes section.

### Type of Change (Check all that apply)

- [ ] protocol change
- [ ] user-visible impact
- [ ] breaking change for a client SDKs
- [ ] breaking change for FNs (FN binary must upgrade)
- [ ] breaking change for validators or node operators (must upgrade
binaries)
- [ ] breaking change for on-chain data layout
- [ ] necessitate either a data wipe or data migration

### Release notes
  • Loading branch information
gegaowp authored Feb 16, 2024
1 parent df41d5f commit 4f52e05
Show file tree
Hide file tree
Showing 26 changed files with 348 additions and 460 deletions.
2 changes: 1 addition & 1 deletion .github/actions/ts-e2e/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ runs:

- name: Set env
run: |
echo "E2E_RUN_LOCAL_NET_CMD=(RUST_LOG=\"consensus=off\" $(echo $PWD/target/debug/sui-test-validator) --with-indexer --use-indexer-v2 --pg-port 5432 --pg-db-name sui_indexer_v2 --graphql-host 127.0.0.1 --graphql-port 9125)" >> $GITHUB_ENV
echo "E2E_RUN_LOCAL_NET_CMD=(RUST_LOG=\"consensus=off\" $(echo $PWD/target/debug/sui-test-validator) --with-indexer --pg-port 5432 --pg-db-name sui_indexer_v2 --graphql-host 127.0.0.1 --graphql-port 9125)" >> $GITHUB_ENV
echo "VITE_SUI_BIN=$PWD/target/debug/sui" >> $GITHUB_ENV
shell: bash

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ jobs:
- name: Set env
run: |
echo "VITE_SUI_BIN=$PWD/target/debug/sui" >> $GITHUB_ENV
echo "E2E_RUN_LOCAL_NET_CMD=(RUST_LOG=info RUST_BACKTRACE=1 $(echo $PWD/target/debug/sui-test-validator) --with-indexer --use-indexer-v2 --pg-port 5432 --pg-db-name sui_indexer_v2 --graphql-host 127.0.0.1 --graphql-port 9125)" >> $GITHUB_ENV
echo "E2E_RUN_LOCAL_NET_CMD=(RUST_LOG=info RUST_BACKTRACE=1 $(echo $PWD/target/debug/sui-test-validator) --with-indexer --pg-port 5432 --pg-db-name sui_indexer_v2 --graphql-host 127.0.0.1 --graphql-port 9125)" >> $GITHUB_ENV
- name: Run TS SDK e2e tests
if: ${{ needs.diff.outputs.isTypescriptSDK == 'true' || needs.diff.outputs.isRust == 'true'}}
Expand Down
2 changes: 1 addition & 1 deletion crates/data-transform/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use sui_types::object::MoveObject;

use self::models::*;
use std::env;
use sui_indexer::db::new_pg_connection_pool;
use sui_indexer::errors::IndexerError;
use sui_indexer::new_pg_connection_pool;
use sui_indexer::store::module_resolver_v2::IndexerStorePackageModuleResolver;

use move_core_types::language_storage::ModuleId;
Expand Down
8 changes: 3 additions & 5 deletions crates/sui-cluster-test/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use sui_config::Config;
use sui_config::{PersistedConfig, SUI_KEYSTORE_FILENAME, SUI_NETWORK_CONFIG};
use sui_graphql_rpc::config::ConnectionConfig;
use sui_graphql_rpc::test_infra::cluster::start_graphql_server_with_fn_rpc;
use sui_indexer::test_utils::{start_test_indexer_v2, ReaderWriterConfig};
use sui_indexer::test_utils::{start_test_indexer, ReaderWriterConfig};
use sui_keys::keystore::{AccountKeystore, FileBasedKeystore, Keystore};
use sui_sdk::sui_client_config::{SuiClientConfig, SuiEnv};
use sui_sdk::wallet_context::WalletContext;
Expand Down Expand Up @@ -223,19 +223,17 @@ impl Cluster for LocalNewCluster {
(options.pg_address.clone(), indexer_address)
{
// Start in writer mode
start_test_indexer_v2(
start_test_indexer(
Some(pg_address.clone()),
fullnode_url.clone(),
options.use_indexer_experimental_methods,
ReaderWriterConfig::writer_mode(None),
)
.await;

// Start in reader mode
start_test_indexer_v2(
start_test_indexer(
Some(pg_address),
fullnode_url.clone(),
options.use_indexer_experimental_methods,
ReaderWriterConfig::reader_mode(indexer_address.to_string()),
)
.await;
Expand Down
8 changes: 0 additions & 8 deletions crates/sui-cluster-test/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,10 @@ pub struct ClusterTestOpt {
/// URL for the indexer RPC server
#[clap(long)]
pub indexer_address: Option<String>,
/// Use new version of indexer or not
#[clap(long)]
pub use_indexer_v2: bool,
/// URL for the Indexer Postgres DB
#[clap(long)]
#[derivative(Debug(format_with = "obfuscated_pg_address"))]
pub pg_address: Option<String>,
/// TODO(gegao): remove this after indexer migration is complete.
#[clap(long)]
pub use_indexer_experimental_methods: bool,
#[clap(long)]
pub config_dir: Option<PathBuf>,
/// URL for the indexer RPC server
Expand Down Expand Up @@ -72,10 +66,8 @@ impl ClusterTestOpt {
epoch_duration_ms: None,
indexer_address: None,
pg_address: None,
use_indexer_experimental_methods: false,
config_dir: None,
graphql_address: None,
use_indexer_v2: false,
}
}
}
2 changes: 0 additions & 2 deletions crates/sui-cluster-test/tests/local_cluster_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,8 @@ async fn test_sui_cluster() {
epoch_duration_ms: Some(60000),
indexer_address: Some(format!("127.0.0.1:{}", indexer_rpc_port)),
pg_address: Some(pg_address),
use_indexer_experimental_methods: false,
config_dir: None,
graphql_address: Some(graphql_address),
use_indexer_v2: true,
};

let _cluster = LocalNewCluster::start(&opts).await.unwrap();
Expand Down
5 changes: 2 additions & 3 deletions crates/sui-graphql-rpc/src/context_data/db_data_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ use crate::{
types::{address::Address, sui_address::SuiAddress, validator::Validator},
};
use std::{collections::BTreeMap, time::Duration};
use sui_indexer::{
apis::GovernanceReadApiV2, indexer_reader::IndexerReader, PgConnectionPoolConfig,
};
use sui_indexer::db::PgConnectionPoolConfig;
use sui_indexer::{apis::GovernanceReadApiV2, indexer_reader::IndexerReader};
use sui_json_rpc_types::Stake as RpcStakedSui;
use sui_types::{
base_types::SuiAddress as NativeSuiAddress,
Expand Down
6 changes: 4 additions & 2 deletions crates/sui-graphql-rpc/src/data/pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,10 @@ mod tests {
use diesel::QueryDsl;
use sui_framework::BuiltInFramework;
use sui_indexer::{
get_pg_pool_connection, models_v2::objects::StoredObject, new_pg_connection_pool,
schema::objects, types::IndexedObject, utils::reset_database,
db::{get_pg_pool_connection, new_pg_connection_pool, reset_database},
models_v2::objects::StoredObject,
schema::objects,
types::IndexedObject,
};

#[test]
Expand Down
10 changes: 4 additions & 6 deletions crates/sui-graphql-rpc/src/test_infra/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ pub use sui_indexer::processors_v2::objects_snapshot_processor::SnapshotLagConfi
use sui_indexer::store::indexer_store_v2::IndexerStoreV2;
use sui_indexer::store::PgIndexerStoreV2;
use sui_indexer::test_utils::force_delete_database;
use sui_indexer::test_utils::start_test_indexer_v2;
use sui_indexer::test_utils::start_test_indexer_v2_impl;
use sui_indexer::test_utils::start_test_indexer;
use sui_indexer::test_utils::start_test_indexer_impl;
use sui_indexer::test_utils::ReaderWriterConfig;
use sui_swarm_config::genesis_config::{AccountConfig, DEFAULT_GAS_AMOUNT};
use sui_types::storage::ReadStore;
Expand Down Expand Up @@ -60,10 +60,9 @@ pub async fn start_cluster(
let val_fn = start_validator_with_fullnode(internal_data_source_rpc_port).await;

// Starts indexer
let (pg_store, pg_handle) = start_test_indexer_v2(
let (pg_store, pg_handle) = start_test_indexer(
Some(db_url),
val_fn.rpc_url().to_string(),
true,
ReaderWriterConfig::writer_mode(None),
)
.await;
Expand Down Expand Up @@ -118,10 +117,9 @@ pub async fn serve_executor(
.await;
});

let (pg_store, pg_handle) = start_test_indexer_v2_impl(
let (pg_store, pg_handle) = start_test_indexer_impl(
Some(db_url),
format!("http://{}", executor_server_url),
true,
ReaderWriterConfig::writer_mode(snapshot_config.clone()),
Some(graphql_connection_config.db_name()),
)
Expand Down
177 changes: 177 additions & 0 deletions crates/sui-indexer/src/db.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::time::Duration;

use anyhow::anyhow;
use diesel::migration::MigrationSource;
use diesel::{r2d2::ConnectionManager, PgConnection, RunQueryDsl};
use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness};
use tracing::info;

use crate::errors::IndexerError;

pub type PgConnectionPool = diesel::r2d2::Pool<ConnectionManager<PgConnection>>;
pub type PgPoolConnection = diesel::r2d2::PooledConnection<ConnectionManager<PgConnection>>;

#[derive(Debug, Clone, Copy)]
pub struct PgConnectionPoolConfig {
pub pool_size: u32,
pub connection_timeout: Duration,
pub statement_timeout: Duration,
}

impl PgConnectionPoolConfig {
const DEFAULT_POOL_SIZE: u32 = 100;
const DEFAULT_CONNECTION_TIMEOUT: u64 = 30;
const DEFAULT_STATEMENT_TIMEOUT: u64 = 30;

fn connection_config(&self) -> PgConnectionConfig {
PgConnectionConfig {
statement_timeout: self.statement_timeout,
read_only: false,
}
}

pub fn set_pool_size(&mut self, size: u32) {
self.pool_size = size;
}

pub fn set_connection_timeout(&mut self, timeout: Duration) {
self.connection_timeout = timeout;
}

pub fn set_statement_timeout(&mut self, timeout: Duration) {
self.statement_timeout = timeout;
}
}

impl Default for PgConnectionPoolConfig {
fn default() -> Self {
let db_pool_size = std::env::var("DB_POOL_SIZE")
.ok()
.and_then(|s| s.parse::<u32>().ok())
.unwrap_or(Self::DEFAULT_POOL_SIZE);
let conn_timeout_secs = std::env::var("DB_CONNECTION_TIMEOUT")
.ok()
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(Self::DEFAULT_CONNECTION_TIMEOUT);
let statement_timeout_secs = std::env::var("DB_STATEMENT_TIMEOUT")
.ok()
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(Self::DEFAULT_STATEMENT_TIMEOUT);

Self {
pool_size: db_pool_size,
connection_timeout: Duration::from_secs(conn_timeout_secs),
statement_timeout: Duration::from_secs(statement_timeout_secs),
}
}
}

#[derive(Debug, Clone, Copy)]
pub struct PgConnectionConfig {
pub statement_timeout: Duration,
pub read_only: bool,
}

impl diesel::r2d2::CustomizeConnection<PgConnection, diesel::r2d2::Error> for PgConnectionConfig {
fn on_acquire(&self, conn: &mut PgConnection) -> std::result::Result<(), diesel::r2d2::Error> {
use diesel::sql_query;

sql_query(format!(
"SET statement_timeout = {}",
self.statement_timeout.as_millis(),
))
.execute(conn)
.map_err(diesel::r2d2::Error::QueryError)?;

if self.read_only {
sql_query("SET default_transaction_read_only = 't'")
.execute(conn)
.map_err(diesel::r2d2::Error::QueryError)?;
}

Ok(())
}
}

pub fn new_pg_connection_pool(
db_url: &str,
pool_size: Option<u32>,
) -> Result<PgConnectionPool, IndexerError> {
let pool_config = PgConnectionPoolConfig::default();
let manager = ConnectionManager::<PgConnection>::new(db_url);

let pool_size = pool_size.unwrap_or(pool_config.pool_size);
diesel::r2d2::Pool::builder()
.max_size(pool_size)
.connection_timeout(pool_config.connection_timeout)
.connection_customizer(Box::new(pool_config.connection_config()))
.build(manager)
.map_err(|e| {
IndexerError::PgConnectionPoolInitError(format!(
"Failed to initialize connection pool with error: {:?}",
e
))
})
}

pub fn get_pg_pool_connection(pool: &PgConnectionPool) -> Result<PgPoolConnection, IndexerError> {
pool.get().map_err(|e| {
IndexerError::PgPoolConnectionError(format!(
"Failed to get connection from PG connection pool with error: {:?}",
e
))
})
}

const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations");

/// Resets the database by reverting all migrations and reapplying them.
///
/// If `drop_all` is set to `true`, the function will drop all tables in the database before
/// resetting the migrations. This option is destructive and will result in the loss of all
/// data in the tables. Use with caution, especially in production environments.
pub fn reset_database(conn: &mut PgPoolConnection, drop_all: bool) -> Result<(), anyhow::Error> {
info!("Resetting database ...");
if drop_all {
drop_all_tables(conn)
.map_err(|e| anyhow!("Encountering error when dropping all tables {e}"))?;
} else {
conn.revert_all_migrations(MIGRATIONS)
.map_err(|e| anyhow!("Error reverting all migrations {e}"))?;
}
conn.run_migrations(&MIGRATIONS.migrations().unwrap())
.map_err(|e| anyhow!("Failed to run migrations {e}"))?;
info!("Reset database complete.");
Ok(())
}

fn drop_all_tables(conn: &mut PgConnection) -> Result<(), diesel::result::Error> {
info!("Dropping all tables in the database");
let table_names: Vec<String> = diesel::dsl::sql::<diesel::sql_types::Text>(
"
SELECT tablename FROM pg_tables WHERE schemaname = 'public'
",
)
.load(conn)?;

for table_name in table_names {
let drop_table_query = format!("DROP TABLE IF EXISTS {} CASCADE", table_name);
diesel::sql_query(drop_table_query).execute(conn)?;
}

// Recreate the __diesel_schema_migrations table
diesel::sql_query(
"
CREATE TABLE __diesel_schema_migrations (
version VARCHAR(50) PRIMARY KEY,
run_on TIMESTAMP NOT NULL DEFAULT NOW()
)
",
)
.execute(conn)?;
info!("Dropped all tables in the database");
Ok(())
}
10 changes: 3 additions & 7 deletions crates/sui-indexer/src/handlers/checkpoint_handler_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,13 @@ use crate::errors::IndexerError;
use crate::framework::interface::Handler;
use crate::metrics::IndexerMetrics;

use crate::db::PgConnectionPool;
use crate::store::module_resolver_v2::{IndexerStorePackageModuleResolver, InterimPackageResolver};
use crate::store::{IndexerStoreV2, PgIndexerStoreV2};
use crate::types::IndexedEpochInfo;
use crate::types::{
IndexedCheckpoint, IndexedEvent, IndexedTransaction, IndexerResult, TransactionKind, TxIndex,
IndexedCheckpoint, IndexedDeletedObject, IndexedEpochInfo, IndexedEvent, IndexedObject,
IndexedPackage, IndexedTransaction, IndexerResult, TransactionKind, TxIndex,
};
use crate::types::{IndexedDeletedObject, IndexedObject, IndexedPackage};
use crate::{IndexerConfig, PgConnectionPool};

use super::tx_processor::EpochEndIndexingObjectStore;
use super::tx_processor::TxChangesProcessor;
Expand All @@ -62,7 +61,6 @@ const CHECKPOINT_QUEUE_SIZE: usize = 1000;
pub async fn new_handlers<S>(
state: S,
metrics: IndexerMetrics,
config: &IndexerConfig,
) -> Result<CheckpointHandler<S>, IndexerError>
where
S: IndexerStoreV2 + Clone + Sync + Send + 'static,
Expand All @@ -82,12 +80,10 @@ where

let state_clone = state.clone();
let metrics_clone = metrics.clone();
let config_clone = config.clone();
let (tx, package_tx) = watch::channel(None);
spawn_monitored_task!(start_tx_checkpoint_commit_task(
state_clone,
metrics_clone,
config_clone,
indexed_checkpoint_receiver,
tx,
));
Expand Down
Loading

0 comments on commit 4f52e05

Please sign in to comment.