Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] [SC-179831] Preliminary support for ranged downloads in SDK #1

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .codegen/__init__.py.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import databricks.sdk.dbutils as dbutils
from databricks.sdk.credentials_provider import CredentialsStrategy

from databricks.sdk.mixins.files import DbfsExt
from databricks.sdk.mixins.files import FilesExt
from databricks.sdk.mixins.compute import ClustersExt
from databricks.sdk.mixins.workspace import WorkspaceExt
from databricks.sdk.mixins.open_ai_client import ServingEndpointsExt
Expand All @@ -18,7 +19,7 @@ from typing import Optional
"google_credentials" "google_service_account" }}

{{- define "api" -}}
{{- $mixins := dict "ClustersAPI" "ClustersExt" "DbfsAPI" "DbfsExt" "WorkspaceAPI" "WorkspaceExt" "ServingEndpointsAPI" "ServingEndpointsExt" -}}
{{- $mixins := dict "ClustersAPI" "ClustersExt" "DbfsAPI" "DbfsExt" "FilesAPI" "FilesExt" "WorkspaceAPI" "WorkspaceExt" "ServingEndpointsAPI" "ServingEndpointsExt" -}}
{{- $genApi := concat .PascalName "API" -}}
{{- getOrDefault $mixins $genApi $genApi -}}
{{- end -}}
Expand Down
6 changes: 3 additions & 3 deletions databricks/sdk/__init__.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

269 changes: 268 additions & 1 deletion databricks/sdk/mixins/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import sys
from abc import ABC, abstractmethod
from collections import deque
from io import BytesIO
from datetime import datetime, timezone
from enum import Enum
from io import BytesIO, IOBase, BufferedIOBase, UnsupportedOperation
from types import TracebackType
from typing import (TYPE_CHECKING, AnyStr, BinaryIO, Generator, Iterable,
Iterator, Type, Union)
Expand All @@ -17,6 +19,11 @@
from .._property import _cached_property
from ..errors import NotFound
from ..service import files
from ..service._internal import _escape_multi_segment_path_parameter
from ..service.files import DownloadResponse

_FILES_MIXIN_DEBUG_ENABLED = False
_FILES_MIXIN_ENABLE_UNSUPPORTED_FEATURES = False

if TYPE_CHECKING:
from _typeshed import Self
Expand Down Expand Up @@ -636,3 +643,263 @@ def delete(self, path: str, *, recursive=False):
if p.is_dir and not recursive:
raise IOError('deleting directories requires recursive flag')
p.delete(recursive=recursive)


class FilesExt(files.FilesAPI):
"""Extends the FilesAPI with support for complex multipart upload/download operations & more robust file I/O"""
__doc__ = files.FilesAPI.__doc__

class _FileTransferBackend(Enum):
DB_FILES_API = 1
PRESIGNED_URLS = 2

def __init__(self, api_client):
super().__init__(api_client)

def download(self, file_path: str, *, start_byte_offset: Optional[int] = None, if_unmodified_since_timestamp: Optional[datetime] = None) -> DownloadResponse:
"""Download a file.

Downloads a file of up to 5 GiB. The file contents are the response body. This is a standard HTTP file
download, not a JSON RPC.

:param file_path: str
The absolute path of the file.

:returns: :class:`DownloadResponse`
"""

headers = {'Accept': 'application/octet-stream', }

if start_byte_offset:
headers['Range'] = f'bytes={start_byte_offset}-'

if if_unmodified_since_timestamp:
headers['If-Unmodified-Since'] = if_unmodified_since_timestamp.strftime("%a, %d %b %Y %H:%M:%S GMT")

response_headers = ['content-length', 'content-type', 'last-modified', ]
res = self._api.do('GET',
f'/api/2.0/fs/files{_escape_multi_segment_path_parameter(file_path)}',
headers=headers,
response_headers=response_headers,
raw=True)

return DownloadResponse.from_dict(res)


class PresignedUrl:
"""Represents all information needed to execute a presigned URL request"""

def __init__(self, method: str, url: str, headers: List[Dict[str, str]],
headers_populated_by_client: List[str]):
self.method = method
self.url = url
self.headers_populated_by_client = set(headers_populated_by_client)
self.headers = {h["name"]: h["value"] for h in headers}

def all_client_headers_populated(self, user_headers: List[str]):
return self.headers_populated_by_client.issubset(user_headers)


class MultipartUploadCreatePartUrlsResponse:
"""Represents the response of a request for presigned URLs for uploading parts of a file in a multipart upload session."""

def __init__(self, upload_part_urls: List[PresignedUrl], next_page_token: str):
self.upload_part_urls = upload_part_urls
self.next_page_token = next_page_token


class MultipartUploadCreate:
"""Represents the response to an initiated multipart upload session."""

def __init__(self, session_token: str, part_size: int):
self.session_token = session_token
self.part_size = part_size


class SeekableDownloadBinaryIO(BufferedIOBase):
"""Presents a remote filesystem object as a seekable BinaryIO stream """
# TODO: The initial version will ONLY support resumption; it will NOT support truncation / end points
# TODO: This currently is only handling situations where the underlying stream is closed at the start of a request. It should add:
# - Automatic closure & reopening of a stream that throws an exception during normal operations
# - Throwing an exception if we're unable to open a stream (i.e. non-200 response)
def __init__(self, file_path: str, api: FilesExt):

# This is actively under development and should not be considered production ready or API stable.
if not _FILES_MIXIN_ENABLE_UNSUPPORTED_FEATURES:
raise NotImplementedError("SeekableDownloadBinaryIO is not yet supported")

self._file_path: str = file_path

