Skip to content

Commit

Permalink
Initial metrics work for the indexer (MystenLabs#18025)
Browse files Browse the repository at this point in the history
## Description 

Initial work for monitoring values. 
This is a good start for values to track and please advise on what else
you'd like to see.

## Test plan 

Not much now and need to come up with something to check that things are
good

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
  • Loading branch information
dariorussi authored Jun 7, 2024
1 parent b7b6dfc commit f1a2d61
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 31 deletions.
6 changes: 5 additions & 1 deletion crates/sui-bridge-indexer/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,8 @@
# Ethereum to Sui bridge contract address
# eth_sui_bridge_contract_address: <contract_address>
# Starting block number
# start_block: <indexing_start_block>
# start_block: <indexing_start_block>
# Client metric URL
# metric_url: <url>
# Client metric port
# metric_port: <port>
2 changes: 2 additions & 0 deletions crates/sui-bridge-indexer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ pub struct Config {
pub concurrency: u64,
pub eth_sui_bridge_contract_address: String,
pub start_block: u64,
pub metric_url: String,
pub metric_port: u16,
}

/// Load the config to run.
Expand Down
1 change: 1 addition & 0 deletions crates/sui-bridge-indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::models::TokenTransferData as DBTokenTransferData;
use std::fmt::{Display, Formatter};

pub mod config;
pub mod metrics;
pub mod models;
pub mod postgres_writer;
pub mod schema;
Expand Down
38 changes: 23 additions & 15 deletions crates/sui-bridge-indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@ use prometheus::Registry;
use std::collections::HashMap;
use std::collections::HashSet;
use std::env;
use std::net::IpAddr;
use std::net::Ipv4Addr;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
Expand All @@ -21,9 +18,9 @@ use sui_bridge::{
eth_client::EthClient,
eth_syncer::EthSyncer,
};
use sui_bridge_indexer::postgres_writer::get_connection_pool;
use sui_bridge_indexer::{
config::load_config, worker::process_eth_transaction, worker::BridgeWorker,
config::load_config, metrics::BridgeIndexerMetrics, postgres_writer::get_connection_pool,
worker::process_eth_transaction, worker::BridgeWorker,
};
use sui_data_ingestion_core::{
DataIngestionMetrics, FileProgressStore, IndexerExecutor, ReaderOptions, WorkerPool,
Expand Down Expand Up @@ -54,19 +51,24 @@ async fn main() -> Result<()> {
.expect("Current directory is invalid.")
.join("config.yaml")
};

let config = load_config(&config_path).unwrap();

// start metrics server
let (_exit_sender, exit_receiver) = oneshot::channel();
let metrics = DataIngestionMetrics::new(&Registry::new());

// Init metrics server
let metrics_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 1000);
let registry_service = start_prometheus_server(metrics_address);
let prometheus_registry = registry_service.default_registry();
mysten_metrics::init_metrics(&prometheus_registry);
info!("Metrics server started at port {}", 1000);
let registry_service = start_prometheus_server(
format!("{}:{}", config.metric_url, config.metric_port,)
.parse()
.unwrap_or_else(|err| panic!("Failed to parse metric address: {}", err)),
);
let registry: Registry = registry_service.default_registry();
mysten_metrics::init_metrics(&registry);
info!(
"Metrics server started at {}::{}",
config.metric_url, config.metric_port
);
let metrics = DataIngestionMetrics::new(&registry);
let indexer_meterics = BridgeIndexerMetrics::new(&registry);

// start eth client
let provider = Arc::new(
Expand Down Expand Up @@ -110,16 +112,22 @@ async fn main() -> Result<()> {

let pg_pool = get_connection_pool(config.db_url.clone());

let indexer_metrics_cloned = indexer_meterics.clone();
let _task_handle = spawn_logged_monitored_task!(
process_eth_transaction(eth_events_rx, provider.clone(), pg_pool),
process_eth_transaction(
eth_events_rx,
provider.clone(),
pg_pool,
indexer_metrics_cloned
),
"indexer handler"
);

// start sui side
let progress_store = FileProgressStore::new(config.progress_store_file.into());
let mut executor = IndexerExecutor::new(progress_store, 1 /* workflow types */, metrics);
let worker_pool = WorkerPool::new(
BridgeWorker::new(vec![], config.db_url.clone()),
BridgeWorker::new(vec![], config.db_url.clone(), indexer_meterics.clone()),
"bridge worker".into(),
config.concurrency as usize,
);
Expand Down
83 changes: 83 additions & 0 deletions crates/sui-bridge-indexer/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use prometheus::{register_int_counter_with_registry, IntCounter, Registry};

#[derive(Clone)]
pub struct BridgeIndexerMetrics {
pub(crate) total_sui_bridge_transactions: IntCounter,
pub(crate) total_sui_token_deposited: IntCounter,
pub(crate) total_sui_token_transfer_approved: IntCounter,
pub(crate) total_sui_token_transfer_claimed: IntCounter,
pub(crate) total_sui_bridge_txn_other: IntCounter,
pub(crate) total_eth_bridge_transactions: IntCounter,
pub(crate) total_eth_token_deposited: IntCounter,
pub(crate) total_eth_token_transfer_claimed: IntCounter,
pub(crate) total_eth_bridge_txn_other: IntCounter,
}

impl BridgeIndexerMetrics {
pub fn new(registry: &Registry) -> Self {
Self {
total_sui_bridge_transactions: register_int_counter_with_registry!(
"total_sui_bridge_transactions",
"Total number of sui bridge transactions",
registry,
)
.unwrap(),
total_sui_token_deposited: register_int_counter_with_registry!(
"total_sui_token_deposited",
"Total number of sui token deposited transactions",
registry,
)
.unwrap(),
total_sui_token_transfer_approved: register_int_counter_with_registry!(
"total_sui_token_transfer_approved",
"Total number of sui token approved transactions",
registry,
)
.unwrap(),
total_sui_token_transfer_claimed: register_int_counter_with_registry!(
"total_sui_token_transfer_claimed",
"Total number of sui token claimed transactions",
registry,
)
.unwrap(),
total_sui_bridge_txn_other: register_int_counter_with_registry!(
"total_sui_bridge_txn_other",
"Total number of other sui bridge transactions",
registry,
)
.unwrap(),
total_eth_bridge_transactions: register_int_counter_with_registry!(
"total_eth_bridge_transactions",
"Total number of eth bridge transactions",
registry,
)
.unwrap(),
total_eth_token_deposited: register_int_counter_with_registry!(
"total_eth_token_deposited",
"Total number of eth token deposited transactions",
registry,
)
.unwrap(),
total_eth_token_transfer_claimed: register_int_counter_with_registry!(
"total_eth_token_transfer_claimed",
"Total number of eth token claimed transactions",
registry,
)
.unwrap(),
total_eth_bridge_txn_other: register_int_counter_with_registry!(
"total_eth_bridge_txn_other",
"Total number of other eth bridge transactions",
registry,
)
.unwrap(),
}
}

pub fn new_for_testing() -> Self {
let registry = Registry::new();
Self::new(&registry)
}
}
65 changes: 50 additions & 15 deletions crates/sui-bridge-indexer/src/worker.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::postgres_writer::{get_connection_pool, write, PgPool};
use crate::{BridgeDataSource, TokenTransfer, TokenTransferData, TokenTransferStatus};
use crate::{
metrics::BridgeIndexerMetrics,
postgres_writer::{get_connection_pool, write, PgPool},
BridgeDataSource, TokenTransfer, TokenTransferData, TokenTransferStatus,
};
use anyhow::Result;
use async_trait::async_trait;
use ethers::providers::Provider;
Expand All @@ -25,21 +28,27 @@ use sui_types::{
transaction::{TransactionDataAPI, TransactionKind},
BRIDGE_ADDRESS, SUI_BRIDGE_OBJECT_ID,
};
use tracing::{debug, info};
use tracing::info;

pub struct BridgeWorker {
bridge_object_ids: BTreeSet<ObjectID>,
pg_pool: PgPool,
metrics: BridgeIndexerMetrics,
}

impl BridgeWorker {
pub fn new(bridge_object_ids: Vec<ObjectID>, db_url: String) -> Self {
pub fn new(
bridge_object_ids: Vec<ObjectID>,
db_url: String,
metrics: BridgeIndexerMetrics,
) -> Self {
let mut bridge_object_ids = bridge_object_ids.into_iter().collect::<BTreeSet<_>>();
bridge_object_ids.insert(SUI_BRIDGE_OBJECT_ID);
let pg_pool = get_connection_pool(db_url);
Self {
bridge_object_ids,
pg_pool,
metrics,
}
}

Expand All @@ -64,16 +73,19 @@ impl BridgeWorker {
checkpoint: u64,
timestamp_ms: u64,
) -> Result<Vec<TokenTransfer>> {
self.metrics.total_sui_bridge_transactions.inc();
if let Some(events) = &tx.events {
let token_transfers = events.data.iter().try_fold(vec![], |mut result, ev| {
if let Some(data) = Self::process_sui_event(ev, tx, checkpoint, timestamp_ms)? {
if let Some(data) =
Self::process_sui_event(ev, tx, checkpoint, timestamp_ms, &self.metrics)?
{
result.push(data);
}
Ok::<_, anyhow::Error>(result)
})?;

if !token_transfers.is_empty() {
debug!(
info!(
"SUI: Extracted {} bridge token transfer data entries for tx {}.",
token_transfers.len(),
tx.transaction.digest()
Expand All @@ -90,11 +102,13 @@ impl BridgeWorker {
tx: &CheckpointTransaction,
checkpoint: u64,
timestamp_ms: u64,
metrics: &BridgeIndexerMetrics,
) -> Result<Option<TokenTransfer>> {
Ok(if ev.type_.address == BRIDGE_ADDRESS {
match ev.type_.name.as_str() {
"TokenDepositedEvent" => {
debug!("Observed Sui Deposit {:?}", ev);
info!("Observed Sui Deposit {:?}", ev);
metrics.total_sui_token_deposited.inc();
let move_event: MoveTokenDepositedEvent = bcs::from_bytes(&ev.contents)?;
Some(TokenTransfer {
chain_id: move_event.source_chain,
Expand All @@ -116,7 +130,8 @@ impl BridgeWorker {
})
}
"TokenTransferApproved" => {
debug!("Observed Sui Approval {:?}", ev);
info!("Observed Sui Approval {:?}", ev);
metrics.total_sui_token_transfer_approved.inc();
let event: MoveTokenTransferApproved = bcs::from_bytes(&ev.contents)?;
Some(TokenTransfer {
chain_id: event.message_key.source_chain,
Expand All @@ -132,7 +147,8 @@ impl BridgeWorker {
})
}
"TokenTransferClaimed" => {
debug!("Observed Sui Claim {:?}", ev);
info!("Observed Sui Claim {:?}", ev);
metrics.total_sui_token_transfer_claimed.inc();
let event: MoveTokenTransferClaimed = bcs::from_bytes(&ev.contents)?;
Some(TokenTransfer {
chain_id: event.message_key.source_chain,
Expand All @@ -147,7 +163,10 @@ impl BridgeWorker {
data: None,
})
}
_ => None,
_ => {
metrics.total_sui_bridge_txn_other.inc();
None
}
}
} else {
None
Expand All @@ -159,6 +178,7 @@ pub async fn process_eth_transaction(
mut eth_events_rx: Receiver<(EthAddress, u64, Vec<EthLog>)>,
provider: Arc<Provider<Http>>,
pool: PgPool,
metrics: BridgeIndexerMetrics,
) {
while let Some((_, _, logs)) = eth_events_rx.recv().await {
let mut data = vec![];
Expand All @@ -177,9 +197,15 @@ pub async fn process_eth_transaction(
let gas = transaction.gas;
let tx_hash = log.tx_hash;
info!("Observed Eth bridge event: {:?}", bridge_event);
if let Some(token_transfer) =
process_eth_event(bridge_event, block_number, timestamp, tx_hash, gas.as_u64())
{
metrics.total_eth_bridge_transactions.inc();
if let Some(token_transfer) = process_eth_event(
bridge_event,
block_number,
timestamp,
tx_hash,
gas.as_u64(),
&metrics,
) {
data.push(token_transfer)
}
}
Expand All @@ -195,11 +221,13 @@ fn process_eth_event(
timestamp_ms: u64,
tx_hash: H256,
gas: u64,
metrics: &BridgeIndexerMetrics,
) -> Option<TokenTransfer> {
match bridge_event {
EthBridgeEvent::EthSuiBridgeEvents(bridge_event) => match bridge_event {
EthSuiBridgeEvents::TokensDepositedFilter(bridge_event) => {
info!("Observed Eth Deposit {:?}", bridge_event);
metrics.total_eth_token_deposited.inc();
Some(TokenTransfer {
chain_id: bridge_event.source_chain_id,
nonce: bridge_event.nonce,
Expand All @@ -221,6 +249,7 @@ fn process_eth_event(
}
EthSuiBridgeEvents::TokensClaimedFilter(bridge_event) => {
info!("Observed Eth Claim {:?}", bridge_event);
metrics.total_eth_token_transfer_claimed.inc();
Some(TokenTransfer {
chain_id: bridge_event.source_chain_id,
nonce: bridge_event.nonce,
Expand All @@ -237,12 +266,18 @@ fn process_eth_event(
EthSuiBridgeEvents::PausedFilter(_)
| EthSuiBridgeEvents::UnpausedFilter(_)
| EthSuiBridgeEvents::UpgradedFilter(_)
| EthSuiBridgeEvents::InitializedFilter(_) => None,
| EthSuiBridgeEvents::InitializedFilter(_) => {
metrics.total_eth_bridge_txn_other.inc();
None
}
},
EthBridgeEvent::EthBridgeCommitteeEvents(_)
| EthBridgeEvent::EthBridgeLimiterEvents(_)
| EthBridgeEvent::EthBridgeConfigEvents(_)
| EthBridgeEvent::EthCommitteeUpgradeableContractEvents(_) => None,
| EthBridgeEvent::EthCommitteeUpgradeableContractEvents(_) => {
metrics.total_eth_bridge_txn_other.inc();
None
}
}
}

Expand Down

0 comments on commit f1a2d61

Please sign in to comment.