Skip to content

Commit

Permalink
improve errors with more targetet unwrap fns
Browse files Browse the repository at this point in the history
  • Loading branch information
haakonvt committed Oct 31, 2023
1 parent 2c3067c commit 025b6f2
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 7 deletions.
10 changes: 6 additions & 4 deletions cognite/client/_api/datapoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
)
from cognite.client.utils._concurrency import execute_tasks, get_executor
from cognite.client.utils._experimental import FeaturePreviewWarning
from cognite.client.utils._identifier import Identifier, IdentifierSequence
from cognite.client.utils._identifier import Identifier, IdentifierSequence, IdentifierSequenceCore
from cognite.client.utils._importing import import_as_completed, import_legacy_protobuf, local_import
from cognite.client.utils._time import (
_unit_in_days,
Expand Down Expand Up @@ -1548,7 +1548,7 @@ def _insert_datapoints_concurrently(self, dps_object_lists: list[list[dict[str,
summary = execute_tasks(self._insert_datapoints, tasks, max_workers=self.dps_client._config.max_workers)
summary.raise_compound_exception_if_failed_tasks(
task_unwrap_fn=lambda x: x[0],
task_list_element_unwrap_fn=lambda x: {k: x[k] for k in ["id", "externalId"] if k in x},
task_list_element_unwrap_fn=IdentifierSequenceCore.extract_identifiers,
)

def _insert_datapoints(self, post_dps_objects: list[dict[str, Any]]) -> None:
Expand Down Expand Up @@ -1642,6 +1642,8 @@ def fetch_datapoints(self) -> list[dict[str, Any]]:
for chunk in split_into_chunks(self._all_identifiers, self.dps_client._RETRIEVE_LATEST_LIMIT)
]
tasks_summary = execute_tasks(self.dps_client._post, tasks, max_workers=self.dps_client._config.max_workers)
tasks_summary.raise_compound_exception_if_failed_tasks()

tasks_summary.raise_compound_exception_if_failed_tasks(
task_unwrap_fn=lambda task: task["json"]["items"],
task_list_element_unwrap_fn=IdentifierSequenceCore.extract_identifiers,
)
return tasks_summary.joined_results(lambda res: res.json()["items"])
6 changes: 4 additions & 2 deletions cognite/client/_api/sequences.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
)
from cognite.client.data_classes.shared import TimestampRange
from cognite.client.utils._concurrency import execute_tasks
from cognite.client.utils._identifier import Identifier, IdentifierSequence
from cognite.client.utils._identifier import Identifier, IdentifierSequence, IdentifierSequenceCore
from cognite.client.utils._validation import (
assert_type,
prepare_filter_sort,
Expand Down Expand Up @@ -1100,7 +1100,9 @@ def _fetch_sequence(post_obj: dict[str, Any]) -> SequenceRows:
return SequenceRows.load(sequence_rows)

tasks_summary = execute_tasks(_fetch_sequence, list(zip(identifiers)), max_workers=self._config.max_workers)
tasks_summary.raise_compound_exception_if_failed_tasks()
tasks_summary.raise_compound_exception_if_failed_tasks(
task_list_element_unwrap_fn=IdentifierSequenceCore.extract_identifiers
)
results = tasks_summary.joined_results()
if ident_sequence.is_singleton():
return results[0]
Expand Down
8 changes: 7 additions & 1 deletion cognite/client/utils/_identifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import numbers
from abc import ABC
from typing import (
Any,
Generic,
Literal,
NoReturn,
Expand Down Expand Up @@ -191,6 +192,11 @@ def unwrap_identifier(identifier: str | int | dict) -> str | int:
return identifier["space"]
raise ValueError(f"{identifier} does not contain 'id' or 'externalId' or 'space'")

@staticmethod
def extract_identifiers(dct: dict[str, Any]) -> dict[str, str | int]:
"""An API payload might look like {"id": 1, "before": "2w-ago", ...}. This function extracts the identifiers"""
return {k: dct[k] for k in ("id", "externalId") if k in dct}


T_IdentifierSequenceCore = TypeVar("T_IdentifierSequenceCore", bound=IdentifierSequenceCore)

Expand Down Expand Up @@ -304,4 +310,4 @@ def unwrap_identifier(identifier: str | dict) -> str | tuple[str, str]: # type:
return identifier
if "workflowExternalId" in identifier and "version" in identifier:
return identifier["workflowExternalId"], identifier["version"]
raise ValueError(f"{identifier} does not contain both 'workflowExternalId' and 'version''")
raise ValueError(f"{identifier} does not contain both 'workflowExternalId' and 'version'")

0 comments on commit 025b6f2

Please sign in to comment.