Skip to content

Commit

Permalink
feat(expressions): Extend Expression.url.upload() to support row-spec…
Browse files Browse the repository at this point in the history
…ific URLs (#3518)

Addresses #3320

`Expression.url.upload()` can now take in a column of urls to upload
each row to a specific url.
  • Loading branch information
desmondcheongzx authored Dec 21, 2024
1 parent 1c0f780 commit 1a4ae66
Show file tree
Hide file tree
Showing 7 changed files with 283 additions and 67 deletions.
4 changes: 3 additions & 1 deletion daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1033,9 +1033,11 @@ def url_download(
) -> PyExpr: ...
def url_upload(
expr: PyExpr,
folder_location: str,
folder_location: PyExpr,
max_connections: int,
raise_error_on_failure: bool,
multi_thread: bool,
is_single_folder: bool,
io_config: IOConfig | None,
) -> PyExpr: ...
def tokenize_encode(
Expand Down
36 changes: 31 additions & 5 deletions daft/expressions/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1373,21 +1373,28 @@ def download(

def upload(
self,
location: str,
location: str | Expression,
max_connections: int = 32,
on_error: Literal["raise", "null"] = "raise",
io_config: IOConfig | None = None,
) -> Expression:
"""Uploads a column of binary data to the provided location (also supports S3, local etc).
"""Uploads a column of binary data to the provided location(s) (also supports S3, local etc).
Files will be written into the location (folder) with a generated UUID filename, and the result
Files will be written into the location (folder(s)) with a generated UUID filename, and the result
will be returned as a column of string paths that is compatible with the ``.url.download()`` Expression.
Example:
>>> col("data").url.upload("s3://my-bucket/my-folder") # doctest: +SKIP
Upload to row-specific URLs
>>> col("data").url.upload(col("paths")) # doctest: +SKIP
Args:
location: a folder location to upload data into
location: a folder location or column of folder locations to upload data into
max_connections: The maximum number of connections to use per thread to use for uploading data. Defaults to 32.
on_error: Behavior when a URL upload error is encountered - "raise" to raise the error immediately or "null" to log
the error but fallback to a Null value. Defaults to "raise".
io_config: IOConfig to use when uploading data
Returns:
Expand All @@ -1396,10 +1403,29 @@ def upload(
if not (isinstance(max_connections, int) and max_connections > 0):
raise ValueError(f"Invalid value for `max_connections`: {max_connections}")

location_expr = Expression._to_expression(location)
raise_on_error = False
if on_error == "raise":
raise_on_error = True
elif on_error == "null":
raise_on_error = False
else:
raise NotImplementedError(f"Unimplemented on_error option: {on_error}.")
multi_thread = ExpressionUrlNamespace._should_use_multithreading_tokio_runtime()
# If the user specifies a single location via a string, we should upload to a single folder. Otherwise,
# if the user gave an expression, we assume that each row has a specific url to upload to.
is_single_folder = isinstance(location, str)
io_config = ExpressionUrlNamespace._override_io_config_max_connections(max_connections, io_config)
return Expression._from_pyexpr(
native.url_upload(self._expr, location, max_connections, multi_thread, io_config)
native.url_upload(
self._expr,
location_expr._expr,
max_connections,
raise_on_error,
multi_thread,
is_single_folder,
io_config,
)
)


Expand Down
8 changes: 6 additions & 2 deletions src/daft-functions/src/python/uri.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ pub fn url_download(
#[pyfunction]
pub fn url_upload(
expr: PyExpr,
folder_location: &str,
folder_location: PyExpr,
max_connections: i64,
raise_error_on_failure: bool,
multi_thread: bool,
is_single_folder: bool,
io_config: Option<IOConfig>,
) -> PyResult<PyExpr> {
if max_connections <= 0 {
Expand All @@ -41,9 +43,11 @@ pub fn url_upload(
}
Ok(crate::uri::upload(
expr.into(),
folder_location,
folder_location.into(),
max_connections as usize,
raise_error_on_failure,
multi_thread,
is_single_folder,
io_config.map(|io_config| io_config.config),
)
.into())
Expand Down
9 changes: 6 additions & 3 deletions src/daft-functions/src/uri/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,22 @@ pub fn download(
#[must_use]
pub fn upload(
input: ExprRef,
location: &str,
location: ExprRef,
max_connections: usize,
raise_error_on_failure: bool,
multi_thread: bool,
is_single_folder: bool,
config: Option<IOConfig>,
) -> ExprRef {
ScalarFunction::new(
UploadFunction {
location: location.to_string(),
max_connections,
raise_error_on_failure,
multi_thread,
is_single_folder,
config: config.unwrap_or_default().into(),
},
vec![input],
vec![input, location],
)
.into()
}
Loading

0 comments on commit 1a4ae66

Please sign in to comment.