Skip to content

Commit

Permalink
wip: class based listener pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
grutt committed Feb 7, 2024
1 parent 5aa0df7 commit 8c20a13
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 25 deletions.
17 changes: 12 additions & 5 deletions python-sdk/examples/manual_trigger/trigger-generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,22 @@
from dotenv import load_dotenv
import json

# StepRunEventType.STEP_RUN_EVENT_TYPE_COMPLETED

load_dotenv()

client = Hatchet().client
hatchet = Hatchet().client

workflowRunId = client.admin.run_workflow("ManualTriggerWorkflow", {
workflowRunId = hatchet.admin.run_workflow("ManualTriggerWorkflow", {
"test": "test"
})

for event in client.listener.generator(workflowRunId):
print('EVENT: ' + event.type + ' ' + json.dumps(event.payload))
listener = hatchet.listener.stream(workflowRunId)

# TODO - need to hangup the listener if the workflow is completed
for event in listener:
# TODO FIXME step run is not exported easily from the hatchet_sdk and event type and event.step is not defined on
# the event object, so fix this before merging...

if event.step == 'step2' and event.type == StepRunEventType.STEP_RUN_EVENT_TYPE_COMPLETED:
listener.abort()
print('EVENT: ' + event.type + ' ' + json.dumps(event.payload))
4 changes: 2 additions & 2 deletions python-sdk/examples/manual_trigger/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
hatchet = Hatchet(debug=True)


@hatchet.workflow(on_events=["user:create"])
@hatchet.workflow(on_events=["man:create"])
class ManualTriggerWorkflow:
@hatchet.step()
def step1(self, context):
Expand All @@ -22,7 +22,7 @@ def step2(self, context):


workflow = ManualTriggerWorkflow()
worker = hatchet.worker('test-worker', max_threads=4)
worker = hatchet.worker('manual-worker', max_threads=4)
worker.register_workflow(workflow)

worker.start()
70 changes: 52 additions & 18 deletions python-sdk/hatchet_sdk/clients/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,29 @@ def new_listener(conn, config: ClientConfig):
)


class ListenerClientImpl:
def __init__(self, client: DispatcherStub, token):
class HatchetListener:
def __init__(self, client: DispatcherStub, workflow_run_id: str, token: str):
self.client = client
self.stop_signal = False
self.workflow_run_id = workflow_run_id
self.token = token

def generator(self, workflowRunId: str) -> List[StepRunEvent]:
listener = self.retry_subscribe(workflowRunId)
def __iter__(self):
return self._generator()

def abort(self):
self.stop_signal = True

def _generator(self, stop_step: str = None) -> List[StepRunEvent]:
listener = self.retry_subscribe()
while listener:
if self.stop_signal:
listener = None
break

while True:
try:
for workflow_event in listener:
print('workflow_event:', workflow_event)
eventType = None

if workflow_event.eventType in event_type_mapping:
Expand All @@ -67,33 +79,40 @@ def generator(self, workflowRunId: str) -> List[StepRunEvent]:
payload = json.loads(workflow_event.eventPayload)

# call the handler
event = StepRunEvent(type=eventType, payload=payload)
event = StepRunEvent(
type=eventType, payload=payload, workflowRunId=self.workflow_run_id)
yield event

# stop the listener if the stop event is received
if eventType == StepRunEventType.STEP_RUN_EVENT_TYPE_FAILED or eventType == StepRunEventType.STEP_RUN_EVENT_TYPE_CANCELLED or eventType == StepRunEventType.STEP_RUN_EVENT_TYPE_TIMED_OUT:
listener = None
print('failure stopping listener...')
break

if payload and stop_step and stop_step in payload and eventType != StepRunEventType.STEP_RUN_EVENT_TYPE_STARTED:
listener = None
print('stopping listener...')
break
# TODO the stream is closed

except grpc.RpcError as e:
# Handle different types of errors
if e.code() == grpc.StatusCode.CANCELLED:
# Context cancelled, unsubscribe and close
break
elif e.code() == grpc.StatusCode.UNAVAILABLE:
# Retry logic
logger.info("Could not connect to Hatchet, retrying...")
listener = self.retry_subscribe(workflowRunId)
# logger.info("Could not connect to Hatchet, retrying...")
listener = self.retry_subscribe()
elif e.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
logger.info("Deadline exceeded, retrying subscription")
# logger.info("Deadline exceeded, retrying subscription")
continue
else:
# Unknown error, report and break
logger.error(f"Failed to receive message: {e}")
# logger.error(f"Failed to receive message: {e}")
break

def on(self, workflowRunId: str, handler: callable = None):
for event in self.generator(workflowRunId):
# call the handler if provided
if handler:
handler(event)

def retry_subscribe(self, workflowRunId: str):
def retry_subscribe(self):
retries = 0

while retries < DEFAULT_ACTION_LISTENER_RETRY_COUNT:
Expand All @@ -103,11 +122,26 @@ def retry_subscribe(self, workflowRunId: str):

listener = self.client.SubscribeToWorkflowEvents(
SubscribeToWorkflowEventsRequest(
workflowRunId=workflowRunId,
workflowRunId=self.workflow_run_id,
), metadata=get_metadata(self.token))
return listener
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.UNAVAILABLE:
retries = retries + 1
else:
raise ValueError(f"gRPC error: {e}")


class ListenerClientImpl:
def __init__(self, client: DispatcherStub, token: str):
self.client = client
self.token = token

def stream(self, workflow_run_id: str):
return HatchetListener(self.client, workflow_run_id, self.token)

def on(self, workflow_run_id: str, handler: callable = None):
for event in self.stream(workflow_run_id):
# call the handler if provided
if handler:
handler(event)

0 comments on commit 8c20a13

Please sign in to comment.