Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: add TopologyRequest
Browse files Browse the repository at this point in the history
dimastbk committed Jan 13, 2025

Verified

This commit was signed with the committer’s verified signature. The key has expired.
patrick-ogrady Patrick O'Grady
1 parent a916a5f commit 6e16473
Showing 11 changed files with 173 additions and 11 deletions.
17 changes: 17 additions & 0 deletions pyzeebe/client/client.py
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@
CreateProcessInstanceWithResultResponse,
DeployResourceResponse,
PublishMessageResponse,
TopologyResponse,
)
from pyzeebe.grpc_internals.zeebe_adapter import ZeebeAdapter
from pyzeebe.types import Variables
@@ -220,3 +221,19 @@ async def publish_message(
message_id=message_id,
tenant_id=tenant_id,
)

async def topology(self) -> TopologyResponse:
"""
Obtains the current topology of the cluster the gateway is part of.
Returns:
TopologyResponse: response from Zeebe.
Raises:
ZeebeBackPressureError: If Zeebe is currently in back pressure (too many requests)
ZeebeGatewayUnavailableError: If the Zeebe gateway is unavailable
ZeebeInternalError: If Zeebe experiences an internal error
UnknownGrpcStatusCodeError: If Zeebe returns an unexpected status code
"""
return await self.zeebe_adapter.topology()
27 changes: 18 additions & 9 deletions pyzeebe/client/sync_client.py
Original file line number Diff line number Diff line change
@@ -2,7 +2,6 @@

import asyncio
import os
from functools import partial, wraps

import grpc

@@ -14,18 +13,16 @@
CreateProcessInstanceWithResultResponse,
DeployResourceResponse,
PublishMessageResponse,
TopologyResponse,
)
from pyzeebe.types import Variables

copy_docstring = partial(wraps, assigned=["__doc__"], updated=[])


class SyncZeebeClient:
def __init__(self, grpc_channel: grpc.aio.Channel, max_connection_retries: int = 10) -> None:
self.loop = asyncio.get_event_loop()
self.client = ZeebeClient(grpc_channel, max_connection_retries)

