Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add EvaluateDecision #564

Merged
merged 3 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/errors.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ All ``pyzeebe`` errors inherit from :py:class:`PyZeebeError`

.. autoexception:: pyzeebe.errors.ProcessInvalidError

.. autoexception:: pyzeebe.errors.DecisionNotFoundError

.. autoexception:: pyzeebe.errors.InvalidJSONError

.. autoexception:: pyzeebe.errors.ZeebeError
Expand Down
10 changes: 10 additions & 0 deletions docs/zeebe_adapter_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ Zeebe GRPC Responses
:undoc-members:
:member-order: bysource

.. autoclass:: pyzeebe.grpc_internals.types.EvaluateDecisionResponse
:members:
:undoc-members:
:member-order: bysource

.. autoclass:: pyzeebe.grpc_internals.types.BroadcastSignalResponse
:members:
:undoc-members:
Expand All @@ -56,3 +61,8 @@ Zeebe GRPC Responses
:members:
:undoc-members:
:member-order: bysource

.. autoclass:: pyzeebe.grpc_internals.types.TopologyResponse
:members:
:undoc-members:
:member-order: bysource
35 changes: 35 additions & 0 deletions pyzeebe/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
CreateProcessInstanceResponse,
CreateProcessInstanceWithResultResponse,
DeployResourceResponse,
EvaluateDecisionResponse,
PublishMessageResponse,
TopologyResponse,
)
Expand Down Expand Up @@ -151,6 +152,40 @@ async def deploy_resource(
"""
return await self.zeebe_adapter.deploy_resource(*resource_file_path, tenant_id=tenant_id)

async def evaluate_decision(
self,
decision_key: int | None,
decision_id: str | None,
variables: Variables | None = None,
tenant_id: str | None = None,
) -> EvaluateDecisionResponse:
"""Evaluates a decision.

You specify the decision to evaluate either by using its unique KEY (as returned by :py:meth:`ZeebeClient.deploy_resource`), or using the decision ID.
When using the decision ID, the latest deployed version of the decision is used.

Args:
decision_key (int): The unique key identifying the decision to be evaluated
(e.g. returned from a decision in the DeployResourceResponse message)
decision_id (str): The ID of the decision to be evaluated
variables (dict): A dictionary containing all variables for the decision to be evaluated. Must be JSONable.
tenant_id (strc): The tenant ID of the resources to deploy. New in Zeebe 8.3.

Returns:
EvaluateDecisionResponse: response from Zeebe.

Raises:
DecisionNotFoundError: No decision with decision_key/decision_id exists
ZeebeBackPressureError: If Zeebe is currently in back pressure (too many requests)
ZeebeGatewayUnavailableError: If the Zeebe gateway is unavailable
ZeebeInternalError: If Zeebe experiences an internal error
UnknownGrpcStatusCodeError: If Zeebe returns an unexpected status code

"""
return await self.zeebe_adapter.evaluate_decision(
decision_key, decision_id, variables=variables or {}, tenant_id=tenant_id
)

async def broadcast_signal(
self,
signal_name: str,
Expand Down
14 changes: 14 additions & 0 deletions pyzeebe/client/sync_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
CreateProcessInstanceResponse,
CreateProcessInstanceWithResultResponse,
DeployResourceResponse,
EvaluateDecisionResponse,
PublishMessageResponse,
TopologyResponse,
)
Expand Down Expand Up @@ -63,6 +64,19 @@ def deploy_resource(

deploy_resource.__doc__ = ZeebeClient.deploy_resource.__doc__

def evaluate_decision(
self,
decision_key: int | None,
decision_id: str | None,
variables: Variables | None = None,
tenant_id: str | None = None,
) -> EvaluateDecisionResponse:
return self.loop.run_until_complete(
self.client.evaluate_decision(decision_key, decision_id, variables=variables, tenant_id=tenant_id)
)

evaluate_decision.__doc__ = ZeebeClient.evaluate_decision.__doc__

def broadcast_signal(
self,
signal_name: str,
Expand Down
2 changes: 2 additions & 0 deletions pyzeebe/errors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
)
from .message_errors import MessageAlreadyExistsError
from .process_errors import (
DecisionNotFoundError,
InvalidJSONError,
ProcessDefinitionHasNoStartEventError,
ProcessDefinitionNotFoundError,
Expand Down Expand Up @@ -42,6 +43,7 @@
"ProcessInstanceNotFoundError",
"ProcessInvalidError",
"ProcessTimeoutError",
"DecisionNotFoundError",
"BusinessError",
"DuplicateTaskTypeError",
"NoVariableNameGivenError",
Expand Down
13 changes: 13 additions & 0 deletions pyzeebe/errors/process_errors.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

from pyzeebe.errors.pyzeebe_errors import PyZeebeError


Expand Down Expand Up @@ -32,3 +34,14 @@ class ProcessTimeoutError(PyZeebeError, TimeoutError):
def __init__(self, bpmn_process_id: str):
super().__init__(f"Timeout while waiting for process {bpmn_process_id} to complete")
self.bpmn_process_id = bpmn_process_id


class DecisionNotFoundError(PyZeebeError):
def __init__(self, decision_key: int | None, decision_id: str | None):
if decision_id is not None:
msg = f"Decision with id '{decision_id}' was not found"
else:
msg = f"Decision with key '{decision_key}' was not found"
super().__init__(msg)
self.decision_key = decision_key
self.decision_id = decision_id
97 changes: 96 additions & 1 deletion pyzeebe/grpc_internals/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import enum
from dataclasses import dataclass

from pyzeebe.types import Variables
from pyzeebe.types import JsonType, Variables


@dataclass(frozen=True)
Expand Down Expand Up @@ -135,6 +135,101 @@ class FormMetadata:
"""the tenant ID of the deployed resources"""


@dataclass(frozen=True)
class EvaluateDecisionResponse:

@dataclass(frozen=True)
class EvaluatedDecision:

@dataclass(frozen=True)
class MatchedDecisionRule:

@dataclass(frozen=True)
class EvaluatedDecisionOutput:
output_id: str
"""the id of the evaluated decision output"""
output_name: str
"""the name of the evaluated decision output"""
output_value: JsonType
"""the value of the evaluated decision output"""

rule_id: str
"""the id of the matched rule"""
rule_index: int
"""the index of the matched rule"""
evaluated_outputs: list[EvaluatedDecisionOutput]
"""the evaluated decision outputs"""

@dataclass(frozen=True)
class EvaluatedDecisionInput:
input_id: str
"""the id of the evaluated decision input"""
input_name: str
"""the name of the evaluated decision input"""
input_value: JsonType
"""the value of the evaluated decision input"""

decision_key: int
"""the unique key identifying the decision which was evaluated (e.g. returned
from a decision in the DeployResourceResponse message)
"""
decision_id: str
"""the ID of the decision which was evaluated"""
decision_name: str
"""the name of the decision which was evaluated"""
decision_version: int
"""the version of the decision which was evaluated"""
decision_type: str
"""the type of the decision which was evaluated"""
decision_output: JsonType
"""JSON document that will instantiate the result of the decision which was
evaluated; it will be a JSON object, as the result output will be mapped
in a key-value fashion, e.g. { "a": 1 }.
"""
matched_rules: list[MatchedDecisionRule]
"""the decision rules that matched within this decision evaluation"""
evaluated_inputs: list[EvaluatedDecisionInput]
"""the decision inputs that were evaluated within this decision evaluation"""
tenant_id: str | None
"""the tenant identifier of the evaluated decision"""

decision_key: int
"""the unique key identifying the decision which was evaluated (e.g. returned
from a decision in the DeployResourceResponse message)
"""
decision_id: str
"""the ID of the decision which was evaluated"""
decision_name: str
"""the name of the decision which was evaluated"""
decision_version: int
"""the version of the decision which was evaluated"""
decision_requirements_id: str
"""the ID of the decision requirements graph that the decision which was
evaluated is part of.
"""
decision_requirements_key: int
"""the unique key identifying the decision requirements graph that the
decision which was evaluated is part of.
"""
decision_output: JsonType
"""JSON document that will instantiate the result of the decision which was
evaluated; it will be a JSON object, as the result output will be mapped
in a key-value fashion, e.g. { "a": 1 }.
"""
evaluated_decisions: list[EvaluatedDecision]
"""a list of decisions that were evaluated within the requested decision evaluation"""
failed_decision_id: str
"""an optional string indicating the ID of the decision which
failed during evaluation
"""
failure_message: str
"""an optional message describing why the decision which was evaluated failed"""
tenant_id: str | None
"""the tenant identifier of the evaluated decision"""
decision_instance_key: int
"""the unique key identifying this decision evaluation"""


@dataclass(frozen=True)
class BroadcastSignalResponse:
key: int
Expand Down
97 changes: 97 additions & 0 deletions pyzeebe/grpc_internals/zeebe_process_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import grpc

from pyzeebe.errors import (
DecisionNotFoundError,
InvalidJSONError,
ProcessDefinitionHasNoStartEventError,
ProcessDefinitionNotFoundError,
Expand All @@ -25,7 +26,12 @@
DecisionMetadata,
DecisionRequirementsMetadata,
DeployResourceRequest,
EvaluatedDecision,
EvaluatedDecisionInput,
EvaluatedDecisionOutput,
EvaluateDecisionRequest,
FormMetadata,
MatchedDecisionRule,
ProcessMetadata,
Resource,
)
Expand All @@ -36,6 +42,7 @@
CreateProcessInstanceResponse,
CreateProcessInstanceWithResultResponse,
DeployResourceResponse,
EvaluateDecisionResponse,
)


Expand Down Expand Up @@ -205,6 +212,96 @@ def _create_form_from_raw_form(response: FormMetadata) -> DeployResourceResponse
tenant_id=response.tenantId,
)

async def evaluate_decision(
self,
decision_key: int | None,
decision_id: str | None,
variables: Variables,
tenant_id: str | None = None,
) -> EvaluateDecisionResponse:
if decision_id is None and decision_key is None:
raise ValueError("decision_key or decision_id must be not None")

try:
response = await self._gateway_stub.EvaluateDecision(
EvaluateDecisionRequest(
decisionKey=decision_key, # type: ignore[arg-type]
decisionId=decision_id, # type: ignore[arg-type]
variables=json.dumps(variables),
tenantId=tenant_id, # type: ignore[arg-type]
)
)
except grpc.aio.AioRpcError as grpc_error:
if is_error_status(grpc_error, grpc.StatusCode.INVALID_ARGUMENT) and (details := grpc_error.details()):
if "but no decision found for" in details:
raise DecisionNotFoundError(decision_id=decision_id, decision_key=decision_key) from grpc_error
await self._handle_grpc_error(grpc_error)

return EvaluateDecisionResponse(
decision_key=response.decisionKey,
decision_id=response.decisionId,
decision_name=response.decisionName,
decision_version=response.decisionVersion,
decision_requirements_id=response.decisionRequirementsId,
decision_requirements_key=response.decisionRequirementsKey,
decision_output=json.loads(response.decisionOutput),
evaluated_decisions=[
self._create_evaluated_decision_from_raw(evaluated_decision)
for evaluated_decision in response.evaluatedDecisions
],
failed_decision_id=response.failedDecisionId,
failure_message=response.failureMessage,
tenant_id=response.tenantId,
decision_instance_key=response.decisionInstanceKey,
)

def _create_evaluated_decision_from_raw(
self, response: EvaluatedDecision
) -> EvaluateDecisionResponse.EvaluatedDecision:
return EvaluateDecisionResponse.EvaluatedDecision(
decision_key=response.decisionKey,
decision_id=response.decisionId,
decision_name=response.decisionName,
decision_version=response.decisionVersion,
decision_type=response.decisionType,
decision_output=json.loads(response.decisionOutput),
matched_rules=[self._create_matched_rule_from_raw(matched_rule) for matched_rule in response.matchedRules],
evaluated_inputs=[
self._create_evaluated_input_from_raw(evaluated_input) for evaluated_input in response.evaluatedInputs
],
tenant_id=response.tenantId,
)

def _create_matched_rule_from_raw(
self, response: MatchedDecisionRule
) -> EvaluateDecisionResponse.EvaluatedDecision.MatchedDecisionRule:
return EvaluateDecisionResponse.EvaluatedDecision.MatchedDecisionRule(
rule_id=response.ruleId,
rule_index=response.ruleIndex,
evaluated_outputs=[
self._create_evaluated_output_from_raw(evaluated_output)
for evaluated_output in response.evaluatedOutputs
],
)

def _create_evaluated_input_from_raw(
self, response: EvaluatedDecisionInput
) -> EvaluateDecisionResponse.EvaluatedDecision.EvaluatedDecisionInput:
return EvaluateDecisionResponse.EvaluatedDecision.EvaluatedDecisionInput(
input_id=response.inputId,
input_name=response.inputName,
input_value=json.loads(response.inputValue),
)

def _create_evaluated_output_from_raw(
self, response: EvaluatedDecisionOutput
) -> EvaluateDecisionResponse.EvaluatedDecision.MatchedDecisionRule.EvaluatedDecisionOutput:
return EvaluateDecisionResponse.EvaluatedDecision.MatchedDecisionRule.EvaluatedDecisionOutput(
output_id=response.outputId,
output_name=response.outputName,
output_value=json.loads(response.outputValue),
)


_METADATA_PARSERS: dict[
str,
Expand Down
7 changes: 5 additions & 2 deletions pyzeebe/types.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from collections.abc import Mapping, Sequence
from typing import Any
from typing import Any, Union

from typing_extensions import TypeAlias

Headers: TypeAlias = Mapping[str, Any]
Variables: TypeAlias = Mapping[str, Any]
Unset = "UNSET"

ChannelArgumentType: TypeAlias = Sequence[tuple[str, Any]]

JsonType: TypeAlias = Union[Mapping[str, "JsonType"], Sequence["JsonType"], str, int, float, bool, None]
JsonDictType: TypeAlias = Mapping[str, JsonType]
Variables: TypeAlias = JsonDictType
Loading
Loading