-
Notifications
You must be signed in to change notification settings - Fork 76
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Describe task queue #656
base: main
Are you sure you want to change the base?
Describe task queue #656
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -1108,6 +1108,46 @@ async def get_worker_task_reachability( | |||||
) | ||||||
) | ||||||
|
||||||
async def describe_task_queue( | ||||||
self, | ||||||
task_queue: str, | ||||||
task_queue_types: Sequence[TaskQueueType] = [], | ||||||
report_pollers: bool = False, | ||||||
report_stats: bool = False, | ||||||
rpc_metadata: Mapping[str, str] = {}, | ||||||
rpc_timeout: Optional[timedelta] = None, | ||||||
) -> TaskQueueDescription: | ||||||
"""Describe task queue. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: This comment is redundant. Can you give a flavor for the type of information I should expect as a response and why I would use each one? At this point I assumed I would get some info if I check neither and was wondering what it was. Wasn't until I looked at the code that I realized it wouldn't work. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 I will clarify that this returns poller and stats information and that currently you need to provide at least one of the booleans. |
||||||
|
||||||
.. note:: | ||||||
This is only for unversioned workers. Worker versioning is not yet | ||||||
supported for describing task queue. | ||||||
|
||||||
Args: | ||||||
task_queue: Name of the task queue. Sticky queues are not supported. | ||||||
task_queue_types: Task queue types to report info about. If not | ||||||
present or empty, all types are considered. | ||||||
report_pollers: Include list of pollers for requested task queue types. | ||||||
report_stats: Include task queue stats for requested task queue types. | ||||||
Comment on lines
+1130
to
+1131
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are these toggles here because it could degrade server performance to check them? Is there any kind of throughput limitation people should be aware of? (Otherwise, why are they necessary?) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is how the API was designed server side, but I would guess that they made these opt-in for performance reasons, yes. This is consistent with the Go SDK API that exists today already. |
||||||
rpc_metadata: Headers used on each RPC call. Keys here override | ||||||
client-level RPC metadata keys. | ||||||
rpc_timeout: Optional RPC deadline to set for each RPC call. | ||||||
""" | ||||||
if not report_pollers and not report_stats: | ||||||
raise ValueError( | ||||||
"At least one of report_pollers or report_stats must be True" | ||||||
) | ||||||
Comment on lines
+1136
to
+1139
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Oh I see, so one is required. I assumed not since they were both optional, and there was a default set of info. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. An enum is a bit rough here because there are more coming (e.g. "report reachability"). We designed this as simple booleans to match Go SDK. |
||||||
return await self._impl.describe_task_queue( | ||||||
DescribeTaskQueueInput( | ||||||
task_queue, | ||||||
task_queue_types, | ||||||
report_pollers, | ||||||
report_stats, | ||||||
rpc_metadata, | ||||||
rpc_timeout, | ||||||
) | ||||||
) | ||||||
|
||||||
|
||||||
class ClientConfig(TypedDict, total=False): | ||||||
"""TypedDict of config originally passed to :py:meth:`Client`.""" | ||||||
|
@@ -4814,6 +4854,18 @@ class GetWorkerTaskReachabilityInput: | |||||
rpc_timeout: Optional[timedelta] | ||||||
|
||||||
|
||||||
@dataclass | ||||||
class DescribeTaskQueueInput: | ||||||
"""Input for :py:meth:`OutboundInterceptor.describe_task_queue`.""" | ||||||
|
||||||
task_queue: str | ||||||
task_queue_types: Sequence[TaskQueueType] | ||||||
report_pollers: bool | ||||||
report_stats: bool | ||||||
rpc_metadata: Mapping[str, str] | ||||||
rpc_timeout: Optional[timedelta] | ||||||
|
||||||
|
||||||
@dataclass | ||||||
class Interceptor: | ||||||
"""Interceptor for clients. | ||||||
|
@@ -4983,6 +5035,14 @@ async def get_worker_task_reachability( | |||||
"""Called for every :py:meth:`Client.get_worker_task_reachability` call.""" | ||||||
return await self.next.get_worker_task_reachability(input) | ||||||
|
||||||
### Other | ||||||
|
||||||
async def describe_task_queue( | ||||||
self, input: DescribeTaskQueueInput | ||||||
) -> TaskQueueDescription: | ||||||
"""Called for every :py:meth:`Client.describe_task_queue` call.""" | ||||||
return await self.next.describe_task_queue(input) | ||||||
|
||||||
|
||||||
class _ClientImpl(OutboundInterceptor): | ||||||
def __init__(self, client: Client) -> None: | ||||||
|
@@ -5726,6 +5786,27 @@ async def get_worker_task_reachability( | |||||
) | ||||||
return WorkerTaskReachability._from_proto(resp) | ||||||
|
||||||
### Other calls | ||||||
|
||||||
async def describe_task_queue( | ||||||
self, input: DescribeTaskQueueInput | ||||||
) -> TaskQueueDescription: | ||||||
req = temporalio.api.workflowservice.v1.DescribeTaskQueueRequest( | ||||||
namespace=self._client.namespace, | ||||||
task_queue=temporalio.api.taskqueue.v1.TaskQueue(name=input.task_queue), | ||||||
api_mode=temporalio.api.enums.v1.DescribeTaskQueueMode.DESCRIBE_TASK_QUEUE_MODE_ENHANCED, | ||||||
task_queue_types=[ | ||||||
temporalio.api.enums.v1.TaskQueueType.ValueType(t) | ||||||
for t in input.task_queue_types | ||||||
], | ||||||
report_pollers=input.report_pollers, | ||||||
report_stats=input.report_stats, | ||||||
) | ||||||
resp = await self._client.workflow_service.describe_task_queue( | ||||||
req, retry=True, metadata=input.rpc_metadata, timeout=input.rpc_timeout | ||||||
) | ||||||
return TaskQueueDescription._from_proto(resp) | ||||||
|
||||||
|
||||||
def _history_from_json( | ||||||
history: Union[str, Dict[str, Any]], | ||||||
|
@@ -6114,6 +6195,142 @@ def _to_proto(self) -> temporalio.api.enums.v1.TaskReachability.ValueType: | |||||
) | ||||||
|
||||||
|
||||||
class TaskQueueType(IntEnum): | ||||||
"""Type of task queue. | ||||||
|
||||||
See :py:class:`temporalio.api.enums.v1.TaskQueueType`. | ||||||
""" | ||||||
|
||||||
WORKFLOW = int(temporalio.api.enums.v1.TaskQueueType.TASK_QUEUE_TYPE_WORKFLOW) | ||||||
ACTIVITY = int(temporalio.api.enums.v1.TaskQueueType.TASK_QUEUE_TYPE_ACTIVITY) | ||||||
NEXUS = int(temporalio.api.enums.v1.TaskQueueType.TASK_QUEUE_TYPE_NEXUS) | ||||||
|
||||||
|
||||||
@dataclass | ||||||
class TaskQueueDescription: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How this struct will look like once the per-buildId info is added? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't know, so we are trying not to guess. We are trying to build this as if versioning does not exist from a user POV (because it doesn't). |
||||||
"""Description of a task queue.""" | ||||||
|
||||||
types: Mapping[TaskQueueType, TaskQueueTypeInfo] | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd prefer the following.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My reasoning is as follows. The following diagram shows different "levels of task queue". ["Task Queue" is a overloaded term and it's been used to refer to any of the boxes in this diagram! In the server code at least we've refactored things and gave each box a different name.] Now, from users perspective, Task Queue almost alway is the most top-level, black box. So DescribeTaskQueue, without any other specification, should describe that box. Unversioned is just a slice of the whole Task Queue. A top-level field such as As long as only unversioned stats are desired, it's totally fine, IMO, to put it in the top level proto but it should be qualified as such so user does not interpret it as something that holds the stats for all slices. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agree There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think people not using versioning should not have to know anything about versioning or have the term versioning in their code anywhere. Can we pretend these stats were made available before versioning was a thing (which is kinda the case since versioning is not a thing yet)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should call things what they really are. We cannot pretend versioning is not there because it is. Even if it is pre-release, we know it's going to be public-preview and GA eventually. If we call this only In your proposal as well you do have a note that talks about versioning and unversioned, so it's not like those terms will be completely absent from developers mind. Lastly, I'm not sure if not having the term "unversioned" in the code anywhere for users not using versioning should be a goal TBH. I agree that they should not have to use term "version" or "build ID" in their code though. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I don't expect we're going to change
I don't propose removing documentation about versioning, just API terms for those not using versioning.
I think we can document this away. Same as anywhere else where unversioned default differs for people using versioning. Versioning does not meaningfully exist right now, I don't think we should code as if it does. We're only punishing people with this terminology because we waited to implement backlog stats until some versioning parts were put into the API. |
||||||
""" | ||||||
Task queue type information, keyed by task queue type. | ||||||
|
||||||
.. note:: | ||||||
This is only for unversioned workers. Worker versioning is not yet | ||||||
supported for task queue description. | ||||||
""" | ||||||
|
||||||
@staticmethod | ||||||
def _from_proto( | ||||||
resp: temporalio.api.workflowservice.v1.DescribeTaskQueueResponse, | ||||||
) -> TaskQueueDescription: | ||||||
return TaskQueueDescription( | ||||||
types={ | ||||||
TaskQueueType(type): TaskQueueTypeInfo._from_proto(info) | ||||||
for type, info in resp.versions_info[""].types_info.items() | ||||||
} | ||||||
) | ||||||
|
||||||
|
||||||
@dataclass | ||||||
class TaskQueueTypeInfo: | ||||||
"""Information for a specific task queue type.""" | ||||||
|
||||||
pollers: Sequence[TaskQueuePollerInfo] | ||||||
""" | ||||||
Information about recent pollers, or empty if not requested or none | ||||||
recently. | ||||||
""" | ||||||
|
||||||
stats: Optional[TaskQueueStats] | ||||||
"""Task queue stats, or none if not requested.""" | ||||||
|
||||||
@staticmethod | ||||||
def _from_proto( | ||||||
info: temporalio.api.taskqueue.v1.TaskQueueTypeInfo, | ||||||
) -> TaskQueueTypeInfo: | ||||||
return TaskQueueTypeInfo( | ||||||
pollers=[ | ||||||
TaskQueuePollerInfo._from_proto(poller_info) | ||||||
for poller_info in info.pollers | ||||||
], | ||||||
stats=TaskQueueStats._from_proto(info.stats) | ||||||
if info.HasField("stats") | ||||||
else None, | ||||||
) | ||||||
|
||||||
|
||||||
@dataclass | ||||||
class TaskQueuePollerInfo: | ||||||
"""Information for a specific task queue poller.""" | ||||||
|
||||||
last_access_time: Optional[datetime] | ||||||
"""Time of the last poll if any.""" | ||||||
|
||||||
identity: str | ||||||
"""Identity of the worker/client who is polling this task queue.""" | ||||||
|
||||||
rate_per_second: Optional[float] | ||||||
"""Polling rate.""" | ||||||
|
||||||
@staticmethod | ||||||
def _from_proto( | ||||||
info: temporalio.api.taskqueue.v1.PollerInfo, | ||||||
) -> TaskQueuePollerInfo: | ||||||
return TaskQueuePollerInfo( | ||||||
last_access_time=info.last_access_time.ToDatetime().replace( | ||||||
tzinfo=timezone.utc | ||||||
) | ||||||
if info.HasField("last_access_time") | ||||||
else None, | ||||||
identity=info.identity, | ||||||
rate_per_second=info.rate_per_second if info.rate_per_second != 0 else None, | ||||||
) | ||||||
|
||||||
|
||||||
@dataclass | ||||||
class TaskQueueStats: | ||||||
"""Statistics for a specific task queue type.""" | ||||||
|
||||||
approximate_backlog_count: int | ||||||
""" | ||||||
The approximate number of tasks backlogged in this task queue. May count | ||||||
expired tasks but eventually converges to the right value. | ||||||
""" | ||||||
|
||||||
approximate_backlog_age: timedelta | ||||||
""" | ||||||
Approximate age of the oldest task in the backlog based on the create | ||||||
timestamp of the task at the head of the queue. | ||||||
""" | ||||||
|
||||||
backlog_increase_rate: float | ||||||
""":py:attr:`tasks_add_rate` - :py:attr:`tasks_dispatch_rate`""" | ||||||
|
||||||
tasks_add_rate: float | ||||||
""" | ||||||
Approximate tasks per second added to the task queue based on activity | ||||||
within a fixed window. This includes both backlogged and sync-matched tasks. | ||||||
""" | ||||||
|
||||||
tasks_dispatch_rate: float | ||||||
""" | ||||||
Approximate tasks per second dispatched to workers based on activity within | ||||||
a fixed window. This includes both backlogged and sync-matched tasks. | ||||||
""" | ||||||
|
||||||
@staticmethod | ||||||
def _from_proto( | ||||||
stats: temporalio.api.taskqueue.v1.TaskQueueStats, | ||||||
) -> TaskQueueStats: | ||||||
return TaskQueueStats( | ||||||
approximate_backlog_count=stats.approximate_backlog_count, | ||||||
approximate_backlog_age=stats.approximate_backlog_age.ToTimedelta(), | ||||||
backlog_increase_rate=stats.tasks_add_rate - stats.tasks_dispatch_rate, | ||||||
tasks_add_rate=stats.tasks_add_rate, | ||||||
tasks_dispatch_rate=stats.tasks_dispatch_rate, | ||||||
) | ||||||
|
||||||
|
||||||
class CloudOperationsClient: | ||||||
"""Client for accessing Temporal Cloud Operations API. | ||||||
|
||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have we considered a top level scaling/tuning API instead that takes task queue types as a filter? That might be more discoverable for somebody perusing the CLI/API command surface, who knows they want to do scaling but doesn't think of that when they see task queue.
It would also allow us to add deploy group as a filter once that ships.
And finally, it could allow us to embed other information within the API call such as related namespace-level information, for example if the namespace has rate limits that should also be taken into account.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additional context. Versioning team originally shipped APIs within task queue, and Max asked us to make them top-level and simply refer to task queue from them. (Later on, we decided to switch to deploy group, but the top-level API concept remains.)
This feels like it could be analogous.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure. This is the Python review for the SDK API that already exists in Go and calls the already existing server API. I think existential questions concerning the API may need to be asked in a place with broader reach.