Skip to content

Commit

Permalink
[bridge-indexer] revamp task (MystenLabs#19245)
Browse files Browse the repository at this point in the history
## Description 

This PR reworks `Tasks`:
1. get rid of trait `Tasks` and create struct `Tasks` instead.
2. add `is_live_task` field to `Task`
3. pass `Task` to several functions instead of its parameters.
4. for ingestion framework, use a custom batch read size for backfill
tasks (this significantly improves the data download speed)

## Test plan 

How did you test the new or updated feature?

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
longbowlu authored Sep 8, 2024
1 parent 2616286 commit dd951f8
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 92 deletions.
17 changes: 8 additions & 9 deletions crates/sui-bridge-indexer/src/eth_bridge_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use sui_bridge::error::BridgeError;
use sui_bridge::eth_client::EthClient;
use sui_bridge::metered_eth_provider::MeteredEthHttpProvier;
use sui_bridge::retry_with_max_elapsed_time;
use sui_indexer_builder::Task;
use tokio::task::JoinHandle;
use tracing::info;

Expand Down Expand Up @@ -63,14 +64,13 @@ impl EthSubscriptionDatasource {
impl Datasource<RawEthData> for EthSubscriptionDatasource {
async fn start_data_retrieval(
&self,
starting_checkpoint: u64,
target_checkpoint: u64,
task: Task,
data_sender: DataSender<RawEthData>,
) -> Result<JoinHandle<Result<(), Error>>, Error> {
let filter = Filter::new()
.address(self.bridge_address)
.from_block(starting_checkpoint)
.to_block(target_checkpoint);
.from_block(task.start_checkpoint)
.to_block(task.target_checkpoint);

let eth_ws_url = self.eth_ws_url.clone();
let indexer_metrics: BridgeIndexerMetrics = self.indexer_metrics.clone();
Expand Down Expand Up @@ -194,8 +194,7 @@ impl EthSyncDatasource {
impl Datasource<RawEthData> for EthSyncDatasource {
async fn start_data_retrieval(
&self,
starting_checkpoint: u64,
target_checkpoint: u64,
task: Task,
data_sender: DataSender<RawEthData>,
) -> Result<JoinHandle<Result<(), Error>>, Error> {
let provider = Arc::new(
Expand All @@ -214,8 +213,8 @@ impl Datasource<RawEthData> for EthSyncDatasource {
let Ok(Ok(logs)) = retry_with_max_elapsed_time!(
client.get_raw_events_in_range(
bridge_address,
starting_checkpoint,
target_checkpoint
task.start_checkpoint,
task.target_checkpoint
),
Duration::from_secs(30000)
) else {
Expand Down Expand Up @@ -254,7 +253,7 @@ impl Datasource<RawEthData> for EthSyncDatasource {
data.push((log, block, transaction));
}

data_sender.send((target_checkpoint, data)).await?;
data_sender.send((task.target_checkpoint, data)).await?;

indexer_metrics
.last_synced_eth_block
Expand Down
5 changes: 3 additions & 2 deletions crates/sui-bridge-indexer/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use diesel::data_types::PgTimestamp;
use diesel::{Identifiable, Insertable, Queryable, Selectable};

use sui_indexer_builder::Task;
use sui_indexer_builder::{Task, LIVE_TASK_TARGET_CHECKPOINT};

use crate::schema::{
progress_store, sui_error_transactions, sui_progress_store, token_transfer, token_transfer_data,
Expand All @@ -23,10 +23,11 @@ impl From<ProgressStore> for Task {
fn from(value: ProgressStore) -> Self {
Self {
task_name: value.task_name,
checkpoint: value.checkpoint as u64,
start_checkpoint: value.checkpoint as u64,
target_checkpoint: value.target_checkpoint as u64,
// Ok to unwrap, timestamp is defaulted to now() in database
timestamp: value.timestamp.expect("Timestamp not set").0 as u64,
is_live_task: value.target_checkpoint == LIVE_TASK_TARGET_CHECKPOINT,
}
}
}
Expand Down
31 changes: 27 additions & 4 deletions crates/sui-bridge-indexer/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::schema::progress_store::{columns, dsl};
use crate::schema::{sui_error_transactions, token_transfer, token_transfer_data};
use crate::{models, schema, ProcessedTxnData};
use sui_indexer_builder::indexer_builder::{IndexerProgressStore, Persistent};
use sui_indexer_builder::Task;
use sui_indexer_builder::{Task, Tasks, LIVE_TASK_TARGET_CHECKPOINT};

/// Persistent layer impl
#[derive(Clone)]
Expand Down Expand Up @@ -147,7 +147,7 @@ impl IndexerProgressStore for PgBridgePersistent {
Ok(None)
}

async fn get_ongoing_tasks(&self, prefix: &str) -> Result<Vec<Task>, anyhow::Error> {
async fn get_ongoing_tasks(&self, prefix: &str) -> Result<Tasks, anyhow::Error> {
let mut conn = self.pool.get().await?;
// get all unfinished tasks
let cp: Vec<models::ProgressStore> = dsl::progress_store
Expand All @@ -157,7 +157,8 @@ impl IndexerProgressStore for PgBridgePersistent {
.order_by(columns::target_checkpoint.desc())
.load(&mut conn)
.await?;
Ok(cp.into_iter().map(|d| d.into()).collect())
let tasks = cp.into_iter().map(|d| d.into()).collect();
Ok(Tasks::new(tasks)?)
}

async fn get_largest_backfill_task_target_checkpoint(
Expand All @@ -177,6 +178,8 @@ impl IndexerProgressStore for PgBridgePersistent {
Ok(cp.map(|c| c as u64))
}

/// Register a new task to progress store with a start checkpoint and target checkpoint.
/// Usually used for backfill tasks.
async fn register_task(
&mut self,
task_name: String,
Expand All @@ -197,11 +200,31 @@ impl IndexerProgressStore for PgBridgePersistent {
Ok(())
}

/// Register a live task to progress store with a start checkpoint.
async fn register_live_task(
&mut self,
task_name: String,
start_checkpoint: u64,
) -> Result<(), anyhow::Error> {
let mut conn = self.pool.get().await?;
diesel::insert_into(schema::progress_store::table)
.values(models::ProgressStore {
task_name,
checkpoint: start_checkpoint as i64,
target_checkpoint: LIVE_TASK_TARGET_CHECKPOINT,
// Timestamp is defaulted to current time in DB if None
timestamp: None,
})
.execute(&mut conn)
.await?;
Ok(())
}

async fn update_task(&mut self, task: Task) -> Result<(), anyhow::Error> {
let mut conn = self.pool.get().await?;
diesel::update(dsl::progress_store.filter(columns::task_name.eq(task.task_name)))
.set((
columns::checkpoint.eq(task.checkpoint as i64),
columns::checkpoint.eq(task.start_checkpoint as i64),
columns::target_checkpoint.eq(task.target_checkpoint as i64),
columns::timestamp.eq(now),
))
Expand Down
37 changes: 26 additions & 11 deletions crates/sui-bridge-indexer/src/sui_datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use sui_data_ingestion_core::{
DataIngestionMetrics, IndexerExecutor, ProgressStore, ReaderOptions, Worker, WorkerPool,
};
use sui_indexer_builder::indexer_builder::{DataSender, Datasource};
use sui_indexer_builder::Task;
use sui_sdk::SuiClient;
use sui_types::base_types::TransactionDigest;
use sui_types::full_checkpoint_content::CheckpointData as SuiCheckpointData;
use sui_types::full_checkpoint_content::CheckpointTransaction;
use sui_types::messages_checkpoint::CheckpointSequenceNumber;
Expand All @@ -23,6 +23,9 @@ use tokio::task::JoinHandle;

use crate::metrics::BridgeIndexerMetrics;

const BACKFILL_TASK_INGESTION_READER_BATCH_SIZE: usize = 300;
const LIVE_TASK_INGESTION_READER_BATCH_SIZE: usize = 10;

pub struct SuiCheckpointDatasource {
remote_store_url: String,
sui_client: Arc<SuiClient>,
Expand Down Expand Up @@ -58,23 +61,32 @@ impl SuiCheckpointDatasource {
impl Datasource<CheckpointTxnData> for SuiCheckpointDatasource {
async fn start_data_retrieval(
&self,
starting_checkpoint: u64,
target_checkpoint: u64,
task: Task,
data_sender: DataSender<CheckpointTxnData>,
) -> Result<JoinHandle<Result<(), Error>>, Error> {
let (exit_sender, exit_receiver) = oneshot::channel();
let progress_store = PerTaskInMemProgressStore {
current_checkpoint: starting_checkpoint,
exit_checkpoint: target_checkpoint,
current_checkpoint: task.start_checkpoint,
exit_checkpoint: task.target_checkpoint,
exit_sender: Some(exit_sender),
};
// The max concurrnecy of checkpoint to fetch at the same time for ingestion framework
let ingestion_reader_batch_size = if task.is_live_task {
// Live task uses smaller number to be cost effective
LIVE_TASK_INGESTION_READER_BATCH_SIZE
} else {
std::env::var("BACKFILL_TASK_INGESTION_READER_BATCH_SIZE")
.unwrap_or(BACKFILL_TASK_INGESTION_READER_BATCH_SIZE.to_string())
.parse::<usize>()
.unwrap()
};
tracing::info!(
"Starting Sui checkpoint data retrieval with batch size {}",
ingestion_reader_batch_size
);
let mut executor = IndexerExecutor::new(progress_store, 1, self.metrics.clone());
let worker = IndexerWorker::new(data_sender);
let worker_pool = WorkerPool::new(
worker,
TransactionDigest::random().to_string(),
self.concurrency,
);
let worker_pool = WorkerPool::new(worker, task.task_name.clone(), self.concurrency);
executor.register(worker_pool).await?;
let checkpoint_path = self.checkpoint_path.clone();
let remote_store_url = self.remote_store_url.clone();
Expand All @@ -84,7 +96,10 @@ impl Datasource<CheckpointTxnData> for SuiCheckpointDatasource {
checkpoint_path,
Some(remote_store_url),
vec![], // optional remote store access options
ReaderOptions::default(),
ReaderOptions {
batch_size: ingestion_reader_batch_size,
..Default::default()
},
exit_receiver,
)
.await?;
Expand Down
46 changes: 22 additions & 24 deletions crates/sui-indexer-builder/src/indexer_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,7 @@ impl<P, D, M> Indexer<P, D, M> {
let live_task_future = match ongoing_tasks.live_task() {
Some(live_task) if !self.disable_live_task => {
let live_task_future = self.datasource.start_ingestion_task(
live_task.task_name.clone(),
live_task.checkpoint,
live_task.target_checkpoint,
live_task,
self.storage.clone(),
self.data_mapper.clone(),
);
Expand All @@ -127,20 +125,18 @@ impl<P, D, M> Indexer<P, D, M> {
_ => None,
};

let backfill_tasks = ongoing_tasks.backfill_tasks();
let backfill_tasks = ongoing_tasks.backfill_tasks_ordered_desc();
let storage_clone = self.storage.clone();
let data_mapper_clone = self.data_mapper.clone();
let datasource_clone = self.datasource.clone();

let handle = spawn_monitored_task!(async {
// Execute tasks one by one
for backfill_task in backfill_tasks {
if backfill_task.checkpoint < backfill_task.target_checkpoint {
if backfill_task.start_checkpoint < backfill_task.target_checkpoint {
datasource_clone
.start_ingestion_task(
backfill_task.task_name.clone(),
backfill_task.checkpoint,
backfill_task.target_checkpoint,
backfill_task,
storage_clone.clone(),
data_mapper_clone.clone(),
)
Expand Down Expand Up @@ -181,10 +177,9 @@ impl<P, D, M> Indexer<P, D, M> {
match ongoing_tasks.live_task() {
None => {
self.storage
.register_task(
.register_live_task(
format!("{} - Live", self.name),
live_task_from_checkpoint,
i64::MAX as u64,
)
.await
.tap_err(|e| {
Expand All @@ -199,8 +194,8 @@ impl<P, D, M> Indexer<P, D, M> {
// We still check this because in the case of slow
// block generation (e.g. Ethereum), it's possible we will
// stay on the same block for a bit.
if live_task_from_checkpoint != live_task.checkpoint {
live_task.checkpoint = live_task_from_checkpoint;
if live_task_from_checkpoint != live_task.start_checkpoint {
live_task.start_checkpoint = live_task_from_checkpoint;
self.storage.update_task(live_task).await.tap_err(|e| {
tracing::error!(
"Failed to update live task to ({}-MAX): {:?}",
Expand Down Expand Up @@ -318,7 +313,7 @@ pub trait IndexerProgressStore: Send {
target_checkpoint_number: u64,
) -> anyhow::Result<Option<u64>>;

async fn get_ongoing_tasks(&self, task_prefix: &str) -> Result<Vec<Task>, Error>;
async fn get_ongoing_tasks(&self, task_prefix: &str) -> Result<Tasks, Error>;

async fn get_largest_backfill_task_target_checkpoint(
&self,
Expand All @@ -328,27 +323,34 @@ pub trait IndexerProgressStore: Send {
async fn register_task(
&mut self,
task_name: String,
checkpoint: u64,
start_checkpoint: u64,
target_checkpoint: u64,
) -> Result<(), anyhow::Error>;

async fn register_live_task(
&mut self,
task_name: String,
start_checkpoint: u64,
) -> Result<(), anyhow::Error>;

async fn update_task(&mut self, task: Task) -> Result<(), Error>;
}

#[async_trait]
pub trait Datasource<T: Send>: Sync + Send {
async fn start_ingestion_task<M, P, R>(
&self,
task_name: String,
starting_checkpoint: u64,
target_checkpoint: u64,
task: Task,
mut storage: P,
data_mapper: M,
) -> Result<(), Error>
where
M: DataMapper<T, R>,
P: Persistent<R>,
{
let task_name = task.task_name.clone();
let starting_checkpoint = task.start_checkpoint;
let target_checkpoint = task.target_checkpoint;
let ingestion_batch_size = std::env::var("INGESTION_BATCH_SIZE")
.unwrap_or(INGESTION_BATCH_SIZE.to_string())
.parse::<usize>()
Expand All @@ -365,18 +367,15 @@ pub trait Datasource<T: Send>: Sync + Send {
starting_checkpoint,
target_checkpoint,
);
let is_live_task = target_checkpoint == i64::MAX as u64;
let (data_sender, data_rx) = metered_channel::channel(
checkpoint_channel_size,
&mysten_metrics::get_metrics()
.unwrap()
.channel_inflight
.with_label_values(&[&task_name]),
);
let join_handle = self
.start_data_retrieval(starting_checkpoint, target_checkpoint, data_sender)
.await?;

let is_live_task = task.is_live_task;
let join_handle = self.start_data_retrieval(task, data_sender).await?;
let processed_checkpoints_metrics = self
.get_tasks_processed_checkpoints_metric()
.with_label_values(&[&task_name]);
Expand Down Expand Up @@ -499,8 +498,7 @@ pub trait Datasource<T: Send>: Sync + Send {

async fn start_data_retrieval(
&self,
starting_checkpoint: u64,
target_checkpoint: u64,
task: Task,
data_sender: DataSender<T>,
) -> Result<JoinHandle<Result<(), Error>>, Error>;

Expand Down
Loading

0 comments on commit dd951f8

Please sign in to comment.