diff --git a/.github/workflows/test-python-package.yml b/.github/workflows/test-python-package.yml index 49cf1769..575f3acf 100644 --- a/.github/workflows/test-python-package.yml +++ b/.github/workflows/test-python-package.yml @@ -13,23 +13,17 @@ jobs: strategy: matrix: python-version: [3.5, 3.6, 3.7, 3.8] - + + container: python:${{ matrix.python-version }} steps: - uses: actions/checkout@v2 - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v2 - with: - python-version: ${{ matrix.python-version }} - - name: Install pipenv - uses: dschep/install-pipenv-action@v1 - - name: Install dependencies run: | pip install pipenv pipenv install --dev - name: Test with pytest run: | - pipenv run coverage run --source=. -m py.test + pipenv run coverage run --source=. -m py.test pyzeebe - name: Upload to coveralls run: | pipenv run coveralls diff --git a/.github/workflows/test-zeebe-integration.yml b/.github/workflows/test-zeebe-integration.yml new file mode 100644 index 00000000..0ddd0d0a --- /dev/null +++ b/.github/workflows/test-zeebe-integration.yml @@ -0,0 +1,36 @@ +name: Integration test pyzeebe + +on: + push: + branches: [ master, development, feature/*, bugfix/* ] + pull_request: + branches: [ master, development, feature/*, bugfix/* ] + +jobs: + test: + env: + ZEEBE_ADDRESS: "zeebe:26500" + runs-on: ubuntu-latest + strategy: + matrix: + zeebe-version: [ "0.23.5", "0.24.2" ] + python-version: [ 3.5, 3.6, 3.7, 3.8 ] + + container: python:${{ matrix.python-version }} + + services: + zeebe: + image: camunda/zeebe:${{ matrix.zeebe-version }} + ports: + - 26500/tcp + + steps: + - uses: actions/checkout@v2 + - name: Install dependencies + run: | + pip install pipenv + pipenv install --dev + + - name: Run integration tests + run: | + pipenv run pytest tests diff --git a/README.md b/README.md index 62878135..dac9ea47 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ Zeebe version support: | Pyzeebe version | Tested Zeebe versions | |:---------------:|----------------| -| 1.0.1 | 0.24.2 | +| 1.1.0 | 0.23, 0.24 | ## Getting Started To install: @@ -27,6 +27,8 @@ To install: ### Worker +The `ZeebeWorker` class uses threading to get and run jobs. + ```python from pyzeebe import ZeebeWorker, Task, TaskStatusController, TaskContext @@ -48,6 +50,43 @@ worker.add_task(task) # Add task to zeebe worker worker.work() # Now every time that a task with type example is called example_task will be called ``` +Stop a worker: +```python +from threading import Event + + +stop_event = Event() +zeebe_worker.work(stop_event=stop_event) # Worker will begin working +stop_event.set() # Stops worker and all running jobs +``` + +### Client + +```python +from pyzeebe import ZeebeClient + +# Create a zeebe client +zeebe_client = ZeebeClient(hostname='localhost', port=26500) + +# Run a workflow +workflow_instance_key = zeebe_client.run_workflow(bpmn_process_id='My zeebe workflow', variables={}) + +# Run a workflow and receive the result +workflow_result = zeebe_client.run_workflow_with_result(bpmn_process_id='My zeebe workflow', + timeout=10000) # Will wait 10000 milliseconds (10 seconds) + +# Deploy a bpmn workflow definition +zeebe_client.deploy_workflow('workflow.bpmn') + +# Cancel a running workflow +zeebe_client.cancel_workflow_instance(workflow_instance_key=12345) + +# Publish message +zeebe_client.publish_message(name='message_name', correlation_key='some_id') + + +``` + ## Tests Use the package manager [pip](https://pip.pypa.io/en/stable/) to install pyzeebe diff --git a/examples/client.py b/examples/client.py new file mode 100644 index 00000000..ed4baaff --- /dev/null +++ b/examples/client.py @@ -0,0 +1,20 @@ +from pyzeebe import ZeebeClient + +# Create a zeebe client +zeebe_client = ZeebeClient(hostname='localhost', port=26500) + +# Run a workflow +workflow_instance_key = zeebe_client.run_workflow(bpmn_process_id='My zeebe workflow', variables={}) + +# Run a workflow and receive the result +workflow_result = zeebe_client.run_workflow_with_result(bpmn_process_id='My zeebe workflow', + timeout=10000) # Will wait 10000 milliseconds (10 seconds) + +# Deploy a bpmn workflow definition +zeebe_client.deploy_workflow('workflow.bpmn') + +# Cancel a running workflow +zeebe_client.cancel_workflow_instance(workflow_instance_key=12345) + +# Publish message +zeebe_client.publish_message(name='message_name', correlation_key='some_id') diff --git a/examples/worker.py b/examples/worker.py index b4b48a3d..f301e21a 100644 --- a/examples/worker.py +++ b/examples/worker.py @@ -3,8 +3,8 @@ from pyzeebe import Task, TaskContext, TaskStatusController, ZeebeWorker -def example_task(input: str) -> Dict: - return {'output': f'Hello world, {input}!'} +def example_task() -> Dict: + return {'output': f'Hello world, test!'} def example_exception_handler(exc: Exception, context: TaskContext, controller: TaskStatusController) -> None: @@ -13,7 +13,7 @@ def example_exception_handler(exc: Exception, context: TaskContext, controller: controller.error(f'Failed to run task {context.type}. Reason: {exc}') -task = Task(task_type='example', task_handler=example_task, exception_handler=example_exception_handler) +task = Task(task_type='test', task_handler=example_task, exception_handler=example_exception_handler) worker = ZeebeWorker() # Will use environment variable ZEEBE_ADDRESS or localhost:26500 diff --git a/pyzeebe/__init__.py b/pyzeebe/__init__.py index 409aa4eb..96eb1f74 100644 --- a/pyzeebe/__init__.py +++ b/pyzeebe/__init__.py @@ -1,3 +1,5 @@ +from pyzeebe.client.client import ZeebeClient +from pyzeebe.common import exceptions from pyzeebe.task.task import Task from pyzeebe.task.task_context import TaskContext from pyzeebe.task.task_status_controller import TaskStatusController diff --git a/pyzeebe/client/__init__.py b/pyzeebe/client/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pyzeebe/client/client.py b/pyzeebe/client/client.py new file mode 100644 index 00000000..49adf6af --- /dev/null +++ b/pyzeebe/client/client.py @@ -0,0 +1,34 @@ +from typing import Dict, List + +import grpc + +from pyzeebe.grpc_internals.zeebe_adapter import ZeebeAdapter + + +class ZeebeClient(object): + def __init__(self, hostname: str = None, port: int = None, channel: grpc.Channel = None): + self.zeebe_adapter = ZeebeAdapter(hostname=hostname, port=port, channel=channel) + + def run_workflow(self, bpmn_process_id: str, variables: Dict = None, version: int = -1) -> int: + return self.zeebe_adapter.create_workflow_instance(bpmn_process_id=bpmn_process_id, variables=variables or {}, + version=version) + + def run_workflow_with_result(self, bpmn_process_id: str, variables: Dict = None, version: int = -1, + timeout: int = 0, variables_to_fetch: List[str] = None) -> Dict: + return self.zeebe_adapter.create_workflow_instance_with_result(bpmn_process_id=bpmn_process_id, + variables=variables or {}, version=version, + timeout=timeout, + variables_to_fetch=variables_to_fetch or []) + + def cancel_workflow_instance(self, workflow_instance_key: int) -> int: + self.zeebe_adapter.cancel_workflow_instance(workflow_instance_key=workflow_instance_key) + return workflow_instance_key + + def deploy_workflow(self, *workflow_file_path: str): + self.zeebe_adapter.deploy_workflow(*workflow_file_path) + + def publish_message(self, name: str, correlation_key: str, variables: Dict = None, + time_to_live_in_milliseconds: int = 60000) -> None: + self.zeebe_adapter.publish_message(name=name, correlation_key=correlation_key, + time_to_live_in_milliseconds=time_to_live_in_milliseconds, + variables=variables or {}) diff --git a/pyzeebe/client/client_test.py b/pyzeebe/client/client_test.py new file mode 100644 index 00000000..f640ecd7 --- /dev/null +++ b/pyzeebe/client/client_test.py @@ -0,0 +1,75 @@ +from random import randint +from uuid import uuid4 + +import pytest + +from pyzeebe.client.client import ZeebeClient +from pyzeebe.common.exceptions import WorkflowNotFound +from pyzeebe.common.gateway_mock import GatewayMock +from pyzeebe.common.random_utils import RANDOM_RANGE + +zeebe_client: ZeebeClient + + +@pytest.fixture(scope='module') +def grpc_add_to_server(): + from pyzeebe.grpc_internals.zeebe_pb2_grpc import add_GatewayServicer_to_server + return add_GatewayServicer_to_server + + +@pytest.fixture(scope='module') +def grpc_servicer(): + return GatewayMock() + + +@pytest.fixture(scope='module') +def grpc_stub_cls(grpc_channel): + from pyzeebe.grpc_internals.zeebe_pb2_grpc import GatewayStub + return GatewayStub + + +@pytest.fixture(autouse=True) +def run_around_tests(grpc_channel): + global zeebe_client + zeebe_client = ZeebeClient(channel=grpc_channel) + yield + zeebe_client = ZeebeClient(channel=grpc_channel) + + +def test_run_workflow(grpc_servicer): + bpmn_process_id = str(uuid4()) + version = randint(0, 10) + grpc_servicer.mock_deploy_workflow(bpmn_process_id, version, []) + assert isinstance(zeebe_client.run_workflow(bpmn_process_id=bpmn_process_id, variables={}, version=version), int) + + +def test_run_workflow_with_result(grpc_servicer): + bpmn_process_id = str(uuid4()) + version = randint(0, 10) + grpc_servicer.mock_deploy_workflow(bpmn_process_id, version, []) + assert isinstance(zeebe_client.run_workflow(bpmn_process_id=bpmn_process_id, variables={}, version=version), int) + + +def test_deploy_workflow(grpc_servicer): + bpmn_process_id = str(uuid4()) + version = randint(0, 10) + grpc_servicer.mock_deploy_workflow(bpmn_process_id, version, []) + assert bpmn_process_id in grpc_servicer.deployed_workflows.keys() + + +def test_run_non_existent_workflow(): + with pytest.raises(WorkflowNotFound): + zeebe_client.run_workflow(bpmn_process_id=str(uuid4())) + + +def test_run_non_existent_workflow_with_result(): + with pytest.raises(WorkflowNotFound): + zeebe_client.run_workflow_with_result(bpmn_process_id=str(uuid4())) + + +def test_cancel_workflow_instance(): + assert isinstance(zeebe_client.cancel_workflow_instance(workflow_instance_key=randint(0, RANDOM_RANGE)), int) + + +def test_publish_message(): + zeebe_client.publish_message(name=str(uuid4()), correlation_key=str(uuid4())) diff --git a/pyzeebe/common/exceptions.py b/pyzeebe/common/exceptions.py index 96b6896e..9ed52b08 100644 --- a/pyzeebe/common/exceptions.py +++ b/pyzeebe/common/exceptions.py @@ -1,2 +1,77 @@ -class TaskNotFoundException(Exception): +class TaskNotFound(Exception): + pass + + +class WorkflowNotFound(Exception): + def __init__(self, bpmn_process_id: str, version: int): + super().__init__( + f'Workflow definition: {bpmn_process_id} with {version} was not found') + self.bpmn_process_id = bpmn_process_id + self.version = version + + +class WorkflowInstanceNotFound(Exception): + def __init__(self, workflow_instance_key: int): + super().__init__(f'Workflow instance key: {workflow_instance_key} was not found') + self.workflow_instance_key = workflow_instance_key + + +class WorkflowHasNoStartEvent(Exception): + def __init__(self, bpmn_process_id: str): + super().__init__(f"Workflow {bpmn_process_id} has no start event that can be called manually") + self.bpmn_process_id = bpmn_process_id + + +class ActivateJobsRequestInvalid(Exception): + def __init__(self, task_type: str, worker: str, timeout: int, max_jobs_to_activate: int): + msg = "Failed to activate jobs. Reasons:" + if task_type == "" or task_type is None: + msg = msg + "task_type is empty, " + if worker == "" or task_type is None: + msg = msg + "worker is empty, " + if timeout < 1: + msg = msg + "job timeout is smaller than 0ms, " + if max_jobs_to_activate < 1: + msg = msg + "max_jobs_to_activate is smaller than 0ms, " + + super().__init__(msg) + + +class JobAlreadyDeactivated(Exception): + def __init__(self, job_key: int): + super().__init__(f"Job {job_key} was already stopped (Completed/Failed/Error)") + self.job_key = job_key + + +class JobNotFound(Exception): + def __init__(self, job_key: int): + super().__init__(f"Job {job_key} not found") + self.job_key = job_key + + +class WorkflowInvalid(Exception): + pass + + +class MessageAlreadyExists(Exception): + pass + + +class ElementNotFound(Exception): + pass + + +class InvalidJSON(Exception): + pass + + +class ZeebeBackPressure(Exception): + pass + + +class ZeebeGatewayUnavailable(Exception): + pass + + +class ZeebeInternalError(Exception): pass diff --git a/pyzeebe/common/gateway_mock.py b/pyzeebe/common/gateway_mock.py new file mode 100644 index 00000000..5e3fe079 --- /dev/null +++ b/pyzeebe/common/gateway_mock.py @@ -0,0 +1,135 @@ +import json +from random import randint +from typing import List, Dict +from unittest.mock import patch +from uuid import uuid4 + +import grpc + +from pyzeebe.common.random_utils import RANDOM_RANGE, random_task_context +from pyzeebe.grpc_internals.zeebe_pb2 import * +from pyzeebe.grpc_internals.zeebe_pb2_grpc import GatewayServicer +from pyzeebe.task.task import Task +from pyzeebe.task.task_context import TaskContext +from pyzeebe.task.task_status import TaskStatus + + +@patch('grpc.insecure_channel') +def mock_channel(): + pass + + +class GatewayMock(GatewayServicer): + # TODO: Mock behaviour of zeebe + + def __init__(self): + self.deployed_workflows = {} + self.active_jobs: Dict[int, TaskContext] = {} + + def ActivateJobs(self, request, context): + if not request.type: + context.set_code(grpc.StatusCode.INVALID_ARGUMENT) + return ActivateJobsResponse() + + if request.maxJobsToActivate <= 0: + context.set_code(grpc.StatusCode.INVALID_ARGUMENT) + return ActivateJobsResponse() + + if request.timeout <= 0: + context.set_code(grpc.StatusCode.INVALID_ARGUMENT) + return ActivateJobsResponse() + + if not request.worker: + context.set_code(grpc.StatusCode.INVALID_ARGUMENT) + return ActivateJobsResponse() + + jobs = [] + for active_job in self.active_jobs.values(): + if active_job.type == request.type: + jobs.append(ActivatedJob(key=active_job.key, type=active_job.type, + workflowInstanceKey=active_job.workflow_instance_key, + bpmnProcessId=active_job.bpmn_process_id, + workflowDefinitionVersion=active_job.workflow_definition_version, + workflowKey=active_job.workflow_key, + elementId=active_job.element_id, + elementInstanceKey=active_job.element_instance_key, + customHeaders=json.dumps(active_job.custom_headers), + worker=active_job.worker, retries=active_job.retries, + deadline=active_job.deadline, + variables=json.dumps(active_job.variables))) + yield ActivateJobsResponse(jobs=jobs) + + def CompleteJob(self, request, context): + if request.jobKey in self.active_jobs.keys(): + active_job = self.active_jobs.get(request.jobKey) + self.handle_job(active_job, TaskStatus.Completed, context) + return CompleteJobResponse() + else: + context.set_code(grpc.StatusCode.NOT_FOUND) + return CompleteJobResponse() + + def FailJob(self, request, context): + if request.jobKey in self.active_jobs.keys(): + active_job = self.active_jobs.get(request.jobKey) + self.handle_job(active_job, TaskStatus.Failed, context) + return FailJobResponse() + else: + context.set_code(grpc.StatusCode.NOT_FOUND) + return FailJobResponse() + + def ThrowError(self, request, context): + if request.jobKey in self.active_jobs.keys(): + active_job = self.active_jobs.get(request.jobKey) + self.handle_job(active_job, TaskStatus.ErrorThrown, context) + return CompleteJobResponse() + else: + context.set_code(grpc.StatusCode.NOT_FOUND) + return CompleteJobResponse() + + @staticmethod + def handle_job(job: TaskContext, status_on_deactivate: TaskStatus, context): + if job.status != TaskStatus.Running: + context.set_code(grpc.StatusCode.FAILED_PRECONDITION) + else: + job.status = status_on_deactivate + return context + + def CreateWorkflowInstance(self, request, context): + if request.bpmnProcessId in self.deployed_workflows.keys(): + for task in self.deployed_workflows[request.bpmnProcessId]['tasks']: + task_context = random_task_context(task) + self.active_jobs[task_context.key] = task_context + return CreateWorkflowInstanceResponse(workflowKey=randint(0, RANDOM_RANGE), + bpmnProcessId=request.bpmnProcessId, + version=request.version, workflowInstanceKey=randint(0, RANDOM_RANGE)) + else: + context.set_code(grpc.StatusCode.NOT_FOUND) + return CreateWorkflowInstanceResponse() + + def CreateWorkflowInstanceWithResult(self, request, context): + if request.request.bpmnProcessId in self.deployed_workflows.keys(): + return CreateWorkflowInstanceWithResultResponse(workflowKey=request.request.workflowKey, + bpmnProcessId=request.request.bpmnProcessId, + version=randint(0, 10), variables=request.request.variables) + else: + context.set_code(grpc.StatusCode.NOT_FOUND) + return CreateWorkflowInstanceWithResultResponse() + + def CancelWorkflowInstance(self, request, context): + return CancelWorkflowInstanceResponse() + + def DeployWorkflow(self, request, context): + workflows = [] + for workflow in request.workflows: + workflow_metadata = WorkflowMetadata(bpmnProcessId=str(uuid4()), version=randint(0, 10), + workflowKey=randint(0, RANDOM_RANGE), resourceName=workflow.name) + workflows.append(workflow_metadata) + + return DeployWorkflowResponse(key=randint(0, RANDOM_RANGE), workflows=workflows) + + def PublishMessage(self, request, context): + return PublishMessageResponse() + + def mock_deploy_workflow(self, bpmn_process_id: str, version: int, tasks: List[Task]): + self.deployed_workflows[bpmn_process_id] = {'bpmn_process_id': bpmn_process_id, 'version': version, + 'tasks': tasks} diff --git a/pyzeebe/common/random_utils.py b/pyzeebe/common/random_utils.py index 7c48e4d3..f17c7e0f 100644 --- a/pyzeebe/common/random_utils.py +++ b/pyzeebe/common/random_utils.py @@ -4,7 +4,7 @@ from pyzeebe.task.task import Task from pyzeebe.task.task_context import TaskContext -RANDOM_RANGE = 100000 +RANDOM_RANGE = 1000000000 def random_task_context(task: Task = Task(task_type='test', task_handler=lambda x: {'x': x}, diff --git a/pyzeebe/grpc_internals/zeebe_adapter.py b/pyzeebe/grpc_internals/zeebe_adapter.py index 0b6f3a59..4f55cee5 100644 --- a/pyzeebe/grpc_internals/zeebe_adapter.py +++ b/pyzeebe/grpc_internals/zeebe_adapter.py @@ -5,22 +5,23 @@ import grpc +from pyzeebe.common.exceptions import * from pyzeebe.grpc_internals.zeebe_pb2 import * from pyzeebe.grpc_internals.zeebe_pb2_grpc import GatewayStub from pyzeebe.task.task_context import TaskContext -class ZeebeAdapter: +class ZeebeAdapter(object): def __init__(self, hostname: str = None, port: int = None, channel: grpc.Channel = None, **kwargs): - self._connection_uri = f'{hostname}:{port}' or os.getenv('ZEEBE_ADDRESS') or 'localhost:26500' if channel: self._channel = channel + self.connection_uri = None else: if hostname or port: - self._connection_uri = f'{hostname or "localhost"}:{port or 26500}' + self.connection_uri = f'{hostname or "localhost"}:{port or 26500}' else: - self._connection_uri = os.getenv('ZEEBE_ADDRESS') or 'localhost:26500' - self._channel = grpc.insecure_channel(self._connection_uri) + self.connection_uri = os.getenv('ZEEBE_ADDRESS', 'localhost:26500') + self._channel = grpc.insecure_channel(self.connection_uri) self.connected = False self.retrying_connection = True @@ -41,18 +42,24 @@ def _check_connectivity(self, value: grpc.ChannelConnectivity) -> None: logging.error('Failed to establish connection to Zeebe. Non recoverable') self.connected = False self.retrying_connection = False - raise ConnectionAbortedError(f'Lost connection to {self._connection_uri}') + raise ConnectionAbortedError(f'Lost connection to {self.connection_uri}') def activate_jobs(self, task_type: str, worker: str, timeout: int, max_jobs_to_activate: int, variables_to_fetch: List[str], request_timeout: int) -> Generator[TaskContext, None, None]: - for response in self.gateway_stub.ActivateJobs( - ActivateJobsRequest(type=task_type, worker=worker, timeout=timeout, - maxJobsToActivate=max_jobs_to_activate, - fetchVariable=variables_to_fetch, requestTimeout=request_timeout)): - for job in response.jobs: - context = self._create_task_context_from_job(job) - logging.debug(f'Got job: {context} from zeebe') - yield context + try: + for response in self.gateway_stub.ActivateJobs( + ActivateJobsRequest(type=task_type, worker=worker, timeout=timeout, + maxJobsToActivate=max_jobs_to_activate, + fetchVariable=variables_to_fetch, requestTimeout=request_timeout)): + for job in response.jobs: + context = self._create_task_context_from_job(job) + logging.debug(f'Got job: {context} from zeebe') + yield context + except grpc.RpcError as rpc_error: + if self.is_error_status(rpc_error, grpc.StatusCode.INVALID_ARGUMENT): + raise ActivateJobsRequestInvalid(task_type, worker, timeout, max_jobs_to_activate) + else: + self._common_zeebe_grpc_errors(rpc_error) @staticmethod def _create_task_context_from_job(job) -> TaskContext: @@ -70,41 +77,122 @@ def _create_task_context_from_job(job) -> TaskContext: variables=json.loads(job.variables)) def complete_job(self, job_key: int, variables: Dict) -> CompleteJobResponse: - return self.gateway_stub.CompleteJob(CompleteJobRequest(jobKey=job_key, variables=json.dumps(variables))) + try: + return self.gateway_stub.CompleteJob(CompleteJobRequest(jobKey=job_key, variables=json.dumps(variables))) + except grpc.RpcError as rpc_error: + if self.is_error_status(rpc_error, grpc.StatusCode.NOT_FOUND): + raise JobNotFound(job_key=job_key) + elif self.is_error_status(rpc_error, grpc.StatusCode.FAILED_PRECONDITION): + raise JobAlreadyDeactivated(job_key=job_key) + else: + self._common_zeebe_grpc_errors(rpc_error) def fail_job(self, job_key: int, message: str) -> FailJobResponse: - return self.gateway_stub.FailJob(FailJobRequest(jobKey=job_key, errorMessage=message)) + try: + return self.gateway_stub.FailJob(FailJobRequest(jobKey=job_key, errorMessage=message)) + except grpc.RpcError as rpc_error: + if self.is_error_status(rpc_error, grpc.StatusCode.NOT_FOUND): + raise JobNotFound(job_key=job_key) + elif self.is_error_status(rpc_error, grpc.StatusCode.FAILED_PRECONDITION): + raise JobAlreadyDeactivated(job_key=job_key) + else: + self._common_zeebe_grpc_errors(rpc_error) def throw_error(self, job_key: int, message: str) -> ThrowErrorResponse: - return self.gateway_stub.ThrowError( - ThrowErrorRequest(jobKey=job_key, errorMessage=message)) + try: + return self.gateway_stub.ThrowError( + ThrowErrorRequest(jobKey=job_key, errorMessage=message)) + except grpc.RpcError as rpc_error: + if self.is_error_status(rpc_error, grpc.StatusCode.NOT_FOUND): + raise JobNotFound(job_key=job_key) + elif self.is_error_status(rpc_error, grpc.StatusCode.FAILED_PRECONDITION): + raise JobAlreadyDeactivated(job_key=job_key) + else: + self._common_zeebe_grpc_errors(rpc_error) - def create_workflow_instance(self, bpmn_process_id: str, version: int, variables: Dict) -> str: - response = self.gateway_stub.CreateWorkflowInstance( - CreateWorkflowInstanceRequest(bpmnProcessId=bpmn_process_id, version=version, - variables=json.dumps(variables))) - return response.workflowInstanceKey + def create_workflow_instance(self, bpmn_process_id: str, version: int, variables: Dict) -> int: + try: + response = self.gateway_stub.CreateWorkflowInstance( + CreateWorkflowInstanceRequest(bpmnProcessId=bpmn_process_id, version=version, + variables=json.dumps(variables))) + return response.workflowInstanceKey + except grpc.RpcError as rpc_error: + self._create_workflow_errors(rpc_error, bpmn_process_id, version, variables) def create_workflow_instance_with_result(self, bpmn_process_id: str, version: int, variables: Dict, timeout: int, variables_to_fetch) -> Dict: - response = self.gateway_stub.CreateWorkflowInstanceWithResult( - CreateWorkflowInstanceWithResultRequest( - request=CreateWorkflowInstanceRequest(bpmnProcessId=bpmn_process_id, version=version, - variables=json.dumps(variables)), - requestTimeout=timeout, fetchVariables=variables_to_fetch)) - return json.loads(response.variables) + try: + response = self.gateway_stub.CreateWorkflowInstanceWithResult( + CreateWorkflowInstanceWithResultRequest( + request=CreateWorkflowInstanceRequest(bpmnProcessId=bpmn_process_id, version=version, + variables=json.dumps(variables)), + requestTimeout=timeout, fetchVariables=variables_to_fetch)) + return json.loads(response.variables) + except grpc.RpcError as rpc_error: + self._create_workflow_errors(rpc_error, bpmn_process_id, version, variables) + + def _create_workflow_errors(self, rpc_error: grpc.RpcError, bpmn_process_id: str, version: int, + variables: Dict) -> None: + if self.is_error_status(rpc_error, grpc.StatusCode.NOT_FOUND): + raise WorkflowNotFound(bpmn_process_id=bpmn_process_id, version=version) + elif self.is_error_status(rpc_error, grpc.StatusCode.INVALID_ARGUMENT): + raise InvalidJSON( + f"Cannot start workflow: {bpmn_process_id} with version {version}. Variables: {variables}") + elif self.is_error_status(rpc_error, grpc.StatusCode.FAILED_PRECONDITION): + raise WorkflowHasNoStartEvent(bpmn_process_id=bpmn_process_id) + else: + self._common_zeebe_grpc_errors(rpc_error) + + def cancel_workflow_instance(self, workflow_instance_key: int) -> None: + try: + self.gateway_stub.CancelWorkflowInstance( + CancelWorkflowInstanceRequest(workflowInstanceKey=workflow_instance_key)) + except grpc.RpcError as rpc_error: + if self.is_error_status(rpc_error, grpc.StatusCode.NOT_FOUND): + raise WorkflowInstanceNotFound(workflow_instance_key=workflow_instance_key) + else: + self._common_zeebe_grpc_errors(rpc_error) def publish_message(self, name: str, correlation_key: str, time_to_live_in_milliseconds: int, variables: Dict) -> PublishMessageResponse: - return self.gateway_stub.PublishMessage( - PublishMessageRequest(name=name, correlationKey=correlation_key, timeToLive=time_to_live_in_milliseconds, - variables=json.dumps(variables))) + try: + return self.gateway_stub.PublishMessage( + PublishMessageRequest(name=name, correlationKey=correlation_key, + timeToLive=time_to_live_in_milliseconds, + variables=json.dumps(variables))) + except grpc.RpcError as rpc_error: + if self.is_error_status(rpc_error, grpc.StatusCode.ALREADY_EXISTS): + raise MessageAlreadyExists() + else: + self._common_zeebe_grpc_errors(rpc_error) def deploy_workflow(self, *workflow_file_path: str) -> DeployWorkflowResponse: - return self.gateway_stub.DeployWorkflow( - DeployWorkflowRequest(workflows=map(self._get_workflow_request_object, workflow_file_path))) + + try: + return self.gateway_stub.DeployWorkflow( + DeployWorkflowRequest(workflows=map(self._get_workflow_request_object, workflow_file_path))) + except grpc.RpcError as rpc_error: + if self.is_error_status(rpc_error, grpc.StatusCode.INVALID_ARGUMENT): + raise WorkflowInvalid() + else: + self._common_zeebe_grpc_errors(rpc_error) @staticmethod def _get_workflow_request_object(workflow_file_path: str) -> WorkflowRequestObject: - return WorkflowRequestObject(name=os.path.split(workflow_file_path)[-1], - definition=open(workflow_file_path).read()) + with open(workflow_file_path, 'rb') as file: + return WorkflowRequestObject(name=os.path.split(workflow_file_path)[-1], + definition=file.read()) + + def _common_zeebe_grpc_errors(self, rpc_error: grpc.RpcError): + if self.is_error_status(rpc_error, grpc.StatusCode.RESOURCE_EXHAUSTED): + raise ZeebeBackPressure() + elif self.is_error_status(rpc_error, grpc.StatusCode.UNAVAILABLE): + raise ZeebeGatewayUnavailable() + elif self.is_error_status(rpc_error, grpc.StatusCode.INTERNAL): + raise ZeebeInternalError() + else: + raise rpc_error + + @staticmethod + def is_error_status(rpc_error: grpc.RpcError, status_code: grpc.StatusCode): + return rpc_error._state.code == status_code diff --git a/pyzeebe/grpc_internals/zeebe_adapter_test.py b/pyzeebe/grpc_internals/zeebe_adapter_test.py index 8550daa0..7c20ef41 100644 --- a/pyzeebe/grpc_internals/zeebe_adapter_test.py +++ b/pyzeebe/grpc_internals/zeebe_adapter_test.py @@ -1,3 +1,4 @@ +from io import BytesIO from random import randint from unittest.mock import patch from uuid import uuid4 @@ -5,61 +6,17 @@ import grpc import pytest -from pyzeebe.common.random_utils import RANDOM_RANGE +from pyzeebe.common.exceptions import * +from pyzeebe.common.gateway_mock import GatewayMock +from pyzeebe.common.random_utils import RANDOM_RANGE, random_task_context from pyzeebe.grpc_internals.zeebe_adapter import ZeebeAdapter from pyzeebe.grpc_internals.zeebe_pb2 import * -from pyzeebe.grpc_internals.zeebe_pb2_grpc import GatewayServicer +from pyzeebe.task.task import Task +from pyzeebe.task.task_context import TaskContext zeebe_adapter: ZeebeAdapter -@patch('grpc.insecure_channel') -def mock_channel(): - pass - - -class TestGatewayServicer(GatewayServicer): - """ - def ActivateJobs(self, request, context): - return ActivateJobsResponse(jobs=[ActivatedJob()]) - """ - - def __init__(self): - self.workflows = {} - - def CompleteJob(self, request, context): - return CompleteJobResponse() - - def FailJob(self, request, context): - return FailJobResponse() - - def ThrowError(self, request, context): - return ThrowErrorResponse() - - def CreateWorkflowInstance(self, request, context): - return CreateWorkflowInstanceResponse(workflowKey=randint(0, RANDOM_RANGE), - bpmnProcessId=request.bpmnProcessId, - version=request.version, workflowInstanceKey=randint(0, RANDOM_RANGE)) - - def CreateWorkflowInstanceWithResult(self, request, context): - return CreateWorkflowInstanceWithResultResponse(workflowKey=request.request.workflowKey, - bpmnProcessId=request.request.bpmnProcessId, - version=randint(0, 10), variables=request.request.variables) - - def DeployWorkflow(self, request, context): - workflows = [] - for workflow in request.workflows: - workflow_metadata = WorkflowMetadata(bpmnProcessId=str(uuid4()), version=randint(0, 10), - workflowKey=randint(0, RANDOM_RANGE), resourceName=workflow.name) - workflows.append(workflow_metadata) - self.workflows[workflow_metadata.bpmnProcessId] = workflow_metadata - - return DeployWorkflowResponse(key=randint(0, RANDOM_RANGE), workflows=workflows) - - def PublishMessage(self, request, context): - return PublishMessageResponse() - - @pytest.fixture(scope='module') def grpc_add_to_server(): from pyzeebe.grpc_internals.zeebe_pb2_grpc import add_GatewayServicer_to_server @@ -68,7 +25,7 @@ def grpc_add_to_server(): @pytest.fixture(scope='module') def grpc_servicer(): - return TestGatewayServicer() + return GatewayMock() @pytest.fixture(scope='module') @@ -114,31 +71,138 @@ def test_connectivity_shutdown(): zeebe_adapter._check_connectivity(grpc.ChannelConnectivity.SHUTDOWN) -def test_complete_job(): - response = zeebe_adapter.complete_job(job_key=randint(0, RANDOM_RANGE), variables={}) +def test_only_port(): + port = randint(0, 10000) + zeebe_adapter = ZeebeAdapter(port=port) + assert zeebe_adapter.connection_uri == f'localhost:{port}' + + +def test_only_host(): + hostname = str(uuid4()) + zeebe_adapter = ZeebeAdapter(hostname=hostname) + assert zeebe_adapter.connection_uri == f'{hostname}:26500' + + +def test_host_and_port(): + hostname = str(uuid4()) + port = randint(0, 10000) + zeebe_adapter = ZeebeAdapter(hostname=hostname, port=port) + assert zeebe_adapter.connection_uri == f'{hostname}:{port}' + + +def test_activate_jobs(grpc_servicer): + task_type = create_random_task_and_activate(grpc_servicer) + active_jobs_count = randint(4, 100) + counter = 0 + for i in range(0, active_jobs_count): + create_random_task_and_activate(grpc_servicer, task_type) + + for job in zeebe_adapter.activate_jobs(task_type=task_type, worker=str(uuid4()), timeout=randint(10, 100), + request_timeout=100, max_jobs_to_activate=1, variables_to_fetch=[]): + counter += 1 + assert isinstance(job, TaskContext) + assert counter == active_jobs_count + 1 + + +def test_activate_jobs_invalid_worker(): + with pytest.raises(ActivateJobsRequestInvalid): + next(zeebe_adapter.activate_jobs(task_type=str(uuid4()), worker=None, timeout=randint(10, 100), + request_timeout=100, + max_jobs_to_activate=1, variables_to_fetch=[])) + + +def test_activate_jobs_invalid_job_timeout(): + with pytest.raises(ActivateJobsRequestInvalid): + next(zeebe_adapter.activate_jobs(task_type=str(uuid4()), worker=str(uuid4()), timeout=0, + request_timeout=100, max_jobs_to_activate=1, variables_to_fetch=[])) + + +def test_activate_jobs_invalid_task_type(): + with pytest.raises(ActivateJobsRequestInvalid): + next(zeebe_adapter.activate_jobs(task_type=None, worker=str(uuid4()), timeout=randint(10, 100), + request_timeout=100, max_jobs_to_activate=1, variables_to_fetch=[])) + + +def test_activate_jobs_invalid_max_jobs(): + with pytest.raises(ActivateJobsRequestInvalid): + next(zeebe_adapter.activate_jobs(task_type=str(uuid4()), worker=str(uuid4()), timeout=randint(10, 100), + request_timeout=100, max_jobs_to_activate=0, variables_to_fetch=[])) + + +def test_complete_job(grpc_servicer): + task_type = create_random_task_and_activate(grpc_servicer) + job = get_first_active_job(task_type) + response = zeebe_adapter.complete_job(job_key=job.key, variables={}) assert isinstance(response, CompleteJobResponse) -def test_fail_job(): - response = zeebe_adapter.fail_job(job_key=randint(0, RANDOM_RANGE), message=str(uuid4())) +def test_complete_job_not_found(grpc_servicer): + with pytest.raises(JobNotFound): + zeebe_adapter.complete_job(job_key=randint(0, RANDOM_RANGE), variables={}) + + +def test_complete_job_already_completed(grpc_servicer): + task_type = create_random_task_and_activate(grpc_servicer) + job = get_first_active_job(task_type) + zeebe_adapter.complete_job(job_key=job.key, variables={}) + with pytest.raises(JobAlreadyDeactivated): + zeebe_adapter.complete_job(job_key=job.key, variables={}) + + +def test_fail_job(grpc_servicer): + task_type = create_random_task_and_activate(grpc_servicer) + job = get_first_active_job(task_type) + response = zeebe_adapter.fail_job(job_key=job.key, message=str(uuid4())) assert isinstance(response, FailJobResponse) -def test_throw_error(): - response = zeebe_adapter.throw_error(job_key=randint(0, RANDOM_RANGE), message=str(uuid4())) +def test_fail_job_not_found(): + with pytest.raises(JobNotFound): + zeebe_adapter.fail_job(job_key=randint(0, RANDOM_RANGE), message=str(uuid4())) + + +def test_fail_job_already_failed(grpc_servicer): + task_type = create_random_task_and_activate(grpc_servicer) + job = get_first_active_job(task_type) + zeebe_adapter.fail_job(job_key=job.key, message=str(uuid4())) + with pytest.raises(JobAlreadyDeactivated): + zeebe_adapter.fail_job(job_key=job.key, message=str(uuid4())) + + +def test_throw_error(grpc_servicer): + task_type = create_random_task_and_activate(grpc_servicer) + job = get_first_active_job(task_type) + response = zeebe_adapter.throw_error(job_key=job.key, message=str(uuid4())) assert isinstance(response, ThrowErrorResponse) -def test_create_workflow_instance(): - response = zeebe_adapter.create_workflow_instance(bpmn_process_id=str(uuid4()), variables={}, - version=randint(0, 10)) +def test_throw_error_job_not_found(): + with pytest.raises(JobNotFound): + zeebe_adapter.throw_error(job_key=randint(0, RANDOM_RANGE), message=str(uuid4())) + + +def test_throw_error_already_thrown(grpc_servicer): + task_type = create_random_task_and_activate(grpc_servicer) + job = get_first_active_job(task_type) + zeebe_adapter.throw_error(job_key=job.key, message=str(uuid4())) + with pytest.raises(JobAlreadyDeactivated): + zeebe_adapter.throw_error(job_key=job.key, message=str(uuid4())) + + +def test_create_workflow_instance(grpc_servicer): + bpmn_process_id = str(uuid4()) + version = randint(0, 10) + grpc_servicer.mock_deploy_workflow(bpmn_process_id, version, []) + response = zeebe_adapter.create_workflow_instance(bpmn_process_id=bpmn_process_id, variables={}, version=version) assert isinstance(response, int) -def test_create_workflow_instance_with_result(): - response = zeebe_adapter.create_workflow_instance_with_result(bpmn_process_id=str(uuid4()), variables={}, - version=randint(0, 10), timeout=0, - variables_to_fetch=[]) +def test_create_workflow_instance_with_result(grpc_servicer): + bpmn_process_id = str(uuid4()) + version = randint(0, 10) + grpc_servicer.mock_deploy_workflow(bpmn_process_id, version, []) + response = zeebe_adapter.create_workflow_instance_with_result(bpmn_process_id=bpmn_process_id, variables={}, + version=version, timeout=0, variables_to_fetch=[]) assert isinstance(response, dict) @@ -146,3 +210,27 @@ def test_publish_message(): response = zeebe_adapter.publish_message(name=str(uuid4()), variables={}, correlation_key=str(uuid4()), time_to_live_in_milliseconds=randint(0, RANDOM_RANGE)) assert isinstance(response, PublishMessageResponse) + + +def create_random_task_and_activate(grpc_servicer, task_type: str = None) -> str: + if task_type: + mock_task_type = task_type + else: + mock_task_type = str(uuid4()) + task = Task(task_type=mock_task_type, task_handler=lambda x: x, exception_handler=lambda x: x) + task_context = random_task_context(task) + grpc_servicer.active_jobs[task_context.key] = task_context + return mock_task_type + + +def get_first_active_job(task_type) -> TaskContext: + return next(zeebe_adapter.activate_jobs(task_type=task_type, max_jobs_to_activate=1, request_timeout=10, + timeout=100, variables_to_fetch=[], worker=str(uuid4()))) + + +def test_get_workflow_request_object(): + with patch('builtins.open') as mock_open: + mock_open.return_value = BytesIO() + file_path = str(uuid4()) + zeebe_adapter._get_workflow_request_object(file_path) + mock_open.assert_called_with(file_path, 'rb') diff --git a/pyzeebe/task/task_context.py b/pyzeebe/task/task_context.py index 748451e3..4c88e972 100644 --- a/pyzeebe/task/task_context.py +++ b/pyzeebe/task/task_context.py @@ -1,10 +1,13 @@ from typing import Dict +from pyzeebe.task.task_status import TaskStatus -class TaskContext: + +class TaskContext(object): def __init__(self, key: int, _type: str, workflow_instance_key: int, bpmn_process_id: str, workflow_definition_version: int, workflow_key: int, element_id: str, element_instance_key: int, - custom_headers: Dict, worker: str, retries: int, deadline: int, variables: Dict): + custom_headers: Dict, worker: str, retries: int, deadline: int, variables: Dict, + status: TaskStatus = TaskStatus.Running): self.key = key self.type = _type self.workflow_instance_key = workflow_instance_key @@ -18,6 +21,7 @@ def __init__(self, key: int, _type: str, workflow_instance_key: int, bpmn_proces self.retries = retries self.deadline = deadline self.variables = variables + self.status = status def __repr__(self): return str({'jobKey': self.key, 'taskType': self.type, 'workflowInstanceKey': self.workflow_instance_key, diff --git a/pyzeebe/task/task_status.py b/pyzeebe/task/task_status.py new file mode 100644 index 00000000..0655b701 --- /dev/null +++ b/pyzeebe/task/task_status.py @@ -0,0 +1,8 @@ +from enum import Enum + + +class TaskStatus(Enum): + Running = 'Running' + Completed = 'Completed' + Failed = 'Failed' + ErrorThrown = 'ErrorThrown' diff --git a/pyzeebe/task/task_status_controller.py b/pyzeebe/task/task_status_controller.py index 2c8b7a0a..59da34f4 100644 --- a/pyzeebe/task/task_status_controller.py +++ b/pyzeebe/task/task_status_controller.py @@ -2,7 +2,7 @@ from pyzeebe.task.task_context import TaskContext -class TaskStatusController: +class TaskStatusController(object): def __init__(self, context: TaskContext, zeebe_adapter: ZeebeAdapter): self.zeebe_adapter = zeebe_adapter self.context = context diff --git a/pyzeebe/worker/worker.py b/pyzeebe/worker/worker.py index 2d15c9b4..70a89017 100644 --- a/pyzeebe/worker/worker.py +++ b/pyzeebe/worker/worker.py @@ -1,9 +1,9 @@ import logging import socket -from concurrent.futures import ThreadPoolExecutor +from threading import Thread, Event from typing import List, Callable, Generator, Tuple -from pyzeebe.common.exceptions import TaskNotFoundException +from pyzeebe.common.exceptions import TaskNotFound from pyzeebe.decorators.zeebe_decorator_base import ZeebeDecoratorBase from pyzeebe.grpc_internals.zeebe_adapter import ZeebeAdapter from pyzeebe.task.task import Task @@ -31,27 +31,28 @@ def __init__(self, name: str = None, request_timeout: int = 0, hostname: str = N self.name = name or socket.gethostname() self.request_timeout = request_timeout self.tasks = [] + self._task_threads: List[Thread] = [] - def work(self): - with ThreadPoolExecutor(thread_name_prefix='zeebe-task') as executor: - for task in self.tasks: - executor.submit(self._handle_task, task) + def work(self, stop_event: Event = None): + for task in self.tasks: + task_thread = Thread(target=self._handle_task, args=(task, stop_event)) + self._task_threads.append(task_thread) + task_thread.start() - def _handle_task(self, task: Task): + def _handle_task(self, task: Task, stop_event: Event): logging.debug(f'Handling task {task}') - while self.zeebe_adapter.connected or self.zeebe_adapter.retrying_connection: + while not stop_event.is_set() and self.zeebe_adapter.connected or self.zeebe_adapter.retrying_connection: if self.zeebe_adapter.retrying_connection: - logging.debug(f'Retrying connection to {self.zeebe_adapter._connection_uri}') + logging.debug(f'Retrying connection to {self.zeebe_adapter.connection_uri}') continue self._handle_task_contexts(task) def _handle_task_contexts(self, task: Task): - executor = ThreadPoolExecutor(thread_name_prefix=f'zeebe-job-{task.type}') for task_context in self._get_task_contexts(task): + thread = Thread(target=task.handler, args=(task_context,)) logging.debug(f'Running job: {task_context}') - executor.submit(task.handler, task_context) - executor.shutdown(wait=False) + thread.start() def _get_task_contexts(self, task: Task) -> Generator[TaskContext, None, None]: logging.debug(f'Activating jobs for task: {task}') @@ -116,4 +117,4 @@ def _get_task_and_index(self, task_type: str) -> Tuple[Task, int]: for index, task in enumerate(self.tasks): if task.type == task_type: return task, index - raise TaskNotFoundException(f"Could not find task {task_type}") + raise TaskNotFound(f"Could not find task {task_type}") diff --git a/pyzeebe/worker/worker_test.py b/pyzeebe/worker/worker_test.py index 0ad19e44..9de282d4 100644 --- a/pyzeebe/worker/worker_test.py +++ b/pyzeebe/worker/worker_test.py @@ -1,10 +1,12 @@ from random import randint +from threading import Event from unittest.mock import patch from uuid import uuid4 import pytest -from pyzeebe.common.exceptions import TaskNotFoundException +from pyzeebe.common.exceptions import TaskNotFound +from pyzeebe.common.gateway_mock import GatewayMock from pyzeebe.common.random_utils import random_task_context from pyzeebe.task.task import Task from pyzeebe.task.task_context import TaskContext @@ -18,6 +20,23 @@ def decorator(context: TaskContext) -> TaskContext: return context +@pytest.fixture(scope='module') +def grpc_add_to_server(): + from pyzeebe.grpc_internals.zeebe_pb2_grpc import add_GatewayServicer_to_server + return add_GatewayServicer_to_server + + +@pytest.fixture(scope='module') +def grpc_servicer(): + return GatewayMock() + + +@pytest.fixture(scope='module') +def grpc_stub_cls(grpc_channel): + from pyzeebe.grpc_internals.zeebe_pb2_grpc import GatewayStub + return GatewayStub + + @pytest.fixture(autouse=True) def run_around_tests(): global zeebe_worker, task @@ -110,12 +129,12 @@ def test_remove_task_from_many(): def test_remove_fake_task(): - with pytest.raises(TaskNotFoundException): + with pytest.raises(TaskNotFound): zeebe_worker.remove_task(str(uuid4())) def test_get_fake_task(): - with pytest.raises(TaskNotFoundException): + with pytest.raises(TaskNotFound): zeebe_worker.get_task(str(uuid4())) @@ -206,3 +225,9 @@ def test_handle_many_jobs(): task_handler_mock.return_value = {'x': str(uuid4())} zeebe_worker._handle_task_contexts(task) task_handler_mock.assert_called_with(context) + + +def test_stop_worker(): + stop_event = Event() + zeebe_worker.work(stop_event=stop_event) + stop_event.set() diff --git a/setup.py b/setup.py index 68a99f1c..36076fb9 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setuptools.setup( name="pyzeebe", - version="1.0.1", + version="1.1.0", author="Jonatan Martens", author_email="jonatanmartenstav@gmail.com", description="Zeebe client api", diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/integration_test.py b/tests/integration_test.py new file mode 100644 index 00000000..9c64f17e --- /dev/null +++ b/tests/integration_test.py @@ -0,0 +1,70 @@ +import os +from threading import Thread, Event +from typing import Dict +from uuid import uuid4 + +import pytest + +from pyzeebe import Task, ZeebeWorker, ZeebeClient, exceptions, TaskContext, TaskStatusController + + +def task_handler(should_throw: bool, input: str) -> Dict: + if should_throw: + raise Exception('Error thrown') + else: + return {'output': input + str(uuid4())} + + +def exception_handler(exc: Exception, context: TaskContext, controller: TaskStatusController) -> None: + controller.error(f'Failed to run task {context.type}. Reason: {exc}') + + +task = Task('test', task_handler, exception_handler) + +zeebe_client: ZeebeClient + + +def run_worker(stop_event): + zeebe_worker = ZeebeWorker() + zeebe_worker.add_task(task) + zeebe_worker.work(stop_event) + + +@pytest.fixture(scope='module', autouse=True) +def setup(): + global zeebe_client + + stop_event = Event() + t = Thread(target=run_worker, args=(stop_event,)) + t.start() + + zeebe_client = ZeebeClient() + try: + zeebe_client.deploy_workflow(os.path.join('tests', 'test.bpmn')) + except FileNotFoundError: + zeebe_client.deploy_workflow('test.bpmn') + + yield zeebe_client + stop_event.set() + + +def test_run_workflow(): + workflow_key = zeebe_client.run_workflow('test', {'input': str(uuid4()), 'should_throw': False}) + assert isinstance(workflow_key, int) + + +def test_non_existent_workflow(): + with pytest.raises(exceptions.WorkflowNotFound): + zeebe_client.run_workflow(str(uuid4())) + + +def test_run_workflow_with_result(): + input = str(uuid4()) + output = zeebe_client.run_workflow_with_result('test', {'input': input, 'should_throw': False}) + assert isinstance(output['output'], str) + assert output['output'].startswith(input) + + +def test_cancel_workflow(): + workflow_key = zeebe_client.run_workflow('test', {'input': str(uuid4()), 'should_throw': False}) + zeebe_client.cancel_workflow_instance(workflow_key) diff --git a/tests/test.bpmn b/tests/test.bpmn new file mode 100644 index 00000000..e6106603 --- /dev/null +++ b/tests/test.bpmn @@ -0,0 +1,46 @@ + + + + + Flow_1o209vx + + + + + + + + + + + + Flow_1o209vx + Flow_00xikey + + + Flow_00xikey + + + + + + + + + + + + + + + + + + + + + + + + +