Skip to content

Commit

Permalink
[data ingestion] split core engine and mysten pipelines (MystenLabs#1…
Browse files Browse the repository at this point in the history
…6103)

this enables a cleaner experience for external developers, eliminating
the need to include dependencies specific to the Mysten ingestion
pipelines, such as S3, DynamoDB, etc
  • Loading branch information
phoenix-o authored Feb 8, 2024
1 parent d2df8af commit 52f14b1
Show file tree
Hide file tree
Showing 22 changed files with 103 additions and 34 deletions.
28 changes: 27 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ members = [
"crates/sui-core",
"crates/sui-cost",
"crates/sui-data-ingestion",
"crates/sui-data-ingestion-core",
"crates/sui-e2e-tests",
"crates/sui-enum-compat-util",
"crates/sui-faucet",
Expand Down Expand Up @@ -575,6 +576,7 @@ sui-config = { path = "crates/sui-config" }
sui-core = { path = "crates/sui-core" }
sui-cost = { path = "crates/sui-cost" }
sui-data-ingestion = { path = "crates/sui-data-ingestion" }
sui-data-ingestion-core = { path = "crates/sui-data-ingestion-core" }
sui-e2e-tests = { path = "crates/sui-e2e-tests" }
sui-enum-compat-util = { path = "crates/sui-enum-compat-util" }
sui-faucet = { path = "crates/sui-faucet" }
Expand Down
31 changes: 31 additions & 0 deletions crates/sui-data-ingestion-core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
[package]
name = "sui-data-ingestion-core"
authors = ["Mysten Labs <[email protected]>"]
license = "Apache-2.0"
publish = false
edition = "2021"
version = "0.1.0"

[dependencies]
anyhow.workspace = true
async-trait.workspace = true
backoff.workspace = true
futures.workspace = true
mysten-metrics.workspace = true
notify.workspace = true
serde.workspace = true
serde_json.workspace = true
object_store.workspace = true
prometheus.workspace = true
telemetry-subscribers.workspace = true
tokio = { workspace = true, features = ["full"] }
tracing.workspace = true
sui-storage.workspace = true
sui-types.workspace = true
url.workspace = true
workspace-hack.workspace = true

[dev-dependencies]
rand.workspace = true
tempfile.workspace = true
sui-types = { workspace = true, features = ["test-utils"] }
File renamed without changes.
17 changes: 17 additions & 0 deletions crates/sui-data-ingestion-core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

mod executor;
mod metrics;
mod progress_store;
mod reader;
#[cfg(test)]
mod tests;
mod worker_pool;
mod workers;

pub use executor::IndexerExecutor;
pub use metrics::DataIngestionMetrics;
pub use progress_store::{FileProgressStore, ProgressStore};
pub use worker_pool::WorkerPool;
pub use workers::Worker;
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ use anyhow::Result;
use async_trait::async_trait;
use std::collections::HashMap;
use sui_types::messages_checkpoint::CheckpointSequenceNumber;
mod dynamodb;
mod file;
pub use dynamodb::DynamoDBProgressStore;
pub use file::FileProgressStore;

pub type ExecutorProgress = HashMap<String, CheckpointSequenceNumber>;
Expand Down
File renamed without changes.
File renamed without changes.
14 changes: 14 additions & 0 deletions crates/sui-data-ingestion-core/src/workers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use anyhow::Result;
use async_trait::async_trait;
use sui_types::full_checkpoint_content::CheckpointData;

#[async_trait]
pub trait Worker: Send + Sync {
async fn process_checkpoint(&self, checkpoint: CheckpointData) -> Result<()>;
async fn save_progress(&self) -> bool {
true
}
}
1 change: 1 addition & 0 deletions crates/sui-data-ingestion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ tokio = { workspace = true, features = ["full"] }
tracing.workspace = true
sui-archival.workspace = true
sui-storage.workspace = true
sui-data-ingestion-core.workspace = true
sui-types.workspace = true
url.workspace = true
workspace-hack.workspace = true
Expand Down
12 changes: 1 addition & 11 deletions crates/sui-data-ingestion/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,10 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

mod executor;
mod metrics;
mod progress_store;
mod reader;
#[cfg(test)]
mod tests;
mod worker_pool;
mod workers;

pub use executor::IndexerExecutor;
pub use metrics::DataIngestionMetrics;
pub use progress_store::{DynamoDBProgressStore, FileProgressStore};
pub use worker_pool::WorkerPool;
pub use progress_store::DynamoDBProgressStore;
pub use workers::{
ArchivalConfig, ArchivalWorker, BlobTaskConfig, BlobWorker, KVStoreTaskConfig, KVStoreWorker,
Worker,
};
7 changes: 4 additions & 3 deletions crates/sui-data-ingestion/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ use serde::{Deserialize, Serialize};
use std::env;
use std::path::PathBuf;
use sui_data_ingestion::{
ArchivalConfig, ArchivalWorker, BlobTaskConfig, BlobWorker, DataIngestionMetrics,
DynamoDBProgressStore, KVStoreTaskConfig, KVStoreWorker,
ArchivalConfig, ArchivalWorker, BlobTaskConfig, BlobWorker, DynamoDBProgressStore,
KVStoreTaskConfig, KVStoreWorker,
};
use sui_data_ingestion::{IndexerExecutor, WorkerPool};
use sui_data_ingestion_core::DataIngestionMetrics;
use sui_data_ingestion_core::{IndexerExecutor, WorkerPool};
use tokio::signal;
use tokio::sync::oneshot;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::progress_store::ProgressStore;
use anyhow::Result;
use async_trait::async_trait;
use aws_sdk_dynamodb::types::AttributeValue;
use aws_sdk_dynamodb::Client;
use aws_sdk_s3::config::{Credentials, Region};
use std::str::FromStr;
use sui_data_ingestion_core::ProgressStore;
use sui_types::messages_checkpoint::CheckpointSequenceNumber;

pub struct DynamoDBProgressStore {
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-data-ingestion/src/workers/archival.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::Worker;
use anyhow::Result;
use async_trait::async_trait;
use byteorder::BigEndian;
Expand All @@ -17,6 +16,7 @@ use sui_archival::{
create_file_metadata_from_bytes, finalize_manifest, read_manifest_from_bytes, FileType,
Manifest, CHECKPOINT_FILE_MAGIC, SUMMARY_FILE_MAGIC,
};
use sui_data_ingestion_core::Worker;
use sui_storage::blob::{Blob, BlobEncoding};
use sui_storage::{compress, FileCompression, StorageFormat};
use sui_types::base_types::{EpochId, ExecutionData};
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-data-ingestion/src/workers/blob.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::Worker;
use anyhow::Result;
use async_trait::async_trait;
use bytes::Bytes;
Expand All @@ -11,6 +10,7 @@ use object_store::{ClientConfigKey, ClientOptions, ObjectStore, RetryConfig};
use serde::{Deserialize, Serialize};
use std::str::FromStr;
use std::time::Duration;
use sui_data_ingestion_core::Worker;
use sui_storage::blob::{Blob, BlobEncoding};
use sui_types::full_checkpoint_content::CheckpointData;
use url::Url;
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-data-ingestion/src/workers/kv_store.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::Worker;
use anyhow::Result;
use async_trait::async_trait;
use aws_sdk_dynamodb::primitives::Blob;
Expand All @@ -15,6 +14,7 @@ use serde::{Deserialize, Serialize};
use std::borrow::Borrow;
use std::collections::{HashMap, HashSet, VecDeque};
use std::iter::repeat;
use sui_data_ingestion_core::Worker;
use sui_storage::http_key_value_store::TaggedKey;
use sui_types::full_checkpoint_content::CheckpointData;
use sui_types::storage::ObjectKey;
Expand Down
11 changes: 0 additions & 11 deletions crates/sui-data-ingestion/src/workers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,9 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use anyhow::Result;
use async_trait::async_trait;
use sui_types::full_checkpoint_content::CheckpointData;
mod archival;
mod blob;
mod kv_store;
pub use archival::{ArchivalConfig, ArchivalWorker};
pub use blob::{BlobTaskConfig, BlobWorker};
pub use kv_store::{KVStoreTaskConfig, KVStoreWorker};

#[async_trait]
pub trait Worker: Send + Sync {
async fn process_checkpoint(&self, checkpoint: CheckpointData) -> Result<()>;
async fn save_progress(&self) -> bool {
true
}
}
2 changes: 1 addition & 1 deletion crates/suins-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ version = "0.1.0"

[dependencies]
diesel = { version = "2.1.4", features = ["postgres", "r2d2"] }
sui-data-ingestion.workspace = true
sui-data-ingestion-core.workspace = true
anyhow.workspace = true
async-trait.workspace = true
aws-config.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion crates/suins-indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use async_trait::async_trait;
use diesel::{dsl::sql, Connection, ExpressionMethods, RunQueryDsl};
use prometheus::Registry;
use std::path::PathBuf;
use sui_data_ingestion::{
use sui_data_ingestion_core::{
DataIngestionMetrics, FileProgressStore, IndexerExecutor, Worker, WorkerPool,
};
use sui_types::full_checkpoint_content::CheckpointData;
Expand Down

0 comments on commit 52f14b1

Please sign in to comment.