diff --git a/.codegen/__init__.py.tmpl b/.codegen/__init__.py.tmpl index d54e9dfff..83bdee5e2 100644 --- a/.codegen/__init__.py.tmpl +++ b/.codegen/__init__.py.tmpl @@ -3,6 +3,7 @@ import databricks.sdk.dbutils as dbutils from databricks.sdk.credentials_provider import CredentialsStrategy from databricks.sdk.mixins.files import DbfsExt +from databricks.sdk.mixins.files import FilesExt from databricks.sdk.mixins.compute import ClustersExt from databricks.sdk.mixins.workspace import WorkspaceExt from databricks.sdk.mixins.open_ai_client import ServingEndpointsExt @@ -18,7 +19,7 @@ from typing import Optional "google_credentials" "google_service_account" }} {{- define "api" -}} - {{- $mixins := dict "ClustersAPI" "ClustersExt" "DbfsAPI" "DbfsExt" "WorkspaceAPI" "WorkspaceExt" "ServingEndpointsAPI" "ServingEndpointsExt" -}} + {{- $mixins := dict "ClustersAPI" "ClustersExt" "DbfsAPI" "DbfsExt" "FilesAPI" "FilesExt" "WorkspaceAPI" "WorkspaceExt" "ServingEndpointsAPI" "ServingEndpointsExt" -}} {{- $genApi := concat .PascalName "API" -}} {{- getOrDefault $mixins $genApi $genApi -}} {{- end -}} diff --git a/databricks/sdk/__init__.py b/databricks/sdk/__init__.py index 159946461..5cab39d27 100755 --- a/databricks/sdk/__init__.py +++ b/databricks/sdk/__init__.py @@ -5,7 +5,7 @@ from databricks.sdk import azure from databricks.sdk.credentials_provider import CredentialsStrategy from databricks.sdk.mixins.compute import ClustersExt -from databricks.sdk.mixins.files import DbfsExt +from databricks.sdk.mixins.files import DbfsExt, FilesExt from databricks.sdk.mixins.open_ai_client import ServingEndpointsExt from databricks.sdk.mixins.workspace import WorkspaceExt from databricks.sdk.service.apps import AppsAPI @@ -202,7 +202,7 @@ def __init__(self, self._dbsql_permissions = DbsqlPermissionsAPI(self._api_client) self._experiments = ExperimentsAPI(self._api_client) self._external_locations = ExternalLocationsAPI(self._api_client) - self._files = FilesAPI(self._api_client) + self._files = FilesExt(self._api_client) self._functions = FunctionsAPI(self._api_client) self._genie = GenieAPI(self._api_client) self._git_credentials = GitCredentialsAPI(self._api_client) @@ -408,7 +408,7 @@ def external_locations(self) -> ExternalLocationsAPI: return self._external_locations @property - def files(self) -> FilesAPI: + def files(self) -> FilesExt: """The Files API is a standard HTTP API that allows you to read, write, list, and delete files and directories by referring to their URI.""" return self._files diff --git a/databricks/sdk/mixins/files.py b/databricks/sdk/mixins/files.py index 1e109a1a7..bb8e8fab1 100644 --- a/databricks/sdk/mixins/files.py +++ b/databricks/sdk/mixins/files.py @@ -8,7 +8,9 @@ import sys from abc import ABC, abstractmethod from collections import deque -from io import BytesIO +from datetime import datetime, timezone +from enum import Enum +from io import BytesIO, IOBase, BufferedIOBase, UnsupportedOperation from types import TracebackType from typing import (TYPE_CHECKING, AnyStr, BinaryIO, Generator, Iterable, Iterator, Type, Union) @@ -17,6 +19,11 @@ from .._property import _cached_property from ..errors import NotFound from ..service import files +from ..service._internal import _escape_multi_segment_path_parameter +from ..service.files import DownloadResponse + +_FILES_MIXIN_DEBUG_ENABLED = False +_FILES_MIXIN_ENABLE_UNSUPPORTED_FEATURES = False if TYPE_CHECKING: from _typeshed import Self @@ -636,3 +643,263 @@ def delete(self, path: str, *, recursive=False): if p.is_dir and not recursive: raise IOError('deleting directories requires recursive flag') p.delete(recursive=recursive) + + +class FilesExt(files.FilesAPI): + """Extends the FilesAPI with support for complex multipart upload/download operations & more robust file I/O""" + __doc__ = files.FilesAPI.__doc__ + + class _FileTransferBackend(Enum): + DB_FILES_API = 1 + PRESIGNED_URLS = 2 + + def __init__(self, api_client): + super().__init__(api_client) + + def download(self, file_path: str, *, start_byte_offset: Optional[int] = None, if_unmodified_since_timestamp: Optional[datetime] = None) -> DownloadResponse: + """Download a file. + + Downloads a file of up to 5 GiB. The file contents are the response body. This is a standard HTTP file + download, not a JSON RPC. + + :param file_path: str + The absolute path of the file. + + :returns: :class:`DownloadResponse` + """ + + headers = {'Accept': 'application/octet-stream', } + + if start_byte_offset: + headers['Range'] = f'bytes={start_byte_offset}-' + + if if_unmodified_since_timestamp: + headers['If-Unmodified-Since'] = if_unmodified_since_timestamp.strftime("%a, %d %b %Y %H:%M:%S GMT") + + response_headers = ['content-length', 'content-type', 'last-modified', ] + res = self._api.do('GET', + f'/api/2.0/fs/files{_escape_multi_segment_path_parameter(file_path)}', + headers=headers, + response_headers=response_headers, + raw=True) + + return DownloadResponse.from_dict(res) + + +class PresignedUrl: + """Represents all information needed to execute a presigned URL request""" + + def __init__(self, method: str, url: str, headers: List[Dict[str, str]], + headers_populated_by_client: List[str]): + self.method = method + self.url = url + self.headers_populated_by_client = set(headers_populated_by_client) + self.headers = {h["name"]: h["value"] for h in headers} + + def all_client_headers_populated(self, user_headers: List[str]): + return self.headers_populated_by_client.issubset(user_headers) + + +class MultipartUploadCreatePartUrlsResponse: + """Represents the response of a request for presigned URLs for uploading parts of a file in a multipart upload session.""" + + def __init__(self, upload_part_urls: List[PresignedUrl], next_page_token: str): + self.upload_part_urls = upload_part_urls + self.next_page_token = next_page_token + + +class MultipartUploadCreate: + """Represents the response to an initiated multipart upload session.""" + + def __init__(self, session_token: str, part_size: int): + self.session_token = session_token + self.part_size = part_size + + +class SeekableDownloadBinaryIO(BufferedIOBase): + """Presents a remote filesystem object as a seekable BinaryIO stream """ + # TODO: The initial version will ONLY support resumption; it will NOT support truncation / end points + # TODO: This currently is only handling situations where the underlying stream is closed at the start of a request. It should add: + # - Automatic closure & reopening of a stream that throws an exception during normal operations + # - Throwing an exception if we're unable to open a stream (i.e. non-200 response) + def __init__(self, file_path: str, api: FilesExt): + + # This is actively under development and should not be considered production ready or API stable. + if not _FILES_MIXIN_ENABLE_UNSUPPORTED_FEATURES: + raise NotImplementedError("SeekableDownloadBinaryIO is not yet supported") + + self._file_path: str = file_path + + self._api = api + self._initial_request_metadata: DownloadResponse = self._api.download(self._file_path) + self._dl_session_initiated = datetime.now(timezone.utc) + self._current_pos_of_underlying_stream: int = 0 + self._start_pos_of_underlying_stream: int = 0 + self._underlying_stream: BinaryIO = self._initial_request_metadata.contents + self._overall_file_size: int = self._initial_request_metadata.content_length + self._most_recent_dl_resp = None + self._closed: bool = False + + def _replace_underlying_stream(self, __offset): + """Close the existing underlying stream and open a new one at the specified file offset""" + old_stream = self._underlying_stream + self._underlying_stream = self._api.download(self._file_path, start_byte_offset=__offset, if_unmodified_since_timestamp=self._dl_session_initiated).contents + printd("Closed older stream") + old_stream.close() + printd("Set underlying stream") + + def _underlying_stream_is_open(self): + """Convenience method indicating that the underlying stream is open. TODO: This also assumes that the stream does not auto-close at EOF. Might need to revisit that""" + return self._underlying_stream is not None and not self._underlying_stream.closed + + def _ensure_open_stream(self): + """Calling this will ensure that the underlying stream is open, smoothing over issues like socket timeouts to create the illusion of one indefinitely readable file stream""" + if not self._underlying_stream_is_open(): + self._replace_underlying_stream(self.tell()) + + def detach(self): + raise UnsupportedOperation("Detaching from the buffer is not supported") + + def read(self, __size = -1): + # Read and return up to size bytes. If omitted, None, or Negative, data is read until EOF is reached + # Empty bytes object returned if stream is EOF + self._ensure_open_stream() + out = self._underlying_stream.read(__size) + self._current_pos_of_underlying_stream += len(out) + return out + + def read1(self, __size = -1): + # Read and return up to size bytes, with at most one read() system call + self._ensure_open_stream() + out = self._underlying_stream.read1(__size) + self._current_pos_of_underlying_stream += len(out) + return out + + def readinto(self, __buffer): + # Read up to len(buffer) bytes into buffer and return number of bytes read + self._ensure_open_stream() + out = self._underlying_stream.readinto(__buffer) + self._current_pos_of_underlying_stream += len(out) + return out + + def readinto1(self, __buffer): + # Read up to len(buffer) bytes into buffer with at most one read() system call + self._ensure_open_stream() + out = self._underlying_stream.readinto1(__buffer) + self._current_pos_of_underlying_stream += len(out) + return out + + def write(self, __buffer): + raise UnsupportedOperation("SeekableDownloadBinaryIO is used exclusively for read operations") + + def close(self): + """Close the underlying stream & mark the SeekableBinaryIO stream as closed as well""" + try: + self._underlying_stream.close() + self._closed = True + except: + self._underlying_stream = None + self._closed = True + + def closed(self): + """Reflects whether we permit additional operations on this stream""" + return self._closed + + def fileno(self): + raise UnsupportedOperation("fileno() is not supported on this stream") + + def flush(self): + return + + def isatty(self): + return False + + def readable(self): + return True + + def readline(self, __size = -1): + self._ensure_open_stream() + out = self._underlying_stream.readline(__size) + self._current_pos_of_underlying_stream += len(out) + return out + + def readlines(self, __hint = -1): + self._ensure_open_stream() + out = self._underlying_stream.readlines(__hint) + self._current_pos_of_underlying_stream += len(out) + return out + + def seek(self, __offset, __whence = os.SEEK_SET): + """ + Change the stream position to the given byte offset, which may necessitate closing the existing client connection and opening a new one. + + :param __offset: Change the position to the byte offset, relative to the whence reference point + :param __whence: + - os.SEEK_SET / 0: Start of the file, offset must be 0 or positive + - os.SEEK_CUR / 1: Current position, offset may be pos/neg/0 + - os.SEEK_END / 2: End of the file, offset must be 0 or negative + :return: absolute position of the stream (in the overall file) + """ + + if self._underlying_stream.seekable() and __offset > 0: + return self._start_pos_of_underlying_stream + self._underlying_stream.seek(__offset, __whence) + + if(__whence == os.SEEK_SET): + if(__offset < 0): + raise ValueError("Seek position must be 0 or positive") + + printd("Closing underlying stream, START") + self._underlying_stream.close() + + # TODO: Request new stream starting from byte __offset + printd(f"Setting up new underlying stream, START, {__offset}") + self._replace_underlying_stream(__offset) + self._start_pos_of_underlying_stream = __offset + return __offset + + if(__whence == os.SEEK_CUR): + if(__offset == 0): + return self._underlying_stream.tell() + self._underlying_stream.close() + printd("Closing underlying stream, CUR") + + # TODO: Request new stream starting from byte __offset + new_offset = self._start_pos_of_underlying_stream + __offset + printd(f"Setting up new underlying stream, CUR, {new_offset}") + self._replace_underlying_stream(new_offset) + self._start_pos_of_underlying_stream = new_offset + return new_offset + + if(__whence == os.SEEK_END): + if(__offset > 0): + raise ValueError("Seek position must be 0 or negative") + + self._underlying_stream.close() + printd("Closing underlying stream, END") + new_offset = self._initial_request_metadata.content_length + __offset + self._replace_underlying_stream(new_offset) + self._start_pos_of_underlying_stream = new_offset + return new_offset + + + def seekable(self): + return True + + def tell(self): + return self._current_pos_of_underlying_stream + self._start_pos_of_underlying_stream + + def truncate(self, __size = None): + raise UnsupportedOperation("Truncation is not supported on this stream") + + def writable(self): + return False + + def writelines(self, __lines): + raise UnsupportedOperation("Writing lines is not supported on this stream") + + def __del__(self): + self.close() + +def printd(s): + if _FILES_MIXIN_DEBUG_ENABLED: + print(s) \ No newline at end of file diff --git a/databricks/sdk/service/apps.py b/databricks/sdk/service/apps.py index 52796d0e8..5f413f0be 100755 --- a/databricks/sdk/service/apps.py +++ b/databricks/sdk/service/apps.py @@ -813,29 +813,31 @@ def wait_get_app_active(self, attempt += 1 raise TimeoutError(f'timed out after {timeout}: {status_message}') - def wait_get_app_stopped(self, - name: str, - timeout=timedelta(minutes=20), - callback: Optional[Callable[[App], None]] = None) -> App: + def wait_get_deployment_app_succeeded( + self, + app_name: str, + deployment_id: str, + timeout=timedelta(minutes=20), + callback: Optional[Callable[[AppDeployment], None]] = None) -> AppDeployment: deadline = time.time() + timeout.total_seconds() - target_states = (ComputeState.STOPPED, ) - failure_states = (ComputeState.ERROR, ) + target_states = (AppDeploymentState.SUCCEEDED, ) + failure_states = (AppDeploymentState.FAILED, ) status_message = 'polling...' attempt = 1 while time.time() < deadline: - poll = self.get(name=name) - status = poll.compute_status.state + poll = self.get_deployment(app_name=app_name, deployment_id=deployment_id) + status = poll.status.state status_message = f'current status: {status}' - if poll.compute_status: - status_message = poll.compute_status.message + if poll.status: + status_message = poll.status.message if status in target_states: return poll if callback: callback(poll) if status in failure_states: - msg = f'failed to reach STOPPED, got {status}: {status_message}' + msg = f'failed to reach SUCCEEDED, got {status}: {status_message}' raise OperationFailed(msg) - prefix = f"name={name}" + prefix = f"app_name={app_name}, deployment_id={deployment_id}" sleep = attempt if sleep > 10: # sleep 10s max per attempt @@ -845,31 +847,29 @@ def wait_get_app_stopped(self, attempt += 1 raise TimeoutError(f'timed out after {timeout}: {status_message}') - def wait_get_deployment_app_succeeded( - self, - app_name: str, - deployment_id: str, - timeout=timedelta(minutes=20), - callback: Optional[Callable[[AppDeployment], None]] = None) -> AppDeployment: + def wait_get_app_stopped(self, + name: str, + timeout=timedelta(minutes=20), + callback: Optional[Callable[[App], None]] = None) -> App: deadline = time.time() + timeout.total_seconds() - target_states = (AppDeploymentState.SUCCEEDED, ) - failure_states = (AppDeploymentState.FAILED, ) + target_states = (ComputeState.STOPPED, ) + failure_states = (ComputeState.ERROR, ) status_message = 'polling...' attempt = 1 while time.time() < deadline: - poll = self.get_deployment(app_name=app_name, deployment_id=deployment_id) - status = poll.status.state + poll = self.get(name=name) + status = poll.compute_status.state status_message = f'current status: {status}' - if poll.status: - status_message = poll.status.message + if poll.compute_status: + status_message = poll.compute_status.message if status in target_states: return poll if callback: callback(poll) if status in failure_states: - msg = f'failed to reach SUCCEEDED, got {status}: {status_message}' + msg = f'failed to reach STOPPED, got {status}: {status_message}' raise OperationFailed(msg) - prefix = f"app_name={app_name}, deployment_id={deployment_id}" + prefix = f"name={name}" sleep = attempt if sleep > 10: # sleep 10s max per attempt diff --git a/databricks/sdk/service/compute.py b/databricks/sdk/service/compute.py index 4a77496de..40def5df5 100755 --- a/databricks/sdk/service/compute.py +++ b/databricks/sdk/service/compute.py @@ -7865,20 +7865,19 @@ def wait_command_status_command_execution_cancelled( attempt += 1 raise TimeoutError(f'timed out after {timeout}: {status_message}') - def wait_command_status_command_execution_finished_or_error( + def wait_context_status_command_execution_running( self, cluster_id: str, - command_id: str, context_id: str, timeout=timedelta(minutes=20), - callback: Optional[Callable[[CommandStatusResponse], None]] = None) -> CommandStatusResponse: + callback: Optional[Callable[[ContextStatusResponse], None]] = None) -> ContextStatusResponse: deadline = time.time() + timeout.total_seconds() - target_states = (CommandStatus.FINISHED, CommandStatus.ERROR, ) - failure_states = (CommandStatus.CANCELLED, CommandStatus.CANCELLING, ) + target_states = (ContextStatus.RUNNING, ) + failure_states = (ContextStatus.ERROR, ) status_message = 'polling...' attempt = 1 while time.time() < deadline: - poll = self.command_status(cluster_id=cluster_id, command_id=command_id, context_id=context_id) + poll = self.context_status(cluster_id=cluster_id, context_id=context_id) status = poll.status status_message = f'current status: {status}' if status in target_states: @@ -7886,9 +7885,9 @@ def wait_command_status_command_execution_finished_or_error( if callback: callback(poll) if status in failure_states: - msg = f'failed to reach Finished or Error, got {status}: {status_message}' + msg = f'failed to reach Running, got {status}: {status_message}' raise OperationFailed(msg) - prefix = f"cluster_id={cluster_id}, command_id={command_id}, context_id={context_id}" + prefix = f"cluster_id={cluster_id}, context_id={context_id}" sleep = attempt if sleep > 10: # sleep 10s max per attempt @@ -7898,19 +7897,20 @@ def wait_command_status_command_execution_finished_or_error( attempt += 1 raise TimeoutError(f'timed out after {timeout}: {status_message}') - def wait_context_status_command_execution_running( + def wait_command_status_command_execution_finished_or_error( self, cluster_id: str, + command_id: str, context_id: str, timeout=timedelta(minutes=20), - callback: Optional[Callable[[ContextStatusResponse], None]] = None) -> ContextStatusResponse: + callback: Optional[Callable[[CommandStatusResponse], None]] = None) -> CommandStatusResponse: deadline = time.time() + timeout.total_seconds() - target_states = (ContextStatus.RUNNING, ) - failure_states = (ContextStatus.ERROR, ) + target_states = (CommandStatus.FINISHED, CommandStatus.ERROR, ) + failure_states = (CommandStatus.CANCELLED, CommandStatus.CANCELLING, ) status_message = 'polling...' attempt = 1 while time.time() < deadline: - poll = self.context_status(cluster_id=cluster_id, context_id=context_id) + poll = self.command_status(cluster_id=cluster_id, command_id=command_id, context_id=context_id) status = poll.status status_message = f'current status: {status}' if status in target_states: @@ -7918,9 +7918,9 @@ def wait_context_status_command_execution_running( if callback: callback(poll) if status in failure_states: - msg = f'failed to reach Running, got {status}: {status_message}' + msg = f'failed to reach Finished or Error, got {status}: {status_message}' raise OperationFailed(msg) - prefix = f"cluster_id={cluster_id}, context_id={context_id}" + prefix = f"cluster_id={cluster_id}, command_id={command_id}, context_id={context_id}" sleep = attempt if sleep > 10: # sleep 10s max per attempt diff --git a/databricks/sdk/service/pipelines.py b/databricks/sdk/service/pipelines.py index 9c12f8788..f99201fde 100755 --- a/databricks/sdk/service/pipelines.py +++ b/databricks/sdk/service/pipelines.py @@ -2122,13 +2122,13 @@ class PipelinesAPI: def __init__(self, api_client): self._api = api_client - def wait_get_pipeline_idle( + def wait_get_pipeline_running( self, pipeline_id: str, timeout=timedelta(minutes=20), callback: Optional[Callable[[GetPipelineResponse], None]] = None) -> GetPipelineResponse: deadline = time.time() + timeout.total_seconds() - target_states = (PipelineState.IDLE, ) + target_states = (PipelineState.RUNNING, ) failure_states = (PipelineState.FAILED, ) status_message = 'polling...' attempt = 1 @@ -2141,7 +2141,7 @@ def wait_get_pipeline_idle( if callback: callback(poll) if status in failure_states: - msg = f'failed to reach IDLE, got {status}: {status_message}' + msg = f'failed to reach RUNNING, got {status}: {status_message}' raise OperationFailed(msg) prefix = f"pipeline_id={pipeline_id}" sleep = attempt @@ -2153,13 +2153,13 @@ def wait_get_pipeline_idle( attempt += 1 raise TimeoutError(f'timed out after {timeout}: {status_message}') - def wait_get_pipeline_running( + def wait_get_pipeline_idle( self, pipeline_id: str, timeout=timedelta(minutes=20), callback: Optional[Callable[[GetPipelineResponse], None]] = None) -> GetPipelineResponse: deadline = time.time() + timeout.total_seconds() - target_states = (PipelineState.RUNNING, ) + target_states = (PipelineState.IDLE, ) failure_states = (PipelineState.FAILED, ) status_message = 'polling...' attempt = 1 @@ -2172,7 +2172,7 @@ def wait_get_pipeline_running( if callback: callback(poll) if status in failure_states: - msg = f'failed to reach RUNNING, got {status}: {status_message}' + msg = f'failed to reach IDLE, got {status}: {status_message}' raise OperationFailed(msg) prefix = f"pipeline_id={pipeline_id}" sleep = attempt diff --git a/docs/workspace/files/files.rst b/docs/workspace/files/files.rst index db20b2192..a9b0f5159 100644 --- a/docs/workspace/files/files.rst +++ b/docs/workspace/files/files.rst @@ -2,7 +2,7 @@ ================== .. currentmodule:: databricks.sdk.service.files -.. py:class:: FilesAPI +.. py:class:: FilesExt The Files API is a standard HTTP API that allows you to read, write, list, and delete files and directories by referring to their URI. The API makes working with file content as raw bytes easier and @@ -60,16 +60,16 @@ - .. py:method:: download(file_path: str) -> DownloadResponse + .. py:method:: download(file_path: str [, start_byte_offset: Optional[int], if_unmodified_since_timestamp: Optional[datetime]]) -> DownloadResponse Download a file. - + Downloads a file of up to 5 GiB. The file contents are the response body. This is a standard HTTP file download, not a JSON RPC. - + :param file_path: str The absolute path of the file. - + :returns: :class:`DownloadResponse`