Skip to content

Commit

Permalink
[data ingestion] archival worker (MystenLabs#16039)
Browse files Browse the repository at this point in the history
includes a slight change to the data ingestion interface to support the
use case. Now tasks can skip progress updates in the meta store,
allowing the accumulation of state with multiple checkpoints before
saving and ensuring correct behavior during random crash/restart.
Note: this workload is expected to be run with concurrency=1 only
  • Loading branch information
phoenix-o authored Feb 6, 2024
1 parent 2db4292 commit f54311e
Show file tree
Hide file tree
Showing 8 changed files with 265 additions and 22 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 34 additions & 9 deletions crates/sui-archival/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@ use sui_config::object_storage_config::ObjectStoreConfig;
use sui_storage::blob::{Blob, BlobEncoding};
use sui_storage::object_store::util::{get, put};
use sui_storage::object_store::{ObjectStoreGetExt, ObjectStorePutExt};
use sui_storage::{compute_sha3_checksum, SHA3_BYTES};
use sui_storage::{compute_sha3_checksum, compute_sha3_checksum_for_bytes, SHA3_BYTES};
use sui_types::base_types::ExecutionData;
use sui_types::messages_checkpoint::{FullCheckpointContents, VerifiedCheckpointContents};
use sui_types::storage::{SingleCheckpointSharedInMemoryStore, WriteStore};
use tracing::{error, info};

#[allow(rustdoc::invalid_html_tags)]
/// Checkpoints and summaries are persisted as blob files. Files are committed to local store
/// by duration or file size. Committed files are synced with the remote store continuously. Files are
/// optionally compressed with the zstd compression format. Filenames follow the format
Expand Down Expand Up @@ -88,8 +89,8 @@ use tracing::{error, info};
///├──────────────────────────────┤
///│ sha3 <32 bytes> │
///└──────────────────────────────┘
const CHECKPOINT_FILE_MAGIC: u32 = 0x0000DEAD;
const SUMMARY_FILE_MAGIC: u32 = 0x0000CAFE;
pub const CHECKPOINT_FILE_MAGIC: u32 = 0x0000DEAD;
pub const SUMMARY_FILE_MAGIC: u32 = 0x0000CAFE;
const MANIFEST_FILE_MAGIC: u32 = 0x00C0FFEE;
const MAGIC_BYTES: usize = 4;
const CHECKPOINT_FILE_SUFFIX: &str = "chk";
Expand Down Expand Up @@ -262,9 +263,29 @@ pub fn create_file_metadata(
Ok(file_metadata)
}

pub fn create_file_metadata_from_bytes(
bytes: Bytes,
file_type: FileType,
epoch_num: u64,
checkpoint_seq_range: Range<u64>,
) -> Result<FileMetadata> {
let sha3_digest = compute_sha3_checksum_for_bytes(bytes)?;
let file_metadata = FileMetadata {
file_type,
epoch_num,
checkpoint_seq_range,
sha3_digest,
};
Ok(file_metadata)
}

pub async fn read_manifest<S: ObjectStoreGetExt>(remote_store: S) -> Result<Manifest> {
let manifest_file_path = Path::from(MANIFEST_FILENAME);
let vec = get(&remote_store, &manifest_file_path).await?.to_vec();
read_manifest_from_bytes(vec)
}

pub fn read_manifest_from_bytes(vec: Vec<u8>) -> Result<Manifest> {
let manifest_file_size = vec.len();
let mut manifest_reader = Cursor::new(vec);
manifest_reader.rewind()?;
Expand Down Expand Up @@ -293,11 +314,7 @@ pub async fn read_manifest<S: ObjectStoreGetExt>(remote_store: S) -> Result<Mani
Blob::read(&mut manifest_reader)?.decode()
}

pub async fn write_manifest<S: ObjectStorePutExt>(
manifest: Manifest,
remote_store: S,
) -> Result<()> {
let path = Path::from(MANIFEST_FILENAME);
pub fn finalize_manifest(manifest: Manifest) -> Result<Bytes> {
let mut buf = BufWriter::new(vec![]);
buf.write_u32::<BigEndian>(MANIFEST_FILE_MAGIC)?;
let blob = Blob::encode(&manifest, BlobEncoding::Bcs)?;
Expand All @@ -307,7 +324,15 @@ pub async fn write_manifest<S: ObjectStorePutExt>(
hasher.update(buf.get_ref());
let computed_digest = hasher.finalize().digest;
buf.write_all(&computed_digest)?;
let bytes = Bytes::from(buf.into_inner()?);
Ok(Bytes::from(buf.into_inner()?))
}

pub async fn write_manifest<S: ObjectStorePutExt>(
manifest: Manifest,
remote_store: S,
) -> Result<()> {
let path = Path::from(MANIFEST_FILENAME);
let bytes = finalize_manifest(manifest)?;
put(&remote_store, &path, bytes).await?;
Ok(())
}
Expand Down
2 changes: 2 additions & 0 deletions crates/sui-data-ingestion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ aws-sdk-s3.workspace = true
backoff.workspace = true
base64-url.workspace = true
bcs.workspace = true
byteorder.workspace = true
bytes.workspace = true
futures.workspace = true
mysten-metrics.workspace = true
Expand All @@ -27,6 +28,7 @@ prometheus.workspace = true
telemetry-subscribers.workspace = true
tokio = { workspace = true, features = ["full"] }
tracing.workspace = true
sui-archival.workspace = true
sui-storage.workspace = true
sui-types.workspace = true
url.workspace = true
Expand Down
5 changes: 4 additions & 1 deletion crates/sui-data-ingestion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,7 @@ pub use executor::IndexerExecutor;
pub use metrics::DataIngestionMetrics;
pub use progress_store::{DynamoDBProgressStore, FileProgressStore};
pub use worker_pool::WorkerPool;
pub use workers::{BlobTaskConfig, BlobWorker, KVStoreTaskConfig, KVStoreWorker, Worker};
pub use workers::{
ArchivalConfig, ArchivalWorker, BlobTaskConfig, BlobWorker, KVStoreTaskConfig, KVStoreWorker,
Worker,
};
13 changes: 11 additions & 2 deletions crates/sui-data-ingestion/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use serde::{Deserialize, Serialize};
use std::env;
use std::path::PathBuf;
use sui_data_ingestion::{
BlobTaskConfig, BlobWorker, DataIngestionMetrics, DynamoDBProgressStore, KVStoreTaskConfig,
KVStoreWorker,
ArchivalConfig, ArchivalWorker, BlobTaskConfig, BlobWorker, DataIngestionMetrics,
DynamoDBProgressStore, KVStoreTaskConfig, KVStoreWorker,
};
use sui_data_ingestion::{IndexerExecutor, WorkerPool};
use tokio::signal;
Expand All @@ -17,6 +17,7 @@ use tokio::sync::oneshot;
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(rename_all = "lowercase")]
enum Task {
Archival(ArchivalConfig),
Blob(BlobTaskConfig),
KV(KVStoreTaskConfig),
}
Expand Down Expand Up @@ -115,6 +116,14 @@ async fn main() -> Result<()> {
let mut executor = IndexerExecutor::new(progress_store, config.tasks.len(), metrics);
for task_config in config.tasks {
match task_config.task {
Task::Archival(archival_config) => {
let worker_pool = WorkerPool::new(
ArchivalWorker::new(archival_config).await?,
task_config.name,
task_config.concurrency,
);
executor.register(worker_pool).await?;
}
Task::Blob(blob_config) => {
let worker_pool = WorkerPool::new(
BlobWorker::new(blob_config),
Expand Down
26 changes: 16 additions & 10 deletions crates/sui-data-ingestion/src/worker_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use crate::executor::MAX_CHECKPOINTS_IN_PROGRESS;
use crate::workers::Worker;
use mysten_metrics::spawn_monitored_task;
use std::collections::{BTreeSet, HashSet, VecDeque};
use std::collections::{BTreeSet, HashMap, VecDeque};
use std::sync::Arc;
use std::time::Instant;
use sui_types::full_checkpoint_content::CheckpointData;
Expand Down Expand Up @@ -37,7 +37,7 @@ impl<W: Worker + 'static> WorkerPool<W> {
"Starting indexing pipeline {} with concurrency {}. Current watermark is {}.",
self.task_name, self.concurrency, current_checkpoint_number
);
let mut updates: HashSet<u64> = HashSet::new();
let mut updates = HashMap::new();

let (progress_sender, mut progress_receiver) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
let mut workers = vec![];
Expand Down Expand Up @@ -76,7 +76,7 @@ impl<W: Worker + 'static> WorkerPool<W> {
.await
.expect("checkpoint processing failed for checkpoint");
info!("finished checkpoint processing {} for workflow {} in {:?}", sequence_number, task_name, start_time.elapsed());
cloned_progress_sender.send((worker_id, sequence_number)).await.expect("failed to update progress");
cloned_progress_sender.send((worker_id, sequence_number, worker.save_progress().await)).await.expect("failed to update progress");
}
}
}
Expand All @@ -97,17 +97,23 @@ impl<W: Worker + 'static> WorkerPool<W> {
workers[worker_id].0.send(checkpoint).await.expect("failed to dispatch a task");
}
}
Some((worker_id, status_update)) = progress_receiver.recv() => {
Some((worker_id, status_update, should_save_progress)) = progress_receiver.recv() => {
idle.insert(worker_id);
updates.insert(status_update);
updates.insert(status_update, should_save_progress);
if status_update == current_checkpoint_number {
while updates.remove(&current_checkpoint_number) {
let mut executor_status_update = None;
while let Some(should_save_progress) = updates.remove(&current_checkpoint_number) {
if should_save_progress {
executor_status_update = Some(current_checkpoint_number + 1);
}
current_checkpoint_number += 1;
}
executor_progress_sender
.send((self.task_name.clone(), current_checkpoint_number))
.await
.expect("Failed to send progress update to the executor");
if let Some(update) = executor_status_update {
executor_progress_sender
.send((self.task_name.clone(), update))
.await
.expect("Failed to send progress update to the executor");
}
}
while !checkpoints.is_empty() && !idle.is_empty() {
let checkpoint = checkpoints.pop_front().unwrap();
Expand Down
Loading

0 comments on commit f54311e

Please sign in to comment.