Skip to content

Commit

Permalink
feat: listen for run events
Browse files Browse the repository at this point in the history
  • Loading branch information
grutt committed Feb 5, 2024
1 parent 2bbf5db commit 05fcabc
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 16 deletions.
4 changes: 3 additions & 1 deletion python-sdk/examples/manual_trigger/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

client = new_client()

client.admin.run_workflow("ManualTriggerWorkflow", {
workflowRunId = client.admin.run_workflow("ManualTriggerWorkflow", {
"test": "test"
})

client.listener.on(workflowRunId, lambda event: print('YO ' + event))
50 changes: 35 additions & 15 deletions python-sdk/hatchet_sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
from .clients.admin import AdminClientImpl, new_admin
from .clients.events import EventClientImpl, new_event
from .clients.dispatcher import DispatcherClientImpl, new_dispatcher
from .clients.listener import ListenerClientImpl, new_listener

from .loader import ConfigLoader, ClientConfig
import grpc


class Client:
def admin(self):
raise NotImplementedError
Expand All @@ -16,71 +18,89 @@ def dispatcher(self):

def event(self):
raise NotImplementedError


def listener(self):
raise NotImplementedError


class ClientImpl(Client):
def __init__(self, event_client, admin_client : AdminClientImpl, dispatcher_client):
def __init__(
self,
event_client: EventClientImpl,
admin_client: AdminClientImpl,
dispatcher_client: DispatcherClientImpl,
listener_client: ListenerClientImpl):
# self.conn = conn
# self.tenant_id = tenant_id
# self.logger = logger
# self.validator = validator
self.admin = admin_client
self.dispatcher = dispatcher_client
self.event = event_client
self.listener = listener_client

def admin(self) -> AdminClientImpl:
def admin(self) -> ListenerClientImpl:
return self.admin

def dispatcher(self) -> DispatcherClientImpl:
return self.dispatcher

def event(self) -> EventClientImpl:
return self.event


def listener(self) -> ListenerClientImpl:
return self.listener


def with_host_port(host: str, port: int):
def with_host_port_impl(config: ClientConfig):
config.host = host
config.port = port

return with_host_port_impl


def new_client(*opts_functions):
config : ClientConfig = ConfigLoader(".").load_client_config()
config: ClientConfig = ConfigLoader(".").load_client_config()

for opt_function in opts_functions:
opt_function(config)

if config.tls_config is None:
raise ValueError("TLS config is required")

if config.host_port is None:
raise ValueError("Host and port are required")
credentials : grpc.ChannelCredentials | None = None

credentials: grpc.ChannelCredentials | None = None

# load channel credentials
if config.tls_config.tls_strategy == 'tls':
root : Any | None = None
root: Any | None = None

if config.tls_config.ca_file:
root = open(config.tls_config.ca_file, "rb").read()

credentials = grpc.ssl_channel_credentials(root_certificates=root)
elif config.tls_config.tls_strategy == 'mtls':
root = open(config.tls_config.ca_file, "rb").read()
private_key = open(config.tls_config.key_file, "rb").read()
certificate_chain = open(config.tls_config.cert_file, "rb").read()

credentials = grpc.ssl_channel_credentials(root_certificates=root, private_key=private_key, certificate_chain=certificate_chain)

credentials = grpc.ssl_channel_credentials(
root_certificates=root, private_key=private_key, certificate_chain=certificate_chain)

conn = grpc.secure_channel(
target=config.host_port,
credentials=credentials,
options=[('grpc.ssl_target_name_override', config.tls_config.server_name)],
options=[('grpc.ssl_target_name_override',
config.tls_config.server_name)],
)

# Instantiate client implementations
event_client = new_event(conn, config)
admin_client = new_admin(conn, config)
dispatcher_client = new_dispatcher(conn, config)
listener_client = new_listener(conn, config)

return ClientImpl(event_client, admin_client, dispatcher_client)
return ClientImpl(event_client, admin_client, dispatcher_client, listener_client)
87 changes: 87 additions & 0 deletions python-sdk/hatchet_sdk/clients/listener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
from typing import List
import grpc
from ..dispatcher_pb2_grpc import DispatcherStub

from ..dispatcher_pb2 import SubscribeToWorkflowEventsRequest, WorkflowEvent
from ..loader import ClientConfig
from ..metadata import get_metadata
import json
import time


DEFAULT_ACTION_LISTENER_RETRY_INTERVAL = 5 # seconds
DEFAULT_ACTION_LISTENER_RETRY_COUNT = 5


class StepRunEventType:
STEP_RUN_EVENT_TYPE_STARTED = 'STEP_RUN_EVENT_TYPE_STARTED'
STEP_RUN_EVENT_TYPE_COMPLETED = 'STEP_RUN_EVENT_TYPE_COMPLETED'
STEP_RUN_EVENT_TYPE_FAILED = 'STEP_RUN_EVENT_TYPE_FAILED'
STEP_RUN_EVENT_TYPE_CANCELLED = 'STEP_RUN_EVENT_TYPE_CANCELLED'
STEP_RUN_EVENT_TYPE_TIMED_OUT = 'STEP_RUN_EVENT_TYPE_TIMED_OUT'


class StepRunEvent:
def __init__(self, type: StepRunEventType, payload: str):
self.type = type
self.payload = payload


def new_listener(conn, config: ClientConfig):
return ListenerClientImpl(
client=DispatcherStub(conn),
token=config.token,
)


class ListenerClientImpl:
def __init__(self, client: DispatcherStub, token):
self.client = client
self.token = token

def on(self, workflowRunId: str, handler: callable):
listener = self.retry_subscribe(workflowRunId)

print('x', listener)

while True:
try:
for workflow_event in listener:
print('y', workflow_event)
# TODO handler()

except grpc.RpcError as e:
# Handle different types of errors
if e.code() == grpc.StatusCode.CANCELLED:
# Context cancelled, unsubscribe and close
break
elif e.code() == grpc.StatusCode.UNAVAILABLE:
# Retry logic
logger.info("Could not connect to Hatchet, retrying...")
listener = self.retry_subscribe(workflowRunId)
elif e.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
logger.info("Deadline exceeded, retrying subscription")
continue
else:
# Unknown error, report and break
logger.error(f"Failed to receive message: {e}")
break

def retry_subscribe(self, workflowRunId: str):
retries = 0

while retries < DEFAULT_ACTION_LISTENER_RETRY_COUNT:
try:
if retries > 0:
time.sleep(DEFAULT_ACTION_LISTENER_RETRY_INTERVAL)

listener = self.client.SubscribeToWorkflowEvents(
SubscribeToWorkflowEventsRequest(
workflowRunId=workflowRunId,
), metadata=get_metadata(self.token))
return listener
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.UNAVAILABLE:
retries = retries + 1
else:
raise ValueError(f"gRPC error: {e}")

0 comments on commit 05fcabc

Please sign in to comment.