Skip to content

Commit

Permalink
Move to url namespace
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Jul 4, 2024
1 parent bf7f204 commit d30ddf5
Show file tree
Hide file tree
Showing 10 changed files with 141 additions and 119 deletions.
8 changes: 7 additions & 1 deletion daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1019,7 +1019,6 @@ class PySchema:

class PyExpr:
def alias(self, name: str) -> PyExpr: ...
def binary_upload_to_folder(self, folder_location: str, io_config: IOConfig | None): ...
def cast(self, dtype: PyDataType) -> PyExpr: ...
def ceil(self) -> PyExpr: ...
def floor(self) -> PyExpr: ...
Expand Down Expand Up @@ -1148,6 +1147,13 @@ class PyExpr:
multi_thread: bool,
config: IOConfig,
) -> PyExpr: ...
def url_upload(
self,
folder_location: str,
max_connections: int,
multi_thread: bool,
io_config: IOConfig | None,
): ...
def partitioning_days(self) -> PyExpr: ...
def partitioning_hours(self) -> PyExpr: ...
def partitioning_months(self) -> PyExpr: ...
Expand Down
95 changes: 61 additions & 34 deletions daft/expressions/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,6 @@ class Expression:
def __init__(self) -> None:
raise NotImplementedError("We do not support creating a Expression via __init__ ")

@property
def bytes(self) -> ExpressionBinaryNamespace:
"""Access methods that work on columns of binary data"""
return ExpressionBinaryNamespace.from_expression(self)

@property
def str(self) -> ExpressionStringNamespace:
"""Access methods that work on columns of strings"""
Expand Down Expand Up @@ -821,6 +816,35 @@ def from_expression(cls: type[SomeExpressionNamespace], expr: Expression) -> Som


class ExpressionUrlNamespace(ExpressionNamespace):
@staticmethod
def _should_use_multithreading_tokio_runtime() -> bool:
"""Whether or not our expression should use the multithreaded tokio runtime under the hood, or a singlethreaded one
This matters because for distributed workloads, each process has its own tokio I/O runtime. if each distributed process
is multithreaded (by default we spin up `N_CPU` threads) then we will be running `(N_CPU * N_PROC)` number of threads, and
opening `(N_CPU * N_PROC * max_connections)` number of connections. This is too large for big machines with many CPU cores.
Hence for Ray we default to doing the singlethreaded runtime. This means that we will have a limit of
`(singlethreaded=1 * N_PROC * max_connections)` number of open connections per machine, which works out to be reasonable at ~2-4k connections.
For local execution, we run in a single process which means that it all shares the same tokio I/O runtime and connection pool.
Thus we just have `(multithreaded=N_CPU * max_connections)` number of open connections, which is usually reasonable as well.
"""
using_ray_runner = context.get_context().is_ray_runner
return not using_ray_runner

@staticmethod
def _override_io_config_max_connections(max_connections: int, io_config: IOConfig | None) -> IOConfig:
"""Use a user-provided `max_connections` argument to override the value in S3Config
This is because our Rust code under the hood actually does `min(S3Config's max_connections, url_download's max_connections)` to
determine how many connections to allow per-thread. Thus we need to override the io_config here to ensure that the user's max_connections
is correctly applied in our Rust code.
"""
io_config = context.get_context().daft_planning_config.default_io_config if io_config is None else io_config
io_config = io_config.replace(s3=io_config.s3.replace(max_connections=max_connections))
return io_config

