diff --git a/potassium/potassium.py b/potassium/potassium.py index 694330d..b4ae24a 100644 --- a/potassium/potassium.py +++ b/potassium/potassium.py @@ -102,6 +102,7 @@ def __init__(self, name): self._worker_pool = None self.event_handler_thread = Thread(target=self._event_handler, daemon=True) + self.event_handler_thread.start() self._status = PotassiumStatus( num_started_inference_requests=0, @@ -240,13 +241,6 @@ def handle(path): status=resp.status, headers=resp.headers ) - - def on_close(): - print("on_close") - self._response_mailbox.cleanup(internal_id) - - flask_response.call_on_close(on_close) - elif endpoint.type == "background": self._worker_pool.apply_async(run_worker, args=(endpoint.func, req, internal_id)) diff --git a/potassium/status.py b/potassium/status.py index 72b4da7..3693866 100644 --- a/potassium/status.py +++ b/potassium/status.py @@ -35,7 +35,7 @@ def sequence_number(self): @property def idle_time(self): - if not self.gpu_available: + if not self.gpu_available or len(self.in_flight_request_start_times) > 0: return 0 return time.time() - self.idle_start_timestamp @@ -53,7 +53,7 @@ def update(self, event): event_data = event[1:] if event_type not in event_handlers: raise Exception(f"Invalid event {event}") - return event_handlers[event](self.clone(), *event_data) + return event_handlers[event_type](self.clone(), *event_data) def clone(self): @@ -91,7 +91,7 @@ def handle_worker_started(status: PotassiumStatus): StatusEvent.INFERENCE_REQUEST_RECEIVED: handle_inference_request_received, StatusEvent.INFERENCE_START: handle_start_inference, StatusEvent.INFERENCE_END: handle_end_inference, - StatusEvent.WORKER_STARTED: lambda status: status, + StatusEvent.WORKER_STARTED: handle_worker_started } diff --git a/potassium/worker.py b/potassium/worker.py index 6f30d12..ee3a65e 100644 --- a/potassium/worker.py +++ b/potassium/worker.py @@ -62,7 +62,7 @@ def init_worker(index_queue, event_queue, response_queue, init_func): else: context = init_func(worker_num) - event_queue.put((StatusEvent.WORKER_STARTED, worker_num)) + event_queue.put((StatusEvent.WORKER_STARTED,)) worker = Worker( context,