diff --git a/.codegen/_openapi_sha b/.codegen/_openapi_sha index 8622b29c..dfe78790 100644 --- a/.codegen/_openapi_sha +++ b/.codegen/_openapi_sha @@ -1 +1 @@ -a6a317df8327c9b1e5cb59a03a42ffa2aabeef6d \ No newline at end of file +779817ed8d63031f5ea761fbd25ee84f38feec0d \ No newline at end of file diff --git a/.github/workflows/external-message.yml b/.github/workflows/external-message.yml index a2d9dc2e..6771057c 100644 --- a/.github/workflows/external-message.yml +++ b/.github/workflows/external-message.yml @@ -13,7 +13,10 @@ on: jobs: comment-on-pr: - runs-on: ubuntu-latest + runs-on: + group: databricks-deco-testing-runner-group + labels: ubuntu-latest-deco + permissions: pull-requests: write @@ -44,13 +47,13 @@ jobs: gh pr comment ${{ github.event.pull_request.number }} --body \ " If integration tests don't run automatically, an authorized user can run them manually by following the instructions below: - + Trigger: [go/deco-tests-run/sdk-py](https://go/deco-tests-run/sdk-py) Inputs: * PR number: ${{github.event.pull_request.number}} * Commit SHA: \`${{ env.COMMIT_SHA }}\` - + Checks will be approved automatically on success. " diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index 93a6c267..c308cc03 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -6,12 +6,16 @@ on: types: [opened, synchronize] merge_group: - + jobs: check-token: name: Check secrets access - runs-on: ubuntu-latest + + runs-on: + group: databricks-deco-testing-runner-group + labels: ubuntu-latest-deco + environment: "test-trigger-is" outputs: has_token: ${{ steps.set-token-status.outputs.has_token }} @@ -26,14 +30,18 @@ jobs: echo "DECO_WORKFLOW_TRIGGER_APP_ID is set. User has access to secrets." echo "::set-output name=has_token::true" fi - + trigger-tests: name: Trigger Tests - runs-on: ubuntu-latest + + runs-on: + group: databricks-deco-testing-runner-group + labels: ubuntu-latest-deco + needs: check-token if: github.event_name == 'pull_request' && needs.check-token.outputs.has_token == 'true' environment: "test-trigger-is" - + steps: - uses: actions/checkout@v3 @@ -45,7 +53,7 @@ jobs: private-key: ${{ secrets.DECO_WORKFLOW_TRIGGER_PRIVATE_KEY }} owner: ${{ secrets.ORG_NAME }} repositories: ${{secrets.REPO_NAME}} - + - name: Trigger Workflow in Another Repo env: GH_TOKEN: ${{ steps.generate-token.outputs.token }} @@ -53,18 +61,22 @@ jobs: gh workflow run sdk-py-isolated-pr.yml -R ${{ secrets.ORG_NAME }}/${{secrets.REPO_NAME}} \ --ref main \ -f pull_request_number=${{ github.event.pull_request.number }} \ - -f commit_sha=${{ github.event.pull_request.head.sha }} + -f commit_sha=${{ github.event.pull_request.head.sha }} - # Statuses and checks apply to specific commits (by hash). + # Statuses and checks apply to specific commits (by hash). # Enforcement of required checks is done both at the PR level and the merge queue level. - # In case of multiple commits in a single PR, the hash of the squashed commit + # In case of multiple commits in a single PR, the hash of the squashed commit # will not match the one for the latest (approved) commit in the PR. # We auto approve the check for the merge queue for two reasons: # * Queue times out due to duration of tests. # * Avoid running integration tests twice, since it was already run at the tip of the branch before squashing. auto-approve: if: github.event_name == 'merge_group' - runs-on: ubuntu-latest + + runs-on: + group: databricks-deco-testing-runner-group + labels: ubuntu-latest-deco + steps: - name: Mark Check env: @@ -75,4 +87,4 @@ jobs: -H "X-GitHub-Api-Version: 2022-11-28" \ /repos/${{ github.repository }}/statuses/${{ github.sha }} \ -f 'state=success' \ - -f 'context=Integration Tests Check' \ No newline at end of file + -f 'context=Integration Tests Check' diff --git a/.github/workflows/release-test.yml b/.github/workflows/release-test.yml index c3349b75..0e8c4d8e 100644 --- a/.github/workflows/release-test.yml +++ b/.github/workflows/release-test.yml @@ -5,10 +5,15 @@ on: jobs: publish: - runs-on: ubuntu-latest + runs-on: + group: databricks-deco-testing-runner-group + labels: ubuntu-latest-deco + environment: release-test + permissions: id-token: write + steps: - uses: actions/checkout@v3 diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index ecde40e0..32890bde 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -7,11 +7,16 @@ on: jobs: publish: - runs-on: ubuntu-latest + runs-on: + group: databricks-deco-testing-runner-group + labels: ubuntu-latest-deco + environment: release + permissions: contents: write id-token: write + steps: - uses: actions/checkout@v3 diff --git a/databricks/sdk/__init__.py b/databricks/sdk/__init__.py index 068069f0..80fe188b 100755 --- a/databricks/sdk/__init__.py +++ b/databricks/sdk/__init__.py @@ -1,3 +1,5 @@ +# Code generated from OpenAPI specs by Databricks SDK Generator. DO NOT EDIT. + from typing import Optional import databricks.sdk.core as client @@ -5,7 +7,7 @@ from databricks.sdk import azure from databricks.sdk.credentials_provider import CredentialsStrategy from databricks.sdk.mixins.compute import ClustersExt -from databricks.sdk.mixins.files import DbfsExt +from databricks.sdk.mixins.files import DbfsExt, FilesExt from databricks.sdk.mixins.jobs import JobsExt from databricks.sdk.mixins.open_ai_client import ServingEndpointsExt from databricks.sdk.mixins.workspace import WorkspaceExt @@ -114,6 +116,13 @@ def _make_dbutils(config: client.Config): return runtime_dbutils +def _make_files_client(apiClient: client.ApiClient, config: client.Config): + if config.enable_experimental_files_api_client: + return FilesExt(apiClient, config) + else: + return FilesAPI(apiClient) + + class WorkspaceClient: """ The WorkspaceClient is a client for the workspace-level Databricks REST API. @@ -203,7 +212,7 @@ def __init__(self, self._dbsql_permissions = DbsqlPermissionsAPI(self._api_client) self._experiments = ExperimentsAPI(self._api_client) self._external_locations = ExternalLocationsAPI(self._api_client) - self._files = FilesAPI(self._api_client) + self._files = _make_files_client(self._api_client, self._config) self._functions = FunctionsAPI(self._api_client) self._genie = GenieAPI(self._api_client) self._git_credentials = GitCredentialsAPI(self._api_client) diff --git a/databricks/sdk/_base_client.py b/databricks/sdk/_base_client.py index ed85dc47..e61dd39c 100644 --- a/databricks/sdk/_base_client.py +++ b/databricks/sdk/_base_client.py @@ -1,6 +1,7 @@ import io import logging import urllib.parse +from abc import ABC, abstractmethod from datetime import timedelta from types import TracebackType from typing import (Any, BinaryIO, Callable, Dict, Iterable, Iterator, List, @@ -285,8 +286,20 @@ def _record_request_log(self, response: requests.Response, raw: bool = False) -> logger.debug(RoundTrip(response, self._debug_headers, self._debug_truncate_bytes, raw).generate()) +class _RawResponse(ABC): + + @abstractmethod + # follows Response signature: https://github.com/psf/requests/blob/main/src/requests/models.py#L799 + def iter_content(self, chunk_size: int = 1, decode_unicode: bool = False): + pass + + @abstractmethod + def close(self): + pass + + class _StreamingResponse(BinaryIO): - _response: requests.Response + _response: _RawResponse _buffer: bytes _content: Union[Iterator[bytes], None] _chunk_size: Union[int, None] @@ -298,7 +311,7 @@ def fileno(self) -> int: def flush(self) -> int: pass - def __init__(self, response: requests.Response, chunk_size: Union[int, None] = None): + def __init__(self, response: _RawResponse, chunk_size: Union[int, None] = None): self._response = response self._buffer = b'' self._content = None @@ -308,7 +321,7 @@ def _open(self) -> None: if self._closed: raise ValueError("I/O operation on closed file") if not self._content: - self._content = self._response.iter_content(chunk_size=self._chunk_size) + self._content = self._response.iter_content(chunk_size=self._chunk_size, decode_unicode=False) def __enter__(self) -> BinaryIO: self._open() diff --git a/databricks/sdk/config.py b/databricks/sdk/config.py index 387fa65c..490c6ba4 100644 --- a/databricks/sdk/config.py +++ b/databricks/sdk/config.py @@ -92,6 +92,11 @@ class Config: max_connections_per_pool: int = ConfigAttribute() databricks_environment: Optional[DatabricksEnvironment] = None + enable_experimental_files_api_client: bool = ConfigAttribute( + env='DATABRICKS_ENABLE_EXPERIMENTAL_FILES_API_CLIENT') + files_api_client_download_max_total_recovers = None + files_api_client_download_max_total_recovers_without_progressing = 1 + def __init__( self, *, diff --git a/databricks/sdk/data_plane.py b/databricks/sdk/data_plane.py index 6f6ddf80..5ad9b79a 100644 --- a/databricks/sdk/data_plane.py +++ b/databricks/sdk/data_plane.py @@ -3,7 +3,6 @@ from typing import Callable, List from databricks.sdk.oauth import Token -from databricks.sdk.service.oauth2 import DataPlaneInfo @dataclass @@ -19,6 +18,7 @@ class DataPlaneDetails: class DataPlaneService: """Helper class to fetch and manage DataPlane details.""" + from .service.serving import DataPlaneInfo def __init__(self): self._data_plane_info = {} diff --git a/databricks/sdk/mixins/files.py b/databricks/sdk/mixins/files.py index 1e109a1a..678b4b63 100644 --- a/databricks/sdk/mixins/files.py +++ b/databricks/sdk/mixins/files.py @@ -1,6 +1,7 @@ from __future__ import annotations import base64 +import logging import os import pathlib import platform @@ -8,19 +9,27 @@ import sys from abc import ABC, abstractmethod from collections import deque +from collections.abc import Iterator from io import BytesIO from types import TracebackType from typing import (TYPE_CHECKING, AnyStr, BinaryIO, Generator, Iterable, - Iterator, Type, Union) + Optional, Type, Union) from urllib import parse +from requests import RequestException + +from .._base_client import _RawResponse, _StreamingResponse from .._property import _cached_property from ..errors import NotFound from ..service import files +from ..service._internal import _escape_multi_segment_path_parameter +from ..service.files import DownloadResponse if TYPE_CHECKING: from _typeshed import Self +_LOG = logging.getLogger(__name__) + class _DbfsIO(BinaryIO): MAX_CHUNK_SIZE = 1024 * 1024 @@ -636,3 +645,177 @@ def delete(self, path: str, *, recursive=False): if p.is_dir and not recursive: raise IOError('deleting directories requires recursive flag') p.delete(recursive=recursive) + + +class FilesExt(files.FilesAPI): + __doc__ = files.FilesAPI.__doc__ + + def __init__(self, api_client, config: Config): + super().__init__(api_client) + self._config = config.copy() + + def download(self, file_path: str) -> DownloadResponse: + """Download a file. + + Downloads a file of any size. The file contents are the response body. + This is a standard HTTP file download, not a JSON RPC. + + It is strongly recommended, for fault tolerance reasons, + to iteratively consume from the stream with a maximum read(size) + defined instead of using indefinite-size reads. + + :param file_path: str + The remote path of the file, e.g. /Volumes/path/to/your/file + + :returns: :class:`DownloadResponse` + """ + + initial_response: DownloadResponse = self._download_raw_stream(file_path=file_path, + start_byte_offset=0, + if_unmodified_since_timestamp=None) + + wrapped_response = self._wrap_stream(file_path, initial_response) + initial_response.contents._response = wrapped_response + return initial_response + + def _download_raw_stream(self, + file_path: str, + start_byte_offset: int, + if_unmodified_since_timestamp: Optional[str] = None) -> DownloadResponse: + headers = {'Accept': 'application/octet-stream', } + + if start_byte_offset and not if_unmodified_since_timestamp: + raise Exception("if_unmodified_since_timestamp is required if start_byte_offset is specified") + + if start_byte_offset: + headers['Range'] = f'bytes={start_byte_offset}-' + + if if_unmodified_since_timestamp: + headers['If-Unmodified-Since'] = if_unmodified_since_timestamp + + response_headers = ['content-length', 'content-type', 'last-modified', ] + res = self._api.do('GET', + f'/api/2.0/fs/files{_escape_multi_segment_path_parameter(file_path)}', + headers=headers, + response_headers=response_headers, + raw=True) + + result = DownloadResponse.from_dict(res) + if not isinstance(result.contents, _StreamingResponse): + raise Exception("Internal error: response contents is of unexpected type: " + + type(result.contents).__name__) + + return result + + def _wrap_stream(self, file_path: str, downloadResponse: DownloadResponse): + underlying_response = _ResilientIterator._extract_raw_response(downloadResponse) + return _ResilientResponse(self, + file_path, + downloadResponse.last_modified, + offset=0, + underlying_response=underlying_response) + + +class _ResilientResponse(_RawResponse): + + def __init__(self, api: FilesExt, file_path: str, file_last_modified: str, offset: int, + underlying_response: _RawResponse): + self.api = api + self.file_path = file_path + self.underlying_response = underlying_response + self.offset = offset + self.file_last_modified = file_last_modified + + def iter_content(self, chunk_size=1, decode_unicode=False): + if decode_unicode: + raise ValueError('Decode unicode is not supported') + + iterator = self.underlying_response.iter_content(chunk_size=chunk_size, decode_unicode=False) + self.iterator = _ResilientIterator(iterator, self.file_path, self.file_last_modified, self.offset, + self.api, chunk_size) + return self.iterator + + def close(self): + self.iterator.close() + + +class _ResilientIterator(Iterator): + # This class tracks current offset (returned to the client code) + # and recovers from failures by requesting download from the current offset. + + @staticmethod + def _extract_raw_response(download_response: DownloadResponse) -> _RawResponse: + streaming_response: _StreamingResponse = download_response.contents # this is an instance of _StreamingResponse + return streaming_response._response + + def __init__(self, underlying_iterator, file_path: str, file_last_modified: str, offset: int, + api: FilesExt, chunk_size: int): + self._underlying_iterator = underlying_iterator + self._api = api + self._file_path = file_path + + # Absolute current offset (0-based), i.e. number of bytes from the beginning of the file + # that were so far returned to the caller code. + self._offset = offset + self._file_last_modified = file_last_modified + self._chunk_size = chunk_size + + self._total_recovers_count: int = 0 + self._recovers_without_progressing_count: int = 0 + self._closed: bool = False + + def _should_recover(self) -> bool: + if self._total_recovers_count == self._api._config.files_api_client_download_max_total_recovers: + _LOG.debug("Total recovers limit exceeded") + return False + if self._api._config.files_api_client_download_max_total_recovers_without_progressing is not None and self._recovers_without_progressing_count >= self._api._config.files_api_client_download_max_total_recovers_without_progressing: + _LOG.debug("No progression recovers limit exceeded") + return False + return True + + def _recover(self) -> bool: + if not self._should_recover(): + return False # recover suppressed, rethrow original exception + + self._total_recovers_count += 1 + self._recovers_without_progressing_count += 1 + + try: + self._underlying_iterator.close() + + _LOG.debug("Trying to recover from offset " + str(self._offset)) + + # following call includes all the required network retries + downloadResponse = self._api._download_raw_stream(self._file_path, self._offset, + self._file_last_modified) + underlying_response = _ResilientIterator._extract_raw_response(downloadResponse) + self._underlying_iterator = underlying_response.iter_content(chunk_size=self._chunk_size, + decode_unicode=False) + _LOG.debug("Recover succeeded") + return True + except: + return False # recover failed, rethrow original exception + + def __next__(self): + if self._closed: + # following _BaseClient + raise ValueError("I/O operation on closed file") + + while True: + try: + returned_bytes = next(self._underlying_iterator) + self._offset += len(returned_bytes) + self._recovers_without_progressing_count = 0 + return returned_bytes + + except StopIteration: + raise + + # https://requests.readthedocs.io/en/latest/user/quickstart/#errors-and-exceptions + except RequestException: + if not self._recover(): + raise + + def close(self): + self._underlying_iterator.close() + self._closed = True diff --git a/databricks/sdk/service/apps.py b/databricks/sdk/service/apps.py index eee49a21..37af1011 100755 --- a/databricks/sdk/service/apps.py +++ b/databricks/sdk/service/apps.py @@ -967,12 +967,14 @@ def wait_get_app_stopped(self, attempt += 1 raise TimeoutError(f'timed out after {timeout}: {status_message}') - def create(self, *, app: Optional[App] = None) -> Wait[App]: + def create(self, *, app: Optional[App] = None, no_compute: Optional[bool] = None) -> Wait[App]: """Create an app. Creates a new app. :param app: :class:`App` (optional) + :param no_compute: bool (optional) + If true, the app will not be started after creation. :returns: Long-running operation waiter for :class:`App`. @@ -981,11 +983,15 @@ def create(self, *, app: Optional[App] = None) -> Wait[App]: body = app.as_dict() headers = {'Accept': 'application/json', 'Content-Type': 'application/json', } - op_response = self._api.do('POST', '/api/2.0/apps', body=body, headers=headers) + op_response = self._api.do('POST', '/api/2.0/apps', query=query, body=body, headers=headers) return Wait(self.wait_get_app_active, response=App.from_dict(op_response), name=op_response['name']) - def create_and_wait(self, *, app: Optional[App] = None, timeout=timedelta(minutes=20)) -> App: - return self.create(app=app).result(timeout=timeout) + def create_and_wait(self, + *, + app: Optional[App] = None, + no_compute: Optional[bool] = None, + timeout=timedelta(minutes=20)) -> App: + return self.create(app=app, no_compute=no_compute).result(timeout=timeout) def delete(self, name: str) -> App: """Delete an app. diff --git a/databricks/sdk/service/catalog.py b/databricks/sdk/service/catalog.py index f1b54933..c56acce3 100755 --- a/databricks/sdk/service/catalog.py +++ b/databricks/sdk/service/catalog.py @@ -5810,6 +5810,7 @@ def from_dict(cls, d: Dict[str, any]) -> ProvisioningInfo: class ProvisioningInfoState(Enum): ACTIVE = 'ACTIVE' + DEGRADED = 'DEGRADED' DELETING = 'DELETING' FAILED = 'FAILED' PROVISIONING = 'PROVISIONING' diff --git a/databricks/sdk/service/jobs.py b/databricks/sdk/service/jobs.py index 105c7cd2..c5fdb839 100755 --- a/databricks/sdk/service/jobs.py +++ b/databricks/sdk/service/jobs.py @@ -35,6 +35,11 @@ class BaseJob: Jobs UI in the job details page and Jobs API using `budget_policy_id` 3. Inferred default based on accessible budget policies of the run_as identity on job creation or modification.""" + has_more: Optional[bool] = None + """Indicates if the job has more sub-resources (`tasks`, `job_clusters`) that are not shown. They + can be accessed via :method:jobs/get endpoint. It is only relevant for API 2.2 :method:jobs/list + requests with `expand_tasks=true`.""" + job_id: Optional[int] = None """The canonical identifier for this job.""" @@ -49,6 +54,7 @@ def as_dict(self) -> dict: if self.creator_user_name is not None: body['creator_user_name'] = self.creator_user_name if self.effective_budget_policy_id is not None: body['effective_budget_policy_id'] = self.effective_budget_policy_id + if self.has_more is not None: body['has_more'] = self.has_more if self.job_id is not None: body['job_id'] = self.job_id if self.settings: body['settings'] = self.settings.as_dict() return body @@ -60,6 +66,7 @@ def as_shallow_dict(self) -> dict: if self.creator_user_name is not None: body['creator_user_name'] = self.creator_user_name if self.effective_budget_policy_id is not None: body['effective_budget_policy_id'] = self.effective_budget_policy_id + if self.has_more is not None: body['has_more'] = self.has_more if self.job_id is not None: body['job_id'] = self.job_id if self.settings: body['settings'] = self.settings return body @@ -70,6 +77,7 @@ def from_dict(cls, d: Dict[str, any]) -> BaseJob: return cls(created_time=d.get('created_time', None), creator_user_name=d.get('creator_user_name', None), effective_budget_policy_id=d.get('effective_budget_policy_id', None), + has_more=d.get('has_more', None), job_id=d.get('job_id', None), settings=_from_dict(d, 'settings', JobSettings)) @@ -124,10 +132,16 @@ class BaseRun: Note: dbt and SQL File tasks support only version-controlled sources. If dbt or SQL File tasks are used, `git_source` must be defined on the job.""" + has_more: Optional[bool] = None + """Indicates if the run has more sub-resources (`tasks`, `job_clusters`) that are not shown. They + can be accessed via :method:jobs/getrun endpoint. It is only relevant for API 2.2 + :method:jobs/listruns requests with `expand_tasks=true`.""" + job_clusters: Optional[List[JobCluster]] = None """A list of job cluster specifications that can be shared and reused by tasks of this job. Libraries cannot be declared in a shared job cluster. You must declare dependent libraries in - task settings.""" + task settings. If more than 100 job clusters are available, you can paginate through them using + :method:jobs/getrun.""" job_id: Optional[int] = None """The canonical identifier of the job that contains this run.""" @@ -198,7 +212,9 @@ class BaseRun: tasks: Optional[List[RunTask]] = None """The list of tasks performed by the run. Each task has its own `run_id` which you can use to call - `JobsGetOutput` to retrieve the run resutls.""" + `JobsGetOutput` to retrieve the run resutls. If more than 100 tasks are available, you can + paginate through them using :method:jobs/getrun. Use the `next_page_token` field at the object + root to determine if more results are available.""" trigger: Optional[TriggerType] = None """The type of trigger that fired this run. @@ -227,6 +243,7 @@ def as_dict(self) -> dict: if self.end_time is not None: body['end_time'] = self.end_time if self.execution_duration is not None: body['execution_duration'] = self.execution_duration if self.git_source: body['git_source'] = self.git_source.as_dict() + if self.has_more is not None: body['has_more'] = self.has_more if self.job_clusters: body['job_clusters'] = [v.as_dict() for v in self.job_clusters] if self.job_id is not None: body['job_id'] = self.job_id if self.job_parameters: body['job_parameters'] = [v.as_dict() for v in self.job_parameters] @@ -264,6 +281,7 @@ def as_shallow_dict(self) -> dict: if self.end_time is not None: body['end_time'] = self.end_time if self.execution_duration is not None: body['execution_duration'] = self.execution_duration if self.git_source: body['git_source'] = self.git_source + if self.has_more is not None: body['has_more'] = self.has_more if self.job_clusters: body['job_clusters'] = self.job_clusters if self.job_id is not None: body['job_id'] = self.job_id if self.job_parameters: body['job_parameters'] = self.job_parameters @@ -301,6 +319,7 @@ def from_dict(cls, d: Dict[str, any]) -> BaseRun: end_time=d.get('end_time', None), execution_duration=d.get('execution_duration', None), git_source=_from_dict(d, 'git_source', GitSource), + has_more=d.get('has_more', None), job_clusters=_repeated_dict(d, 'job_clusters', JobCluster), job_id=d.get('job_id', None), job_parameters=_repeated_dict(d, 'job_parameters', JobParameter), @@ -754,7 +773,8 @@ class CreateJob: job_clusters: Optional[List[JobCluster]] = None """A list of job cluster specifications that can be shared and reused by tasks of this job. Libraries cannot be declared in a shared job cluster. You must declare dependent libraries in - task settings.""" + task settings. If more than 100 job clusters are available, you can paginate through them using + :method:jobs/get.""" max_concurrent_runs: Optional[int] = None """An optional maximum allowed number of concurrent runs of the job. Set this value if you want to @@ -795,7 +815,9 @@ class CreateJob: be added to the job.""" tasks: Optional[List[Task]] = None - """A list of task specifications to be executed by this job.""" + """A list of task specifications to be executed by this job. If more than 100 tasks are available, + you can paginate through them using :method:jobs/get. Use the `next_page_token` field at the + object root to determine if more results are available.""" timeout_seconds: Optional[int] = None """An optional timeout applied to each run of this job. A value of `0` means no timeout.""" @@ -1680,9 +1702,17 @@ class Job: Jobs UI in the job details page and Jobs API using `budget_policy_id` 3. Inferred default based on accessible budget policies of the run_as identity on job creation or modification.""" + has_more: Optional[bool] = None + """Indicates if the job has more sub-resources (`tasks`, `job_clusters`) that are not shown. They + can be accessed via :method:jobs/get endpoint. It is only relevant for API 2.2 :method:jobs/list + requests with `expand_tasks=true`.""" + job_id: Optional[int] = None """The canonical identifier for this job.""" + next_page_token: Optional[str] = None + """A token that can be used to list the next page of sub-resources.""" + run_as_user_name: Optional[str] = None """The email of an active workspace user or the application ID of a service principal that the job runs as. This value can be changed by setting the `run_as` field when creating or updating a @@ -1703,7 +1733,9 @@ def as_dict(self) -> dict: if self.creator_user_name is not None: body['creator_user_name'] = self.creator_user_name if self.effective_budget_policy_id is not None: body['effective_budget_policy_id'] = self.effective_budget_policy_id + if self.has_more is not None: body['has_more'] = self.has_more if self.job_id is not None: body['job_id'] = self.job_id + if self.next_page_token is not None: body['next_page_token'] = self.next_page_token if self.run_as_user_name is not None: body['run_as_user_name'] = self.run_as_user_name if self.settings: body['settings'] = self.settings.as_dict() return body @@ -1715,7 +1747,9 @@ def as_shallow_dict(self) -> dict: if self.creator_user_name is not None: body['creator_user_name'] = self.creator_user_name if self.effective_budget_policy_id is not None: body['effective_budget_policy_id'] = self.effective_budget_policy_id + if self.has_more is not None: body['has_more'] = self.has_more if self.job_id is not None: body['job_id'] = self.job_id + if self.next_page_token is not None: body['next_page_token'] = self.next_page_token if self.run_as_user_name is not None: body['run_as_user_name'] = self.run_as_user_name if self.settings: body['settings'] = self.settings return body @@ -1726,7 +1760,9 @@ def from_dict(cls, d: Dict[str, any]) -> Job: return cls(created_time=d.get('created_time', None), creator_user_name=d.get('creator_user_name', None), effective_budget_policy_id=d.get('effective_budget_policy_id', None), + has_more=d.get('has_more', None), job_id=d.get('job_id', None), + next_page_token=d.get('next_page_token', None), run_as_user_name=d.get('run_as_user_name', None), settings=_from_dict(d, 'settings', JobSettings)) @@ -2366,7 +2402,8 @@ class JobSettings: job_clusters: Optional[List[JobCluster]] = None """A list of job cluster specifications that can be shared and reused by tasks of this job. Libraries cannot be declared in a shared job cluster. You must declare dependent libraries in - task settings.""" + task settings. If more than 100 job clusters are available, you can paginate through them using + :method:jobs/get.""" max_concurrent_runs: Optional[int] = None """An optional maximum allowed number of concurrent runs of the job. Set this value if you want to @@ -2407,7 +2444,9 @@ class JobSettings: be added to the job.""" tasks: Optional[List[Task]] = None - """A list of task specifications to be executed by this job.""" + """A list of task specifications to be executed by this job. If more than 100 tasks are available, + you can paginate through them using :method:jobs/get. Use the `next_page_token` field at the + object root to determine if more results are available.""" timeout_seconds: Optional[int] = None """An optional timeout applied to each run of this job. A value of `0` means no timeout.""" @@ -3663,13 +3702,19 @@ class Run: Note: dbt and SQL File tasks support only version-controlled sources. If dbt or SQL File tasks are used, `git_source` must be defined on the job.""" + has_more: Optional[bool] = None + """Indicates if the run has more sub-resources (`tasks`, `job_clusters`) that are not shown. They + can be accessed via :method:jobs/getrun endpoint. It is only relevant for API 2.2 + :method:jobs/listruns requests with `expand_tasks=true`.""" + iterations: Optional[List[RunTask]] = None """Only populated by for-each iterations. The parent for-each task is located in tasks array.""" job_clusters: Optional[List[JobCluster]] = None """A list of job cluster specifications that can be shared and reused by tasks of this job. Libraries cannot be declared in a shared job cluster. You must declare dependent libraries in - task settings.""" + task settings. If more than 100 job clusters are available, you can paginate through them using + :method:jobs/getrun.""" job_id: Optional[int] = None """The canonical identifier of the job that contains this run.""" @@ -3743,7 +3788,9 @@ class Run: tasks: Optional[List[RunTask]] = None """The list of tasks performed by the run. Each task has its own `run_id` which you can use to call - `JobsGetOutput` to retrieve the run resutls.""" + `JobsGetOutput` to retrieve the run resutls. If more than 100 tasks are available, you can + paginate through them using :method:jobs/getrun. Use the `next_page_token` field at the object + root to determine if more results are available.""" trigger: Optional[TriggerType] = None """The type of trigger that fired this run. @@ -3772,6 +3819,7 @@ def as_dict(self) -> dict: if self.end_time is not None: body['end_time'] = self.end_time if self.execution_duration is not None: body['execution_duration'] = self.execution_duration if self.git_source: body['git_source'] = self.git_source.as_dict() + if self.has_more is not None: body['has_more'] = self.has_more if self.iterations: body['iterations'] = [v.as_dict() for v in self.iterations] if self.job_clusters: body['job_clusters'] = [v.as_dict() for v in self.job_clusters] if self.job_id is not None: body['job_id'] = self.job_id @@ -3811,6 +3859,7 @@ def as_shallow_dict(self) -> dict: if self.end_time is not None: body['end_time'] = self.end_time if self.execution_duration is not None: body['execution_duration'] = self.execution_duration if self.git_source: body['git_source'] = self.git_source + if self.has_more is not None: body['has_more'] = self.has_more if self.iterations: body['iterations'] = self.iterations if self.job_clusters: body['job_clusters'] = self.job_clusters if self.job_id is not None: body['job_id'] = self.job_id @@ -3850,6 +3899,7 @@ def from_dict(cls, d: Dict[str, any]) -> Run: end_time=d.get('end_time', None), execution_duration=d.get('execution_duration', None), git_source=_from_dict(d, 'git_source', GitSource), + has_more=d.get('has_more', None), iterations=_repeated_dict(d, 'iterations', RunTask), job_clusters=_repeated_dict(d, 'job_clusters', JobCluster), job_id=d.get('job_id', None), @@ -7066,6 +7116,7 @@ def create(self, :param job_clusters: List[:class:`JobCluster`] (optional) A list of job cluster specifications that can be shared and reused by tasks of this job. Libraries cannot be declared in a shared job cluster. You must declare dependent libraries in task settings. + If more than 100 job clusters are available, you can paginate through them using :method:jobs/get. :param max_concurrent_runs: int (optional) An optional maximum allowed number of concurrent runs of the job. Set this value if you want to be able to execute multiple runs of the same job concurrently. This is useful for example if you @@ -7097,7 +7148,9 @@ def create(self, clusters, and are subject to the same limitations as cluster tags. A maximum of 25 tags can be added to the job. :param tasks: List[:class:`Task`] (optional) - A list of task specifications to be executed by this job. + A list of task specifications to be executed by this job. If more than 100 tasks are available, you + can paginate through them using :method:jobs/get. Use the `next_page_token` field at the object root + to determine if more results are available. :param timeout_seconds: int (optional) An optional timeout applied to each run of this job. A value of `0` means no timeout. :param trigger: :class:`TriggerSettings` (optional) @@ -7193,19 +7246,28 @@ def export_run(self, run_id: int, *, views_to_export: Optional[ViewsToExport] = res = self._api.do('GET', '/api/2.1/jobs/runs/export', query=query, headers=headers) return ExportRunOutput.from_dict(res) - def get(self, job_id: int) -> Job: + def get(self, job_id: int, *, page_token: Optional[str] = None) -> Job: """Get a single job. Retrieves the details for a single job. + In Jobs API 2.2, requests for a single job support pagination of `tasks` and `job_clusters` when + either exceeds 100 elements. Use the `next_page_token` field to check for more results and pass its + value as the `page_token` in subsequent requests. Arrays with fewer than 100 elements in a page will + be empty on later pages. + :param job_id: int The canonical identifier of the job to retrieve information about. This field is required. + :param page_token: str (optional) + Use `next_page_token` returned from the previous GetJob to request the next page of the job's + sub-resources. :returns: :class:`Job` """ query = {} if job_id is not None: query['job_id'] = job_id + if page_token is not None: query['page_token'] = page_token headers = {'Accept': 'application/json', } res = self._api.do('GET', '/api/2.1/jobs/get', query=query, headers=headers) @@ -7251,7 +7313,12 @@ def get_run(self, page_token: Optional[str] = None) -> Run: """Get a single job run. - Retrieve the metadata of a run. + Retrieves the metadata of a run. + + In Jobs API 2.2, requests for a single job run support pagination of `tasks` and `job_clusters` when + either exceeds 100 elements. Use the `next_page_token` field to check for more results and pass its + value as the `page_token` in subsequent requests. Arrays with fewer than 100 elements in a page will + be empty on later pages. :param run_id: int The canonical identifier of the run for which to retrieve the metadata. This field is required. @@ -7260,8 +7327,8 @@ def get_run(self, :param include_resolved_values: bool (optional) Whether to include resolved parameter values in the response. :param page_token: str (optional) - To list the next page of job tasks, set this field to the value of the `next_page_token` returned in - the GetJob response. + Use `next_page_token` returned from the previous GetRun to request the next page of the run's + sub-resources. :returns: :class:`Run` """ @@ -7313,7 +7380,8 @@ def list(self, Retrieves a list of jobs. :param expand_tasks: bool (optional) - Whether to include task and cluster details in the response. + Whether to include task and cluster details in the response. Note that in API 2.2, only the first + 100 elements will be shown. Use :method:jobs/get to paginate through all tasks and clusters. :param limit: int (optional) The number of jobs to return. This value must be greater than 0 and less or equal to 100. The default value is 20. @@ -7370,7 +7438,8 @@ def list_runs(self, If completed_only is `true`, only completed runs are included in the results; otherwise, lists both active and completed runs. This field cannot be `true` when active_only is `true`. :param expand_tasks: bool (optional) - Whether to include task and cluster details in the response. + Whether to include task and cluster details in the response. Note that in API 2.2, only the first + 100 elements will be shown. Use :method:jobs/getrun to paginate through all tasks and clusters. :param job_id: int (optional) The job for which to list runs. If omitted, the Jobs service lists runs from all jobs. :param limit: int (optional) diff --git a/databricks/sdk/service/oauth2.py b/databricks/sdk/service/oauth2.py index f7df5a25..1aac8bc1 100755 --- a/databricks/sdk/service/oauth2.py +++ b/databricks/sdk/service/oauth2.py @@ -202,35 +202,6 @@ def from_dict(cls, d: Dict[str, any]) -> CreateServicePrincipalSecretResponse: update_time=d.get('update_time', None)) -@dataclass -class DataPlaneInfo: - authorization_details: Optional[str] = None - """Authorization details as a string.""" - - endpoint_url: Optional[str] = None - """The URL of the endpoint for this operation in the dataplane.""" - - def as_dict(self) -> dict: - """Serializes the DataPlaneInfo into a dictionary suitable for use as a JSON request body.""" - body = {} - if self.authorization_details is not None: body['authorization_details'] = self.authorization_details - if self.endpoint_url is not None: body['endpoint_url'] = self.endpoint_url - return body - - def as_shallow_dict(self) -> dict: - """Serializes the DataPlaneInfo into a shallow dictionary of its immediate attributes.""" - body = {} - if self.authorization_details is not None: body['authorization_details'] = self.authorization_details - if self.endpoint_url is not None: body['endpoint_url'] = self.endpoint_url - return body - - @classmethod - def from_dict(cls, d: Dict[str, any]) -> DataPlaneInfo: - """Deserializes the DataPlaneInfo from a dictionary.""" - return cls(authorization_details=d.get('authorization_details', None), - endpoint_url=d.get('endpoint_url', None)) - - @dataclass class DeleteCustomAppIntegrationOutput: @@ -297,8 +268,13 @@ class FederationPolicy: """Description of the federation policy.""" name: Optional[str] = None - """Name of the federation policy. The name must contain only lowercase alphanumeric characters, - numbers, and hyphens. It must be unique within the account.""" + """Resource name for the federation policy. Example values include + `accounts//federationPolicies/my-federation-policy` for Account Federation Policies, + and + `accounts//servicePrincipals//federationPolicies/my-federation-policy` + for Service Principal Federation Policies. Typically an output parameter, which does not need to + be specified in create or update requests. If specified in a request, must match the value in + the request URL.""" oidc_policy: Optional[OidcFederationPolicy] = None """Specifies the policy to use for validating OIDC claims in your federated tokens.""" @@ -961,7 +937,8 @@ def create(self, :param policy: :class:`FederationPolicy` (optional) :param policy_id: str (optional) - The identifier for the federation policy. If unspecified, the id will be assigned by Databricks. + The identifier for the federation policy. The identifier must contain only lowercase alphanumeric + characters, numbers, hyphens, and slashes. If unspecified, the id will be assigned by Databricks. :returns: :class:`FederationPolicy` """ @@ -979,6 +956,7 @@ def delete(self, policy_id: str): """Delete account federation policy. :param policy_id: str + The identifier for the federation policy. """ @@ -993,6 +971,7 @@ def get(self, policy_id: str) -> FederationPolicy: """Get account federation policy. :param policy_id: str + The identifier for the federation policy. :returns: :class:`FederationPolicy` """ @@ -1035,17 +1014,20 @@ def list(self, def update(self, policy_id: str, - update_mask: str, *, - policy: Optional[FederationPolicy] = None) -> FederationPolicy: + policy: Optional[FederationPolicy] = None, + update_mask: Optional[str] = None) -> FederationPolicy: """Update account federation policy. :param policy_id: str - :param update_mask: str - Field mask is required to be passed into the PATCH request. Field mask specifies which fields of the - setting payload will be updated. The field mask needs to be supplied as single string. To specify - multiple fields in the field mask, use comma as the separator (no space). + The identifier for the federation policy. :param policy: :class:`FederationPolicy` (optional) + :param update_mask: str (optional) + The field mask specifies which fields of the policy to update. To specify multiple fields in the + field mask, use comma as the separator (no space). The special value '*' indicates that all fields + should be updated (full replacement). If unspecified, all fields that are set in the policy provided + in the update request will overwrite the corresponding fields in the existing policy. Example value: + 'description,oidc_policy.audiences'. :returns: :class:`FederationPolicy` """ @@ -1433,7 +1415,8 @@ def create(self, The service principal id for the federation policy. :param policy: :class:`FederationPolicy` (optional) :param policy_id: str (optional) - The identifier for the federation policy. If unspecified, the id will be assigned by Databricks. + The identifier for the federation policy. The identifier must contain only lowercase alphanumeric + characters, numbers, hyphens, and slashes. If unspecified, the id will be assigned by Databricks. :returns: :class:`FederationPolicy` """ @@ -1454,6 +1437,7 @@ def delete(self, service_principal_id: int, policy_id: str): :param service_principal_id: int The service principal id for the federation policy. :param policy_id: str + The identifier for the federation policy. """ @@ -1471,6 +1455,7 @@ def get(self, service_principal_id: int, policy_id: str) -> FederationPolicy: :param service_principal_id: int The service principal id for the federation policy. :param policy_id: str + The identifier for the federation policy. :returns: :class:`FederationPolicy` """ @@ -1519,19 +1504,22 @@ def list(self, def update(self, service_principal_id: int, policy_id: str, - update_mask: str, *, - policy: Optional[FederationPolicy] = None) -> FederationPolicy: + policy: Optional[FederationPolicy] = None, + update_mask: Optional[str] = None) -> FederationPolicy: """Update service principal federation policy. :param service_principal_id: int The service principal id for the federation policy. :param policy_id: str - :param update_mask: str - Field mask is required to be passed into the PATCH request. Field mask specifies which fields of the - setting payload will be updated. The field mask needs to be supplied as single string. To specify - multiple fields in the field mask, use comma as the separator (no space). + The identifier for the federation policy. :param policy: :class:`FederationPolicy` (optional) + :param update_mask: str (optional) + The field mask specifies which fields of the policy to update. To specify multiple fields in the + field mask, use comma as the separator (no space). The special value '*' indicates that all fields + should be updated (full replacement). If unspecified, all fields that are set in the policy provided + in the update request will overwrite the corresponding fields in the existing policy. Example value: + 'description,oidc_policy.audiences'. :returns: :class:`FederationPolicy` """ diff --git a/databricks/sdk/service/pipelines.py b/databricks/sdk/service/pipelines.py index 8f8b015c..db5d698d 100755 --- a/databricks/sdk/service/pipelines.py +++ b/databricks/sdk/service/pipelines.py @@ -85,6 +85,14 @@ class CreatePipeline: restart_window: Optional[RestartWindow] = None """Restart window of this pipeline.""" + run_as: Optional[RunAs] = None + """Write-only setting, available only in Create/Update calls. Specifies the user or service + principal that the pipeline runs as. If not specified, the pipeline runs as the user who created + the pipeline. + + Only `user_name` or `service_principal_name` can be specified. If both are specified, an error + is thrown.""" + schema: Optional[str] = None """The default schema (database) where tables are read from or published to. The presence of this field implies that the pipeline is in direct publishing mode.""" @@ -126,6 +134,7 @@ def as_dict(self) -> dict: if self.notifications: body['notifications'] = [v.as_dict() for v in self.notifications] if self.photon is not None: body['photon'] = self.photon if self.restart_window: body['restart_window'] = self.restart_window.as_dict() + if self.run_as: body['run_as'] = self.run_as.as_dict() if self.schema is not None: body['schema'] = self.schema if self.serverless is not None: body['serverless'] = self.serverless if self.storage is not None: body['storage'] = self.storage @@ -156,6 +165,7 @@ def as_shallow_dict(self) -> dict: if self.notifications: body['notifications'] = self.notifications if self.photon is not None: body['photon'] = self.photon if self.restart_window: body['restart_window'] = self.restart_window + if self.run_as: body['run_as'] = self.run_as if self.schema is not None: body['schema'] = self.schema if self.serverless is not None: body['serverless'] = self.serverless if self.storage is not None: body['storage'] = self.storage @@ -186,6 +196,7 @@ def from_dict(cls, d: Dict[str, any]) -> CreatePipeline: notifications=_repeated_dict(d, 'notifications', Notifications), photon=d.get('photon', None), restart_window=_from_dict(d, 'restart_window', RestartWindow), + run_as=_from_dict(d, 'run_as', RunAs), schema=d.get('schema', None), serverless=d.get('serverless', None), storage=d.get('storage', None), @@ -277,6 +288,19 @@ def from_dict(cls, d: Dict[str, any]) -> DataPlaneId: return cls(instance=d.get('instance', None), seq_no=d.get('seq_no', None)) +class DayOfWeek(Enum): + """Days of week in which the restart is allowed to happen (within a five-hour window starting at + start_hour). If not specified all days of the week will be used.""" + + FRIDAY = 'FRIDAY' + MONDAY = 'MONDAY' + SATURDAY = 'SATURDAY' + SUNDAY = 'SUNDAY' + THURSDAY = 'THURSDAY' + TUESDAY = 'TUESDAY' + WEDNESDAY = 'WEDNESDAY' + + @dataclass class DeletePipelineResponse: @@ -373,6 +397,14 @@ class EditPipeline: restart_window: Optional[RestartWindow] = None """Restart window of this pipeline.""" + run_as: Optional[RunAs] = None + """Write-only setting, available only in Create/Update calls. Specifies the user or service + principal that the pipeline runs as. If not specified, the pipeline runs as the user who created + the pipeline. + + Only `user_name` or `service_principal_name` can be specified. If both are specified, an error + is thrown.""" + schema: Optional[str] = None """The default schema (database) where tables are read from or published to. The presence of this field implies that the pipeline is in direct publishing mode.""" @@ -416,6 +448,7 @@ def as_dict(self) -> dict: if self.photon is not None: body['photon'] = self.photon if self.pipeline_id is not None: body['pipeline_id'] = self.pipeline_id if self.restart_window: body['restart_window'] = self.restart_window.as_dict() + if self.run_as: body['run_as'] = self.run_as.as_dict() if self.schema is not None: body['schema'] = self.schema if self.serverless is not None: body['serverless'] = self.serverless if self.storage is not None: body['storage'] = self.storage @@ -448,6 +481,7 @@ def as_shallow_dict(self) -> dict: if self.photon is not None: body['photon'] = self.photon if self.pipeline_id is not None: body['pipeline_id'] = self.pipeline_id if self.restart_window: body['restart_window'] = self.restart_window + if self.run_as: body['run_as'] = self.run_as if self.schema is not None: body['schema'] = self.schema if self.serverless is not None: body['serverless'] = self.serverless if self.storage is not None: body['storage'] = self.storage @@ -479,6 +513,7 @@ def from_dict(cls, d: Dict[str, any]) -> EditPipeline: photon=d.get('photon', None), pipeline_id=d.get('pipeline_id', None), restart_window=_from_dict(d, 'restart_window', RestartWindow), + run_as=_from_dict(d, 'run_as', RunAs), schema=d.get('schema', None), serverless=d.get('serverless', None), storage=d.get('storage', None), @@ -2105,7 +2140,7 @@ class RestartWindow: """An integer between 0 and 23 denoting the start hour for the restart window in the 24-hour day. Continuous pipeline restart is triggered only within a five-hour window starting at this hour.""" - days_of_week: Optional[List[RestartWindowDaysOfWeek]] = None + days_of_week: Optional[List[DayOfWeek]] = None """Days of week in which the restart is allowed to happen (within a five-hour window starting at start_hour). If not specified all days of the week will be used.""" @@ -2133,22 +2168,48 @@ def as_shallow_dict(self) -> dict: @classmethod def from_dict(cls, d: Dict[str, any]) -> RestartWindow: """Deserializes the RestartWindow from a dictionary.""" - return cls(days_of_week=_repeated_enum(d, 'days_of_week', RestartWindowDaysOfWeek), + return cls(days_of_week=_repeated_enum(d, 'days_of_week', DayOfWeek), start_hour=d.get('start_hour', None), time_zone_id=d.get('time_zone_id', None)) -class RestartWindowDaysOfWeek(Enum): - """Days of week in which the restart is allowed to happen (within a five-hour window starting at - start_hour). If not specified all days of the week will be used.""" +@dataclass +class RunAs: + """Write-only setting, available only in Create/Update calls. Specifies the user or service + principal that the pipeline runs as. If not specified, the pipeline runs as the user who created + the pipeline. + + Only `user_name` or `service_principal_name` can be specified. If both are specified, an error + is thrown.""" - FRIDAY = 'FRIDAY' - MONDAY = 'MONDAY' - SATURDAY = 'SATURDAY' - SUNDAY = 'SUNDAY' - THURSDAY = 'THURSDAY' - TUESDAY = 'TUESDAY' - WEDNESDAY = 'WEDNESDAY' + service_principal_name: Optional[str] = None + """Application ID of an active service principal. Setting this field requires the + `servicePrincipal/user` role.""" + + user_name: Optional[str] = None + """The email of an active workspace user. Users can only set this field to their own email.""" + + def as_dict(self) -> dict: + """Serializes the RunAs into a dictionary suitable for use as a JSON request body.""" + body = {} + if self.service_principal_name is not None: + body['service_principal_name'] = self.service_principal_name + if self.user_name is not None: body['user_name'] = self.user_name + return body + + def as_shallow_dict(self) -> dict: + """Serializes the RunAs into a shallow dictionary of its immediate attributes.""" + body = {} + if self.service_principal_name is not None: + body['service_principal_name'] = self.service_principal_name + if self.user_name is not None: body['user_name'] = self.user_name + return body + + @classmethod + def from_dict(cls, d: Dict[str, any]) -> RunAs: + """Deserializes the RunAs from a dictionary.""" + return cls(service_principal_name=d.get('service_principal_name', None), + user_name=d.get('user_name', None)) @dataclass @@ -2791,6 +2852,7 @@ def create(self, notifications: Optional[List[Notifications]] = None, photon: Optional[bool] = None, restart_window: Optional[RestartWindow] = None, + run_as: Optional[RunAs] = None, schema: Optional[str] = None, serverless: Optional[bool] = None, storage: Optional[str] = None, @@ -2843,6 +2905,12 @@ def create(self, Whether Photon is enabled for this pipeline. :param restart_window: :class:`RestartWindow` (optional) Restart window of this pipeline. + :param run_as: :class:`RunAs` (optional) + Write-only setting, available only in Create/Update calls. Specifies the user or service principal + that the pipeline runs as. If not specified, the pipeline runs as the user who created the pipeline. + + Only `user_name` or `service_principal_name` can be specified. If both are specified, an error is + thrown. :param schema: str (optional) The default schema (database) where tables are read from or published to. The presence of this field implies that the pipeline is in direct publishing mode. @@ -2879,6 +2947,7 @@ def create(self, if notifications is not None: body['notifications'] = [v.as_dict() for v in notifications] if photon is not None: body['photon'] = photon if restart_window is not None: body['restart_window'] = restart_window.as_dict() + if run_as is not None: body['run_as'] = run_as.as_dict() if schema is not None: body['schema'] = schema if serverless is not None: body['serverless'] = serverless if storage is not None: body['storage'] = storage @@ -3213,6 +3282,7 @@ def update(self, notifications: Optional[List[Notifications]] = None, photon: Optional[bool] = None, restart_window: Optional[RestartWindow] = None, + run_as: Optional[RunAs] = None, schema: Optional[str] = None, serverless: Optional[bool] = None, storage: Optional[str] = None, @@ -3268,6 +3338,12 @@ def update(self, Whether Photon is enabled for this pipeline. :param restart_window: :class:`RestartWindow` (optional) Restart window of this pipeline. + :param run_as: :class:`RunAs` (optional) + Write-only setting, available only in Create/Update calls. Specifies the user or service principal + that the pipeline runs as. If not specified, the pipeline runs as the user who created the pipeline. + + Only `user_name` or `service_principal_name` can be specified. If both are specified, an error is + thrown. :param schema: str (optional) The default schema (database) where tables are read from or published to. The presence of this field implies that the pipeline is in direct publishing mode. @@ -3304,6 +3380,7 @@ def update(self, if notifications is not None: body['notifications'] = [v.as_dict() for v in notifications] if photon is not None: body['photon'] = photon if restart_window is not None: body['restart_window'] = restart_window.as_dict() + if run_as is not None: body['run_as'] = run_as.as_dict() if schema is not None: body['schema'] = schema if serverless is not None: body['serverless'] = serverless if storage is not None: body['storage'] = storage diff --git a/databricks/sdk/service/serving.py b/databricks/sdk/service/serving.py index fc606953..c992fe3f 100755 --- a/databricks/sdk/service/serving.py +++ b/databricks/sdk/service/serving.py @@ -12,14 +12,11 @@ import requests -from ..data_plane import DataPlaneService from ..errors import OperationFailed from ._internal import Wait, _enum, _from_dict, _repeated_dict _LOG = logging.getLogger('databricks.sdk') -from databricks.sdk.service import oauth2 - # all definitions in this file are in alphabetical order @@ -712,6 +709,35 @@ def from_dict(cls, d: Dict[str, any]) -> CreateServingEndpoint: tags=_repeated_dict(d, 'tags', EndpointTag)) +@dataclass +class DataPlaneInfo: + authorization_details: Optional[str] = None + """Authorization details as a string.""" + + endpoint_url: Optional[str] = None + """The URL of the endpoint for this operation in the dataplane.""" + + def as_dict(self) -> dict: + """Serializes the DataPlaneInfo into a dictionary suitable for use as a JSON request body.""" + body = {} + if self.authorization_details is not None: body['authorization_details'] = self.authorization_details + if self.endpoint_url is not None: body['endpoint_url'] = self.endpoint_url + return body + + def as_shallow_dict(self) -> dict: + """Serializes the DataPlaneInfo into a shallow dictionary of its immediate attributes.""" + body = {} + if self.authorization_details is not None: body['authorization_details'] = self.authorization_details + if self.endpoint_url is not None: body['endpoint_url'] = self.endpoint_url + return body + + @classmethod + def from_dict(cls, d: Dict[str, any]) -> DataPlaneInfo: + """Deserializes the DataPlaneInfo from a dictionary.""" + return cls(authorization_details=d.get('authorization_details', None), + endpoint_url=d.get('endpoint_url', None)) + + @dataclass class DatabricksModelServingConfig: databricks_workspace_url: str @@ -1444,7 +1470,7 @@ def from_dict(cls, d: Dict[str, any]) -> ListEndpointsResponse: @dataclass class ModelDataPlaneInfo: - query_info: Optional[oauth2.DataPlaneInfo] = None + query_info: Optional[DataPlaneInfo] = None """Information required to query DataPlane API 'query' endpoint.""" def as_dict(self) -> dict: @@ -1462,7 +1488,7 @@ def as_shallow_dict(self) -> dict: @classmethod def from_dict(cls, d: Dict[str, any]) -> ModelDataPlaneInfo: """Deserializes the ModelDataPlaneInfo from a dictionary.""" - return cls(query_info=_from_dict(d, 'query_info', oauth2.DataPlaneInfo)) + return cls(query_info=_from_dict(d, 'query_info', DataPlaneInfo)) @dataclass @@ -3725,6 +3751,7 @@ class ServingEndpointsDataPlaneAPI: def __init__(self, api_client, control_plane): self._api = api_client self._control_plane = control_plane + from ..data_plane import DataPlaneService self._data_plane_service = DataPlaneService() def query(self, diff --git a/tests/test_base_client.py b/tests/test_base_client.py index 4b6aaa71..a9a9d5cc 100644 --- a/tests/test_base_client.py +++ b/tests/test_base_client.py @@ -5,17 +5,17 @@ from unittest.mock import Mock import pytest -import requests from databricks.sdk import errors, useragent -from databricks.sdk._base_client import _BaseClient, _StreamingResponse +from databricks.sdk._base_client import (_BaseClient, _RawResponse, + _StreamingResponse) from databricks.sdk.core import DatabricksError from .clock import FakeClock from .fixture_server import http_fixture_server -class DummyResponse(requests.Response): +class DummyResponse(_RawResponse): _content: Iterator[bytes] _closed: bool = False @@ -293,9 +293,9 @@ def test_streaming_response_chunk_size(chunk_size, expected_chunks, data_size): test_data = bytes(rng.getrandbits(8) for _ in range(data_size)) content_chunks = [] - mock_response = Mock(spec=requests.Response) + mock_response = Mock(spec=_RawResponse) - def mock_iter_content(chunk_size): + def mock_iter_content(chunk_size: int, decode_unicode: bool): # Simulate how requests would chunk the data. for i in range(0, len(test_data), chunk_size): chunk = test_data[i:i + chunk_size] diff --git a/tests/test_data_plane.py b/tests/test_data_plane.py index a7465896..1eac9238 100644 --- a/tests/test_data_plane.py +++ b/tests/test_data_plane.py @@ -2,7 +2,7 @@ from databricks.sdk.data_plane import DataPlaneService from databricks.sdk.oauth import Token -from databricks.sdk.service.oauth2 import DataPlaneInfo +from databricks.sdk.service.serving import DataPlaneInfo info = DataPlaneInfo(authorization_details="authDetails", endpoint_url="url") diff --git a/tests/test_files.py b/tests/test_files.py new file mode 100644 index 00000000..f4d916f6 --- /dev/null +++ b/tests/test_files.py @@ -0,0 +1,340 @@ +import logging +import os +import re +from dataclasses import dataclass +from typing import List, Union + +import pytest +from requests import RequestException + +from databricks.sdk import WorkspaceClient +from databricks.sdk.core import Config + +logger = logging.getLogger(__name__) + + +@dataclass +class RequestData: + + def __init__(self, offset: int): + self._offset: int = offset + + +class DownloadTestCase: + + def __init__(self, name: str, enable_new_client: bool, file_size: int, + failure_at_absolute_offset: List[int], max_recovers_total: Union[int, None], + max_recovers_without_progressing: Union[int, None], expected_success: bool, + expected_requested_offsets: List[int]): + self.name = name + self.enable_new_client = enable_new_client + self.file_size = file_size + self.failure_at_absolute_offset = failure_at_absolute_offset + self.max_recovers_total = max_recovers_total + self.max_recovers_without_progressing = max_recovers_without_progressing + self.expected_success = expected_success + self.expected_requested_offsets = expected_requested_offsets + + @staticmethod + def to_string(test_case): + return test_case.name + + def run(self, config: Config): + config = config.copy() + config.enable_experimental_files_api_client = self.enable_new_client + config.files_api_client_download_max_total_recovers = self.max_recovers_total + config.files_api_client_download_max_total_recovers_without_progressing = self.max_recovers_without_progressing + + w = WorkspaceClient(config=config) + + session = MockSession(self) + w.files._api._api_client._session = session + + response = w.files.download("/test").contents + if self.expected_success: + actual_content = response.read() + assert (len(actual_content) == len(session.content)) + assert (actual_content == session.content) + else: + with pytest.raises(RequestException): + response.read() + + received_requests = session.received_requests + + assert (len(self.expected_requested_offsets) == len(received_requests)) + for idx, requested_offset in enumerate(self.expected_requested_offsets): + assert (requested_offset == received_requests[idx]._offset) + + +class MockSession: + + def __init__(self, test_case: DownloadTestCase): + self.test_case: DownloadTestCase = test_case + self.received_requests: List[RequestData] = [] + self.content: bytes = os.urandom(self.test_case.file_size) + self.failure_pointer = 0 + self.last_modified = 'Thu, 28 Nov 2024 16:39:14 GMT' + + # following the signature of Session.request() + def request(self, + method, + url, + params=None, + data=None, + headers=None, + cookies=None, + files=None, + auth=None, + timeout=None, + allow_redirects=True, + proxies=None, + hooks=None, + stream=None, + verify=None, + cert=None, + json=None): + assert method == 'GET' + assert stream == True + + offset = 0 + if "Range" in headers: + range = headers["Range"] + match = re.search("^bytes=(\\d+)-$", range) + if match: + offset = int(match.group(1)) + else: + raise Exception("Unexpected range header: " + range) + + if "If-Unmodified-Since" in headers: + assert (headers["If-Unmodified-Since"] == self.last_modified) + else: + raise Exception("If-Unmodified-Since header should be passed along with Range") + + logger.info("Client requested offset: %s", offset) + + if offset > len(self.content): + raise Exception("Offset %s exceeds file length %s", offset, len(self.content)) + + self.received_requests.append(RequestData(offset)) + return MockResponse(self, offset, MockRequest(url)) + + +# required only for correct logging +class MockRequest: + + def __init__(self, url: str): + self.url = url + self.method = 'GET' + self.headers = dict() + self.body = None + + +class MockResponse: + + def __init__(self, session: MockSession, offset: int, request: MockRequest): + self.session = session + self.offset = offset + self.request = request + self.status_code = 200 + self.reason = 'OK' + self.headers = dict() + self.headers['Content-Length'] = len(session.content) - offset + self.headers['Content-Type'] = 'application/octet-stream' + self.headers['Last-Modified'] = session.last_modified + self.ok = True + self.url = request.url + + def iter_content(self, chunk_size: int, decode_unicode: bool): + assert decode_unicode == False + return MockIterator(self, chunk_size) + + +class MockIterator: + + def __init__(self, response: MockResponse, chunk_size: int): + self.response = response + self.chunk_size = chunk_size + self.offset = 0 + + def __next__(self): + start_offset = self.response.offset + self.offset + if start_offset == len(self.response.session.content): + raise StopIteration + + end_offset = start_offset + self.chunk_size # exclusive, might be out of range + + if self.response.session.failure_pointer < len( + self.response.session.test_case.failure_at_absolute_offset): + failure_after_byte = self.response.session.test_case.failure_at_absolute_offset[ + self.response.session.failure_pointer] + if failure_after_byte < end_offset: + self.response.session.failure_pointer += 1 + raise RequestException("Fake error") + + result = self.response.session.content[start_offset:end_offset] + self.offset += len(result) + return result + + def close(self): + pass + + +class _Constants: + underlying_chunk_size = 1024 * 1024 # see ticket #832 + + +@pytest.mark.parametrize( + "test_case", + [ + DownloadTestCase(name="Old client: no failures, file of 5 bytes", + enable_new_client=False, + file_size=5, + failure_at_absolute_offset=[], + max_recovers_total=0, + max_recovers_without_progressing=0, + expected_success=True, + expected_requested_offsets=[0]), + DownloadTestCase(name="Old client: no failures, file of 1.5 chunks", + enable_new_client=False, + file_size=int(1.5 * _Constants.underlying_chunk_size), + failure_at_absolute_offset=[], + max_recovers_total=0, + max_recovers_without_progressing=0, + expected_success=True, + expected_requested_offsets=[0]), + DownloadTestCase( + name="Old client: failure", + enable_new_client=False, + file_size=1024, + failure_at_absolute_offset=[100], + max_recovers_total=None, # unlimited but ignored + max_recovers_without_progressing=None, # unlimited but ignored + expected_success=False, + expected_requested_offsets=[0]), + DownloadTestCase(name="New client: no failures, file of 5 bytes", + enable_new_client=True, + file_size=5, + failure_at_absolute_offset=[], + max_recovers_total=0, + max_recovers_without_progressing=0, + expected_success=True, + expected_requested_offsets=[0]), + DownloadTestCase(name="New client: no failures, file of 1 Kb", + enable_new_client=True, + file_size=1024, + max_recovers_total=None, + max_recovers_without_progressing=None, + failure_at_absolute_offset=[], + expected_success=True, + expected_requested_offsets=[0]), + DownloadTestCase(name="New client: no failures, file of 1.5 chunks", + enable_new_client=True, + file_size=int(1.5 * _Constants.underlying_chunk_size), + failure_at_absolute_offset=[], + max_recovers_total=0, + max_recovers_without_progressing=0, + expected_success=True, + expected_requested_offsets=[0]), + DownloadTestCase(name="New client: no failures, file of 10 chunks", + enable_new_client=True, + file_size=10 * _Constants.underlying_chunk_size, + failure_at_absolute_offset=[], + max_recovers_total=0, + max_recovers_without_progressing=0, + expected_success=True, + expected_requested_offsets=[0]), + DownloadTestCase(name="New client: recovers are disabled, first failure leads to download abort", + enable_new_client=True, + file_size=10000, + failure_at_absolute_offset=[5], + max_recovers_total=0, + max_recovers_without_progressing=0, + expected_success=False, + expected_requested_offsets=[0]), + DownloadTestCase( + name="New client: unlimited recovers allowed", + enable_new_client=True, + file_size=_Constants.underlying_chunk_size * 5, + # causes errors on requesting the third chunk + failure_at_absolute_offset=[ + _Constants.underlying_chunk_size - 1, _Constants.underlying_chunk_size - 1, + _Constants.underlying_chunk_size - 1, _Constants.underlying_chunk_size + 1, + _Constants.underlying_chunk_size * 3, + ], + max_recovers_total=None, + max_recovers_without_progressing=None, + expected_success=True, + expected_requested_offsets=[ + 0, 0, 0, 0, _Constants.underlying_chunk_size, _Constants.underlying_chunk_size * 3 + ]), + DownloadTestCase( + name="New client: we respect limit on total recovers when progressing", + enable_new_client=True, + file_size=_Constants.underlying_chunk_size * 10, + failure_at_absolute_offset=[ + 1, + _Constants.underlying_chunk_size + 1, # progressing + _Constants.underlying_chunk_size * 2 + 1, # progressing + _Constants.underlying_chunk_size * 3 + 1 # progressing + ], + max_recovers_total=3, + max_recovers_without_progressing=None, + expected_success=False, + expected_requested_offsets=[ + 0, 0, _Constants.underlying_chunk_size * 1, _Constants.underlying_chunk_size * 2 + ]), + DownloadTestCase(name="New client: we respect limit on total recovers when not progressing", + enable_new_client=True, + file_size=_Constants.underlying_chunk_size * 10, + failure_at_absolute_offset=[1, 1, 1, 1], + max_recovers_total=3, + max_recovers_without_progressing=None, + expected_success=False, + expected_requested_offsets=[0, 0, 0, 0]), + DownloadTestCase(name="New client: we respect limit on non-progressing recovers", + enable_new_client=True, + file_size=_Constants.underlying_chunk_size * 2, + failure_at_absolute_offset=[ + _Constants.underlying_chunk_size - 1, _Constants.underlying_chunk_size - 1, + _Constants.underlying_chunk_size - 1, _Constants.underlying_chunk_size - 1 + ], + max_recovers_total=None, + max_recovers_without_progressing=3, + expected_success=False, + expected_requested_offsets=[0, 0, 0, 0]), + DownloadTestCase( + name="New client: non-progressing recovers count is reset when progressing", + enable_new_client=True, + file_size=_Constants.underlying_chunk_size * 10, + failure_at_absolute_offset=[ + _Constants.underlying_chunk_size + 1, # this recover is after progressing + _Constants.underlying_chunk_size + 1, # this is not + _Constants.underlying_chunk_size * 2 + 1, # this recover is after progressing + _Constants.underlying_chunk_size * 2 + 1, # this is not + _Constants.underlying_chunk_size * 2 + 1, # this is not, we abort here + ], + max_recovers_total=None, + max_recovers_without_progressing=2, + expected_success=False, + expected_requested_offsets=[ + 0, _Constants.underlying_chunk_size, _Constants.underlying_chunk_size, + _Constants.underlying_chunk_size * 2, _Constants.underlying_chunk_size * 2 + ]), + DownloadTestCase(name="New client: non-progressing recovers count is reset when progressing - 2", + enable_new_client=True, + file_size=_Constants.underlying_chunk_size * 10, + failure_at_absolute_offset=[ + 1, _Constants.underlying_chunk_size + 1, _Constants.underlying_chunk_size * 2 + + 1, _Constants.underlying_chunk_size * 3 + 1 + ], + max_recovers_total=None, + max_recovers_without_progressing=1, + expected_success=True, + expected_requested_offsets=[ + 0, 0, _Constants.underlying_chunk_size, _Constants.underlying_chunk_size * 2, + _Constants.underlying_chunk_size * 3 + ]), + ], + ids=DownloadTestCase.to_string) +def test_download_recover(config: Config, test_case: DownloadTestCase): + test_case.run(config)