From 8c20a13bf0e9671324ddb3a4200192974770ca14 Mon Sep 17 00:00:00 2001 From: g Date: Wed, 7 Feb 2024 16:00:54 -0500 Subject: [PATCH] wip: class based listener pattern --- .../manual_trigger/trigger-generator.py | 17 +++-- python-sdk/examples/manual_trigger/worker.py | 4 +- python-sdk/hatchet_sdk/clients/listener.py | 70 ++++++++++++++----- 3 files changed, 66 insertions(+), 25 deletions(-) diff --git a/python-sdk/examples/manual_trigger/trigger-generator.py b/python-sdk/examples/manual_trigger/trigger-generator.py index ba97d63ce..8f7a86385 100644 --- a/python-sdk/examples/manual_trigger/trigger-generator.py +++ b/python-sdk/examples/manual_trigger/trigger-generator.py @@ -2,15 +2,22 @@ from dotenv import load_dotenv import json +# StepRunEventType.STEP_RUN_EVENT_TYPE_COMPLETED + load_dotenv() -client = Hatchet().client +hatchet = Hatchet().client -workflowRunId = client.admin.run_workflow("ManualTriggerWorkflow", { +workflowRunId = hatchet.admin.run_workflow("ManualTriggerWorkflow", { "test": "test" }) -for event in client.listener.generator(workflowRunId): - print('EVENT: ' + event.type + ' ' + json.dumps(event.payload)) +listener = hatchet.listener.stream(workflowRunId) -# TODO - need to hangup the listener if the workflow is completed +for event in listener: + # TODO FIXME step run is not exported easily from the hatchet_sdk and event type and event.step is not defined on + # the event object, so fix this before merging... + + if event.step == 'step2' and event.type == StepRunEventType.STEP_RUN_EVENT_TYPE_COMPLETED: + listener.abort() + print('EVENT: ' + event.type + ' ' + json.dumps(event.payload)) diff --git a/python-sdk/examples/manual_trigger/worker.py b/python-sdk/examples/manual_trigger/worker.py index 85e06b723..690ee5f01 100644 --- a/python-sdk/examples/manual_trigger/worker.py +++ b/python-sdk/examples/manual_trigger/worker.py @@ -6,7 +6,7 @@ hatchet = Hatchet(debug=True) -@hatchet.workflow(on_events=["user:create"]) +@hatchet.workflow(on_events=["man:create"]) class ManualTriggerWorkflow: @hatchet.step() def step1(self, context): @@ -22,7 +22,7 @@ def step2(self, context): workflow = ManualTriggerWorkflow() -worker = hatchet.worker('test-worker', max_threads=4) +worker = hatchet.worker('manual-worker', max_threads=4) worker.register_workflow(workflow) worker.start() diff --git a/python-sdk/hatchet_sdk/clients/listener.py b/python-sdk/hatchet_sdk/clients/listener.py index a95364bc2..fc6258804 100644 --- a/python-sdk/hatchet_sdk/clients/listener.py +++ b/python-sdk/hatchet_sdk/clients/listener.py @@ -43,17 +43,29 @@ def new_listener(conn, config: ClientConfig): ) -class ListenerClientImpl: - def __init__(self, client: DispatcherStub, token): +class HatchetListener: + def __init__(self, client: DispatcherStub, workflow_run_id: str, token: str): self.client = client + self.stop_signal = False + self.workflow_run_id = workflow_run_id self.token = token - def generator(self, workflowRunId: str) -> List[StepRunEvent]: - listener = self.retry_subscribe(workflowRunId) + def __iter__(self): + return self._generator() + + def abort(self): + self.stop_signal = True + + def _generator(self, stop_step: str = None) -> List[StepRunEvent]: + listener = self.retry_subscribe() + while listener: + if self.stop_signal: + listener = None + break - while True: try: for workflow_event in listener: + print('workflow_event:', workflow_event) eventType = None if workflow_event.eventType in event_type_mapping: @@ -67,9 +79,22 @@ def generator(self, workflowRunId: str) -> List[StepRunEvent]: payload = json.loads(workflow_event.eventPayload) # call the handler - event = StepRunEvent(type=eventType, payload=payload) + event = StepRunEvent( + type=eventType, payload=payload, workflowRunId=self.workflow_run_id) yield event + # stop the listener if the stop event is received + if eventType == StepRunEventType.STEP_RUN_EVENT_TYPE_FAILED or eventType == StepRunEventType.STEP_RUN_EVENT_TYPE_CANCELLED or eventType == StepRunEventType.STEP_RUN_EVENT_TYPE_TIMED_OUT: + listener = None + print('failure stopping listener...') + break + + if payload and stop_step and stop_step in payload and eventType != StepRunEventType.STEP_RUN_EVENT_TYPE_STARTED: + listener = None + print('stopping listener...') + break + # TODO the stream is closed + except grpc.RpcError as e: # Handle different types of errors if e.code() == grpc.StatusCode.CANCELLED: @@ -77,23 +102,17 @@ def generator(self, workflowRunId: str) -> List[StepRunEvent]: break elif e.code() == grpc.StatusCode.UNAVAILABLE: # Retry logic - logger.info("Could not connect to Hatchet, retrying...") - listener = self.retry_subscribe(workflowRunId) + # logger.info("Could not connect to Hatchet, retrying...") + listener = self.retry_subscribe() elif e.code() == grpc.StatusCode.DEADLINE_EXCEEDED: - logger.info("Deadline exceeded, retrying subscription") + # logger.info("Deadline exceeded, retrying subscription") continue else: # Unknown error, report and break - logger.error(f"Failed to receive message: {e}") + # logger.error(f"Failed to receive message: {e}") break - def on(self, workflowRunId: str, handler: callable = None): - for event in self.generator(workflowRunId): - # call the handler if provided - if handler: - handler(event) - - def retry_subscribe(self, workflowRunId: str): + def retry_subscribe(self): retries = 0 while retries < DEFAULT_ACTION_LISTENER_RETRY_COUNT: @@ -103,7 +122,7 @@ def retry_subscribe(self, workflowRunId: str): listener = self.client.SubscribeToWorkflowEvents( SubscribeToWorkflowEventsRequest( - workflowRunId=workflowRunId, + workflowRunId=self.workflow_run_id, ), metadata=get_metadata(self.token)) return listener except grpc.RpcError as e: @@ -111,3 +130,18 @@ def retry_subscribe(self, workflowRunId: str): retries = retries + 1 else: raise ValueError(f"gRPC error: {e}") + + +class ListenerClientImpl: + def __init__(self, client: DispatcherStub, token: str): + self.client = client + self.token = token + + def stream(self, workflow_run_id: str): + return HatchetListener(self.client, workflow_run_id, self.token) + + def on(self, workflow_run_id: str, handler: callable = None): + for event in self.stream(workflow_run_id): + # call the handler if provided + if handler: + handler(event)