Skip to content

Commit

Permalink
feat: Support S3 paths in Parquet inputs (#798)
Browse files Browse the repository at this point in the history
Remaining work for S3 support includes preparing to S3 and writing to S3
destinations.
  • Loading branch information
bjchambers authored Oct 9, 2023
1 parent 691c346 commit d893f39
Show file tree
Hide file tree
Showing 20 changed files with 1,175 additions and 469 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci_python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ jobs:
pip install ${WHEEL} --force-reinstall
echo "::endgroup::"
echo "::group::Test Python $V"
poetry run pytest
poetry run pytest pytests -S minio
echo "::endgroup::"
echo "::group::MyPy Python $V"
poetry run mypy -- --install-types --non-interactive pysrc pytests
Expand Down
2 changes: 1 addition & 1 deletion crates/sparrow-runtime/src/execute/operation/spread_zip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ mod tests {

let expected_offsets =
Int32Array::from(vec![Some(0), Some(3), Some(6), Some(6), Some(8), Some(9)]);
let offsets: Vec<i32> = result.offsets().into_iter().map(|i| *i).collect();
let offsets: Vec<i32> = result.offsets().iter().copied().collect();
let offsets = Int32Array::from(offsets);
assert_eq!(expected_offsets, offsets);
assert_eq!(&expected, values);
Expand Down
2 changes: 1 addition & 1 deletion crates/sparrow-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub mod key_hash_inverse;
mod metadata;
mod min_heap;
pub mod prepare;
mod read;
pub mod read;
pub mod stores;
mod streams;
mod util;
Expand Down
10 changes: 3 additions & 7 deletions crates/sparrow-runtime/src/prepare/preparer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl Preparer {
/// - `prepare_prefix`: The prefix to write prepared files to.
pub async fn prepare_parquet(
&self,
to_prepare: &str,
to_prepare: ObjectStoreUrl,
prepare_prefix: &ObjectStoreUrl,
) -> error_stack::Result<Vec<PreparedFile>, Error> {
// TODO: Support Slicing
Expand All @@ -110,14 +110,10 @@ impl Preparer {
.object_store(&output_url)
.change_context(Error::Internal)?;

// TODO: support preparing from remote stores
let to_prepare = std::path::Path::new(to_prepare);
let extension = to_prepare.extension().and_then(|ext| ext.to_str());
let to_prepare_url = ObjectStoreUrl::from_file_path(to_prepare)
.change_context_lazy(|| Error::InvalidUrl(to_prepare.display().to_string()))?;
let extension = Some("parquet");
let source_data = SourceData {
source: Some(
SourceData::try_from_url(to_prepare_url.url(), extension)
SourceData::try_from_url(to_prepare.url(), extension)
.into_report()
.change_context(Error::Internal)?,
),
Expand Down
6 changes: 5 additions & 1 deletion crates/sparrow-runtime/src/stores/object_store_url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,11 @@ impl FromStr for ObjectStoreUrl {
type Err = error_stack::Report<ParseError>;

fn from_str(s: &str) -> Result<Self, Self::Err> {
Url::from_str(s)
let base = std::env::current_dir()
.into_report()
.change_context(ParseError(s.to_owned()))?;
let base = Url::from_directory_path(base).map_err(|_| ParseError(s.to_owned()))?;
base.join(s)
.into_report()
.change_context(ParseError(s.to_owned()))
.map(|it| ObjectStoreUrl { url: it })
Expand Down
9 changes: 9 additions & 0 deletions crates/sparrow-session/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
#![warn(
rust_2018_idioms,
nonstandard_style,
future_incompatible,
clippy::mod_module_files,
clippy::print_stdout,
clippy::print_stderr
)]

mod error;
mod execution;
mod expr;
Expand Down
2 changes: 1 addition & 1 deletion crates/sparrow-session/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub struct Session {
/// uuid. Once we run on multiple machines, we'll have to serialize/pickle the
/// udf as well.
udfs: HashMap<Uuid, Arc<dyn Udf>>,
object_store_registry: Arc<ObjectStoreRegistry>,
pub object_store_registry: Arc<ObjectStoreRegistry>,
rt: tokio::runtime::Runtime,
/// Default temporary path to prepare files to.
///
Expand Down
6 changes: 4 additions & 2 deletions crates/sparrow-session/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,18 +137,20 @@ impl Table {
Ok(())
}

pub async fn add_parquet(&self, path: &str) -> error_stack::Result<(), Error> {
pub async fn add_parquet(&self, file: &str) -> error_stack::Result<(), Error> {
let file_sets = match &self.source {
Source::Parquet(file_sets) => file_sets.clone(),
other => error_stack::bail!(Error::internal_msg(format!(
"expected parquet data source, saw {:?}",
other
))),
};
let url =
ObjectStoreUrl::from_str(file).change_context(Error::InvalidUrl(file.to_owned()))?;

let prepared = self
.preparer
.prepare_parquet(path, &self.prepare_prefix)
.prepare_parquet(url, &self.prepare_prefix)
.await
.change_context(Error::Prepare)?;

Expand Down
6 changes: 3 additions & 3 deletions crates/sparrow-transforms/src/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,14 @@ mod tests {
let time = Arc::new(TimestampNanosecondArray::from(vec![0, 1, 2, 3]));
let subsort = Arc::new(UInt64Array::from(vec![0, 1, 2, 3]));
let key_hash = Arc::new(UInt64Array::from(vec![0, 1, 2, 3]));
let input_batch = Batch::new_with_data(

Batch::new_with_data(
input_data,
time,
subsort,
key_hash,
RowTime::from_timestamp_ns(3),
);
input_batch
)
}

#[test]
Expand Down
6 changes: 3 additions & 3 deletions crates/sparrow-transforms/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,14 @@ mod tests {
let time = Arc::new(TimestampNanosecondArray::from(vec![0, 1, 2, 3]));
let subsort = Arc::new(UInt64Array::from(vec![0, 1, 2, 3]));
let key_hash = Arc::new(UInt64Array::from(vec![0, 1, 2, 3]));
let input_batch = Batch::new_with_data(

Batch::new_with_data(
input_data,
time,
subsort,
key_hash,
RowTime::from_timestamp_ns(3),
);
input_batch
)
}

#[test]
Expand Down
1,420 changes: 991 additions & 429 deletions python/poetry.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ mypy = ">=0.930"
pandas-stubs = "^2.0.2"
typeguard = ">=2.13.3"
graphviz = { version = "^0.20.1" }
boto3-stubs = {extras = ["s3"], version = "^1.28.62"}

[tool.poetry.group.docs]
# Dependencies for documentation.
Expand Down Expand Up @@ -106,9 +107,11 @@ sphinx-social-cards = { version = "^0.3.0", python = ">=3.9,<3.12" }
optional = true

[tool.poetry.group.test.dependencies]
boto3 = "^1.28.54"
coverage = { extras = ["toml"], version = ">=6.2"}
pytest = ">=6.2.5"
pytest-asyncio = "^0.21.1"
pytest-docker-fixtures = "^1.3.17"
xdoctest = {extras = ["colors"], version = ">=0.15.10"}

[tool.poetry.group.release]
Expand Down
4 changes: 3 additions & 1 deletion python/pysrc/kaskada/_ffi.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ class Table:
def name(self) -> str: ...
def expr(self) -> Expr: ...
async def add_pyarrow(self, data: pa.RecordBatch) -> None: ...
async def add_parquet(self, path: str) -> None: ...
async def add_parquet(self, file: str) -> None: ...

class Udf(object):
def __init__(self, result_ty: str, result_fn: Callable[..., pa.Array]) -> None: ...

async def parquet_schema(session: Session, url: str) -> pa.Schema: ...
26 changes: 13 additions & 13 deletions python/pysrc/kaskada/sources/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import pyarrow.json
import pyarrow.parquet

from .. import _ffi
from .._session import _get_session
from .source import Source, TimeUnit


Expand Down Expand Up @@ -537,7 +539,7 @@ def __init__(

@staticmethod
async def create(
path: Optional[str] = None,
url: Optional[str] = None,
*,
time_column: str,
key_column: str,
Expand All @@ -549,8 +551,9 @@ async def create(
"""Create a Parquet source.
Args:
path: The path to the Parquet file to add. This can be relative to the current
working directory or an absolute path (prefixed by '/').
file: The url or path of the Parquet file to add. Paths should be relative to the
current working directory or absolute. URLs may describe local file paths or
object-store locations.
time_column: The name of the column containing the time.
key_column: The name of the column containing the key.
schema: The schema to use. If not provided, it will be inferred from the input.
Expand All @@ -562,13 +565,10 @@ async def create(
time_unit: The unit of the time column. One of `ns`, `us`, `ms`, or `s`.
If not specified (and not specified in the data), nanosecond will be assumed.
"""
path = Source._get_absolute_path(path)

if schema is None:
if path is None:
raise ValueError("Must provide schema or path to parquet file")
schema = pa.parquet.read_schema(path)

if url is None:
raise ValueError("Must provide schema or url to parquet file")
schema = await _ffi.parquet_schema(_get_session(), url)
source = Parquet(
time_column=time_column,
key_column=key_column,
Expand All @@ -578,10 +578,10 @@ async def create(
time_unit=time_unit,
)

if path:
await source.add_file(path)
if url:
await source.add_file(url)
return source

async def add_file(self, path: str) -> None:
async def add_file(self, file: str) -> None:
"""Add data to the source."""
await self._ffi_table.add_parquet(str(Source._get_absolute_path(path)))
await self._ffi_table.add_parquet(file)
34 changes: 30 additions & 4 deletions python/pytests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,36 @@
from kaskada import init_session


pytest_plugins = ["pytest_docker_fixtures"]


def pytest_addoption(parser: pytest.Parser):
parser.addoption("--save-golden", action="store_true", help="update golden files")
parser.addoption(
"-S",
action="append",
metavar="SVC",
help="run tests requiring the service SVC.",
)


def pytest_configure(config):
# register an additional marker
config.addinivalue_line(
"markers", "svc(name): tests that require the named service"
)


def pytest_runtest_setup(item):
"""Skip the test unless all of the marked services are present."""
required_svcs = {mark.args[0] for mark in item.iter_markers(name="svc")}
provided_svcs = set(item.config.getoption("-S") or [])

missing_svcs = required_svcs - provided_svcs
if missing_svcs:
pytest.skip(f"test requires services {missing_svcs!r}")


@pytest.fixture(autouse=True, scope="session")
def session() -> None:
init_session()
Expand All @@ -22,10 +52,6 @@ def event_loop():
loop.close()


def pytest_addoption(parser: pytest.Parser):
parser.addoption("--save-golden", action="store_true", help="update golden files")


class GoldenFixture(object):
def __init__(self, dirname: str, test_name: str, save: bool):
self._output = 0
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{"_time":"2020-01-01T00:00:00.000000000","_key":"karen","id":"cb_001","purchase_time":"2020-01-01T00:00:00.000000000","customer_id":"karen","vendor_id":"chum_bucket","amount":9,"subsort_id":0}
{"_time":"2020-01-01T00:00:00.000000000","_key":"patrick","id":"kk_001","purchase_time":"2020-01-01T00:00:00.000000000","customer_id":"patrick","vendor_id":"krusty_krab","amount":3,"subsort_id":1}
{"_time":"2020-01-02T00:00:00.000000000","_key":"karen","id":"cb_002","purchase_time":"2020-01-02T00:00:00.000000000","customer_id":"karen","vendor_id":"chum_bucket","amount":2,"subsort_id":2}
{"_time":"2020-01-02T00:00:00.000000000","_key":"patrick","id":"kk_002","purchase_time":"2020-01-02T00:00:00.000000000","customer_id":"patrick","vendor_id":"krusty_krab","amount":5,"subsort_id":3}
{"_time":"2020-01-03T00:00:00.000000000","_key":"karen","id":"cb_003","purchase_time":"2020-01-03T00:00:00.000000000","customer_id":"karen","vendor_id":"chum_bucket","amount":4,"subsort_id":4}
{"_time":"2020-01-03T00:00:00.000000000","_key":"patrick","id":"kk_003","purchase_time":"2020-01-03T00:00:00.000000000","customer_id":"patrick","vendor_id":"krusty_krab","amount":12,"subsort_id":5}
{"_time":"2020-01-04T00:00:00.000000000","_key":"patrick","id":"cb_004","purchase_time":"2020-01-04T00:00:00.000000000","customer_id":"patrick","vendor_id":"chum_bucket","amount":5000,"subsort_id":6}
{"_time":"2020-01-04T00:00:00.000000000","_key":"karen","id":"cb_005","purchase_time":"2020-01-04T00:00:00.000000000","customer_id":"karen","vendor_id":"chum_bucket","amount":3,"subsort_id":7}
{"_time":"2020-01-05T00:00:00.000000000","_key":"karen","id":"cb_006","purchase_time":"2020-01-05T00:00:00.000000000","customer_id":"karen","vendor_id":"chum_bucket","amount":5,"subsort_id":8}
{"_time":"2020-01-05T00:00:00.000000000","_key":"patrick","id":"kk_004","purchase_time":"2020-01-05T00:00:00.000000000","customer_id":"patrick","vendor_id":"krusty_krab","amount":9,"subsort_id":9}
55 changes: 55 additions & 0 deletions python/pytests/parquet_source_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,58 @@ async def test_with_trailing_slash(golden) -> None:
subsort_column="subsort_id",
)
golden.jsonl(source)


@pytest.mark.svc("minio")
async def test_read_parquet_from_s3_minio(minio, golden) -> None:
# Upload a parquet file to minio for testing purposes
(minio_host, minio_port) = minio
import boto3

aws_endpoint = f"http://{minio_host}:{minio_port}"
# Defaults set in the `pytest-docker-fixtures`
# https://github.com/guillotinaweb/pytest-docker-fixtures/blob/236fdc1b6a9db03640040af2baf3bd3dfcc8d187/pytest_docker_fixtures/images.py#L42
aws_access_key_id = "x" * 10
aws_secret_access_key = "x" * 10
s3_client = boto3.client(
"s3",
endpoint_url=aws_endpoint,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
# aws_session_token=None,
# config=boto3.session.Config(signature_version='s3v4'),
# verify=False
)
s3_client.create_bucket(Bucket="test-bucket")
s3_client.upload_file(
"../testdata/purchases/purchases_part1.parquet",
"test-bucket",
"purchases_part1.parquet",
)

# This is a hack.
# The session (which is only created once) will cache the S3 client.
# This means that generally, changing the environment variables would
# require creating a new session to be picked up.
# Currently, we don't allow recreating the session (or scoping it).
# We probably should.
#
# But... this still works. The trick is that the S3 client isn't
# created until the first S3 URL is used. As long as we set the
# environment variables before that, they will be picked up and
# stored in the session appropriately.
import os

os.environ["AWS_ENDPOINT"] = aws_endpoint
os.environ["AWS_ACCESS_KEY_ID"] = aws_access_key_id
os.environ["AWS_SECRET_ACCESS_KEY"] = aws_secret_access_key
os.environ["AWS_DEFAULT_REGION"] = "us-east-1"
# Minio requires HTTP.
os.environ["AWS_ALLOW_HTTP"] = "true"

source = await kd.sources.Parquet.create(
"s3://test-bucket/purchases_part1.parquet",
time_column="purchase_time",
key_column="customer_id",
)
golden.jsonl(source)
10 changes: 10 additions & 0 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
#![warn(
rust_2018_idioms,
nonstandard_style,
future_incompatible,
clippy::mod_module_files,
clippy::print_stdout,
clippy::print_stderr
)]

use pyo3::prelude::*;

mod error;
Expand All @@ -22,5 +31,6 @@ fn ffi(_py: Python<'_>, m: &PyModule) -> PyResult<()> {
m.add_class::<table::Table>()?;
m.add_class::<execution::Execution>()?;

m.add_function(wrap_pyfunction!(session::parquet_schema, m)?)?;
Ok(())
}
Loading

0 comments on commit d893f39

Please sign in to comment.