diff --git a/temporalio/client.py b/temporalio/client.py index 7e5992f7..d54f7b8d 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -1108,6 +1108,46 @@ async def get_worker_task_reachability( ) ) + async def describe_task_queue( + self, + task_queue: str, + task_queue_types: Sequence[TaskQueueType] = [], + report_pollers: bool = False, + report_stats: bool = False, + rpc_metadata: Mapping[str, str] = {}, + rpc_timeout: Optional[timedelta] = None, + ) -> TaskQueueDescription: + """Describe task queue. + + .. note:: + This is only for unversioned workers. Worker versioning is not yet + supported for describing task queue. + + Args: + task_queue: Name of the task queue. Sticky queues are not supported. + task_queue_types: Task queue types to report info about. If not + present or empty, all types are considered. + report_pollers: Include list of pollers for requested task queue types. + report_stats: Include task queue stats for requested task queue types. + rpc_metadata: Headers used on each RPC call. Keys here override + client-level RPC metadata keys. + rpc_timeout: Optional RPC deadline to set for each RPC call. + """ + if not report_pollers and not report_stats: + raise ValueError( + "At least one of report_pollers or report_stats must be True" + ) + return await self._impl.describe_task_queue( + DescribeTaskQueueInput( + task_queue, + task_queue_types, + report_pollers, + report_stats, + rpc_metadata, + rpc_timeout, + ) + ) + class ClientConfig(TypedDict, total=False): """TypedDict of config originally passed to :py:meth:`Client`.""" @@ -4814,6 +4854,18 @@ class GetWorkerTaskReachabilityInput: rpc_timeout: Optional[timedelta] +@dataclass +class DescribeTaskQueueInput: + """Input for :py:meth:`OutboundInterceptor.describe_task_queue`.""" + + task_queue: str + task_queue_types: Sequence[TaskQueueType] + report_pollers: bool + report_stats: bool + rpc_metadata: Mapping[str, str] + rpc_timeout: Optional[timedelta] + + @dataclass class Interceptor: """Interceptor for clients. @@ -4983,6 +5035,14 @@ async def get_worker_task_reachability( """Called for every :py:meth:`Client.get_worker_task_reachability` call.""" return await self.next.get_worker_task_reachability(input) + ### Other + + async def describe_task_queue( + self, input: DescribeTaskQueueInput + ) -> TaskQueueDescription: + """Called for every :py:meth:`Client.describe_task_queue` call.""" + return await self.next.describe_task_queue(input) + class _ClientImpl(OutboundInterceptor): def __init__(self, client: Client) -> None: @@ -5726,6 +5786,27 @@ async def get_worker_task_reachability( ) return WorkerTaskReachability._from_proto(resp) + ### Other calls + + async def describe_task_queue( + self, input: DescribeTaskQueueInput + ) -> TaskQueueDescription: + req = temporalio.api.workflowservice.v1.DescribeTaskQueueRequest( + namespace=self._client.namespace, + task_queue=temporalio.api.taskqueue.v1.TaskQueue(name=input.task_queue), + api_mode=temporalio.api.enums.v1.DescribeTaskQueueMode.DESCRIBE_TASK_QUEUE_MODE_ENHANCED, + task_queue_types=[ + temporalio.api.enums.v1.TaskQueueType.ValueType(t) + for t in input.task_queue_types + ], + report_pollers=input.report_pollers, + report_stats=input.report_stats, + ) + resp = await self._client.workflow_service.describe_task_queue( + req, retry=True, metadata=input.rpc_metadata, timeout=input.rpc_timeout + ) + return TaskQueueDescription._from_proto(resp) + def _history_from_json( history: Union[str, Dict[str, Any]], @@ -6114,6 +6195,142 @@ def _to_proto(self) -> temporalio.api.enums.v1.TaskReachability.ValueType: ) +class TaskQueueType(IntEnum): + """Type of task queue. + + See :py:class:`temporalio.api.enums.v1.TaskQueueType`. + """ + + WORKFLOW = int(temporalio.api.enums.v1.TaskQueueType.TASK_QUEUE_TYPE_WORKFLOW) + ACTIVITY = int(temporalio.api.enums.v1.TaskQueueType.TASK_QUEUE_TYPE_ACTIVITY) + NEXUS = int(temporalio.api.enums.v1.TaskQueueType.TASK_QUEUE_TYPE_NEXUS) + + +@dataclass +class TaskQueueDescription: + """Description of a task queue.""" + + types: Mapping[TaskQueueType, TaskQueueTypeInfo] + """ + Task queue type information, keyed by task queue type. + + .. note:: + This is only for unversioned workers. Worker versioning is not yet + supported for task queue description. + """ + + @staticmethod + def _from_proto( + resp: temporalio.api.workflowservice.v1.DescribeTaskQueueResponse, + ) -> TaskQueueDescription: + return TaskQueueDescription( + types={ + TaskQueueType(type): TaskQueueTypeInfo._from_proto(info) + for type, info in resp.versions_info[""].types_info.items() + } + ) + + +@dataclass +class TaskQueueTypeInfo: + """Information for a specific task queue type.""" + + pollers: Sequence[TaskQueuePollerInfo] + """ + Information about recent pollers, or empty if not requested or none + recently. + """ + + stats: Optional[TaskQueueStats] + """Task queue stats, or none if not requested.""" + + @staticmethod + def _from_proto( + info: temporalio.api.taskqueue.v1.TaskQueueTypeInfo, + ) -> TaskQueueTypeInfo: + return TaskQueueTypeInfo( + pollers=[ + TaskQueuePollerInfo._from_proto(poller_info) + for poller_info in info.pollers + ], + stats=TaskQueueStats._from_proto(info.stats) + if info.HasField("stats") + else None, + ) + + +@dataclass +class TaskQueuePollerInfo: + """Information for a specific task queue poller.""" + + last_access_time: Optional[datetime] + """Time of the last poll if any.""" + + identity: str + """Identity of the worker/client who is polling this task queue.""" + + rate_per_second: Optional[float] + """Polling rate.""" + + @staticmethod + def _from_proto( + info: temporalio.api.taskqueue.v1.PollerInfo, + ) -> TaskQueuePollerInfo: + return TaskQueuePollerInfo( + last_access_time=info.last_access_time.ToDatetime().replace( + tzinfo=timezone.utc + ) + if info.HasField("last_access_time") + else None, + identity=info.identity, + rate_per_second=info.rate_per_second if info.rate_per_second != 0 else None, + ) + + +@dataclass +class TaskQueueStats: + """Statistics for a specific task queue type.""" + + approximate_backlog_count: int + """ + The approximate number of tasks backlogged in this task queue. May count + expired tasks but eventually converges to the right value. + """ + + approximate_backlog_age: timedelta + """ + Approximate age of the oldest task in the backlog based on the create + timestamp of the task at the head of the queue. + """ + + backlog_increase_rate: float + """:py:attr:`tasks_add_rate` - :py:attr:`tasks_dispatch_rate`""" + + tasks_add_rate: float + """ + Approximate tasks per second added to the task queue based on activity + within a fixed window. This includes both backlogged and sync-matched tasks. + """ + + tasks_dispatch_rate: float + """ + Approximate tasks per second dispatched to workers based on activity within + a fixed window. This includes both backlogged and sync-matched tasks. + """ + + @staticmethod + def _from_proto( + stats: temporalio.api.taskqueue.v1.TaskQueueStats, + ) -> TaskQueueStats: + return TaskQueueStats( + approximate_backlog_count=stats.approximate_backlog_count, + approximate_backlog_age=stats.approximate_backlog_age.ToTimedelta(), + backlog_increase_rate=stats.tasks_add_rate - stats.tasks_dispatch_rate, + tasks_add_rate=stats.tasks_add_rate, + tasks_dispatch_rate=stats.tasks_dispatch_rate, + ) + + class CloudOperationsClient: """Client for accessing Temporal Cloud Operations API. diff --git a/tests/test_client.py b/tests/test_client.py index 03ce68ff..98245a02 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -65,6 +65,7 @@ SignalWorkflowInput, StartWorkflowInput, StartWorkflowUpdateInput, + TaskQueueType, TaskReachabilityType, TerminateWorkflowInput, WorkflowContinuedAsNewError, @@ -1323,3 +1324,58 @@ async def test_cloud_client_simple(): GetNamespaceRequest(namespace=os.environ["TEMPORAL_CLIENT_CLOUD_NAMESPACE"]) ) assert os.environ["TEMPORAL_CLIENT_CLOUD_NAMESPACE"] == result.namespace.namespace + + +@workflow.defn +class TaskQueueDescribeWorkflow: + @workflow.run + async def run(self, name: str) -> str: + return f"Hello, {name}!" + + +async def test_describe_task_queue(client: Client): + task_queue = f"tq-{uuid.uuid4()}" + # Simple describe when nothing present + desc = await client.describe_task_queue( + task_queue, report_pollers=True, report_stats=True + ) + # Confirm activity and workflow have no pollers + assert not desc.types[TaskQueueType.ACTIVITY].pollers + assert not desc.types[TaskQueueType.WORKFLOW].pollers + + # Confirm no add rate + stats = desc.types[TaskQueueType.ACTIVITY].stats + assert stats and stats.tasks_add_rate == 0.0 + stats = desc.types[TaskQueueType.WORKFLOW].stats + assert stats and stats.tasks_add_rate == 0.0 + + # Run some workflows + async with new_worker( + client, TaskQueueDescribeWorkflow, task_queue=task_queue + ) as worker: + for i in range(10): + await client.execute_workflow( + TaskQueueDescribeWorkflow.run, + f"user{i}", + id=f"tq-{uuid.uuid4()}", + task_queue=task_queue, + ) + + # Describe again (while poller still running) + desc = await client.describe_task_queue( + task_queue, report_pollers=True, report_stats=True + ) + + # Confirm activity still has no pollers, but workflow has this one + assert not desc.types[TaskQueueType.ACTIVITY].pollers + assert len(desc.types[TaskQueueType.WORKFLOW].pollers) == 1 + assert ( + desc.types[TaskQueueType.WORKFLOW].pollers[0].identity + == client.service_client.config.identity + ) + + # Confirm activity still has no stats, but workflow does + stats = desc.types[TaskQueueType.ACTIVITY].stats + assert stats and stats.tasks_add_rate == 0.0 + stats = desc.types[TaskQueueType.WORKFLOW].stats + assert stats and stats.tasks_add_rate != 0.0