From 25a7a47ca6224e4b0e774a25f3d53281655bd537 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C2=B5?= Date: Wed, 14 Aug 2024 09:20:39 +0000 Subject: [PATCH] processed first round of feedback, plus extra's - ruff formatting - docstrings - variable names - type annotations - don't return ret = startup() which is annotated to return None - moved time sleep intervals to singular global, for clarity --- bittensor/axon.py | 66 +++++++++++++++++++++++++++++------------------ 1 file changed, 41 insertions(+), 25 deletions(-) diff --git a/bittensor/axon.py b/bittensor/axon.py index 348f257886..99f72eca44 100644 --- a/bittensor/axon.py +++ b/bittensor/axon.py @@ -63,6 +63,12 @@ from bittensor.utils import networking +""" +The quantum of time to sleep in waiting loops, in seconds. +""" +TIME_SLEEP_INTERVAL: float = 1e-3 + + class FastAPIThreadedServer(uvicorn.Server): """ The ``FastAPIThreadedServer`` class is a specialized server implementation for the Axon server in the Bittensor network. @@ -109,27 +115,36 @@ class FastAPIThreadedServer(uvicorn.Server): _thread: threading.Thread = None _started: bool = False - def set_exception(self, ex): + def set_exception(self, exception: Exception) -> None: + """ + Set self._exception in a thread safe manner, so the worker thread can communicate exceptions to the main thread. + """ with self._lock: - self._exception = ex + self._exception = exception - def get_exception(self): + def get_exception(self) -> Optional[Exception]: with self._lock: return self._exception - def set_thread(self, thread): + def set_thread(self, thread: threading.Thread): + """ + Set self._thread in a thread safe manner, so the main thread can get the worker thread object. + """ with self._lock: self._thread = thread - def get_thread(self): + def get_thread(self) -> Optional[threading.Thread]: with self._lock: return self._thread - def set_started(self, started): + def set_started(self, started: bool) -> None: + """ + Set self._started in a thread safe manner, so the main thread can get the worker thread status. + """ with self._lock: self._started = started - def get_started(self): + def get_started(self) -> bool: with self._lock: return self._started @@ -143,9 +158,8 @@ async def startup(self, sockets: Optional[List[socket.socket]] = None) -> None: """ Adds a thread-safe call to set a 'started' flag on the object. """ - ret = await super().startup(sockets) + await super().startup(sockets) self.set_started(True) - return ret @contextlib.contextmanager def run_in_thread(self): @@ -161,9 +175,9 @@ def run_in_thread(self): thread = threading.Thread(target=self.run, daemon=True) thread.start() try: - t0 = time.time() - while not self.get_started() and time.time()-t0<1: - time.sleep(1e-3) + time_start = time.time() + while not self.get_started() and time.time() - time_start < 1: + time.sleep(TIME_SLEEP_INTERVAL) if not self.get_started(): raise Exception("failed to start server") yield thread @@ -181,7 +195,7 @@ def _wrapper_run(self): while not self.should_exit: if not thread.is_alive(): raise Exception("worker thread died") - time.sleep(1e-3) + time.sleep(TIME_SLEEP_INTERVAL) except Exception as e: self.set_exception(e) @@ -458,23 +472,25 @@ def info(self) -> "bittensor.AxonInfo": placeholder2=0, ) - # Our instantiator should be able to test axon.exception to see if any - # exception occurred. @property - def exception(self): + def exception(self) -> Optional[Exception]: + """ + Axon objects expose exceptions that occurred internally through the .exception property. + """ # for future use: setting self._exception to signal an exception - e = getattr(self,'_exception',None) - if e: - return e + exception = getattr(self, "_exception", None) + if exception: + return exception return self.fast_server.get_exception() - # Our instantiator should be able to test axon.is_running() to see if all - # required threads etc are running. - def is_running(self): - t = self.fast_server.get_thread() - if t is None: + def is_running(self) -> bool: + """ + Axon objects can be queried using .is_running() to test whether worker threads are running. + """ + thread = self.fast_server.get_thread() + if thread is None: return False - return t.is_alive() + return thread.is_alive() def attach( self,