Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support S3 paths in Parquet inputs #798

Merged
merged 5 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci_python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ jobs:
pip install ${WHEEL} --force-reinstall
echo "::endgroup::"
echo "::group::Test Python $V"
poetry run pytest
poetry run pytest pytests -S minio
bjchambers marked this conversation as resolved.
Show resolved Hide resolved
echo "::endgroup::"
echo "::group::MyPy Python $V"
poetry run mypy -- --install-types --non-interactive pysrc pytests
Expand Down
2 changes: 1 addition & 1 deletion crates/sparrow-runtime/src/execute/operation/spread_zip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ mod tests {

let expected_offsets =
Int32Array::from(vec![Some(0), Some(3), Some(6), Some(6), Some(8), Some(9)]);
let offsets: Vec<i32> = result.offsets().into_iter().map(|i| *i).collect();
let offsets: Vec<i32> = result.offsets().iter().copied().collect();
let offsets = Int32Array::from(offsets);
assert_eq!(expected_offsets, offsets);
assert_eq!(&expected, values);
Expand Down
2 changes: 1 addition & 1 deletion crates/sparrow-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub mod key_hash_inverse;
mod metadata;
mod min_heap;
pub mod prepare;
mod read;
pub mod read;
pub mod stores;
mod streams;
mod util;
Expand Down
10 changes: 3 additions & 7 deletions crates/sparrow-runtime/src/prepare/preparer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl Preparer {
/// - `prepare_prefix`: The prefix to write prepared files to.
pub async fn prepare_parquet(
&self,
to_prepare: &str,
to_prepare: ObjectStoreUrl,
prepare_prefix: &ObjectStoreUrl,
) -> error_stack::Result<Vec<PreparedFile>, Error> {
// TODO: Support Slicing
Expand All @@ -110,14 +110,10 @@ impl Preparer {
.object_store(&output_url)
.change_context(Error::Internal)?;

// TODO: support preparing from remote stores
let to_prepare = std::path::Path::new(to_prepare);
let extension = to_prepare.extension().and_then(|ext| ext.to_str());
let to_prepare_url = ObjectStoreUrl::from_file_path(to_prepare)
.change_context_lazy(|| Error::InvalidUrl(to_prepare.display().to_string()))?;
let extension = Some("parquet");
let source_data = SourceData {
source: Some(
SourceData::try_from_url(to_prepare_url.url(), extension)
SourceData::try_from_url(to_prepare.url(), extension)
.into_report()
.change_context(Error::Internal)?,
),
Expand Down
6 changes: 5 additions & 1 deletion crates/sparrow-runtime/src/stores/object_store_url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,11 @@ impl FromStr for ObjectStoreUrl {
type Err = error_stack::Report<ParseError>;

fn from_str(s: &str) -> Result<Self, Self::Err> {
Url::from_str(s)
let base = std::env::current_dir()
.into_report()
.change_context(ParseError(s.to_owned()))?;
let base = Url::from_directory_path(base).map_err(|_| ParseError(s.to_owned()))?;
base.join(s)
bjchambers marked this conversation as resolved.
Show resolved Hide resolved
.into_report()
.change_context(ParseError(s.to_owned()))
.map(|it| ObjectStoreUrl { url: it })
Expand Down
9 changes: 9 additions & 0 deletions crates/sparrow-session/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
#![warn(
rust_2018_idioms,
nonstandard_style,
future_incompatible,
clippy::mod_module_files,
clippy::print_stdout,
clippy::print_stderr
)]

mod error;
mod execution;
mod expr;
Expand Down
2 changes: 1 addition & 1 deletion crates/sparrow-session/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub struct Session {
/// uuid. Once we run on multiple machines, we'll have to serialize/pickle the
/// udf as well.
udfs: HashMap<Uuid, Arc<dyn Udf>>,
object_store_registry: Arc<ObjectStoreRegistry>,
pub object_store_registry: Arc<ObjectStoreRegistry>,
rt: tokio::runtime::Runtime,
/// Default temporary path to prepare files to.
///
Expand Down
6 changes: 4 additions & 2 deletions crates/sparrow-session/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,18 +137,20 @@ impl Table {
Ok(())
}

pub async fn add_parquet(&self, path: &str) -> error_stack::Result<(), Error> {
pub async fn add_parquet(&self, file: &str) -> error_stack::Result<(), Error> {
let file_sets = match &self.source {
Source::Parquet(file_sets) => file_sets.clone(),
other => error_stack::bail!(Error::internal_msg(format!(
"expected parquet data source, saw {:?}",
other
))),
};
let url =
ObjectStoreUrl::from_str(file).change_context(Error::InvalidUrl(file.to_owned()))?;

let prepared = self
.preparer
.prepare_parquet(path, &self.prepare_prefix)
.prepare_parquet(url, &self.prepare_prefix)
.await
.change_context(Error::Prepare)?;

Expand Down
6 changes: 3 additions & 3 deletions crates/sparrow-transforms/src/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,14 @@ mod tests {
let time = Arc::new(TimestampNanosecondArray::from(vec![0, 1, 2, 3]));
let subsort = Arc::new(UInt64Array::from(vec![0, 1, 2, 3]));
let key_hash = Arc::new(UInt64Array::from(vec![0, 1, 2, 3]));
let input_batch = Batch::new_with_data(

Batch::new_with_data(
input_data,
time,
subsort,
key_hash,
RowTime::from_timestamp_ns(3),
);
input_batch
)
}

#[test]
Expand Down
6 changes: 3 additions & 3 deletions crates/sparrow-transforms/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,14 @@ mod tests {
let time = Arc::new(TimestampNanosecondArray::from(vec![0, 1, 2, 3]));
let subsort = Arc::new(UInt64Array::from(vec![0, 1, 2, 3]));
let key_hash = Arc::new(UInt64Array::from(vec![0, 1, 2, 3]));
let input_batch = Batch::new_with_data(

Batch::new_with_data(
input_data,
time,
subsort,
key_hash,
RowTime::from_timestamp_ns(3),
);
input_batch
)
}

#[test]
Expand Down
1,420 changes: 991 additions & 429 deletions python/poetry.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ mypy = ">=0.930"
pandas-stubs = "^2.0.2"
typeguard = ">=2.13.3"
graphviz = { version = "^0.20.1" }
boto3-stubs = {extras = ["s3"], version = "^1.28.62"}

[tool.poetry.group.docs]
# Dependencies for documentation.
Expand Down Expand Up @@ -106,9 +107,11 @@ sphinx-social-cards = { version = "^0.3.0", python = ">=3.9,<3.12" }
optional = true

[tool.poetry.group.test.dependencies]
boto3 = "^1.28.54"
coverage = { extras = ["toml"], version = ">=6.2"}
pytest = ">=6.2.5"
pytest-asyncio = "^0.21.1"
pytest-docker-fixtures = "^1.3.17"
xdoctest = {extras = ["colors"], version = ">=0.15.10"}

[tool.poetry.group.release]
Expand Down
4 changes: 3 additions & 1 deletion python/pysrc/kaskada/_ffi.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ class Table:
def name(self) -> str: ...
def expr(self) -> Expr: ...
async def add_pyarrow(self, data: pa.RecordBatch) -> None: ...
async def add_parquet(self, path: str) -> None: ...
async def add_parquet(self, file: str) -> None: ...

class Udf(object):
def __init__(self, result_ty: str, result_fn: Callable[..., pa.Array]) -> None: ...

async def parquet_schema(session: Session, url: str) -> pa.Schema: ...
26 changes: 13 additions & 13 deletions python/pysrc/kaskada/sources/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import pyarrow.json
import pyarrow.parquet

from .. import _ffi
from .._session import _get_session
from .source import Source, TimeUnit


Expand Down Expand Up @@ -537,7 +539,7 @@ def __init__(

@staticmethod
async def create(
path: Optional[str] = None,
url: Optional[str] = None,
*,
time_column: str,
key_column: str,
Expand All @@ -549,8 +551,9 @@ async def create(
"""Create a Parquet source.

Args:
path: The path to the Parquet file to add. This can be relative to the current
working directory or an absolute path (prefixed by '/').
file: The url or path of the Parquet file to add. Paths should be relative to the
current working directory or absolute. URLs may describe local file paths or
object-store locations.
time_column: The name of the column containing the time.
key_column: The name of the column containing the key.
schema: The schema to use. If not provided, it will be inferred from the input.
Expand All @@ -562,13 +565,10 @@ async def create(
time_unit: The unit of the time column. One of `ns`, `us`, `ms`, or `s`.
If not specified (and not specified in the data), nanosecond will be assumed.
"""
path = Source._get_absolute_path(path)

if schema is None:
if path is None:
raise ValueError("Must provide schema or path to parquet file")
schema = pa.parquet.read_schema(path)

if url is None:
raise ValueError("Must provide schema or url to parquet file")
schema = await _ffi.parquet_schema(_get_session(), url)
source = Parquet(
time_column=time_column,
key_column=key_column,
Expand All @@ -578,10 +578,10 @@ async def create(
time_unit=time_unit,
)

if path:
await source.add_file(path)
if url:
await source.add_file(url)
return source

async def add_file(self, path: str) -> None:
async def add_file(self, file: str) -> None:
"""Add data to the source."""
await self._ffi_table.add_parquet(str(Source._get_absolute_path(path)))
await self._ffi_table.add_parquet(file)
34 changes: 30 additions & 4 deletions python/pytests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,36 @@
from kaskada import init_session


pytest_plugins = ["pytest_docker_fixtures"]


def pytest_addoption(parser: pytest.Parser):
parser.addoption("--save-golden", action="store_true", help="update golden files")
parser.addoption(
"-S",
action="append",
metavar="SVC",
help="run tests requiring the service SVC.",
)


def pytest_configure(config):
# register an additional marker
config.addinivalue_line(
"markers", "svc(name): tests that require the named service"
)


def pytest_runtest_setup(item):
"""Skip the test unless all of the marked services are present."""
required_svcs = {mark.args[0] for mark in item.iter_markers(name="svc")}
provided_svcs = set(item.config.getoption("-S") or [])

missing_svcs = required_svcs - provided_svcs
if missing_svcs:
pytest.skip(f"test requires services {missing_svcs!r}")


@pytest.fixture(autouse=True, scope="session")
def session() -> None:
init_session()
Expand All @@ -22,10 +52,6 @@ def event_loop():
loop.close()


def pytest_addoption(parser: pytest.Parser):
parser.addoption("--save-golden", action="store_true", help="update golden files")


class GoldenFixture(object):
def __init__(self, dirname: str, test_name: str, save: bool):
self._output = 0
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{"_time":"2020-01-01T00:00:00.000000000","_key":"karen","id":"cb_001","purchase_time":"2020-01-01T00:00:00.000000000","customer_id":"karen","vendor_id":"chum_bucket","amount":9,"subsort_id":0}
{"_time":"2020-01-01T00:00:00.000000000","_key":"patrick","id":"kk_001","purchase_time":"2020-01-01T00:00:00.000000000","customer_id":"patrick","vendor_id":"krusty_krab","amount":3,"subsort_id":1}
{"_time":"2020-01-02T00:00:00.000000000","_key":"karen","id":"cb_002","purchase_time":"2020-01-02T00:00:00.000000000","customer_id":"karen","vendor_id":"chum_bucket","amount":2,"subsort_id":2}
{"_time":"2020-01-02T00:00:00.000000000","_key":"patrick","id":"kk_002","purchase_time":"2020-01-02T00:00:00.000000000","customer_id":"patrick","vendor_id":"krusty_krab","amount":5,"subsort_id":3}
{"_time":"2020-01-03T00:00:00.000000000","_key":"karen","id":"cb_003","purchase_time":"2020-01-03T00:00:00.000000000","customer_id":"karen","vendor_id":"chum_bucket","amount":4,"subsort_id":4}
{"_time":"2020-01-03T00:00:00.000000000","_key":"patrick","id":"kk_003","purchase_time":"2020-01-03T00:00:00.000000000","customer_id":"patrick","vendor_id":"krusty_krab","amount":12,"subsort_id":5}
{"_time":"2020-01-04T00:00:00.000000000","_key":"patrick","id":"cb_004","purchase_time":"2020-01-04T00:00:00.000000000","customer_id":"patrick","vendor_id":"chum_bucket","amount":5000,"subsort_id":6}
{"_time":"2020-01-04T00:00:00.000000000","_key":"karen","id":"cb_005","purchase_time":"2020-01-04T00:00:00.000000000","customer_id":"karen","vendor_id":"chum_bucket","amount":3,"subsort_id":7}
{"_time":"2020-01-05T00:00:00.000000000","_key":"karen","id":"cb_006","purchase_time":"2020-01-05T00:00:00.000000000","customer_id":"karen","vendor_id":"chum_bucket","amount":5,"subsort_id":8}
{"_time":"2020-01-05T00:00:00.000000000","_key":"patrick","id":"kk_004","purchase_time":"2020-01-05T00:00:00.000000000","customer_id":"patrick","vendor_id":"krusty_krab","amount":9,"subsort_id":9}
55 changes: 55 additions & 0 deletions python/pytests/parquet_source_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,58 @@ async def test_with_trailing_slash(golden) -> None:
subsort_column="subsort_id",
)
golden.jsonl(source)


@pytest.mark.svc("minio")
async def test_read_parquet_from_s3_minio(minio, golden) -> None:
# Upload a parquet file to minio for testing purposes
(minio_host, minio_port) = minio
import boto3

aws_endpoint = f"http://{minio_host}:{minio_port}"
# Defaults set in the `pytest-docker-fixtures`
# https://github.com/guillotinaweb/pytest-docker-fixtures/blob/236fdc1b6a9db03640040af2baf3bd3dfcc8d187/pytest_docker_fixtures/images.py#L42
aws_access_key_id = "x" * 10
aws_secret_access_key = "x" * 10
s3_client = boto3.client(
"s3",
endpoint_url=aws_endpoint,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
# aws_session_token=None,
# config=boto3.session.Config(signature_version='s3v4'),
# verify=False
)
s3_client.create_bucket(Bucket="test-bucket")
s3_client.upload_file(
"../testdata/purchases/purchases_part1.parquet",
"test-bucket",
"purchases_part1.parquet",
)

# This is a hack.
# The session (which is only created once) will cache the S3 client.
# This means that generally, changing the environment variables would
# require creating a new session to be picked up.
# Currently, we don't allow recreating the session (or scoping it).
# We probably should.
#
# But... this still works. The trick is that the S3 client isn't
# created until the first S3 URL is used. As long as we set the
# environment variables before that, they will be picked up and
# stored in the session appropriately.
import os

os.environ["AWS_ENDPOINT"] = aws_endpoint
os.environ["AWS_ACCESS_KEY_ID"] = aws_access_key_id
os.environ["AWS_SECRET_ACCESS_KEY"] = aws_secret_access_key
os.environ["AWS_DEFAULT_REGION"] = "us-east-1"
# Minio requires HTTP.
os.environ["AWS_ALLOW_HTTP"] = "true"

source = await kd.sources.Parquet.create(
"s3://test-bucket/purchases_part1.parquet",
time_column="purchase_time",
key_column="customer_id",
)
golden.jsonl(source)
10 changes: 10 additions & 0 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
#![warn(
rust_2018_idioms,
nonstandard_style,
future_incompatible,
clippy::mod_module_files,
clippy::print_stdout,
clippy::print_stderr
)]

use pyo3::prelude::*;

mod error;
Expand All @@ -22,5 +31,6 @@ fn ffi(_py: Python<'_>, m: &PyModule) -> PyResult<()> {
m.add_class::<table::Table>()?;
m.add_class::<execution::Execution>()?;

m.add_function(wrap_pyfunction!(session::parquet_schema, m)?)?;
Ok(())
}
Loading
Loading