Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support S3 paths in Parquet inputs #798

Merged
merged 5 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci_python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ jobs:
pip install ${WHEEL} --force-reinstall
echo "::endgroup::"
echo "::group::Test Python $V"
poetry run pytest
poetry run pytest pytests -S minio
bjchambers marked this conversation as resolved.
Show resolved Hide resolved
echo "::endgroup::"
echo "::group::MyPy Python $V"
poetry run mypy -- --install-types --non-interactive pysrc pytests
Expand Down
2 changes: 1 addition & 1 deletion crates/sparrow-runtime/src/execute/operation/spread_zip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ mod tests {

let expected_offsets =
Int32Array::from(vec![Some(0), Some(3), Some(6), Some(6), Some(8), Some(9)]);
let offsets: Vec<i32> = result.offsets().into_iter().map(|i| *i).collect();
let offsets: Vec<i32> = result.offsets().iter().copied().collect();
let offsets = Int32Array::from(offsets);
assert_eq!(expected_offsets, offsets);
assert_eq!(&expected, values);
Expand Down
2 changes: 1 addition & 1 deletion crates/sparrow-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub mod key_hash_inverse;
mod metadata;
mod min_heap;
pub mod prepare;
mod read;
pub mod read;
pub mod stores;
mod streams;
mod util;
Expand Down
10 changes: 3 additions & 7 deletions crates/sparrow-runtime/src/prepare/preparer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl Preparer {
/// - `prepare_prefix`: The prefix to write prepared files to.
pub async fn prepare_parquet(
&self,
to_prepare: &str,
to_prepare: ObjectStoreUrl,
prepare_prefix: &ObjectStoreUrl,
) -> error_stack::Result<Vec<PreparedFile>, Error> {
// TODO: Support Slicing
Expand All @@ -110,14 +110,10 @@ impl Preparer {
.object_store(&output_url)
.change_context(Error::Internal)?;

// TODO: support preparing from remote stores
let to_prepare = std::path::Path::new(to_prepare);
let extension = to_prepare.extension().and_then(|ext| ext.to_str());
let to_prepare_url = ObjectStoreUrl::from_file_path(to_prepare)
.change_context_lazy(|| Error::InvalidUrl(to_prepare.display().to_string()))?;
let extension = Some("parquet");
let source_data = SourceData {
source: Some(
SourceData::try_from_url(to_prepare_url.url(), extension)
SourceData::try_from_url(to_prepare.url(), extension)
.into_report()
.change_context(Error::Internal)?,
),
Expand Down
6 changes: 5 additions & 1 deletion crates/sparrow-runtime/src/stores/object_store_url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,11 @@ impl FromStr for ObjectStoreUrl {
type Err = error_stack::Report<ParseError>;

fn from_str(s: &str) -> Result<Self, Self::Err> {
Url::from_str(s)
let base = std::env::current_dir()
.into_report()
.change_context(ParseError(s.to_owned()))?;
let base = Url::from_directory_path(base).map_err(|_| ParseError(s.to_owned()))?;
base.join(s)
bjchambers marked this conversation as resolved.
Show resolved Hide resolved
.into_report()
.change_context(ParseError(s.to_owned()))
.map(|it| ObjectStoreUrl { url: it })
Expand Down
9 changes: 9 additions & 0 deletions crates/sparrow-session/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
#![warn(
rust_2018_idioms,
nonstandard_style,
future_incompatible,
clippy::mod_module_files,
clippy::print_stdout,
clippy::print_stderr
)]

mod error;
mod execution;
mod expr;
Expand Down
2 changes: 1 addition & 1 deletion crates/sparrow-session/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub struct Session {
/// uuid. Once we run on multiple machines, we'll have to serialize/pickle the
/// udf as well.
udfs: HashMap<Uuid, Arc<dyn Udf>>,
object_store_registry: Arc<ObjectStoreRegistry>,
pub object_store_registry: Arc<ObjectStoreRegistry>,
rt: tokio::runtime::Runtime,
/// Default temporary path to prepare files to.
///
Expand Down
6 changes: 4 additions & 2 deletions crates/sparrow-session/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,18 +137,20 @@ impl Table {
Ok(())
}

pub async fn add_parquet(&self, path: &str) -> error_stack::Result<(), Error> {
pub async fn add_parquet(&self, url: &str) -> error_stack::Result<(), Error> {
let file_sets = match &self.source {
Source::Parquet(file_sets) => file_sets.clone(),
other => error_stack::bail!(Error::internal_msg(format!(
"expected parquet data source, saw {:?}",
other
))),
};
let url =
ObjectStoreUrl::from_str(url).change_context(Error::InvalidUrl(url.to_owned()))?;

let prepared = self
.preparer
.prepare_parquet(path, &self.prepare_prefix)
.prepare_parquet(url, &self.prepare_prefix)
.await
.change_context(Error::Prepare)?;

Expand Down
6 changes: 3 additions & 3 deletions crates/sparrow-transforms/src/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,14 @@ mod tests {
let time = Arc::new(TimestampNanosecondArray::from(vec![0, 1, 2, 3]));
let subsort = Arc::new(UInt64Array::from(vec![0, 1, 2, 3]));
let key_hash = Arc::new(UInt64Array::from(vec![0, 1, 2, 3]));
let input_batch = Batch::new_with_data(

Batch::new_with_data(
input_data,
time,
subsort,
key_hash,
RowTime::from_timestamp_ns(3),
);
input_batch
)
}

#[test]
Expand Down
6 changes: 3 additions & 3 deletions crates/sparrow-transforms/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,14 @@ mod tests {
let time = Arc::new(TimestampNanosecondArray::from(vec![0, 1, 2, 3]));
let subsort = Arc::new(UInt64Array::from(vec![0, 1, 2, 3]));
let key_hash = Arc::new(UInt64Array::from(vec![0, 1, 2, 3]));
let input_batch = Batch::new_with_data(

Batch::new_with_data(
input_data,
time,
subsort,
key_hash,
RowTime::from_timestamp_ns(3),
);
input_batch
)
}

#[test]
Expand Down
Loading
Loading