diff --git a/Cargo.lock b/Cargo.lock index c552ead7f..7f82ed17d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -674,306 +674,6 @@ dependencies = [ "serde_json", ] -[[package]] -name = "aws-config" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f91154572c69defdbe4ee7e7c86a0d1ad1f8c49655bedb7c6be61a7c1dc43105" -dependencies = [ - "aws-http", - "aws-sdk-sso", - "aws-sdk-sts", - "aws-smithy-async", - "aws-smithy-client", - "aws-smithy-http", - "aws-smithy-http-tower", - "aws-smithy-json", - "aws-smithy-types", - "aws-types", - "bytes", - "hex", - "http", - "hyper", - "ring", - "tokio", - "tower", - "tracing", - "zeroize", -] - -[[package]] -name = "aws-endpoint" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "515bd0f038107827309daa28612941ff559e71a6e96335e336d4fdf4caffb34b" -dependencies = [ - "aws-smithy-http", - "aws-types", - "http", - "regex", - "tracing", -] - -[[package]] -name = "aws-http" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1378c2c2430a063076621ec8c6435cdbd97b3e053111aebb52c9333fc793f32c" -dependencies = [ - "aws-smithy-http", - "aws-smithy-types", - "aws-types", - "http", - "lazy_static", - "percent-encoding", - "tracing", -] - -[[package]] -name = "aws-sdk-s3" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25e2c859c7420c8564b6eb964056abf508c10c0269cbd624ecc8c8de5c33446c" -dependencies = [ - "aws-endpoint", - "aws-http", - "aws-sig-auth", - "aws-sigv4", - "aws-smithy-async", - "aws-smithy-client", - "aws-smithy-eventstream", - "aws-smithy-http", - "aws-smithy-http-tower", - "aws-smithy-types", - "aws-smithy-xml", - "aws-types", - "bytes", - "http", - "md5", - "tokio-stream", - "tower", -] - -[[package]] -name = "aws-sdk-sso" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d74bff9a790eceb16a7825b11c4d9526fe6d3649349ba04beb5cb4770b7822b4" -dependencies = [ - "aws-endpoint", - "aws-http", - "aws-sig-auth", - "aws-smithy-async", - "aws-smithy-client", - "aws-smithy-http", - "aws-smithy-http-tower", - "aws-smithy-json", - "aws-smithy-types", - "aws-types", - "bytes", - "http", - "tokio-stream", - "tower", -] - -[[package]] -name = "aws-sdk-sts" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e879f3619e0b444218ab76d28364d57b81820ad792fd58088ab675237e4d7b5f" -dependencies = [ - "aws-endpoint", - "aws-http", - "aws-sig-auth", - "aws-smithy-async", - "aws-smithy-client", - "aws-smithy-http", - "aws-smithy-http-tower", - "aws-smithy-query", - "aws-smithy-types", - "aws-smithy-xml", - "aws-types", - "bytes", - "http", - "tower", -] - -[[package]] -name = "aws-sig-auth" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b44c7698294a89fabbadb538560ad34b6fdd26e926dee3c5e710928c3093fcf0" -dependencies = [ - "aws-sigv4", - "aws-smithy-eventstream", - "aws-smithy-http", - "aws-types", - "http", - "thiserror", - "tracing", -] - -[[package]] -name = "aws-sigv4" -version = "0.11.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee3e0ad28dd29d4e87a681ffcab5897e9c29ff0f338369f52c7bd4fba5652393" -dependencies = [ - "aws-smithy-eventstream", - "aws-smithy-http", - "bytes", - "form_urlencoded", - "hex", - "http", - "once_cell", - "percent-encoding", - "regex", - "ring", - "time 0.3.21", - "tracing", -] - -[[package]] -name = "aws-smithy-async" -version = "0.41.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23dcbf7d119f514a627d236412626645c4378b126e30dc61db9de3e069fa1676" -dependencies = [ - "futures-util", - "pin-project-lite", - "tokio", - "tokio-stream", -] - -[[package]] -name = "aws-smithy-client" -version = "0.41.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "511ac6c65f2a89cfcd74fe78aa6d07216095a53cbaeab493b17f6df82cd65b86" -dependencies = [ - "aws-smithy-async", - "aws-smithy-http", - "aws-smithy-http-tower", - "aws-smithy-types", - "bytes", - "fastrand", - "http", - "http-body", - "hyper", - "hyper-rustls 0.22.1", - "lazy_static", - "pin-project", - "pin-project-lite", - "tokio", - "tower", - "tracing", -] - -[[package]] -name = "aws-smithy-eventstream" -version = "0.41.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "703e7d99d80156d5a41a3996a985701650ccb0d5edaf441ca40bda199d34284e" -dependencies = [ - "aws-smithy-types", - "bytes", - "crc32fast", -] - -[[package]] -name = "aws-smithy-http" -version = "0.41.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d800c8684fa567cdf1abd9654c7997b2a887e7b06022938756193472ec7ec251" -dependencies = [ - "aws-smithy-eventstream", - "aws-smithy-types", - "bytes", - "bytes-utils", - "futures-core", - "http", - "http-body", - "hyper", - "once_cell", - "percent-encoding", - "pin-project", - "tokio", - "tokio-util 0.6.10", - "tracing", -] - -[[package]] -name = "aws-smithy-http-tower" -version = "0.41.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8017959786cce64e690214d303d062c97fcd38a68df7cb444255e534c9bbce49" -dependencies = [ - "aws-smithy-http", - "bytes", - "http", - "http-body", - "pin-project", - "tower", - "tracing", -] - -[[package]] -name = "aws-smithy-json" -version = "0.41.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3796e2a4a3b7d15db2fd5aec2de9220919332648f0a56a77b5c53caf4a9653fa" -dependencies = [ - "aws-smithy-types", -] - -[[package]] -name = "aws-smithy-query" -version = "0.41.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76694d1a2dacefa347b921c61bf64b5cc493a898971b3c18fa636ce1788ceabe" -dependencies = [ - "aws-smithy-types", - "urlencoding", -] - -[[package]] -name = "aws-smithy-types" -version = "0.41.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c7f957a2250cc0fa4ccf155e00aeac9a81f600df7cd4ecc910c75030e6534f5" -dependencies = [ - "itoa", - "num-integer", - "ryu", - "time 0.3.21", -] - -[[package]] -name = "aws-smithy-xml" -version = "0.41.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b01d77433f248d9a2b08f519b403d6aa8435bf144b5d8585862f8dd599eb843" -dependencies = [ - "thiserror", - "xmlparser", -] - -[[package]] -name = "aws-types" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "394d5c945b747ab3292b94509b78c91191aacfd1deacbcd58371d6f61f8be78a" -dependencies = [ - "aws-smithy-async", - "aws-smithy-client", - "aws-smithy-http", - "aws-smithy-types", - "http", - "rustc_version", - "tracing", - "zeroize", -] - [[package]] name = "axum" version = "0.6.18" @@ -1213,16 +913,6 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" -[[package]] -name = "bytes-utils" -version = "0.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e47d3a8076e283f3acd27400535992edb3ba4b5bb72f8891ad8fbe7932a7d4b9" -dependencies = [ - "bytes", - "either", -] - [[package]] name = "bzip2-sys" version = "0.1.11+1.0.8" @@ -1281,7 +971,7 @@ dependencies = [ "js-sys", "num-traits", "serde", - "time 0.1.45", + "time", "wasm-bindgen", "winapi", ] @@ -1666,15 +1356,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "ct-logs" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1a816186fa68d9e426e3cb4ae4dff1fcd8e4a2c34b781bf7a822574a0d0aac8" -dependencies = [ - "sct 0.6.1", -] - [[package]] name = "dashmap" version = "5.4.0" @@ -2246,7 +1927,7 @@ dependencies = [ "indexmap 1.9.3", "slab", "tokio", - "tokio-util 0.7.8", + "tokio-util", "tracing", ] @@ -2433,23 +2114,6 @@ dependencies = [ "want", ] -[[package]] -name = "hyper-rustls" -version = "0.22.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f9f7a97316d44c0af9b0301e65010573a853a9fc97046d7331d7f6bc0fd5a64" -dependencies = [ - "ct-logs", - "futures-util", - "hyper", - "log", - "rustls 0.19.1", - "rustls-native-certs", - "tokio", - "tokio-rustls 0.22.0", - "webpki 0.21.4", -] - [[package]] name = "hyper-rustls" version = "0.24.0" @@ -2458,9 +2122,9 @@ checksum = "0646026eb1b3eea4cd9ba47912ea5ce9cc07713d105b1a14698f4e6433d348b7" dependencies = [ "http", "hyper", - "rustls 0.21.1", + "rustls", "tokio", - "tokio-rustls 0.24.0", + "tokio-rustls", ] [[package]] @@ -2943,12 +2607,6 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b87248edafb776e59e6ee64a79086f65890d3510f2c656c000bf2a7e8a0aea40" -[[package]] -name = "md5" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771" - [[package]] name = "memchr" version = "2.5.0" @@ -3884,7 +3542,7 @@ dependencies = [ "regex", "tokio", "tokio-native-tls", - "tokio-util 0.7.8", + "tokio-util", "url", "uuid 1.3.3", ] @@ -4076,7 +3734,7 @@ dependencies = [ "http", "http-body", "hyper", - "hyper-rustls 0.24.0", + "hyper-rustls", "hyper-tls", "ipnet", "js-sys", @@ -4086,15 +3744,15 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "rustls 0.21.1", + "rustls", "rustls-pemfile", "serde", "serde_json", "serde_urlencoded", "tokio", "tokio-native-tls", - "tokio-rustls 0.24.0", - "tokio-util 0.7.8", + "tokio-rustls", + "tokio-util", "tower-service", "url", "wasm-bindgen", @@ -4182,19 +3840,6 @@ dependencies = [ "windows-sys 0.48.0", ] -[[package]] -name = "rustls" -version = "0.19.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35edb675feee39aec9c99fa5ff985081995a06d594114ae14cbe797ad7b7a6d7" -dependencies = [ - "base64 0.13.1", - "log", - "ring", - "sct 0.6.1", - "webpki 0.21.4", -] - [[package]] name = "rustls" version = "0.21.1" @@ -4204,19 +3849,7 @@ dependencies = [ "log", "ring", "rustls-webpki", - "sct 0.7.0", -] - -[[package]] -name = "rustls-native-certs" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a07b7c1885bd8ed3831c289b7870b13ef46fe0e856d288c30d9cc17d75a2092" -dependencies = [ - "openssl-probe", - "rustls 0.19.1", - "schannel", - "security-framework", + "sct", ] [[package]] @@ -4286,16 +3919,6 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" -[[package]] -name = "sct" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b362b83898e0e69f38515b82ee15aa80636befe47c3b6d3d89a911e78fc228ce" -dependencies = [ - "ring", - "untrusted", -] - [[package]] name = "sct" version = "0.7.0" @@ -4802,9 +4425,6 @@ dependencies = [ "arrow", "assert_cmd", "async-stream", - "aws-config", - "aws-sdk-s3", - "aws-types", "chrono", "clap 4.3.2", "dashmap", @@ -4843,7 +4463,7 @@ dependencies = [ "tempfile", "tokio", "tokio-stream", - "tokio-util 0.7.8", + "tokio-util", "tonic", "tonic-health", "tonic-reflection", @@ -4867,7 +4487,7 @@ dependencies = [ "sparrow-runtime", "tokio", "tokio-stream", - "tokio-util 0.7.8", + "tokio-util", "tracing", ] @@ -4969,9 +4589,6 @@ dependencies = [ "async-trait", "avro-rs", "avro-schema", - "aws-config", - "aws-sdk-s3", - "aws-types", "bit-set", "bitvec", "bytes", @@ -5023,7 +4640,7 @@ dependencies = [ "tempfile", "tokio", "tokio-stream", - "tokio-util 0.7.8", + "tokio-util", "tonic", "tracing", "url", @@ -5401,32 +5018,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "time" -version = "0.3.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f3403384eaacbca9923fa06940178ac13e4edb725486d70e8e15881d0c836cc" -dependencies = [ - "serde", - "time-core", - "time-macros", -] - -[[package]] -name = "time-core" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7300fbefb4dadc1af235a9cef3737cea692a9d97e1b9cbcd4ebdae6f8868e6fb" - -[[package]] -name = "time-macros" -version = "0.2.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "372950940a5f07bf38dbe211d7283c9e6d7327df53794992d293e534c733d09b" -dependencies = [ - "time-core", -] - [[package]] name = "tiny-keccak" version = "2.0.2" @@ -5509,24 +5100,13 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-rustls" -version = "0.22.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc6844de72e57df1980054b38be3a9f4702aba4858be64dd700181a8a6d0e1b6" -dependencies = [ - "rustls 0.19.1", - "tokio", - "webpki 0.21.4", -] - [[package]] name = "tokio-rustls" version = "0.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e0d409377ff5b1e3ca6437aa86c1eb7d40c134bfec254e44c830defa92669db5" dependencies = [ - "rustls 0.21.1", + "rustls", "tokio", ] @@ -5541,20 +5121,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-util" -version = "0.6.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "36943ee01a6d67977dd3f84a5a1d2efeb4ada3a1ae771cadfaa535d9d9fc6507" -dependencies = [ - "bytes", - "futures-core", - "futures-sink", - "log", - "pin-project-lite", - "tokio", -] - [[package]] name = "tokio-util" version = "0.7.8" @@ -5602,7 +5168,7 @@ dependencies = [ "prost-derive", "tokio", "tokio-stream", - "tokio-util 0.7.8", + "tokio-util", "tower", "tower-layer", "tower-service", @@ -5665,7 +5231,7 @@ dependencies = [ "rand 0.8.5", "slab", "tokio", - "tokio-util 0.7.8", + "tokio-util", "tower-layer", "tower-service", "tracing", @@ -5690,7 +5256,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" dependencies = [ "cfg-if", - "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -5984,12 +5549,6 @@ dependencies = [ "serde", ] -[[package]] -name = "urlencoding" -version = "2.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8db7427f936968176eaa7cdf81b7f98b980b18495ec28f1b5791ac3bfe3eea9" - [[package]] name = "utf8parse" version = "0.2.1" @@ -6182,16 +5741,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "webpki" -version = "0.21.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8e38c0608262c46d4a56202ebabdeb094cef7e560ca7a226c6bf055188aa4ea" -dependencies = [ - "ring", - "untrusted", -] - [[package]] name = "webpki" version = "0.22.0" @@ -6208,7 +5757,7 @@ version = "0.22.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6c71e40d7d2c34a5106301fb632274ca37242cd0c9d3e64dbece371a40a2d87" dependencies = [ - "webpki 0.22.0", + "webpki", ] [[package]] @@ -6427,12 +5976,6 @@ dependencies = [ "tap", ] -[[package]] -name = "xmlparser" -version = "0.13.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "114ba2b24d2167ef6d67d7d04c8cc86522b87f490025f39f0303b7db5bf5e3d8" - [[package]] name = "yaml-rust" version = "0.4.5" @@ -6463,12 +6006,6 @@ dependencies = [ "synstructure", ] -[[package]] -name = "zeroize" -version = "1.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a0956f1ba7c7909bfb66c2e9e4124ab6f6482560f6628b5aaeba39207c9aad9" - [[package]] name = "zstd" version = "0.12.3+zstd.1.5.2" diff --git a/Cargo.toml b/Cargo.toml index 68e22bf79..06e960ec7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,10 +28,6 @@ async-stream = "0.3.4" async-trait = "0.1.68" avro-rs = "0.13.0" avro-schema = "0.3.0" -aws-config = "0.11.0" -aws-sdk-s3 = "0.11.0" -pulsar = { version = "5.1.0", default-features = false, features = ["async-std-runtime", "tokio-runtime", "lz4"] } -aws-types = "0.11.0" bigdecimal = { version = "0.3.1", features = ["serde"] } bincode = "1.3.3" bit-set = "0.5.3" @@ -91,6 +87,7 @@ prost-types = "0.11.8" prost-wkt = "0.4.1" prost-wkt-build = "0.4.1" prost-wkt-types = "0.4.1" +pulsar = { version = "5.1.0", default-features = false, features = ["async-std-runtime", "tokio-runtime", "lz4"] } rand = "0.8.5" reqwest = "0.11.14" serde = { version = "1.0.159", features = ["derive", "rc"] } diff --git a/crates/sparrow-main/Cargo.toml b/crates/sparrow-main/Cargo.toml index 276051241..c84d0323d 100644 --- a/crates/sparrow-main/Cargo.toml +++ b/crates/sparrow-main/Cargo.toml @@ -14,9 +14,6 @@ ahash.workspace = true anyhow.workspace = true arrow.workspace = true async-stream.workspace = true -aws-config.workspace = true -aws-sdk-s3.workspace = true -aws-types.workspace = true chrono.workspace = true clap.workspace = true dashmap.workspace = true diff --git a/crates/sparrow-main/src/serve.rs b/crates/sparrow-main/src/serve.rs index cb67fdebe..90dc4845d 100644 --- a/crates/sparrow-main/src/serve.rs +++ b/crates/sparrow-main/src/serve.rs @@ -5,17 +5,17 @@ mod compute_service; mod error_status; mod file_service; pub(crate) mod preparation_service; -use error_stack::{IntoReport, IntoReportCompat, ResultExt}; +use error_stack::{IntoReport, ResultExt}; pub use error_status::*; use sparrow_api::kaskada::v1alpha::compute_service_server::ComputeServiceServer; use sparrow_api::kaskada::v1alpha::file_service_server::FileServiceServer; use sparrow_api::kaskada::v1alpha::preparation_service_server::PreparationServiceServer; -use sparrow_runtime::s3::{S3Helper, S3Object}; -use sparrow_runtime::stores::ObjectStoreRegistry; +use sparrow_runtime::stores::{ObjectStoreRegistry, ObjectStoreUrl}; use std::net::SocketAddr; +use std::str::FromStr; use std::sync::Arc; use tonic::transport::Server; use tracing::{info, info_span}; @@ -61,26 +61,28 @@ impl ServeCommand { let _enter = span.enter(); - let s3 = S3Helper::new().await; - let object_store_registry = Arc::new(ObjectStoreRegistry::new()); - let file_service = FileServiceImpl::new(object_store_registry.clone()); + let object_stores = Arc::new(ObjectStoreRegistry::default()); + let file_service = FileServiceImpl::new(object_stores.clone()); // Leak the diagnostic prefix to create a `&'static` reference. // This simplifies the lifetime management of the futures. // This string is fixed for the lifetime of `serve`, so leaking // it once doesn't create a problem. let flight_record_path = if let Some(flight_record_path) = &self.flight_record_path { - Some( - S3Object::try_from_uri(flight_record_path) - .into_report() - .change_context(Error::InvalidFlightRecordPath)?, - ) + let prefix = ObjectStoreUrl::from_str(flight_record_path) + .change_context(Error::InvalidFlightRecordPath)?; + assert!( + prefix.is_delimited(), + "Flight record path must end with `/` but was {flight_record_path}" + ); + + Some(prefix) } else { None }; let flight_record_path = Box::leak(Box::new(flight_record_path)); - let compute_service = ComputeServiceImpl::new(flight_record_path, s3.clone()); - let preparation_service = PreparationServiceImpl::new(object_store_registry.clone()); + let compute_service = ComputeServiceImpl::new(flight_record_path, object_stores.clone()); + let preparation_service = PreparationServiceImpl::new(object_stores.clone()); let (mut health_reporter, health_service) = tonic_health::server::health_reporter(); diff --git a/crates/sparrow-main/src/serve/compute_service.rs b/crates/sparrow-main/src/serve/compute_service.rs index dea198f30..cb3959d56 100644 --- a/crates/sparrow-main/src/serve/compute_service.rs +++ b/crates/sparrow-main/src/serve/compute_service.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use dashmap::DashMap; use error_stack::{IntoReport, ResultExt}; use futures::stream::BoxStream; @@ -17,31 +19,34 @@ use sparrow_compiler::InternalCompileOptions; use sparrow_instructions::ComputeStore; use sparrow_materialize::{Materialization, MaterializationControl}; use sparrow_qfr::kaskada::sparrow::v1alpha::{flight_record_header, FlightRecordHeader}; -use sparrow_runtime::execute::Error; -use sparrow_runtime::s3::{S3Helper, S3Object}; +use sparrow_runtime::execute::error::Error; +use sparrow_runtime::stores::{ObjectStoreRegistry, ObjectStoreUrl}; use tempfile::NamedTempFile; use tonic::{Request, Response, Status}; -use tracing::{error, info, Instrument}; +use tracing::Instrument; use uuid::Uuid; use crate::serve::error_status::IntoStatus; use crate::BuildInfo; pub(super) struct ComputeServiceImpl { - flight_record_path: &'static Option, - s3_helper: S3Helper, + flight_record_path: &'static Option, + object_stores: Arc, /// Thread-safe map containing the materialization id to control handles. materializations: DashMap, } impl ComputeServiceImpl { - pub(super) fn new(flight_record_path: &'static Option, s3_helper: S3Helper) -> Self { + pub(super) fn new( + flight_record_path: &'static Option, + object_stores: Arc, + ) -> Self { let materializations = DashMap::new(); Self { flight_record_path, - s3_helper, + object_stores, materializations, } } @@ -89,7 +94,7 @@ impl ComputeService for ComputeServiceImpl { let handle = tokio::spawn( execute_impl( self.flight_record_path, - self.s3_helper.clone(), + self.object_stores.clone(), request.into_inner(), ) .in_current_span(), @@ -112,11 +117,10 @@ impl ComputeService for ComputeServiceImpl { ) -> Result, Status> { let span = tracing::info_span!("StartMaterialization"); let _enter = span.enter(); - let s3_helper = self.s3_helper.clone(); let id = request.get_ref().materialization_id.clone(); tracing::info!("id: {}", id); - match start_materialization_impl(s3_helper, request.into_inner()) { + match start_materialization_impl(request.into_inner()) { Ok(handle) => { self.materializations.insert(id, handle); Ok(Response::new(StartMaterializationResponse {})) @@ -208,12 +212,12 @@ async fn compile_impl( } async fn execute_impl( - flight_record_path: &'static Option, - s3_helper: S3Helper, + flight_record_path: &'static Option, + object_stores: Arc, request: ExecuteRequest, ) -> error_stack::Result< impl Stream> + Send, - sparrow_runtime::execute::Error, + sparrow_runtime::execute::error::Error, > { // Create a path for the plan yaml tempfile (if needed). // TODO: We could include the plan as part of the flight record proto. @@ -235,7 +239,7 @@ async fn execute_impl( serde_yaml::to_writer(writer, plan) .into_report() .change_context(Error::internal_msg("writing plan tempfile"))?; - info!("Wrote plan yaml to {:?}", tempfile); + tracing::info!("Wrote plan yaml to {:?}", tempfile); Some(tempfile) } else { @@ -280,7 +284,7 @@ async fn execute_impl( Ok(progress_stream .chain(futures::stream::once(debug_message( - s3_helper, + object_stores, flight_record_path, plan_yaml_tempfile, flight_record_tempfile, @@ -289,7 +293,6 @@ async fn execute_impl( } fn start_materialization_impl( - s3_helper: S3Helper, request: StartMaterializationRequest, ) -> error_stack::Result { let id = request.materialization_id.clone(); @@ -302,11 +305,7 @@ fn start_materialization_impl( let materialization = Materialization::new(id, plan, tables, destination); // TODO: Support lateness // Spawns the materialization thread and begin exeution - Ok(MaterializationControl::start( - materialization, - s3_helper, - None, - )) + Ok(MaterializationControl::start(materialization, None)) } /// Sends the debug message after the end of the stream. @@ -314,22 +313,22 @@ fn start_materialization_impl( /// Upload the flight record files (plan yaml and flight record), /// compute snapshots (if applicable), and marks this as the final message. async fn debug_message( - s3_helper: S3Helper, - flight_record_path: &'static Option, + object_stores: Arc, + flight_record_path: &'static Option, plan_yaml_tempfile: Option, flight_record_tempfile: Option, ) -> error_stack::Result { let diagnostic_id = Uuid::new_v4(); let uploaded_plan_yaml_path = upload_flight_record_file( - &s3_helper, + object_stores.as_ref(), flight_record_path, plan_yaml_tempfile, DiagnosticFile::PlanYaml, &diagnostic_id, ); let uploaded_flight_record_path = upload_flight_record_file( - &s3_helper, + object_stores.as_ref(), flight_record_path, flight_record_tempfile, DiagnosticFile::FlightRecord, @@ -337,11 +336,11 @@ async fn debug_message( ); // Wait for all futures to complete let uploaded_plan_yaml_path = uploaded_plan_yaml_path.await.unwrap_or_else(|e| { - error!("Failed to plan yaml: {:?}", e); + tracing::error!("Failed to plan yaml: {:?}", e); None }); let uploaded_flight_record_path = uploaded_flight_record_path.await.unwrap_or_else(|e| { - error!("Failed to upload flight record: {:?}", e); + tracing::error!("Failed to upload flight record: {:?}", e); None }); @@ -374,8 +373,8 @@ impl DiagnosticFile { } async fn upload_flight_record_file<'a>( - s3_helper: &'a S3Helper, - flight_record_path: &'static Option, + object_stores: &'a ObjectStoreRegistry, + flight_record_path: &'static Option, tempfile: Option, kind: DiagnosticFile, diagnostic_id: &'a Uuid, @@ -383,26 +382,27 @@ async fn upload_flight_record_file<'a>( let tempfile = if let Some(tempfile) = tempfile { tempfile } else { - info!("No diagnostic to upload for kind {:?}", kind); + tracing::info!("No diagnostic to upload for kind {:?}", kind); return Ok(None); }; - let path = if let Some(prefix) = flight_record_path { - prefix.join_delimited(&kind.file_name(diagnostic_id)) + let destination = if let Some(prefix) = flight_record_path { + prefix + .join(&kind.file_name(diagnostic_id)) + .map_err(|e| e.into_error())? } else { - info!("No diagnostic prefix -- not uploading {:?}", kind); + tracing::info!("No diagnostic prefix -- not uploading {:?}", kind); return Ok(None); }; - let destination = path.get_formatted_key(); - s3_helper - .upload_tempfile_to_s3(path, tempfile.into_temp_path()) - .await?; - info!( - "Uploaded {:?}. To retrieve: `s3 cp {} .`", - kind, destination - ); - Ok(Some(destination)) + let destination_string = destination.to_string(); + object_stores + .upload(tempfile.path(), destination) + .await + .map_err(|e| e.into_error())?; + + tracing::info!("Uploaded {kind:?} to {destination_string}"); + Ok(Some(destination_string)) } #[cfg(test)] @@ -607,7 +607,6 @@ mod tests { // make sure that the sliced file set is properly used. let file_path = "eventdata/event_data.parquet"; - let s3_helper = S3Helper::new().await; let part1_file_path = sparrow_testing::testdata_path(file_path); let table = TableConfig::new_with_table_source( "Events", @@ -742,10 +741,11 @@ mod tests { let output_to = Destination { destination: Some(destination::Destination::ObjectStore(store)), }; + let object_stores = Arc::new(ObjectStoreRegistry::default()); let mut results: Vec = execute_impl( &None, - s3_helper, + object_stores, ExecuteRequest { plan: compile_response.plan, tables: vec![ComputeTable { diff --git a/crates/sparrow-main/src/serve/file_service.rs b/crates/sparrow-main/src/serve/file_service.rs index 7ba5089c0..dc506b317 100644 --- a/crates/sparrow-main/src/serve/file_service.rs +++ b/crates/sparrow-main/src/serve/file_service.rs @@ -17,14 +17,12 @@ use crate::serve::error_status::IntoStatus; #[derive(Debug)] pub(super) struct FileServiceImpl { - object_store_registry: Arc, + object_stores: Arc, } impl FileServiceImpl { - pub fn new(object_store_registry: Arc) -> Self { - Self { - object_store_registry, - } + pub fn new(object_stores: Arc) -> Self { + Self { object_stores } } } @@ -35,7 +33,7 @@ impl FileService for FileServiceImpl { &self, request: tonic::Request, ) -> Result, tonic::Status> { - let object_store = self.object_store_registry.clone(); + let object_store = self.object_stores.clone(); match tokio::spawn(get_metadata(object_store, request)).await { Ok(result) => result.into_status(), Err(panic) => { diff --git a/crates/sparrow-main/src/serve/preparation_service.rs b/crates/sparrow-main/src/serve/preparation_service.rs index a07177ed7..5523b0128 100644 --- a/crates/sparrow-main/src/serve/preparation_service.rs +++ b/crates/sparrow-main/src/serve/preparation_service.rs @@ -20,9 +20,9 @@ pub(super) struct PreparationServiceImpl { } impl PreparationServiceImpl { - pub fn new(object_store_registry: Arc) -> Self { + pub fn new(object_stores: Arc) -> Self { Self { - object_store_registry, + object_store_registry: object_stores, } } } diff --git a/crates/sparrow-main/tests/e2e/fixture/query_fixture.rs b/crates/sparrow-main/tests/e2e/fixture/query_fixture.rs index a8b6d158e..e65ff9bb9 100644 --- a/crates/sparrow-main/tests/e2e/fixture/query_fixture.rs +++ b/crates/sparrow-main/tests/e2e/fixture/query_fixture.rs @@ -67,15 +67,10 @@ impl QueryFixture { pub fn with_rocksdb( mut self, snapshot_prefix: &std::path::Path, - resume_from: Option<&std::path::Path>, + resume_from: Option, ) -> Self { - let resume_from = resume_from.map(|snapshot_dir| { - let snapshot_dir = snapshot_dir.strip_prefix(snapshot_prefix).unwrap(); - snapshot_dir.to_string_lossy().into_owned() - }); - self.execute_request.compute_snapshot_config = Some(ComputeSnapshotConfig { - output_prefix: snapshot_prefix.to_string_lossy().into_owned(), + output_prefix: format!("file:///{}/", snapshot_prefix.display()), resume_from, }); self diff --git a/crates/sparrow-main/tests/e2e/resumeable_tests.rs b/crates/sparrow-main/tests/e2e/resumeable_tests.rs index 3620532ac..c9326b44e 100644 --- a/crates/sparrow-main/tests/e2e/resumeable_tests.rs +++ b/crates/sparrow-main/tests/e2e/resumeable_tests.rs @@ -37,7 +37,7 @@ async fn assert_final_incremental_same_as_complete( Ok(mut result) => { // Only one snapshot per query execution is currently supported. assert_eq!(result.snapshots.len(), 1); - std::path::PathBuf::from(result.snapshots.remove(0).path) + result.snapshots.remove(0).path } }; @@ -65,7 +65,7 @@ async fn assert_final_incremental_same_as_complete( numbers.clear(); numbers.add_file_source(&csv2).await.unwrap(); let persistent_results = persistent_query - .with_rocksdb(snapshot_dir.path(), Some(&snapshot_path)) + .with_rocksdb(snapshot_dir.path(), Some(snapshot_path)) .run_to_csv(&data_fixture) .await .unwrap(); @@ -210,7 +210,7 @@ async fn test_resumeable_with_unordered_file_sets() { Ok(mut result) => { // Only one snapshot is currently supported. assert_eq!(result.snapshots.len(), 1); - std::path::PathBuf::from(result.snapshots.remove(0).path) + result.snapshots.remove(0).path } }; @@ -231,7 +231,7 @@ async fn test_resumeable_with_unordered_file_sets() { .unwrap(); let persistent_results = persistent_query - .with_rocksdb(snapshot_dir.path(), Some(&snapshot_path)) + .with_rocksdb(snapshot_dir.path(), Some(snapshot_path)) .run_to_csv(&data_fixture) .await .unwrap(); @@ -634,7 +634,7 @@ async fn test_resumeable_final_no_new_data() { Ok(mut result) => { // Only one snapshot is currently supported. assert_eq!(result.snapshots.len(), 1); - let snapshot_path = std::path::PathBuf::from(result.snapshots.remove(0).path); + let snapshot_path = result.snapshots.remove(0).path; (result.inner, snapshot_path) } }; @@ -646,7 +646,7 @@ async fn test_resumeable_final_no_new_data() { let result2 = query .clone() - .with_rocksdb(snapshot_dir.path(), Some(&snapshot_path)) + .with_rocksdb(snapshot_dir.path(), Some(snapshot_path.clone())) .run_to_csv(&data_fixture) .await .unwrap(); @@ -663,7 +663,7 @@ async fn test_resumeable_final_no_new_data() { // Run the query again let result3 = query - .with_rocksdb(snapshot_dir.path(), Some(&snapshot_path)) + .with_rocksdb(snapshot_dir.path(), Some(snapshot_path)) .run_to_csv(&data_fixture) .await .unwrap(); diff --git a/crates/sparrow-materialize/src/materialize/materialization.rs b/crates/sparrow-materialize/src/materialize/materialization.rs index c28f6a090..ce3240226 100644 --- a/crates/sparrow-materialize/src/materialize/materialization.rs +++ b/crates/sparrow-materialize/src/materialize/materialization.rs @@ -1,6 +1,5 @@ use error_stack::ResultExt; use sparrow_api::kaskada::v1alpha::{ComputePlan, ComputeTable, Destination, ExecuteResponse}; -use sparrow_runtime::s3::S3Helper; use tokio_stream::Stream; use crate::Error; @@ -48,18 +47,16 @@ impl Materialization { /// * stop_rx - receiver for the stop signal, allowing cancellation of the materialization process pub async fn start( materialization: Materialization, - s3_helper: S3Helper, bounded_lateness_ns: Option, stop_rx: tokio::sync::watch::Receiver, ) -> error_stack::Result< - impl Stream>, + impl Stream>, Error, > { let progress_stream = sparrow_runtime::execute::materialize( materialization.plan, materialization.destination, materialization.tables, - s3_helper, bounded_lateness_ns, stop_rx, ) diff --git a/crates/sparrow-materialize/src/materialize/materialization_control.rs b/crates/sparrow-materialize/src/materialize/materialization_control.rs index 3bdb798d0..fd77d0afa 100644 --- a/crates/sparrow-materialize/src/materialize/materialization_control.rs +++ b/crates/sparrow-materialize/src/materialize/materialization_control.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use error_stack::{IntoReport, ResultExt}; use futures::{StreamExt, TryStreamExt}; use sparrow_api::kaskada::v1alpha::ProgressInformation; -use sparrow_runtime::s3::S3Helper; use tokio::task::JoinHandle; use crate::{Error, Materialization}; @@ -28,11 +27,7 @@ impl MaterializationControl { /// /// Kicks off a query and begins producing results to the /// destination supplied in the `materialization`. - pub fn start( - materialization: Materialization, - s3_helper: S3Helper, - bounded_lateness_ns: Option, - ) -> Self { + pub fn start(materialization: Materialization, bounded_lateness_ns: Option) -> Self { let (stop_tx, stop_rx) = tokio::sync::watch::channel(false); let (progress_tx, progress_rx) = tokio::sync::watch::channel(MaterializationStatus { state: State::Uninitialized, @@ -43,8 +38,7 @@ impl MaterializationControl { let handle = tokio::spawn(async move { let id = materialization.id.clone(); let progress_stream = - Materialization::start(materialization, s3_helper, bounded_lateness_ns, stop_rx) - .await?; + Materialization::start(materialization, bounded_lateness_ns, stop_rx).await?; let mut progress_stream = progress_stream.boxed(); let mut last_progress = ProgressInformation::default(); while let Some(message) = progress_stream diff --git a/crates/sparrow-runtime/Cargo.toml b/crates/sparrow-runtime/Cargo.toml index ec15ceb77..77c7c9a36 100644 --- a/crates/sparrow-runtime/Cargo.toml +++ b/crates/sparrow-runtime/Cargo.toml @@ -21,9 +21,8 @@ arrow.workspace = true async-once-cell.workspace = true async-stream.workspace = true async-trait.workspace = true -aws-config.workspace = true -aws-sdk-s3.workspace = true -aws-types.workspace = true +avro-rs = { workspace = true } +avro-schema = { workspace = true, optional = true } bit-set.workspace = true bitvec.workspace = true bytes.workspace = true @@ -33,28 +32,27 @@ dashmap.workspace = true data-encoding.workspace = true derive_more.workspace = true enum-map.workspace = true -lz4 = { workspace = true, optional = true } -serde_json.workspace = true -pulsar = { workspace = true, optional = true } -avro-rs = { workspace = true } -avro-schema = { workspace = true, optional = true } erased-serde.workspace = true error-stack.workspace = true fallible-iterator.workspace = true -futures.workspace = true futures-lite.workspace = true +futures.workspace = true half.workspace = true hashbrown.workspace = true inventory.workspace = true itertools.workspace = true +lz4 = { workspace = true, optional = true } num-traits.workspace = true +object_store.workspace = true owning_ref.workspace = true parquet.workspace = true pin-project.workspace = true prost-wkt-types.workspace = true +pulsar = { workspace = true, optional = true } reqwest.workspace = true -serde.workspace = true +serde_json.workspace = true serde_yaml.workspace = true +serde.workspace = true sha2.workspace = true smallvec.workspace = true sparrow-api = { path = "../sparrow-api" } @@ -68,14 +66,13 @@ sparrow-qfr = { path = "../sparrow-qfr" } sparrow-syntax = { path = "../sparrow-syntax" } static_init.workspace = true tempfile.workspace = true -tokio.workspace = true tokio-stream.workspace = true tokio-util.workspace = true +tokio.workspace = true tonic.workspace = true tracing.workspace = true url.workspace = true uuid.workspace = true -object_store.workspace = true [dev-dependencies] criterion.workspace = true diff --git a/crates/sparrow-runtime/src/execute.rs b/crates/sparrow-runtime/src/execute.rs index 5f1e16de4..a64d58828 100644 --- a/crates/sparrow-runtime/src/execute.rs +++ b/crates/sparrow-runtime/src/execute.rs @@ -13,23 +13,23 @@ use sparrow_arrow::scalar_value::ScalarValue; use sparrow_compiler::{hash_compute_plan_proto, DataContext}; use sparrow_instructions::ComputeStore; use sparrow_qfr::kaskada::sparrow::v1alpha::FlightRecordHeader; +use tracing::Instrument; +use crate::execute::error::Error; use crate::execute::key_hash_inverse::{KeyHashInverse, ThreadSafeKeyHashInverse}; use crate::execute::operation::OperationContext; -use crate::s3::S3Helper; use crate::stores::ObjectStoreRegistry; use crate::RuntimeOptions; +mod checkpoints; mod compute_executor; -mod error; +pub mod error; pub(crate) mod key_hash_inverse; pub(crate) mod operation; pub mod output; mod progress_reporter; mod spawner; - pub use compute_executor::*; -pub use error::*; // The path prefix to the local compute store db. const STORE_PATH_PREFIX: &str = "compute_snapshot_"; @@ -86,7 +86,7 @@ pub async fn execute( .into_report() .change_context(Error::internal_msg("create data context"))?; - let s3_helper = S3Helper::new().await; + let object_stores = Arc::new(ObjectStoreRegistry::default()); // If the snapshot config exists, sparrow should attempt to resume from state, // and store new state. Create a new storage path for the local store to @@ -99,15 +99,18 @@ pub async fn execute( .change_context(Error::internal_msg("create snapshot dir"))?; // If a `resume_from` path is specified, download the existing state from s3. - if config.resume_from.is_some() { - crate::s3::download_snapshot(&s3_helper, dir.path(), config) + if let Some(resume_from) = &config.resume_from { + checkpoints::download(resume_from, object_stores.as_ref(), dir.path(), config) + .instrument(tracing::info_span!("Downloading checkpoint files")) .await - .into_report() .change_context(Error::internal_msg("download snapshot"))?; - }; + } else { + tracing::info!("No snapshot set to resume from. Using empty compute store."); + } Some(dir) } else { + tracing::info!("No snapshot config; not creating compute store."); None }; @@ -157,8 +160,6 @@ pub async fn execute( None }; - let object_stores = Arc::new(ObjectStoreRegistry::default()); - let primary_grouping_key_type = plan .primary_grouping_key_type .to_owned() @@ -234,11 +235,7 @@ pub async fn execute( .await .change_context(Error::internal_msg("spawn compute executor"))?; - Ok(compute_executor.execute_with_progress( - s3_helper, - storage_dir, - request.compute_snapshot_config, - )) + Ok(compute_executor.execute_with_progress(storage_dir, request.compute_snapshot_config)) } /// The main method for starting a materialization process. @@ -249,7 +246,6 @@ pub async fn materialize( plan: ComputePlan, destination: Destination, tables: Vec, - s3_helper: S3Helper, bounded_lateness_ns: Option, stop_signal_rx: tokio::sync::watch::Receiver, ) -> error_stack::Result>, Error> { @@ -350,5 +346,5 @@ pub async fn materialize( // TODO: the `execute_with_progress` method contains a lot of additional logic that is theoretically not needed, // as the materialization does not exit, and should not need to handle cleanup tasks that regular // queries do. We should likely refactor this to use a separate `materialize_with_progress` method. - Ok(compute_executor.execute_with_progress(s3_helper, storage_dir, None)) + Ok(compute_executor.execute_with_progress(storage_dir, None)) } diff --git a/crates/sparrow-runtime/src/execute/checkpoints.rs b/crates/sparrow-runtime/src/execute/checkpoints.rs new file mode 100644 index 000000000..36fdaa11c --- /dev/null +++ b/crates/sparrow-runtime/src/execute/checkpoints.rs @@ -0,0 +1,257 @@ +use std::path::Path; +use std::str::FromStr; + +use error_stack::{IntoReport, ResultExt}; +use futures::{StreamExt, TryStreamExt}; +use itertools::Itertools; +use sparrow_api::kaskada::v1alpha::{ComputeSnapshot, ComputeSnapshotConfig}; +use sparrow_instructions::ComputeStore; +use tempfile::TempDir; +use uuid::Uuid; + +use crate::execute::ComputeResult; +use crate::stores::{ObjectStoreRegistry, ObjectStoreUrl}; + +/// Number of concurrent download/upload requests. +const CONCURRENT_LIMIT: usize = 5; + +#[derive(derive_more::Display, Debug)] +pub(crate) enum Error { + #[display(fmt = "error while uploading checkpoint file")] + UploadIo, + #[display(fmt = "error while downloading checkpoint file")] + DownloadIo, + #[display(fmt = "invalid 'resume_from': '{_0}'")] + InvalidResumeFrom(String), + #[display(fmt = "invalid 'output_prefix': '{_0}'")] + InvalidOutputPrefix(String), + #[display(fmt = "expected path '{path:?}' to be in '{prefix:?}'")] + InvalidPrefix { prefix: String, path: String }, + #[display(fmt = "invalid object store")] + InvalidObjectStore, + #[display(fmt = "listing files in the checkpoint")] + ListingFiles, + #[display(fmt = "invalid path part '{_0}'")] + InvalidPathPart(String), +} + +impl error_stack::Context for Error {} + +/// Downloads a compute snapshot from s3 to a local directory. +pub(crate) async fn download( + resume_from: &str, + object_stores: &ObjectStoreRegistry, + storage_path: &Path, + config: &ComputeSnapshotConfig, +) -> error_stack::Result<(), Error> { + let output_prefix = ObjectStoreUrl::from_str(&config.output_prefix) + .change_context_lazy(|| Error::InvalidOutputPrefix(config.output_prefix.clone()))?; + error_stack::ensure!( + output_prefix.is_delimited(), + Error::InvalidOutputPrefix(config.output_prefix.clone()) + ); + let resume_from = output_prefix + .join(resume_from) + .change_context_lazy(|| Error::InvalidResumeFrom(resume_from.to_owned()))?; + + let object_store = object_stores + .object_store(&resume_from) + .change_context(Error::InvalidObjectStore)?; + + let prefix = resume_from.path().change_context(Error::ListingFiles)?; + let list_result = object_store + .list_with_delimiter(Some(&prefix)) + .await + .into_report() + .change_context(Error::ListingFiles)?; + + // Do the downloads, reporting progress (and remaining files). + let count = list_result.objects.len(); + tracing::info!( + "Downloading {count} files for checkpoint from {resume_from} to {}", + storage_path.display() + ); + + let resume_from_path = resume_from.path().change_context(Error::DownloadIo)?; + futures::stream::iter(list_result.objects) + .map(|object| { + let Some(relative_path) = object.location.as_ref().strip_prefix(resume_from_path.as_ref()) else { + error_stack::bail!(Error::InvalidPrefix { + prefix: resume_from.to_string(), + path: resume_from_path.to_string() + }) + }; + // Drop the leading `/` (will be left over from the strip prefix above) + let relative_path = &relative_path[1..]; + let source_url = resume_from.join(relative_path).change_context(Error::DownloadIo)?; + let destination_path = storage_path.join(relative_path); + tracing::info!("For object {object:?}, relative path is {relative_path}, downloading {source_url} to {}", destination_path.display()); + Ok((source_url, destination_path)) + }) + .map_ok(|(source_url, destination_path)| async move { + // This is a bit hacky since we need to go back to a URL to use our + // existing `download` method and go back through the registry. + object_stores + .download(source_url, &destination_path) + .await + .change_context(Error::DownloadIo) + }) + .try_buffer_unordered(CONCURRENT_LIMIT) + .try_fold(count, |count, ()| { + let count = count - 1; + tracing::info!("Downloaded file for checkpoint. {count} remaining."); + futures::future::ok(count) + }) + .await?; + + Ok(()) +} + +/// Uploads a compute snapshot to s3. +/// +/// The owned `TempDir` will be dropped on completion of uploading the snapshot. +pub(crate) async fn upload( + object_stores: &ObjectStoreRegistry, + storage_dir: TempDir, + config: ComputeSnapshotConfig, + compute_result: ComputeResult, +) -> error_stack::Result { + // The name is a UUID that is referenced by snapshot metadata. + let dest_name = Uuid::new_v4().as_hyphenated().to_string(); + + let output_prefix = ObjectStoreUrl::from_str(&config.output_prefix) + .change_context_lazy(|| Error::InvalidOutputPrefix(config.output_prefix.clone()))?; + + let path = if let Some(local_output_path) = output_prefix.local_path() { + let destination = local_output_path.join(dest_name); + + // If this is a local path, we just need to move the directory to the + // destination. + tracing::info!( + "Moving Rocks DB from {:?} to local output {:?}", + storage_dir.path(), + destination + ); + + let storage_dir = storage_dir.into_path(); + tokio::fs::rename(&storage_dir, &destination) + .await + .into_report() + .change_context(Error::UploadIo)?; + + format!("{}/", destination.display()) + } else { + let dir = std::fs::read_dir(storage_dir.path()) + .into_report() + .change_context(Error::UploadIo)?; + let source_prefix = storage_dir.path(); + + let destination = output_prefix + .join(&format!("{dest_name}/")) + .change_context_lazy(|| Error::InvalidPathPart(dest_name.clone()))?; + + tracing::info!( + "Uploading compute snapshot files from '{:?}' to '{}' (output prefix '{}')", + source_prefix, + destination, + output_prefix, + ); + + let entries: Vec<_> = dir + .map(|entry| -> error_stack::Result<_, Error> { + let source_path = entry.into_report().change_context(Error::UploadIo)?.path(); + + // Figure out the path relative the directory. + let relative = source_path + .strip_prefix(source_prefix) + .into_report() + .change_context_lazy(|| Error::InvalidPrefix { + prefix: source_prefix.display().to_string(), + path: source_path.display().to_string(), + })?; + + let relative = relative + .to_str() + .ok_or_else(|| Error::InvalidPathPart(relative.display().to_string()))?; + let destination = destination + .join(relative) + .change_context_lazy(|| Error::InvalidPathPart(relative.to_owned()))?; + Ok((source_path, destination)) + }) + .try_collect()?; + + // Then run the upload futures. + let count = entries.len(); + futures::stream::iter(entries) + .map(|(source_path, destination_url)| async move { + object_stores + .upload(&source_path, destination_url) + .await + .change_context(Error::UploadIo) + }) + .buffer_unordered(CONCURRENT_LIMIT) + .try_fold(count, |count, ()| { + let count = count - 1; + tracing::info!("Uploaded file for checkpoint. {count} remaining."); + futures::future::ok(count) + }) + .await?; + + // Explicitly close the storage dir so any problems cleaning it up + // are reported. + storage_dir + .close() + .into_report() + .change_context(Error::UploadIo)?; + + destination.to_string() + }; + + let snapshot = ComputeSnapshot { + path, + max_event_time: Some(compute_result.max_input_timestamp), + plan_hash: Some(compute_result.plan_hash), + snapshot_version: ComputeStore::current_version(), + }; + + Ok(snapshot) +} + +#[cfg(test)] +mod tests { + use super::*; + + // Test that the async method `download` produces futures that are + // Send. + // + // This test does not need to be executed -- just compiled. + #[tokio::test] + async fn require_download_to_be_send() { + let object_stores = ObjectStoreRegistry::default(); + let storage_path = std::path::Path::new("hello"); + let config = ComputeSnapshotConfig::default(); + + fn require_send(_t: T) {} + require_send(download( + "resume_from", + &object_stores, + storage_path, + &config, + )); + } + + // Test that the async method `upload` produces futures that are + // Send. + // + // This test does not need to be executed -- just compiled. + #[tokio::test] + async fn require_upload_to_be_send() { + let object_stores = ObjectStoreRegistry::default(); + let storage_dir = TempDir::new().unwrap(); + let config = ComputeSnapshotConfig::default(); + let compute_result = ComputeResult::default(); + + fn require_send(_t: T) {} + require_send(upload(&object_stores, storage_dir, config, compute_result)); + } +} diff --git a/crates/sparrow-runtime/src/execute/compute_executor.rs b/crates/sparrow-runtime/src/execute/compute_executor.rs index 1b20e2075..86099f4c9 100644 --- a/crates/sparrow-runtime/src/execute/compute_executor.rs +++ b/crates/sparrow-runtime/src/execute/compute_executor.rs @@ -17,18 +17,19 @@ use sparrow_qfr::FlightRecorderFactory; use tempfile::TempDir; use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_stream::StreamExt; -use tracing::{error, info, info_span}; +use tracing::{error, info, info_span, Instrument}; use crate::execute::operation::{OperationContext, OperationExecutor}; use crate::execute::progress_reporter::{progress_stream, ProgressUpdate}; use crate::execute::spawner::ComputeTaskSpawner; use crate::execute::Error; use crate::execute::Error::Internal; -use crate::s3::S3Helper; +use crate::stores::ObjectStoreRegistry; use crate::util::JoinTask; use crate::{Batch, RuntimeOptions}; pub(crate) struct ComputeExecutor { + object_stores: Arc, compute_store: Option>, plan_hash: PlanHash, futures: FuturesUnordered>, @@ -38,6 +39,7 @@ pub(crate) struct ComputeExecutor { } /// The final results returned after the compute executor finishes. +#[derive(Default)] pub struct ComputeResult { /// The timestamp of the maximum input event processed by the query. pub max_input_timestamp: Timestamp, @@ -151,6 +153,7 @@ impl ComputeExecutor { } Ok(Self { + object_stores: context.object_stores, compute_store: context.compute_store, plan_hash: context.plan_hash, futures: spawner.finish(), @@ -165,11 +168,11 @@ impl ComputeExecutor { /// created, but before progress information stops being streamed. pub fn execute_with_progress( self, - s3_helper: S3Helper, storage_dir: Option, compute_snapshot_config: Option, ) -> impl Stream> { let Self { + object_stores, compute_store, plan_hash, futures, @@ -218,11 +221,12 @@ impl ComputeExecutor { } let compute_snapshots = upload_compute_snapshots( - s3_helper.clone(), + object_stores, storage_dir, compute_snapshot_config, compute_result, ) + .instrument(tracing::info_span!("Uploading checkpoint files")) .await .unwrap_or_else(|e| { // Log, but don't fail if we couldn't upload snapshots. @@ -265,7 +269,7 @@ fn select_biased( } async fn upload_compute_snapshots( - s3_helper: S3Helper, + object_stores: Arc, storage_dir: Option, compute_snapshot_config: Option, compute_result: ComputeResult, @@ -280,11 +284,17 @@ async fn upload_compute_snapshots( if let Some(snapshot_config) = compute_snapshot_config { let storage_dir = storage_dir.ok_or(Error::Internal("missing storage dir"))?; - let snapshot_metadata = - crate::s3::upload_snapshot(s3_helper, storage_dir, snapshot_config, compute_result) - .await - .change_context(Error::Internal("uploading snapshot"))?; + let snapshot_metadata = super::checkpoints::upload( + object_stores.as_ref(), + storage_dir, + snapshot_config, + compute_result, + ) + .await + .change_context(Error::Internal("uploading snapshot"))?; snapshots.push(snapshot_metadata); + } else { + tracing::info!("No snapshot config; not uploading compute store.") } Ok(snapshots) diff --git a/crates/sparrow-runtime/src/execute/error.rs b/crates/sparrow-runtime/src/execute/error.rs index 68e6c41f9..f29f85e4f 100644 --- a/crates/sparrow-runtime/src/execute/error.rs +++ b/crates/sparrow-runtime/src/execute/error.rs @@ -4,26 +4,16 @@ use sparrow_core::ErrorCode; pub enum Error { #[display(fmt = "compute request missing '{_0}'")] MissingField(&'static str), - #[display(fmt = "invalid output path '{_0}'")] - InvalidOutputPath(String), #[display(fmt = "unspecified per-entity behavior")] UnspecifiedPerEntityBehavior, - #[display(fmt = "unspecified output format")] - UnspecifiedOutputFormat, #[display(fmt = "invalid batch input bounds")] InvalidBounds, #[display(fmt = "internal compute error: {_0}")] Internal(&'static str), - #[display(fmt = "failed to create tempfile")] - CreateTempFile, #[display(fmt = "invalid operation: {_0}")] InvalidOperation(String), - #[display(fmt = "failed to get next input for operation")] - GetNextInput, #[display(fmt = "failed to pre-process next input for operation")] PreprocessNextInput, - #[display(fmt = "feature '{feature}' is not enabled")] - FeatureNotEnabled { feature: &'static str }, #[display(fmt = "output '{output}' is not supported")] UnsupportedOutput { output: &'static str }, } @@ -58,7 +48,7 @@ impl error_stack::Context for Error {} impl ErrorCode for Error { fn error_code(&self) -> tonic::Code { match self { - Error::MissingField(_) | Error::InvalidOutputPath(_) => tonic::Code::InvalidArgument, + Error::MissingField(_) => tonic::Code::InvalidArgument, _ => tonic::Code::Internal, } } diff --git a/crates/sparrow-runtime/src/execute/operation/lookup_request.rs b/crates/sparrow-runtime/src/execute/operation/lookup_request.rs index d262c5d28..53fcb1735 100644 --- a/crates/sparrow-runtime/src/execute/operation/lookup_request.rs +++ b/crates/sparrow-runtime/src/execute/operation/lookup_request.rs @@ -16,10 +16,10 @@ use sparrow_instructions::{ComputeStore, GroupingIndices}; use tokio_stream::wrappers::ReceiverStream; use super::BoxedOperation; +use crate::execute::error::{invalid_operation, Error}; use crate::execute::operation::expression_executor::InputColumn; use crate::execute::operation::single_consumer_helper::SingleConsumerHelper; use crate::execute::operation::{InputBatch, Operation}; -use crate::execute::{invalid_operation, Error}; use crate::Batch; #[derive(Debug)] diff --git a/crates/sparrow-runtime/src/execute/operation/lookup_response.rs b/crates/sparrow-runtime/src/execute/operation/lookup_response.rs index 0860ce6f3..f0b9949d2 100644 --- a/crates/sparrow-runtime/src/execute/operation/lookup_response.rs +++ b/crates/sparrow-runtime/src/execute/operation/lookup_response.rs @@ -12,10 +12,10 @@ use sparrow_instructions::ComputeStore; use tokio_stream::wrappers::ReceiverStream; use super::BoxedOperation; +use crate::execute::error::{invalid_operation, Error}; use crate::execute::operation::expression_executor::InputColumn; use crate::execute::operation::single_consumer_helper::SingleConsumerHelper; use crate::execute::operation::{InputBatch, Operation}; -use crate::execute::{invalid_operation, Error}; use crate::Batch; #[derive(Debug)] diff --git a/crates/sparrow-runtime/src/execute/operation/select.rs b/crates/sparrow-runtime/src/execute/operation/select.rs index 965723c62..f31cd886e 100644 --- a/crates/sparrow-runtime/src/execute/operation/select.rs +++ b/crates/sparrow-runtime/src/execute/operation/select.rs @@ -1,4 +1,3 @@ -use crate::execute::{invalid_operation, Error}; use anyhow::Context; use arrow::array::BooleanArray; use async_trait::async_trait; @@ -11,6 +10,7 @@ use sparrow_instructions::ComputeStore; use tokio_stream::wrappers::ReceiverStream; use super::BoxedOperation; +use crate::execute::error::{invalid_operation, Error}; use crate::execute::operation::expression_executor::InputColumn; use crate::execute::operation::single_consumer_helper::SingleConsumerHelper; use crate::execute::operation::{InputBatch, Operation}; diff --git a/crates/sparrow-runtime/src/execute/operation/shift_to.rs b/crates/sparrow-runtime/src/execute/operation/shift_to.rs index 3cdcdbcdd..ce22da3d7 100644 --- a/crates/sparrow-runtime/src/execute/operation/shift_to.rs +++ b/crates/sparrow-runtime/src/execute/operation/shift_to.rs @@ -1,7 +1,6 @@ use std::cmp; use std::sync::Arc; -use crate::execute::{invalid_operation, Error}; use anyhow::Context; use arrow::array::{Array, ArrayRef, TimestampNanosecondArray, UInt32Array, UInt64Array}; use arrow::compute::SortColumn; @@ -20,6 +19,7 @@ use tokio_stream::wrappers::ReceiverStream; use tracing::info; use super::BoxedOperation; +use crate::execute::error::{invalid_operation, Error}; use crate::execute::operation::expression_executor::InputColumn; use crate::execute::operation::single_consumer_helper::SingleConsumerHelper; use crate::execute::operation::spread_zip::spread_zip; diff --git a/crates/sparrow-runtime/src/execute/operation/shift_until.rs b/crates/sparrow-runtime/src/execute/operation/shift_until.rs index 8003d3d2e..1c2b7ec7c 100644 --- a/crates/sparrow-runtime/src/execute/operation/shift_until.rs +++ b/crates/sparrow-runtime/src/execute/operation/shift_until.rs @@ -16,10 +16,10 @@ use sparrow_instructions::{ComputeStore, StoreKey}; use tokio_stream::wrappers::ReceiverStream; use super::BoxedOperation; +use crate::execute::error::{invalid_operation, Error}; use crate::execute::operation::expression_executor::InputColumn; use crate::execute::operation::single_consumer_helper::SingleConsumerHelper; use crate::execute::operation::{InputBatch, Operation}; -use crate::execute::{invalid_operation, Error}; use crate::key_hash_index::KeyHashIndex; use crate::Batch; diff --git a/crates/sparrow-runtime/src/execute/operation/with_key.rs b/crates/sparrow-runtime/src/execute/operation/with_key.rs index 95062a6a6..9b50762c4 100644 --- a/crates/sparrow-runtime/src/execute/operation/with_key.rs +++ b/crates/sparrow-runtime/src/execute/operation/with_key.rs @@ -1,11 +1,11 @@ use std::sync::Arc; use super::BoxedOperation; +use crate::execute::error::{invalid_operation, Error}; use crate::execute::key_hash_inverse::ThreadSafeKeyHashInverse; use crate::execute::operation::expression_executor::InputColumn; use crate::execute::operation::single_consumer_helper::SingleConsumerHelper; use crate::execute::operation::{InputBatch, Operation, OperationContext}; -use crate::execute::{invalid_operation, Error}; use crate::Batch; use anyhow::Context; use async_trait::async_trait; diff --git a/crates/sparrow-runtime/src/execute/output/object_store.rs b/crates/sparrow-runtime/src/execute/output/object_store.rs index 059ef13d8..f94ac7995 100644 --- a/crates/sparrow-runtime/src/execute/output/object_store.rs +++ b/crates/sparrow-runtime/src/execute/output/object_store.rs @@ -260,7 +260,7 @@ pub(super) async fn write( if num_rows_in_file > ROWS_PER_FILE { let url = state.expect("opened above").close().await?; - tracing::info!("Wrote {num_rows_in_file} to {url}"); + tracing::info!("Wrote {num_rows_in_file} rows to file {url}"); progress_updates_tx .try_send(ProgressUpdate::FilesProduced { paths: vec![url] }) .into_report() diff --git a/crates/sparrow-runtime/src/lib.rs b/crates/sparrow-runtime/src/lib.rs index 31bc46656..0841848b6 100644 --- a/crates/sparrow-runtime/src/lib.rs +++ b/crates/sparrow-runtime/src/lib.rs @@ -30,7 +30,6 @@ mod metadata; mod min_heap; pub mod prepare; mod read; -pub mod s3; pub mod stores; mod streams; mod util; diff --git a/crates/sparrow-runtime/src/s3.rs b/crates/sparrow-runtime/src/s3.rs deleted file mode 100644 index b5e564844..000000000 --- a/crates/sparrow-runtime/src/s3.rs +++ /dev/null @@ -1,359 +0,0 @@ -use std::path::Path; - -use anyhow::{anyhow, Context}; -use aws_sdk_s3::types::ByteStream; -use aws_sdk_s3::{Client, Endpoint}; -use futures::stream::StreamExt; -use itertools::Itertools; -use tempfile::TempPath; -use tokio_util::io::StreamReader; -use tonic::transport::Uri; -use tracing::info; - -mod download_snapshot; -mod upload_snapshot; - -pub(crate) use download_snapshot::*; -pub(crate) use upload_snapshot::*; - -#[derive(Clone, Debug, Hash, PartialEq, Eq)] -pub struct S3Object { - pub bucket: String, - pub key: String, -} - -impl S3Object { - pub fn try_from_uri(path: &str) -> anyhow::Result { - if let Some(s3_path) = path.strip_prefix("s3://") { - if let Some((bucket, key)) = s3_path.split_once('/') { - Ok(S3Object { - bucket: bucket.to_owned(), - key: key.to_owned(), - }) - } else { - Err(anyhow!( - "invalid path provided: '{}' missing bucket/key format", - path - )) - } - } else { - Err(anyhow!( - "invalid path provided: '{}' missing s3:// prefix", - path - )) - } - } - - pub fn get_formatted_key(&self) -> String { - format!("s3://{}/{}", self.bucket, self.key) - } - - /// Returns the relative path of the key. - /// - /// example. - /// prefix: "compute/snapshot/" - /// self.key: "/compute/snapshot/file/data" - /// returns: "file/data" - pub fn get_relative_key_path(&self, prefix: &str) -> anyhow::Result<&str> { - anyhow::ensure!(!prefix.is_empty(), "Expected prefix to be non-empty"); - let prefix = if prefix.ends_with('/') { - &prefix[0..prefix.len() - 1] - } else { - prefix - }; - let res = self.key.strip_prefix(prefix).with_context(|| { - format!( - "Expected key '{}' to start with prefix '{}', but did not", - self.key, prefix - ) - })?; - // Strip the preceding '/' - Ok(&res[1..res.len()]) - } - - /// Joins the suffix onto the key separated by a slash. - /// - /// If the key has a trailing slash, or the suffix has a preceding - /// slash, this method will ensure the result is separated only by a - /// single slash. - /// - /// ex. self.key = "prefix" - /// suffix = "suffix" - /// result.key = "prefix/suffix" - #[must_use] - pub fn join_delimited(&self, suffix: &str) -> Self { - let mut result = self.clone(); - result.push_delimited(suffix); - result - } - - /// Pushes the suffix onto the key separated by a slash. - /// - /// If the key has a trailing slash, or the suffix has a preceding - /// slash, this method will ensure the result is separated only by a - /// single slash. - /// - /// ex. self.key = "prefix" - /// suffix = "suffix" - /// result.key = "prefix/suffix" - pub fn push_delimited(&mut self, suffix: &str) { - let existing_delimiter = self.key.ends_with('/'); - let new_delimiter = suffix.starts_with('/'); - - if existing_delimiter && new_delimiter { - // Double delimited. Drop one from the suffix. - self.key.push_str(&suffix[1..]); - } else if existing_delimiter || new_delimiter { - // Single delimited. Push the entire suffix. - self.key.push_str(suffix); - } else { - // No delimiter. Add one. - self.key.push('/'); - self.key.push_str(suffix); - } - } -} - -pub fn is_s3_path(path: &str) -> bool { - path.to_lowercase().starts_with("s3://") -} - -/// S3Helper is a wrapper around the S3 client providing useful methods. -/// -/// The underlying client contains only an `Arc`, so it should be cheap -/// to clone. -#[derive(Clone, Debug)] -pub struct S3Helper { - client: Client, -} - -impl S3Helper { - pub async fn new() -> Self { - let shared_config = aws_config::from_env().load().await; - - // Create the client. - let endpoint = std::env::var("AWS_ENDPOINT"); - let s3_config: aws_sdk_s3::config::Config = if let Ok(endpoint) = endpoint { - // Oeverride the endpoint if needed - let uri: Uri = endpoint.parse().unwrap(); - let endpoint = Endpoint::immutable(uri); - - aws_sdk_s3::config::Builder::from(&shared_config) - .endpoint_resolver(endpoint) - .build() - } else { - (&shared_config).into() - }; - info!("S3 Config: {:?}", s3_config); - let client = Client::from_conf(s3_config); - - Self { client } - } - - pub async fn upload_tempfile_to_s3( - &self, - s3_key: S3Object, - local_path: TempPath, - ) -> anyhow::Result<()> { - self.upload_s3(s3_key, &local_path).await?; - - // Close and remove the temporary file. This causes errors. - local_path.close()?; - Ok(()) - } - - pub async fn download_s3( - &self, - s3_object: S3Object, - target_local_path: impl AsRef, - ) -> anyhow::Result<()> { - let key = Some(s3_object.key); - let bucket = Some(s3_object.bucket); - let download = self - .client - .get_object() - .set_key(key) - .set_bucket(bucket) - .send() - .await - .context("Unable to download object from s3")?; - - let mut file = tokio::fs::File::create(target_local_path).await?; - let mut body = StreamReader::new( - download - .body - // Convert S3 download errors into `io:Error` so we can use `tokio::io::copy` - .map(|item| item.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))), - ); - // https://docs.rs/tokio/1.14.0/tokio/io/fn.copy.html - tokio::io::copy(&mut body, &mut file).await?; - - Ok(()) - } - - pub async fn upload_s3( - &self, - s3_key: S3Object, - target_local_path: &Path, - ) -> anyhow::Result<()> { - let body = ByteStream::from_path(target_local_path) - .await - .with_context(|| { - format!("unable to convert file at {target_local_path:?} to ByteStream") - })?; - - let s3_key_clone = s3_key.clone(); - self.client - .put_object() - .set_key(Some(s3_key.key)) - .set_bucket(Some(s3_key.bucket)) - .set_body(Some(body)) - .send() - .await - .with_context(|| { - format!("unable to upload file at {target_local_path:?} to S3 at {s3_key_clone:?}") - })?; - - Ok(()) - } - - /// Lists s3 objects with the given prefix in s3. - /// - /// Appends a trailing '/' to the `prefix` if it does not exist. - pub async fn list_prefix_delimited( - &self, - bucket: &str, - prefix: &str, - ) -> anyhow::Result> { - let prefix = if !prefix.is_empty() { - let c = &prefix[prefix.len() - 1..prefix.len()]; - if c != "/" { - let mut res = prefix.to_owned(); - res.push('/'); - res - } else { - prefix.to_owned() - } - } else { - String::new() - }; - - let list_obj_res = self - .client - .list_objects_v2() - .set_bucket(Some(bucket.to_owned())) - .set_prefix(Some(prefix)) - .send() - .await - .context("Unable to list objects")?; - - if list_obj_res.is_truncated() { - anyhow::bail!("Unsupported: did not expect result to be truncated!") - } - - let res = list_obj_res - .contents() - .unwrap_or_default() - .iter() - .map(|i| -> anyhow::Result<_> { - Ok(S3Object { - bucket: bucket.to_owned(), - key: i.key().context("Could not parse s3 key")?.to_owned(), - }) - }) - .try_collect()?; - - Ok(res) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_try_from_uri_valid() { - let valid_uri = "s3://bucket/some/key/path"; - let result = S3Object::try_from_uri(valid_uri).unwrap(); - assert_eq!(result.key, "some/key/path"); - assert_eq!(result.bucket, "bucket"); - } - - #[test] - fn test_try_from_uri_invalid() { - let invalid_uri = "s3:/bucket/some/key/path"; - assert_eq!( - S3Object::try_from_uri(invalid_uri).unwrap_err().to_string(), - "invalid path provided: 's3:/bucket/some/key/path' missing s3:// prefix", - ); - } - - #[test] - fn test_get_formatted_key() { - let expected = "s3://bucket/some/key/path"; - let obj = S3Object { - key: "some/key/path".to_owned(), - bucket: "bucket".to_owned(), - }; - assert_eq!(obj.get_formatted_key(), expected); - } - - #[test] - fn test_get_relative_key_path() { - let obj = S3Object { - key: "some/key/path".to_owned(), - bucket: "bucket".to_owned(), - }; - let relative_path = obj.get_relative_key_path("some/key").unwrap(); - assert_eq!(relative_path, "path"); - - // With trailing slash - // Regression test against splitting the key incorrectly. - let relative_path = obj.get_relative_key_path("some/key/").unwrap(); - assert_eq!(relative_path, "path"); - } - - #[test] - fn test_get_relative_key_path_fails_invalid_prefix() { - let obj = S3Object { - key: "some/key/path".to_owned(), - bucket: "bucket".to_owned(), - }; - - assert_eq!( - obj.get_relative_key_path("bogus").unwrap_err().to_string(), - "Expected key 'some/key/path' to start with prefix 'bogus', but did not" - ); - } - - #[test] - fn test_join_delimited_adds_delimiter() { - let obj = S3Object { - key: "some/key/path".to_owned(), - bucket: "bucket".to_owned(), - }; - let res = obj.join_delimited("suffix"); - assert_eq!(res.key, "some/key/path/suffix"); - } - - #[test] - fn test_join_delimited_with_existing_delimiters() { - let obj_key_trailing_slash = S3Object { - key: "some/key/path/".to_owned(), - bucket: "bucket".to_owned(), - }; - let res = obj_key_trailing_slash.join_delimited("suffix"); - assert_eq!(res.key, "some/key/path/suffix"); - - let res = obj_key_trailing_slash.join_delimited("/suffix"); - assert_eq!(res.key, "some/key/path/suffix"); - } - - #[test] - fn test_is_s3_path_valid() { - assert!(is_s3_path("s3://bucket/key")); - assert!(!is_s3_path("s3//not/valid")); - assert!(!is_s3_path("s3:/not/valid")); - assert!(!is_s3_path("bucket/not/valid")); - } -} diff --git a/crates/sparrow-runtime/src/s3/download_snapshot.rs b/crates/sparrow-runtime/src/s3/download_snapshot.rs deleted file mode 100644 index 33174b73b..000000000 --- a/crates/sparrow-runtime/src/s3/download_snapshot.rs +++ /dev/null @@ -1,112 +0,0 @@ -use std::path::Path; - -use anyhow::Context; -use futures::TryStreamExt; -use itertools::Itertools; -use sparrow_api::kaskada::v1alpha::ComputeSnapshotConfig; -use tokio_stream::wrappers::ReadDirStream; -use tracing::{info, info_span, Instrument}; - -use super::{S3Helper, S3Object}; - -/// Downloads a compute snapshot from s3 to a local directory. -pub(crate) async fn download_snapshot( - s3_helper: &S3Helper, - storage_path: &Path, - config: &ComputeSnapshotConfig, -) -> anyhow::Result<()> { - let s3_uri = config - .resume_from - .as_ref() - .context("No resume_from path set")?; - - if !crate::s3::is_s3_path(s3_uri) { - let source_dir = std::path::Path::new(&config.output_prefix).join(s3_uri); - copy_contents(&source_dir, storage_path).await?; - return Ok(()); - } - - let span = info_span!("Downloading snapshot files", ?s3_uri, ?storage_path); - let _enter = span.enter(); - let snapshot_key_prefix = S3Object::try_from_uri(s3_uri)?.key; - - let snapshot_object = S3Object::try_from_uri(&config.output_prefix)?; - let s3_objects = s3_helper - .list_prefix_delimited(&snapshot_object.bucket, &snapshot_key_prefix) - .in_current_span() - .await - .context("Listing s3 objects")?; - - let mut download_futures: futures::stream::FuturesUnordered<_> = s3_objects - .into_iter() - .map(|item| -> anyhow::Result<_> { - // Append key to local directory path - let file_name = item - .get_relative_key_path(&snapshot_key_prefix) - .context("Splitting path by prefix")?; - - let target_path = storage_path.join(file_name); - - Ok(s3_helper.download_s3(item, target_path)) - }) - .try_collect()?; - - let mut count = download_futures.len(); - while let Some(()) = download_futures.try_next().in_current_span().await? { - count -= 1; - info!("Downloaded file. {} remaining.", count); - } - - Ok(()) -} - -async fn copy_contents(from_dir: &Path, to_dir: &Path) -> anyhow::Result<()> { - anyhow::ensure!( - from_dir.is_dir(), - "Expected source '{:?}' to exist and be a directory", - from_dir - ); - anyhow::ensure!( - to_dir.is_dir(), - "Expected destination directory '{:?}' to exist and be a directory", - from_dir - ); - - let read_dir = tokio::fs::read_dir(from_dir).await.context("read dir")?; - ReadDirStream::new(read_dir) - .map_err(|e| anyhow::anyhow!("Invalid entry: {:?}", e)) - .try_for_each_concurrent(None, |entry| async move { - let entry = entry.path(); - anyhow::ensure!( - entry.is_file(), - "Expected all entries to be files, but {:?} was not", - entry - ); - let destination = to_dir.join(entry.file_name().context("file name")?); - tokio::fs::copy(entry, destination).await.context("copy")?; - Ok(()) - }) - .await -} - -#[cfg(test)] -mod tests { - use super::*; - - // Test that the async method `download_snapshot` produces futures that are - // Send. - // - // This test does not need to be executed -- just compiled. - #[tokio::test] - async fn require_download_snapshots_to_be_send() { - let s3_helper = S3Helper::new().await; - let storage_path = std::path::Path::new("hello"); - let config = ComputeSnapshotConfig { - output_prefix: "foo".to_owned(), - resume_from: None, - }; - - fn require_send(_t: T) {} - require_send(download_snapshot(&s3_helper, storage_path, &config)); - } -} diff --git a/crates/sparrow-runtime/src/s3/upload_snapshot.rs b/crates/sparrow-runtime/src/s3/upload_snapshot.rs deleted file mode 100644 index 2a7516178..000000000 --- a/crates/sparrow-runtime/src/s3/upload_snapshot.rs +++ /dev/null @@ -1,120 +0,0 @@ -use std::path::PathBuf; - -use anyhow::Context; -use error_stack::{IntoReport, IntoReportCompat, ResultExt}; -use sparrow_api::kaskada::v1alpha::{ComputeSnapshot, ComputeSnapshotConfig}; -use sparrow_instructions::ComputeStore; -use tempfile::TempDir; -use uuid::Uuid; - -use super::{S3Helper, S3Object}; -use crate::execute::ComputeResult; - -#[derive(derive_more::Display, Debug)] -pub enum Error { - #[display(fmt = "i/o error while uploading snapshot")] - Io, - #[display(fmt = "expected path '{source_path:?}' to be in '{source_prefix:?}'")] - InvalidPrefix { - source_prefix: PathBuf, - source_path: PathBuf, - }, -} - -impl error_stack::Context for Error {} - -/// Uploads a compute snapshot to s3. -/// -/// The owned `TempDir` will be dropped on completion of uploading the snapshot. -pub(crate) async fn upload_snapshot( - s3_helper: S3Helper, - storage_dir: TempDir, - config: ComputeSnapshotConfig, - compute_result: ComputeResult, -) -> error_stack::Result { - // The name is a UUID that is referenced by snapshot metadata. - let dest_name = Uuid::new_v4().to_string(); - - let path = if !crate::s3::is_s3_path(&config.output_prefix) { - let destination = std::path::Path::new(&config.output_prefix).join(dest_name); - - // If this is a local path, we just need to move the directory to the - // destination. - tracing::info!( - "Moving Rocks DB from {:?} to local output {:?}", - storage_dir.path(), - destination - ); - - let storage_dir = storage_dir.into_path(); - tokio::fs::rename(&storage_dir, &destination) - .await - .into_report() - .change_context(Error::Io)?; - destination.to_string_lossy().into_owned() - } else { - let dir = std::fs::read_dir(storage_dir.path()) - .into_report() - .change_context(Error::Io)?; - let source_prefix = storage_dir.path(); - - // The dest_prefix contains the bucket, so create an s3 object to parse. - let mut dest_path = S3Object::try_from_uri(&config.output_prefix) - .into_report() - .change_context(Error::Io)?; - dest_path.push_delimited(&dest_name); - let dest_path = dest_path; - - tracing::info!( - "Uploading compute snapshot files from '{:?}', to s3 at '{:?}'", - source_prefix, - dest_path.get_formatted_key(), - ); - - // Iterate over all files in the storage directory - for entry in dir { - let source_path = entry.into_report().change_context(Error::Io)?.path(); - - // Split the path to get the destination key - let source_key = source_path - .strip_prefix(source_prefix) - .into_report() - .change_context_lazy(|| Error::InvalidPrefix { - source_prefix: source_prefix.to_owned(), - source_path: source_path.clone(), - })?; - - let dest_path = dest_path.join_delimited(source_key.to_str().ok_or(Error::Io)?); - tracing::info!( - "Uploading snapshot to bucket {:?} with key {:?}", - dest_path.bucket, - dest_path.key - ); - - s3_helper - .upload_s3(dest_path, &source_path) - .await - .context("Uploading file to s3") - .into_report() - .change_context(Error::Io)?; - } - - // Explicitly close the storage dir so any problems cleaning it up - // are reported. - storage_dir - .close() - .into_report() - .change_context(Error::Io)?; - - dest_path.get_formatted_key() - }; - - let snapshot = ComputeSnapshot { - path, - max_event_time: Some(compute_result.max_input_timestamp), - plan_hash: Some(compute_result.plan_hash), - snapshot_version: ComputeStore::current_version(), - }; - - Ok(snapshot) -} diff --git a/crates/sparrow-runtime/src/stores/object_store_url.rs b/crates/sparrow-runtime/src/stores/object_store_url.rs index 298fa600a..a1ee68753 100644 --- a/crates/sparrow-runtime/src/stores/object_store_url.rs +++ b/crates/sparrow-runtime/src/stores/object_store_url.rs @@ -44,6 +44,13 @@ impl ObjectStoreUrl { Ok(Self { url }) } + /// Creates a relative URL if possible, with this URL as the base URL. + /// + /// This is the inverse of [`join`]. + pub fn relative_path(&self, url: &Self) -> Option { + self.url.make_relative(&url.url) + } + /// Return the local path, if this is a local file. pub fn local_path(&self) -> Option<&Path> { if self.url.scheme() == "file" { @@ -53,6 +60,11 @@ impl ObjectStoreUrl { None } } + + /// Return true if the URL ends with a delimiter. + pub fn is_delimited(&self) -> bool { + self.url.path().ends_with('/') + } } #[derive(derive_more::Display, Debug)] @@ -112,4 +124,24 @@ mod tests { None ); } + + #[test] + fn test_make_relative() { + let local_prefix = ObjectStoreUrl::from_str("file:///absolute/path/").unwrap(); + let local_file = ObjectStoreUrl::from_str("file:///absolute/path/some/file").unwrap(); + let s3_prefix = ObjectStoreUrl::from_str("s3://bucket/prefix/").unwrap(); + let s3_file = ObjectStoreUrl::from_str("s3://bucket/prefix/some/object").unwrap(); + + assert_eq!( + local_prefix.relative_path(&local_file), + Some("some/file".to_owned()) + ); + assert_eq!(local_prefix.relative_path(&s3_file), None); + + assert_eq!( + s3_prefix.relative_path(&s3_file), + Some("some/object".to_owned()) + ); + assert_eq!(s3_prefix.relative_path(&local_file), None); + } } diff --git a/crates/sparrow-runtime/src/stores/registry.rs b/crates/sparrow-runtime/src/stores/registry.rs index 21d0eb8c4..e9e170f7a 100644 --- a/crates/sparrow-runtime/src/stores/registry.rs +++ b/crates/sparrow-runtime/src/stores/registry.rs @@ -10,6 +10,9 @@ use tokio::io::AsyncWriteExt; use crate::stores::object_store_key::ObjectStoreKey; use crate::stores::ObjectStoreUrl; +/// If a file is smaller than this, use upload rather than multipart upload. +const SINGLE_PART_UPLOAD_LIMIT_BYTES: u64 = 5_000_000; + /// Map from URL scheme to object store for that prefix. /// /// Currently, we use a single object store or each scheme. This covers @@ -57,24 +60,51 @@ impl ObjectStoreRegistry { from: source_path.to_path_buf(), to: destination_url.clone(), }; - let mut source = tokio::fs::File::open(source_path) - .await - .into_report() - .change_context_lazy(upload_error)?; - let (_id, mut destination) = object_store - .put_multipart(&target_path) - .await - .into_report() - .change_context_lazy(upload_error)?; - tokio::io::copy(&mut source, &mut destination) - .await - .into_report() - .change_context_lazy(upload_error)?; - destination - .shutdown() + let metadata = tokio::fs::metadata(source_path) .await .into_report() .change_context_lazy(upload_error)?; + + let length = metadata.len(); + + tracing::info!( + "Uploading {length} bytes from {} to {destination_url}", + source_path.display() + ); + + if length <= SINGLE_PART_UPLOAD_LIMIT_BYTES { + let bytes = tokio::fs::read(source_path) + .await + .into_report() + .change_context_lazy(upload_error)?; + let bytes = bytes::Bytes::from(bytes); + object_store + .put(&target_path, bytes) + .await + .into_report() + .change_context_lazy(upload_error)?; + } else { + let mut source = tokio::fs::File::open(source_path) + .await + .into_report() + .change_context_lazy(upload_error)?; + + let (_id, mut destination) = object_store + .put_multipart(&target_path) + .await + .into_report() + .change_context_lazy(upload_error)?; + tokio::io::copy(&mut source, &mut destination) + .await + .into_report() + .change_context_lazy(upload_error)?; + destination + .shutdown() + .await + .into_report() + .change_context_lazy(upload_error)?; + } + Ok(()) } @@ -102,10 +132,15 @@ impl ObjectStoreRegistry { .into_report() .change_context_lazy(download_error)?; let mut source = tokio_util::io::StreamReader::new(stream); - tokio::io::copy(&mut source, &mut destination) + let length = tokio::io::copy(&mut source, &mut destination) .await .into_report() .change_context_lazy(download_error)?; + + tracing::info!( + "Downloaded {length} bytes from {source_url} to {}", + destination_path.display() + ); Ok(()) } } diff --git a/crates/sparrow-runtime/src/util/join_task.rs b/crates/sparrow-runtime/src/util/join_task.rs index a52b3ec8a..f9b29e133 100644 --- a/crates/sparrow-runtime/src/util/join_task.rs +++ b/crates/sparrow-runtime/src/util/join_task.rs @@ -7,7 +7,7 @@ use futures::ready; use futures::task::{Context, Poll}; use pin_project::pin_project; -use crate::execute::Error; +use crate::execute::error::Error; /// A custom `Future` that can be wrapped around a JoinHandle. /// diff --git a/proto/kaskada/kaskada/v1alpha/compute_service.proto b/proto/kaskada/kaskada/v1alpha/compute_service.proto index 30b70fae6..613171f1a 100644 --- a/proto/kaskada/kaskada/v1alpha/compute_service.proto +++ b/proto/kaskada/kaskada/v1alpha/compute_service.proto @@ -110,14 +110,14 @@ message ProgressInformation { } message ComputeSnapshotConfig { - // S3 URI prefix where *all* snapshots should be written. + // URI prefix where *all* snapshots should be written. // This should be unique to the query hash. // Snapshots will be written to this prefix. // // Example: `s3:///wren/v1alpha/computeSnapshots////data`. string output_prefix = 1; - // If set, the S3 URI prefix of a snapshot to resume from. + // If set, the URI prefix of a snapshot to resume from. // // Example: `s3:///wren/v1alpha/computeSnapshots////data/`. google.protobuf.StringValue resume_from = 2; diff --git a/tests/integration/api/query_v1_incremental_test.go b/tests/integration/api/query_v1_incremental_test.go index f889bcac4..1c298b6b2 100644 --- a/tests/integration/api/query_v1_incremental_test.go +++ b/tests/integration/api/query_v1_incremental_test.go @@ -413,7 +413,7 @@ var _ = Describe("Query V1 with incremental", Ordered, func() { // Run the query with the new table stream, err := queryClient.CreateQuery(ctx, queryRequestIncremental) - Expect(err).ShouldNot(HaveOccurred() ) + Expect(err).ShouldNot(HaveOccurred()) Expect(stream).ShouldNot(BeNil()) queryResponses, err := helpers.GetCreateQueryResponses(stream) diff --git a/wren/compute/compute_manager.go b/wren/compute/compute_manager.go index 6887e1d70..c632ff79c 100644 --- a/wren/compute/compute_manager.go +++ b/wren/compute/compute_manager.go @@ -111,7 +111,7 @@ func (m *computeManager) InitiateQuery(queryContext *QueryContext) (client.Compu subLogger.Info().Bool("incremental_enabled", queryContext.compileResp.IncrementalEnabled).Bool("is_current_data_token", queryContext.isCurrentDataToken).Msg("Populating snapshot config if needed") if queryContext.compileResp.IncrementalEnabled && queryContext.isCurrentDataToken && queryContext.compileResp.PlanHash != nil { executeRequest.ComputeSnapshotConfig = &v1alpha.ComputeSnapshotConfig{ - OutputPrefix: ConvertURIForCompute(m.getComputeSnapshotDataURI(queryContext.owner, *snapshotCacheBuster, queryContext.compileResp.PlanHash.Hash, queryContext.dataToken.DataVersionID)), + OutputPrefix: m.getComputeSnapshotDataURI(queryContext.owner, *snapshotCacheBuster, queryContext.compileResp.PlanHash.Hash, queryContext.dataToken.DataVersionID), } subLogger.Info().Str("SnapshotPrefix", executeRequest.ComputeSnapshotConfig.OutputPrefix).Msg("Snapshot output prefix") @@ -119,7 +119,7 @@ func (m *computeManager) InitiateQuery(queryContext *QueryContext) (client.Compu if err != nil { log.Warn().Err(err).Msg("issue getting existing snapshot. query will execute from scratch") } else if bestSnapshot != nil { - executeRequest.ComputeSnapshotConfig.ResumeFrom = &wrapperspb.StringValue{Value: ConvertURIForCompute(bestSnapshot.Path)} + executeRequest.ComputeSnapshotConfig.ResumeFrom = &wrapperspb.StringValue{Value: bestSnapshot.Path} subLogger.Info().Str("ResumeFrom", executeRequest.ComputeSnapshotConfig.ResumeFrom.Value).Msg("Found snapshot to resume compute from") } else { subLogger.Info().Msg("no valid snapshot to resume from") @@ -198,7 +198,7 @@ func (m *computeManager) runMaterializationQuery(queryContext *QueryContext) (*Q func (m *computeManager) SaveComputeSnapshots(queryContext *QueryContext, computeSnapshots []*v1alpha.ComputeSnapshot) { subLogger := log.Ctx(queryContext.ctx).With().Str("method", "manager.SaveComputeSnapshots").Logger() for _, computeSnapshot := range computeSnapshots { - if err := m.kaskadaTableClient.SaveComputeSnapshot(queryContext.ctx, queryContext.owner, computeSnapshot.PlanHash.Hash, computeSnapshot.SnapshotVersion, queryContext.dataToken, ConvertURIForManager(computeSnapshot.Path), computeSnapshot.MaxEventTime.AsTime(), queryContext.GetTableIDs()); err != nil { + if err := m.kaskadaTableClient.SaveComputeSnapshot(queryContext.ctx, queryContext.owner, computeSnapshot.PlanHash.Hash, computeSnapshot.SnapshotVersion, queryContext.dataToken, computeSnapshot.Path, computeSnapshot.MaxEventTime.AsTime(), queryContext.GetTableIDs()); err != nil { subLogger.Error().Err(err).Str("data_token_id", queryContext.dataToken.ID.String()).Msg("issue saving compute snapshot") } } diff --git a/wren/compute/helpers.go b/wren/compute/helpers.go index 00c86484a..35feea443 100644 --- a/wren/compute/helpers.go +++ b/wren/compute/helpers.go @@ -2,8 +2,6 @@ package compute import ( "context" - "fmt" - "strings" "time" "github.com/kaskada-ai/kaskada/wren/ent" @@ -16,17 +14,6 @@ import ( v1alpha "github.com/kaskada-ai/kaskada/gen/proto/go/kaskada/kaskada/v1alpha" ) -func ConvertURIForCompute(URI string) string { - return strings.TrimPrefix(URI, "file://") -} - -func ConvertURIForManager(URI string) string { - if strings.HasPrefix(URI, "/") { - return fmt.Sprintf("file://%s", URI) - } - return URI -} - func reMapSparrowError(ctx context.Context, err error) error { subLogger := log.Ctx(ctx).With().Str("method", "manager.reMapSparrowError").Logger() inStatus, ok := status.FromError(err)