@copy_docstring(ZeebeClient.run_process)
def run_process(
self,
bpmn_process_id: str,
@@ -35,7 +32,8 @@ def run_process(
) -> CreateProcessInstanceResponse:
return self.loop.run_until_complete(self.client.run_process(bpmn_process_id, variables, version, tenant_id))

@copy_docstring(ZeebeClient.run_process_with_result)
run_process.__doc__ = ZeebeClient.publish_message.__doc__

def run_process_with_result(
self,
bpmn_process_id: str,
@@ -51,17 +49,20 @@ def run_process_with_result(
)
)

@copy_docstring(ZeebeClient.cancel_process_instance)
run_process_with_result.__doc__ = ZeebeClient.publish_message.__doc__

def cancel_process_instance(self, process_instance_key: int) -> CancelProcessInstanceResponse:
return self.loop.run_until_complete(self.client.cancel_process_instance(process_instance_key))

@copy_docstring(ZeebeClient.deploy_resource)
cancel_process_instance.__doc__ = ZeebeClient.cancel_process_instance.__doc__

def deploy_resource(
self, *resource_file_path: str | os.PathLike[str], tenant_id: str | None = None
) -> DeployResourceResponse:
return self.loop.run_until_complete(self.client.deploy_resource(*resource_file_path, tenant_id=tenant_id))

@copy_docstring(ZeebeClient.broadcast_signal)
deploy_resource.__doc__ = ZeebeClient.deploy_resource.__doc__

def broadcast_signal(
self,
signal_name: str,
@@ -76,7 +77,8 @@ def broadcast_signal(
)
)

@copy_docstring(ZeebeClient.publish_message)
broadcast_signal.__doc__ = ZeebeClient.broadcast_signal.__doc__

def publish_message(
self,
name: str,
@@ -96,3 +98,10 @@ def publish_message(
tenant_id,
)
)

publish_message.__doc__ = ZeebeClient.publish_message.__doc__

def topology(self) -> TopologyResponse:
return self.loop.run_until_complete(self.client.topology())

topology.__doc__ = ZeebeClient.topology.__doc__
54 changes: 54 additions & 0 deletions pyzeebe/grpc_internals/types.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import enum
from dataclasses import dataclass

from pyzeebe.types import Variables
@@ -163,3 +164,56 @@ class FailJobResponse:
@dataclass(frozen=True)
class ThrowErrorResponse:
pass


@dataclass(frozen=True)
class TopologyResponse:

@dataclass(frozen=True)
class BrokerInfo:

@dataclass(frozen=True)
class Partition:

class PartitionBrokerRole(enum.IntEnum):
"""Describes the Raft role of the broker for a given partition"""

LEADER = 0
FOLLOWER = 1
INACTIVE = 2

class PartitionBrokerHealth(enum.IntEnum):
"""Describes the current health of the partition"""

HEALTHY = 0
UNHEALTHY = 1
DEAD = 2

partition_id: int
"""the unique ID of this partition"""
role: PartitionBrokerRole
"""the role of the broker for this partition"""
health: PartitionBrokerHealth
"""the health of this partition"""

node_id: int
"""unique (within a cluster) node ID for the broker"""
host: str
"""hostname of the broker"""
port: int
"""port for the broker"""
partitions: list[Partition]
"""list of partitions managed or replicated on this broker"""
version: str
"""broker version"""

brokers: list[BrokerInfo]
"""list of brokers part of this cluster"""
cluster_size: int
"""how many nodes are in the cluster"""
partitions_count: int
"""how many partitions are spread across the cluster"""
replication_factor: int
"""configured replication factor for this cluster"""
gateway_version: str
"""gateway version"""
3 changes: 2 additions & 1 deletion pyzeebe/grpc_internals/zeebe_adapter.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from pyzeebe.grpc_internals.zeebe_cluster_adapter import ZeebeClusterAdapter
from pyzeebe.grpc_internals.zeebe_job_adapter import ZeebeJobAdapter
from pyzeebe.grpc_internals.zeebe_message_adapter import ZeebeMessageAdapter
from pyzeebe.grpc_internals.zeebe_process_adapter import ZeebeProcessAdapter


# Mixin class
class ZeebeAdapter(ZeebeProcessAdapter, ZeebeJobAdapter, ZeebeMessageAdapter):
class ZeebeAdapter(ZeebeClusterAdapter, ZeebeProcessAdapter, ZeebeJobAdapter, ZeebeMessageAdapter):
pass
41 changes: 41 additions & 0 deletions pyzeebe/grpc_internals/zeebe_cluster_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from __future__ import annotations

import grpc

from pyzeebe.grpc_internals.zeebe_adapter_base import ZeebeAdapterBase
from pyzeebe.proto.gateway_pb2 import TopologyRequest
from pyzeebe.proto.gateway_pb2 import TopologyResponse as TopologyResponseStub

from .types import TopologyResponse


class ZeebeClusterAdapter(ZeebeAdapterBase):
async def topology(self) -> TopologyResponse:
try:
response: TopologyResponseStub = await self._gateway_stub.Topology(TopologyRequest())
except grpc.aio.AioRpcError as grpc_error:
await self._handle_grpc_error(grpc_error)

return TopologyResponse(
brokers=[
TopologyResponse.BrokerInfo(
node_id=broker.nodeId,
host=broker.host,
port=broker.port,
partitions=[
TopologyResponse.BrokerInfo.Partition(
partition_id=partition.partitionId,
role=TopologyResponse.BrokerInfo.Partition.PartitionBrokerRole(partition.role),
health=TopologyResponse.BrokerInfo.Partition.PartitionBrokerHealth(partition.health),
)
for partition in broker.partitions
],
version=broker.version,
)
for broker in response.brokers
],
cluster_size=response.clusterSize,
partitions_count=response.partitionsCount,
replication_factor=response.replicationFactor,
gateway_version=response.gatewayVersion,
)
10 changes: 10 additions & 0 deletions tests/integration/topology_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import pytest

from pyzeebe import ZeebeClient


@pytest.mark.e2e
async def test_topology(zeebe_client: ZeebeClient):
topology = await zeebe_client.topology()

assert topology.cluster_size == 1
2 changes: 1 addition & 1 deletion tests/unit/channel/utils_test.py
Original file line number Diff line number Diff line change
@@ -176,7 +176,7 @@ class TestGetCamundaAddress:
def test_is_calculated_from_parameters_as_highest_priority(self):
result = get_camunda_address("cluster_id_param", "camunda_region_param")

assert result == f"cluster_id_param.camunda_region_param.zeebe.camunda.io:443"
assert result == "cluster_id_param.camunda_region_param.zeebe.camunda.io:443"

def test_raises_error_if_cluster_id_is_none(self):
with pytest.raises(SettingsError):
5 changes: 5 additions & 0 deletions tests/unit/client/client_test.py
Original file line number Diff line number Diff line change
@@ -92,3 +92,8 @@ async def test_broadcast_signal(zeebe_client: ZeebeClient):
@pytest.mark.asyncio
async def test_publish_message(zeebe_client: ZeebeClient):
await zeebe_client.publish_message(name=str(uuid4()), correlation_key=str(uuid4()))


@pytest.mark.asyncio
async def test_topology(zeebe_client: ZeebeClient):
await zeebe_client.topology()
9 changes: 9 additions & 0 deletions tests/unit/client/sync_client_test.py
Original file line number Diff line number Diff line change
@@ -103,3 +103,12 @@ def test_calls_publish_message_of_zeebe_client(self, sync_zeebe_client: SyncZeeb
sync_zeebe_client.publish_message(name, correlation_key)

sync_zeebe_client.client.publish_message.assert_called_once()


class TestTopology:
def test_calls_topology_of_zeebe_client(self, sync_zeebe_client: SyncZeebeClient):
sync_zeebe_client.client.topology = AsyncMock()

sync_zeebe_client.topology()

sync_zeebe_client.client.topology.assert_called_once()
12 changes: 12 additions & 0 deletions tests/unit/grpc_internals/zeebe_cluster_adapter_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import pytest

from pyzeebe.grpc_internals.types import TopologyResponse
from pyzeebe.grpc_internals.zeebe_adapter import ZeebeAdapter


@pytest.mark.asyncio
class TestTopology:
async def test_response_is_of_correct_type(self, zeebe_adapter: ZeebeAdapter):
response = await zeebe_adapter.topology()

assert isinstance(response, TopologyResponse)
4 changes: 4 additions & 0 deletions tests/unit/utils/gateway_mock.py
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@
FormMetadata,
ProcessMetadata,
PublishMessageResponse,
TopologyResponse,
)
from pyzeebe.proto.gateway_pb2_grpc import GatewayServicer
from pyzeebe.task.task import Task
@@ -218,3 +219,6 @@ def mock_deploy_process(self, bpmn_process_id: str, version: int, tasks: list[Ta
"version": version,
"tasks": tasks,
}

def Topology(self, request, context):
return TopologyResponse()

0 comments on commit 6e16473

Please sign in to comment.