def download(
self,
max_connections: int = 32,
Expand Down Expand Up @@ -862,16 +886,10 @@ def download(
if not (isinstance(max_connections, int) and max_connections > 0):
raise ValueError(f"Invalid value for `max_connections`: {max_connections}")

# Use the `max_connections` kwarg to override the value in S3Config
# This is because the max parallelism is actually `min(S3Config's max_connections, url_download's max_connections)` under the hood.
# However, default max_connections on S3Config is only 8, and even if we specify 32 here we are bottlenecked there.
# Therefore for S3 downloads, we override `max_connections` kwarg to have the intended effect.
io_config = context.get_context().daft_planning_config.default_io_config if io_config is None else io_config
io_config = io_config.replace(s3=io_config.s3.replace(max_connections=max_connections))

using_ray_runner = context.get_context().is_ray_runner
multi_thread = ExpressionUrlNamespace._should_use_multithreading_tokio_runtime()
io_config = ExpressionUrlNamespace._override_io_config_max_connections(max_connections, io_config)
return Expression._from_pyexpr(
self._expr.url_download(max_connections, raise_on_error, not using_ray_runner, io_config)
self._expr.url_download(max_connections, raise_on_error, multi_thread, io_config)
)
else:
from daft.udf_library import url_udfs
Expand All @@ -882,6 +900,35 @@ def download(
on_error=on_error,
)

def upload(
self,
location: str,
max_connections: int = 32,
io_config: IOConfig | None = None,
) -> Expression:
"""Uploads a column of binary data to the provided location (also supports S3, local etc)
Files will be written into the location (folder) with a generated UUID filename, and the result
will be returned as a column of string paths that is compatible with the ``.url.download()`` Expression.
Example:
>>> col("data").url.upload("s3://my-bucket/my-folder")
Args:
location: a folder location to upload data into
max_connections: The maximum number of connections to use per thread to use for uploading data. Defaults to 32.
io_config: IOConfig to use when uploading data
Returns:
Expression: a String expression containing the written filepath
"""
if not (isinstance(max_connections, int) and max_connections > 0):
raise ValueError(f"Invalid value for `max_connections`: {max_connections}")

Check warning on line 926 in daft/expressions/expressions.py

View check run for this annotation

Codecov / codecov/patch

daft/expressions/expressions.py#L926

Added line #L926 was not covered by tests

multi_thread = ExpressionUrlNamespace._should_use_multithreading_tokio_runtime()
io_config = ExpressionUrlNamespace._override_io_config_max_connections(max_connections, io_config)
return Expression._from_pyexpr(self._expr.url_upload(location, max_connections, multi_thread, io_config))


class ExpressionFloatNamespace(ExpressionNamespace):
def is_nan(self) -> Expression:
Expand Down Expand Up @@ -1095,26 +1142,6 @@ def truncate(self, interval: str, relative_to: Expression | None = None) -> Expr
return Expression._from_pyexpr(self._expr.dt_truncate(interval, relative_to._expr))


class ExpressionBinaryNamespace(ExpressionNamespace):
def upload_to_folder(self, folder_location: str, io_config: IOConfig | None = None) -> Expression:
"""Uploads a column of binary data to the provided folder location (also supports S3, local etc)
Files will be written into the folder with a generated UUID filename, and the result will be returned
as a column of string paths that is compatible with the ``.url.download()`` Expression.
Example:
>>> col("data").bytes.upload_to_folder("s3://my-bucket/my-folder")
Args:
folder_location: a folder location to upload data into
io_config: IOConfig to use when uploading data
Returns:
Expression: a String expression containing the written filepath
"""
return Expression._from_pyexpr(self._expr.binary_upload_to_folder(folder_location, io_config))


class ExpressionStringNamespace(ExpressionNamespace):
def contains(self, substr: str | Expression) -> Expression:
"""Checks whether each string contains the given pattern in a string column
Expand Down
43 changes: 0 additions & 43 deletions src/daft-dsl/src/functions/binary/mod.rs

This file was deleted.

4 changes: 0 additions & 4 deletions src/daft-dsl/src/functions/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
pub mod binary;
pub mod float;
pub mod hash;
pub mod image;
Expand All @@ -18,7 +17,6 @@ use std::fmt::{Display, Formatter, Result};

use crate::ExprRef;

use self::binary::BinaryExpr;
use self::image::ImageExpr;
use self::json::JsonExpr;
use self::list::ListExpr;
Expand Down Expand Up @@ -47,7 +45,6 @@ pub enum FunctionExpr {
Numeric(NumericExpr),
Float(FloatExpr),
Utf8(Utf8Expr),
Binary(BinaryExpr),
Temporal(TemporalExpr),
List(ListExpr),
Map(MapExpr),
Expand Down Expand Up @@ -82,7 +79,6 @@ impl FunctionExpr {
Numeric(expr) => expr.get_evaluator(),
Float(expr) => expr.get_evaluator(),
Utf8(expr) => expr.get_evaluator(),
Binary(expr) => expr.get_evaluator(),
Temporal(expr) => expr.get_evaluator(),
List(expr) => expr.get_evaluator(),
Map(expr) => expr.get_evaluator(),
Expand Down
28 changes: 28 additions & 0 deletions src/daft-dsl/src/functions/uri/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
mod download;
mod upload;

use std::sync::Arc;

use download::DownloadEvaluator;
use serde::{Deserialize, Serialize};
use upload::UploadEvaluator;

use crate::{Expr, ExprRef};

Expand All @@ -19,6 +21,12 @@ pub enum UriExpr {
multi_thread: bool,
config: Arc<IOConfig>,
},
Upload {
location: String,
max_connections: usize,
multi_thread: bool,
config: Arc<IOConfig>,
},
}

impl UriExpr {
Expand All @@ -27,6 +35,7 @@ impl UriExpr {
use UriExpr::*;
match self {
Download { .. } => &DownloadEvaluator {},
Upload { .. } => &UploadEvaluator {},
}
}
}
Expand All @@ -49,3 +58,22 @@ pub fn download(
}
.into()
}

pub fn upload(
input: ExprRef,
location: &str,
max_connections: usize,
multi_thread: bool,
config: Option<IOConfig>,
) -> ExprRef {
Expr::Function {
func: super::FunctionExpr::Uri(UriExpr::Upload {
location: location.to_string(),
max_connections,
multi_thread,
config: config.unwrap_or_default().into(),
}),
inputs: vec![input],
}
.into()
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
use daft_core::{datatypes::Field, schema::Schema, series::Series, DataType};
use daft_io::upload_to_folder;
use daft_io::url_upload;

use crate::ExprRef;

use crate::functions::FunctionExpr;
use common_error::{DaftError, DaftResult};

use super::{super::FunctionEvaluator, BinaryExpr};
use super::{super::FunctionEvaluator, UriExpr};

pub(super) struct UploadToFolderEvaluator {}
pub(super) struct UploadEvaluator {}

impl FunctionEvaluator for UploadToFolderEvaluator {
impl FunctionEvaluator for UploadEvaluator {
fn fn_name(&self) -> &'static str {
"upload_to_folder"
"upload"
}

Check warning on line 16 in src/daft-dsl/src/functions/uri/upload.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-dsl/src/functions/uri/upload.rs#L14-L16

Added lines #L14 - L16 were not covered by tests

fn to_field(&self, inputs: &[ExprRef], schema: &Schema, _: &FunctionExpr) -> DaftResult<Field> {
Expand All @@ -21,7 +21,7 @@ impl FunctionEvaluator for UploadToFolderEvaluator {
let data_field = data.to_field(schema)?;
match data_field.dtype {
DataType::Binary | DataType::FixedSizeBinary(..) | DataType::Utf8 => Ok(Field::new(data_field.name, DataType::Utf8)),
_ => Err(DaftError::TypeError(format!("Expects input to upload_to_folder to be Binary, FixedSizeBinary or String, but received {}", data_field))),
_ => Err(DaftError::TypeError(format!("Expects input to url_upload to be Binary, FixedSizeBinary or String, but received {}", data_field))),

Check warning on line 24 in src/daft-dsl/src/functions/uri/upload.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-dsl/src/functions/uri/upload.rs#L24

Added line #L24 was not covered by tests
}
}
_ => Err(DaftError::SchemaMismatch(format!(
Expand All @@ -32,25 +32,24 @@ impl FunctionEvaluator for UploadToFolderEvaluator {
}

fn evaluate(&self, inputs: &[Series], expr: &FunctionExpr) -> DaftResult<Series> {
let (folder_location, io_config) = match expr {
FunctionExpr::Binary(BinaryExpr::UploadToFolder {
folder_location,
io_config,
}) => Ok((folder_location, io_config)),
let (location, io_config, max_connections, multi_thread) = match expr {
FunctionExpr::Uri(UriExpr::Upload {
location,
config,
max_connections,
multi_thread,
}) => Ok((location, config, max_connections, multi_thread)),
_ => Err(DaftError::ValueError(format!(
"Expected an UploadToFolder expression but received {expr}"
"Expected an Upload expression but received {expr}"
))),
}?;

Check warning on line 45 in src/daft-dsl/src/functions/uri/upload.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-dsl/src/functions/uri/upload.rs#L42-L45

Added lines #L42 - L45 were not covered by tests

match inputs {
[data] => upload_to_folder(
[data] => url_upload(
data,
folder_location,
// TODO: enable these to be configurable. We probably want to do multi_thread=False for the Ray
// case, and override the default s3.max_connections_per_io_thread to be a much higher number
// to optimize for small-file uploads
io_config.s3.max_connections_per_io_thread as usize,
true,
location,
*max_connections,
*multi_thread,
io_config.clone(),
None,
),
Expand Down
Loading

0 comments on commit d30ddf5

Please sign in to comment.