diff --git a/dlt/helpers/dbt_cloud/__init__.py b/dlt/helpers/dbt_cloud/__init__.py index 1e8effd7a6..90a9c5bf6c 100644 --- a/dlt/helpers/dbt_cloud/__init__.py +++ b/dlt/helpers/dbt_cloud/__init__.py @@ -1,5 +1,5 @@ import time -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, Union import dlt from dlt.common.configuration import known_sections, with_config @@ -13,7 +13,7 @@ ) def run_dbt_cloud_job( credentials: DBTCloudConfiguration = dlt.secrets.value, - data: Optional[dict] = None, + data: Optional[Dict[Any, Any]] = None, wait_for_outcome: bool = True, wait_seconds: int = 10, ) -> Dict[Any, Any]: @@ -55,7 +55,7 @@ def run_dbt_cloud_job( ) def get_dbt_cloud_run_status( credentials: DBTCloudConfiguration = dlt.secrets.value, - run_id: Optional[int | str] = None, + run_id: Union[int, str, None] = None, wait_for_outcome: bool = True, wait_seconds: int = 10, ) -> Dict[Any, Any]: diff --git a/dlt/helpers/dbt_cloud/client.py b/dlt/helpers/dbt_cloud/client.py index b3b0b7e01d..a1c6a8becc 100644 --- a/dlt/helpers/dbt_cloud/client.py +++ b/dlt/helpers/dbt_cloud/client.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, Union from dlt.sources.helpers import requests @@ -40,7 +40,7 @@ def __init__( self.account_id = account_id self.accounts_url = f"accounts/{self.account_id}" - def get_endpoint(self, endpoint: str) -> Dict[Any, Any]: + def get_endpoint(self, endpoint: str) -> Any: response = requests.get( f"{self.base_api_url}/{endpoint}", headers=self._headers ) @@ -49,7 +49,7 @@ def get_endpoint(self, endpoint: str) -> Dict[Any, Any]: def post_endpoint( self, endpoint: str, json_body: Optional[dict] = None - ) -> Dict[Any, Any]: + ) -> Any: response = requests.post( f"{self.base_api_url}/{endpoint}", headers=self._headers, @@ -58,7 +58,7 @@ def post_endpoint( results = response.json() return results - def trigger_job_run(self, job_id: int | str, data: Optional[dict] = None) -> int: + def trigger_job_run(self, job_id: Union[int, str], data: Optional[dict] = None) -> int: """ Trigger a job run in dbt Cloud. @@ -113,9 +113,9 @@ def trigger_job_run(self, job_id: int | str, data: Optional[dict] = None) -> int response = self.post_endpoint( f"{self.accounts_url}/jobs/{job_id}/run", json_body=json_body ) - return response["data"]["id"] + return int(response["data"]["id"]) - def get_run_status(self, run_id: int | str) -> Dict[Any, Any]: + def get_run_status(self, run_id: Union[int, str]) -> Dict[Any, Any]: """ Get the status of a dbt Cloud job run by run_id. @@ -140,4 +140,4 @@ def get_run_status(self, run_id: int | str) -> Dict[Any, Any]: ) response = self.get_endpoint(f"{self.accounts_url}/runs/{run_id}") - return response["data"] + return dict(response["data"])