Skip to content

Commit

Permalink
feat: Use object store for rocksdb and debugging (#503)
Browse files Browse the repository at this point in the history
This uses `object_store` for uploading/downloading snapshots and the
query plan and flight record (debugging) information.

This removes the `s3` module, the `S3Helper` and the direct dependencies
on AWS.

This is part of #465.

---------

Co-authored-by: Eric Pinzur <[email protected]>
Co-authored-by: Therapon Skoteiniotis <[email protected]>
  • Loading branch information
3 people authored Jul 15, 2023
1 parent 59a2096 commit 5728ab8
Show file tree
Hide file tree
Showing 34 changed files with 492 additions and 1,263 deletions.
497 changes: 17 additions & 480 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 1 addition & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ async-stream = "0.3.4"
async-trait = "0.1.68"
avro-rs = "0.13.0"
avro-schema = "0.3.0"
aws-config = "0.11.0"
aws-sdk-s3 = "0.11.0"
pulsar = { version = "5.1.0", default-features = false, features = ["async-std-runtime", "tokio-runtime", "lz4"] }
aws-types = "0.11.0"
bigdecimal = { version = "0.3.1", features = ["serde"] }
bincode = "1.3.3"
bit-set = "0.5.3"
Expand Down Expand Up @@ -91,6 +87,7 @@ prost-types = "0.11.8"
prost-wkt = "0.4.1"
prost-wkt-build = "0.4.1"
prost-wkt-types = "0.4.1"
pulsar = { version = "5.1.0", default-features = false, features = ["async-std-runtime", "tokio-runtime", "lz4"] }
rand = "0.8.5"
reqwest = "0.11.14"
serde = { version = "1.0.159", features = ["derive", "rc"] }
Expand Down
3 changes: 0 additions & 3 deletions crates/sparrow-main/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@ ahash.workspace = true
anyhow.workspace = true
arrow.workspace = true
async-stream.workspace = true
aws-config.workspace = true
aws-sdk-s3.workspace = true
aws-types.workspace = true
chrono.workspace = true
clap.workspace = true
dashmap.workspace = true
Expand Down
28 changes: 15 additions & 13 deletions crates/sparrow-main/src/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ mod compute_service;
mod error_status;
mod file_service;
pub(crate) mod preparation_service;
use error_stack::{IntoReport, IntoReportCompat, ResultExt};
use error_stack::{IntoReport, ResultExt};
pub use error_status::*;

use sparrow_api::kaskada::v1alpha::compute_service_server::ComputeServiceServer;
use sparrow_api::kaskada::v1alpha::file_service_server::FileServiceServer;
use sparrow_api::kaskada::v1alpha::preparation_service_server::PreparationServiceServer;

use sparrow_runtime::s3::{S3Helper, S3Object};
use sparrow_runtime::stores::ObjectStoreRegistry;
use sparrow_runtime::stores::{ObjectStoreRegistry, ObjectStoreUrl};
use std::net::SocketAddr;

use std::str::FromStr;
use std::sync::Arc;
use tonic::transport::Server;
use tracing::{info, info_span};
Expand Down Expand Up @@ -61,26 +61,28 @@ impl ServeCommand {

let _enter = span.enter();

let s3 = S3Helper::new().await;
let object_store_registry = Arc::new(ObjectStoreRegistry::new());
let file_service = FileServiceImpl::new(object_store_registry.clone());
let object_stores = Arc::new(ObjectStoreRegistry::default());
let file_service = FileServiceImpl::new(object_stores.clone());

// Leak the diagnostic prefix to create a `&'static` reference.
// This simplifies the lifetime management of the futures.
// This string is fixed for the lifetime of `serve`, so leaking
// it once doesn't create a problem.
let flight_record_path = if let Some(flight_record_path) = &self.flight_record_path {
Some(
S3Object::try_from_uri(flight_record_path)
.into_report()
.change_context(Error::InvalidFlightRecordPath)?,
)
let prefix = ObjectStoreUrl::from_str(flight_record_path)
.change_context(Error::InvalidFlightRecordPath)?;
assert!(
prefix.is_delimited(),
"Flight record path must end with `/` but was {flight_record_path}"
);

Some(prefix)
} else {
None
};
let flight_record_path = Box::leak(Box::new(flight_record_path));
let compute_service = ComputeServiceImpl::new(flight_record_path, s3.clone());
let preparation_service = PreparationServiceImpl::new(object_store_registry.clone());
let compute_service = ComputeServiceImpl::new(flight_record_path, object_stores.clone());
let preparation_service = PreparationServiceImpl::new(object_stores.clone());

let (mut health_reporter, health_service) = tonic_health::server::health_reporter();

Expand Down
88 changes: 44 additions & 44 deletions crates/sparrow-main/src/serve/compute_service.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use dashmap::DashMap;
use error_stack::{IntoReport, ResultExt};
use futures::stream::BoxStream;
Expand All @@ -17,31 +19,34 @@ use sparrow_compiler::InternalCompileOptions;
use sparrow_instructions::ComputeStore;
use sparrow_materialize::{Materialization, MaterializationControl};
use sparrow_qfr::kaskada::sparrow::v1alpha::{flight_record_header, FlightRecordHeader};
use sparrow_runtime::execute::Error;
use sparrow_runtime::s3::{S3Helper, S3Object};
use sparrow_runtime::execute::error::Error;
use sparrow_runtime::stores::{ObjectStoreRegistry, ObjectStoreUrl};
use tempfile::NamedTempFile;

use tonic::{Request, Response, Status};
use tracing::{error, info, Instrument};
use tracing::Instrument;
use uuid::Uuid;

use crate::serve::error_status::IntoStatus;
use crate::BuildInfo;

pub(super) struct ComputeServiceImpl {
flight_record_path: &'static Option<S3Object>,
s3_helper: S3Helper,
flight_record_path: &'static Option<ObjectStoreUrl>,
object_stores: Arc<ObjectStoreRegistry>,
/// Thread-safe map containing the materialization id to control handles.
materializations: DashMap<String, MaterializationControl>,
}

impl ComputeServiceImpl {
pub(super) fn new(flight_record_path: &'static Option<S3Object>, s3_helper: S3Helper) -> Self {
pub(super) fn new(
flight_record_path: &'static Option<ObjectStoreUrl>,
object_stores: Arc<ObjectStoreRegistry>,
) -> Self {
let materializations = DashMap::new();

Self {
flight_record_path,
s3_helper,
object_stores,
materializations,
}
}
Expand Down Expand Up @@ -89,7 +94,7 @@ impl ComputeService for ComputeServiceImpl {
let handle = tokio::spawn(
execute_impl(
self.flight_record_path,
self.s3_helper.clone(),
self.object_stores.clone(),
request.into_inner(),
)
.in_current_span(),
Expand All @@ -112,11 +117,10 @@ impl ComputeService for ComputeServiceImpl {
) -> Result<Response<StartMaterializationResponse>, Status> {
let span = tracing::info_span!("StartMaterialization");
let _enter = span.enter();
let s3_helper = self.s3_helper.clone();
let id = request.get_ref().materialization_id.clone();
tracing::info!("id: {}", id);

match start_materialization_impl(s3_helper, request.into_inner()) {
match start_materialization_impl(request.into_inner()) {
Ok(handle) => {
self.materializations.insert(id, handle);
Ok(Response::new(StartMaterializationResponse {}))
Expand Down Expand Up @@ -208,12 +212,12 @@ async fn compile_impl(
}

async fn execute_impl(
flight_record_path: &'static Option<S3Object>,
s3_helper: S3Helper,
flight_record_path: &'static Option<ObjectStoreUrl>,
object_stores: Arc<ObjectStoreRegistry>,
request: ExecuteRequest,
) -> error_stack::Result<
impl Stream<Item = Result<ExecuteResponse, Status>> + Send,
sparrow_runtime::execute::Error,
sparrow_runtime::execute::error::Error,
> {
// Create a path for the plan yaml tempfile (if needed).
// TODO: We could include the plan as part of the flight record proto.
Expand All @@ -235,7 +239,7 @@ async fn execute_impl(
serde_yaml::to_writer(writer, plan)
.into_report()
.change_context(Error::internal_msg("writing plan tempfile"))?;
info!("Wrote plan yaml to {:?}", tempfile);
tracing::info!("Wrote plan yaml to {:?}", tempfile);

Some(tempfile)
} else {
Expand Down Expand Up @@ -280,7 +284,7 @@ async fn execute_impl(

Ok(progress_stream
.chain(futures::stream::once(debug_message(
s3_helper,
object_stores,
flight_record_path,
plan_yaml_tempfile,
flight_record_tempfile,
Expand All @@ -289,7 +293,6 @@ async fn execute_impl(
}

fn start_materialization_impl(
s3_helper: S3Helper,
request: StartMaterializationRequest,
) -> error_stack::Result<MaterializationControl, Error> {
let id = request.materialization_id.clone();
Expand All @@ -302,46 +305,42 @@ fn start_materialization_impl(
let materialization = Materialization::new(id, plan, tables, destination);
// TODO: Support lateness
// Spawns the materialization thread and begin exeution
Ok(MaterializationControl::start(
materialization,
s3_helper,
None,
))
Ok(MaterializationControl::start(materialization, None))
}

/// Sends the debug message after the end of the stream.
///
/// Upload the flight record files (plan yaml and flight record),
/// compute snapshots (if applicable), and marks this as the final message.
async fn debug_message(
s3_helper: S3Helper,
flight_record_path: &'static Option<S3Object>,
object_stores: Arc<ObjectStoreRegistry>,
flight_record_path: &'static Option<ObjectStoreUrl>,
plan_yaml_tempfile: Option<NamedTempFile>,
flight_record_tempfile: Option<NamedTempFile>,
) -> error_stack::Result<ExecuteResponse, Error> {
let diagnostic_id = Uuid::new_v4();

let uploaded_plan_yaml_path = upload_flight_record_file(
&s3_helper,
object_stores.as_ref(),
flight_record_path,
plan_yaml_tempfile,
DiagnosticFile::PlanYaml,
&diagnostic_id,
);
let uploaded_flight_record_path = upload_flight_record_file(
&s3_helper,
object_stores.as_ref(),
flight_record_path,
flight_record_tempfile,
DiagnosticFile::FlightRecord,
&diagnostic_id,
);
// Wait for all futures to complete
let uploaded_plan_yaml_path = uploaded_plan_yaml_path.await.unwrap_or_else(|e| {
error!("Failed to plan yaml: {:?}", e);
tracing::error!("Failed to plan yaml: {:?}", e);
None
});
let uploaded_flight_record_path = uploaded_flight_record_path.await.unwrap_or_else(|e| {
error!("Failed to upload flight record: {:?}", e);
tracing::error!("Failed to upload flight record: {:?}", e);
None
});

Expand Down Expand Up @@ -374,35 +373,36 @@ impl DiagnosticFile {
}

async fn upload_flight_record_file<'a>(
s3_helper: &'a S3Helper,
flight_record_path: &'static Option<S3Object>,
object_stores: &'a ObjectStoreRegistry,
flight_record_path: &'static Option<ObjectStoreUrl>,
tempfile: Option<NamedTempFile>,
kind: DiagnosticFile,
diagnostic_id: &'a Uuid,
) -> anyhow::Result<Option<String>> {
let tempfile = if let Some(tempfile) = tempfile {
tempfile
} else {
info!("No diagnostic to upload for kind {:?}", kind);
tracing::info!("No diagnostic to upload for kind {:?}", kind);
return Ok(None);
};

let path = if let Some(prefix) = flight_record_path {
prefix.join_delimited(&kind.file_name(diagnostic_id))
let destination = if let Some(prefix) = flight_record_path {
prefix
.join(&kind.file_name(diagnostic_id))
.map_err(|e| e.into_error())?
} else {
info!("No diagnostic prefix -- not uploading {:?}", kind);
tracing::info!("No diagnostic prefix -- not uploading {:?}", kind);
return Ok(None);
};

let destination = path.get_formatted_key();
s3_helper
.upload_tempfile_to_s3(path, tempfile.into_temp_path())
.await?;
info!(
"Uploaded {:?}. To retrieve: `s3 cp {} .`",
kind, destination
);
Ok(Some(destination))
let destination_string = destination.to_string();
object_stores
.upload(tempfile.path(), destination)
.await
.map_err(|e| e.into_error())?;

tracing::info!("Uploaded {kind:?} to {destination_string}");
Ok(Some(destination_string))
}

#[cfg(test)]
Expand Down Expand Up @@ -607,7 +607,6 @@ mod tests {
// make sure that the sliced file set is properly used.

let file_path = "eventdata/event_data.parquet";
let s3_helper = S3Helper::new().await;
let part1_file_path = sparrow_testing::testdata_path(file_path);
let table = TableConfig::new_with_table_source(
"Events",
Expand Down Expand Up @@ -742,10 +741,11 @@ mod tests {
let output_to = Destination {
destination: Some(destination::Destination::ObjectStore(store)),
};
let object_stores = Arc::new(ObjectStoreRegistry::default());

let mut results: Vec<ExecuteResponse> = execute_impl(
&None,
s3_helper,
object_stores,
ExecuteRequest {
plan: compile_response.plan,
tables: vec![ComputeTable {
Expand Down
10 changes: 4 additions & 6 deletions crates/sparrow-main/src/serve/file_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@ use crate::serve::error_status::IntoStatus;

#[derive(Debug)]
pub(super) struct FileServiceImpl {
object_store_registry: Arc<ObjectStoreRegistry>,
object_stores: Arc<ObjectStoreRegistry>,
}

impl FileServiceImpl {
pub fn new(object_store_registry: Arc<ObjectStoreRegistry>) -> Self {
Self {
object_store_registry,
}
pub fn new(object_stores: Arc<ObjectStoreRegistry>) -> Self {
Self { object_stores }
}
}

Expand All @@ -35,7 +33,7 @@ impl FileService for FileServiceImpl {
&self,
request: tonic::Request<GetMetadataRequest>,
) -> Result<tonic::Response<GetMetadataResponse>, tonic::Status> {
let object_store = self.object_store_registry.clone();
let object_store = self.object_stores.clone();
match tokio::spawn(get_metadata(object_store, request)).await {
Ok(result) => result.into_status(),
Err(panic) => {
Expand Down
4 changes: 2 additions & 2 deletions crates/sparrow-main/src/serve/preparation_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ pub(super) struct PreparationServiceImpl {
}

impl PreparationServiceImpl {
pub fn new(object_store_registry: Arc<ObjectStoreRegistry>) -> Self {
pub fn new(object_stores: Arc<ObjectStoreRegistry>) -> Self {
Self {
object_store_registry,
object_store_registry: object_stores,
}
}
}
Expand Down
9 changes: 2 additions & 7 deletions crates/sparrow-main/tests/e2e/fixture/query_fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,10 @@ impl QueryFixture {
pub fn with_rocksdb(
mut self,
snapshot_prefix: &std::path::Path,
resume_from: Option<&std::path::Path>,
resume_from: Option<String>,
) -> Self {
let resume_from = resume_from.map(|snapshot_dir| {
let snapshot_dir = snapshot_dir.strip_prefix(snapshot_prefix).unwrap();
snapshot_dir.to_string_lossy().into_owned()
});

self.execute_request.compute_snapshot_config = Some(ComputeSnapshotConfig {
output_prefix: snapshot_prefix.to_string_lossy().into_owned(),
output_prefix: format!("file:///{}/", snapshot_prefix.display()),
resume_from,
});
self
Expand Down
Loading

0 comments on commit 5728ab8

Please sign in to comment.