self._api = api
self._initial_request_metadata: DownloadResponse = self._api.download(self._file_path)
self._dl_session_initiated = datetime.now(timezone.utc)
self._current_pos_of_underlying_stream: int = 0
self._start_pos_of_underlying_stream: int = 0
self._underlying_stream: BinaryIO = self._initial_request_metadata.contents
self._overall_file_size: int = self._initial_request_metadata.content_length
self._most_recent_dl_resp = None
self._closed: bool = False

def _replace_underlying_stream(self, __offset):
"""Close the existing underlying stream and open a new one at the specified file offset"""
old_stream = self._underlying_stream
self._underlying_stream = self._api.download(self._file_path, start_byte_offset=__offset, if_unmodified_since_timestamp=self._dl_session_initiated).contents
printd("Closed older stream")
old_stream.close()
printd("Set underlying stream")

def _underlying_stream_is_open(self):
"""Convenience method indicating that the underlying stream is open. TODO: This also assumes that the stream does not auto-close at EOF. Might need to revisit that"""
return self._underlying_stream is not None and not self._underlying_stream.closed

def _ensure_open_stream(self):
"""Calling this will ensure that the underlying stream is open, smoothing over issues like socket timeouts to create the illusion of one indefinitely readable file stream"""
if not self._underlying_stream_is_open():
self._replace_underlying_stream(self.tell())

def detach(self):
raise UnsupportedOperation("Detaching from the buffer is not supported")

def read(self, __size = -1):
# Read and return up to size bytes. If omitted, None, or Negative, data is read until EOF is reached
# Empty bytes object returned if stream is EOF
self._ensure_open_stream()
out = self._underlying_stream.read(__size)
self._current_pos_of_underlying_stream += len(out)
return out

def read1(self, __size = -1):
# Read and return up to size bytes, with at most one read() system call
self._ensure_open_stream()
out = self._underlying_stream.read1(__size)
self._current_pos_of_underlying_stream += len(out)
return out

def readinto(self, __buffer):
# Read up to len(buffer) bytes into buffer and return number of bytes read
self._ensure_open_stream()
out = self._underlying_stream.readinto(__buffer)
self._current_pos_of_underlying_stream += len(out)
return out

def readinto1(self, __buffer):
# Read up to len(buffer) bytes into buffer with at most one read() system call
self._ensure_open_stream()
out = self._underlying_stream.readinto1(__buffer)
self._current_pos_of_underlying_stream += len(out)
return out

def write(self, __buffer):
raise UnsupportedOperation("SeekableDownloadBinaryIO is used exclusively for read operations")

def close(self):
"""Close the underlying stream & mark the SeekableBinaryIO stream as closed as well"""
try:
self._underlying_stream.close()
self._closed = True
except:
self._underlying_stream = None
self._closed = True

def closed(self):
"""Reflects whether we permit additional operations on this stream"""
return self._closed

def fileno(self):
raise UnsupportedOperation("fileno() is not supported on this stream")

def flush(self):
return

def isatty(self):
return False

def readable(self):
return True

def readline(self, __size = -1):
self._ensure_open_stream()
out = self._underlying_stream.readline(__size)
self._current_pos_of_underlying_stream += len(out)
return out

def readlines(self, __hint = -1):
self._ensure_open_stream()
out = self._underlying_stream.readlines(__hint)
self._current_pos_of_underlying_stream += len(out)
return out

def seek(self, __offset, __whence = os.SEEK_SET):
"""
Change the stream position to the given byte offset, which may necessitate closing the existing client connection and opening a new one.

:param __offset: Change the position to the byte offset, relative to the whence reference point
:param __whence:
- os.SEEK_SET / 0: Start of the file, offset must be 0 or positive
- os.SEEK_CUR / 1: Current position, offset may be pos/neg/0
- os.SEEK_END / 2: End of the file, offset must be 0 or negative
:return: absolute position of the stream (in the overall file)
"""

if self._underlying_stream.seekable() and __offset > 0:
return self._start_pos_of_underlying_stream + self._underlying_stream.seek(__offset, __whence)

if(__whence == os.SEEK_SET):
if(__offset < 0):
raise ValueError("Seek position must be 0 or positive")

printd("Closing underlying stream, START")
self._underlying_stream.close()

# TODO: Request new stream starting from byte __offset
printd(f"Setting up new underlying stream, START, {__offset}")
self._replace_underlying_stream(__offset)
self._start_pos_of_underlying_stream = __offset
return __offset

if(__whence == os.SEEK_CUR):
if(__offset == 0):
return self._underlying_stream.tell()
self._underlying_stream.close()
printd("Closing underlying stream, CUR")

# TODO: Request new stream starting from byte __offset
new_offset = self._start_pos_of_underlying_stream + __offset
printd(f"Setting up new underlying stream, CUR, {new_offset}")
self._replace_underlying_stream(new_offset)
self._start_pos_of_underlying_stream = new_offset
return new_offset

if(__whence == os.SEEK_END):
if(__offset > 0):
raise ValueError("Seek position must be 0 or negative")

self._underlying_stream.close()
printd("Closing underlying stream, END")
new_offset = self._initial_request_metadata.content_length + __offset
self._replace_underlying_stream(new_offset)
self._start_pos_of_underlying_stream = new_offset
return new_offset


def seekable(self):
return True

def tell(self):
return self._current_pos_of_underlying_stream + self._start_pos_of_underlying_stream

def truncate(self, __size = None):
raise UnsupportedOperation("Truncation is not supported on this stream")

def writable(self):
return False

def writelines(self, __lines):
raise UnsupportedOperation("Writing lines is not supported on this stream")

def __del__(self):
self.close()

def printd(s):
if _FILES_MIXIN_DEBUG_ENABLED:
print(s)
Loading
Loading