Skip to content

Commit

Permalink
[data ingestion] enforce limit on reader side (MystenLabs#15486)
Browse files Browse the repository at this point in the history
the PR prevents various deadlock issues related to mpsc channels in the
data ingestion daemon by limiting the number of concurrent tasks on the
reader side. This restriction ensures that actor channels won't get
congested, as the number of tasks produced by the reader is now capped
  • Loading branch information
phoenix-o authored Jan 11, 2024
1 parent 2aaa700 commit cb68833
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 24 deletions.
8 changes: 4 additions & 4 deletions crates/sui-data-ingestion/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::progress_store::{ExecutorProgress, ProgressStore, ProgressStoreWrapper};
use crate::reader::LocalReader;
use crate::reader::CheckpointReader;
use crate::worker_pool::WorkerPool;
use crate::workers::Worker;
use crate::DataIngestionMetrics;
Expand Down Expand Up @@ -60,10 +60,10 @@ impl<P: ProgressStore> IndexerExecutor<P> {
path: PathBuf,
mut exit_receiver: oneshot::Receiver<()>,
) -> Result<ExecutorProgress> {
let (checkpoint_reader, mut checkpoint_recv, gc_sender, _exit_sender) =
LocalReader::initialize(path);
let mut reader_checkpoint_number = self.progress_store.min_watermark()?;
spawn_monitored_task!(checkpoint_reader.run(reader_checkpoint_number));
let (checkpoint_reader, mut checkpoint_recv, gc_sender, _exit_sender) =
CheckpointReader::initialize(path, reader_checkpoint_number);
spawn_monitored_task!(checkpoint_reader.run());

for pool in std::mem::take(&mut self.pools) {
spawn_monitored_task!(pool);
Expand Down
58 changes: 39 additions & 19 deletions crates/sui-data-ingestion/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,47 +23,69 @@ pub(crate) const ENV_VAR_LOCAL_READ_TIMEOUT_MS: &str = "LOCAL_READ_TIMEOUT_MS";
/// Implements a checkpoint reader that monitors a local directory.
/// Designed for setups where the indexer daemon is colocated with FN.
/// This implementation is push-based and utilizes the inotify API.
pub struct LocalReader {
pub struct CheckpointReader {
path: PathBuf,
current_checkpoint_number: CheckpointSequenceNumber,
last_pruned_watermark: CheckpointSequenceNumber,
checkpoint_sender: mpsc::Sender<CheckpointData>,
processed_receiver: mpsc::Receiver<CheckpointSequenceNumber>,
exit_receiver: oneshot::Receiver<()>,
}

impl LocalReader {
impl CheckpointReader {
/// Represents a single iteration of the reader.
/// Reads files in a local directory, validates them, and forwards `CheckpointData` to the executor.
async fn read_files(
&self,
current_checkpoint_number: CheckpointSequenceNumber,
) -> Result<CheckpointSequenceNumber> {
async fn read_local_files(&self) -> Result<Vec<CheckpointData>> {
let mut files = vec![];
for entry in fs::read_dir(self.path.clone())? {
let entry = entry?;
let filename = entry.file_name();
if let Some(sequence_number) = Self::checkpoint_number_from_file_path(&filename) {
if sequence_number >= current_checkpoint_number {
if sequence_number >= self.current_checkpoint_number {
files.push((sequence_number, entry.path()));
}
}
}
files.sort();
info!(
"local reader: current checkpoint number is {}. Unprocessed local files are {:?}",
current_checkpoint_number, files
self.current_checkpoint_number, files
);
let mut checkpoints = vec![];
for (idx, (sequence_number, filename)) in files.iter().enumerate() {
if current_checkpoint_number + idx as u64 != *sequence_number {
if self.current_checkpoint_number + idx as u64 != *sequence_number {
return Err(anyhow!("checkpoint sequence should not have any gaps"));
}
let checkpoint = Blob::from_bytes::<CheckpointData>(&fs::read(filename)?)?;
checkpoints.push(checkpoint);
}
Ok(checkpoints)
}

async fn sync(&mut self) -> Result<()> {
let backoff = backoff::ExponentialBackoff::default();
let checkpoints = backoff::future::retry(backoff, || async {
self.read_local_files()
.await
.map_err(backoff::Error::transient)
})
.await?;
for checkpoint in checkpoints {
if (MAX_CHECKPOINTS_IN_PROGRESS as u64 + self.last_pruned_watermark)
<= checkpoint.checkpoint_summary.sequence_number
{
break;
}
self.checkpoint_sender.send(checkpoint).await?;
self.current_checkpoint_number += 1;
}
Ok(current_checkpoint_number + files.len() as u64)
Ok(())
}

/// Cleans the local directory by removing all processed checkpoint files.
fn gc_processed_files(&self, watermark: CheckpointSequenceNumber) -> Result<()> {
fn gc_processed_files(&mut self, watermark: CheckpointSequenceNumber) -> Result<()> {
info!("cleaning processed files, watermark is {}", watermark);
self.last_pruned_watermark = watermark;
for entry in fs::read_dir(self.path.clone())? {
let entry = entry?;
let filename = entry.file_name();
Expand All @@ -85,6 +107,7 @@ impl LocalReader {

pub fn initialize(
path: PathBuf,
starting_checkpoint_number: CheckpointSequenceNumber,
) -> (
Self,
mpsc::Receiver<CheckpointData>,
Expand All @@ -96,14 +119,16 @@ impl LocalReader {
let (exit_sender, exit_receiver) = oneshot::channel();
let reader = Self {
path,
current_checkpoint_number: starting_checkpoint_number,
last_pruned_watermark: starting_checkpoint_number,
checkpoint_sender,
processed_receiver,
exit_receiver,
};
(reader, checkpoint_recv, processed_sender, exit_sender)
}

pub async fn run(mut self, mut checkpoint_number: CheckpointSequenceNumber) -> Result<()> {
pub async fn run(mut self) -> Result<()> {
let (inotify_sender, mut inotify_recv) = mpsc::channel(1);
std::fs::create_dir_all(self.path.clone()).expect("failed to create a directory");
let mut watcher = notify::recommended_watcher(move |res| {
Expand All @@ -121,18 +146,13 @@ impl LocalReader {
.expect("Inotify watcher failed");

let timeout_ms = std::env::var(ENV_VAR_LOCAL_READ_TIMEOUT_MS)
.unwrap_or("60000".to_string())
.unwrap_or("1000".to_string())
.parse::<u64>()?;

loop {
tokio::select! {
Ok(Some(_)) | Err(_) = timeout(Duration::from_millis(timeout_ms), inotify_recv.recv()) => {
let backoff = backoff::ExponentialBackoff::default();
checkpoint_number = backoff::future::retry(backoff, || async {
self.read_files(checkpoint_number).await.map_err(backoff::Error::transient)
})
.await
.expect("Failed to read checkpoint files");
self.sync().await.expect("Failed to read checkpoint files");
}
Some(gc_checkpoint_number) = self.processed_receiver.recv() => {
self.gc_processed_files(gc_checkpoint_number).expect("Failed to clean the directory");
Expand Down
5 changes: 4 additions & 1 deletion crates/sui-data-ingestion/src/worker_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ impl<W: Worker + 'static> WorkerPool<W> {
.clone()
.process_checkpoint(checkpoint.clone())
.await
.map_err(backoff::Error::transient)
.map_err(|err| {
info!("transient worker execution error {:?}", err);
backoff::Error::transient(err)
})
})
.await
.expect("checkpoint processing failed for checkpoint");
Expand Down

0 comments on commit cb68833

Please sign in to comment.