Skip to content

Commit

Permalink
processed first round of feedback, plus extra's
Browse files Browse the repository at this point in the history
- ruff formatting
- docstrings
- variable names
- type annotations
- don't return ret = startup() which is annotated to return None
- moved time sleep intervals to singular global, for clarity
  • Loading branch information
µ committed Aug 14, 2024
1 parent bf95417 commit 25a7a47
Showing 1 changed file with 41 additions and 25 deletions.
66 changes: 41 additions & 25 deletions bittensor/axon.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@
from bittensor.utils import networking


"""
The quantum of time to sleep in waiting loops, in seconds.
"""
TIME_SLEEP_INTERVAL: float = 1e-3


class FastAPIThreadedServer(uvicorn.Server):
"""
The ``FastAPIThreadedServer`` class is a specialized server implementation for the Axon server in the Bittensor network.
Expand Down Expand Up @@ -109,27 +115,36 @@ class FastAPIThreadedServer(uvicorn.Server):
_thread: threading.Thread = None
_started: bool = False

def set_exception(self, ex):
def set_exception(self, exception: Exception) -> None:
"""
Set self._exception in a thread safe manner, so the worker thread can communicate exceptions to the main thread.
"""
with self._lock:
self._exception = ex
self._exception = exception

def get_exception(self):
def get_exception(self) -> Optional[Exception]:
with self._lock:
return self._exception

def set_thread(self, thread):
def set_thread(self, thread: threading.Thread):
"""
Set self._thread in a thread safe manner, so the main thread can get the worker thread object.
"""
with self._lock:
self._thread = thread

def get_thread(self):
def get_thread(self) -> Optional[threading.Thread]:
with self._lock:
return self._thread

def set_started(self, started):
def set_started(self, started: bool) -> None:
"""
Set self._started in a thread safe manner, so the main thread can get the worker thread status.
"""
with self._lock:
self._started = started

def get_started(self):
def get_started(self) -> bool:
with self._lock:
return self._started

Expand All @@ -143,9 +158,8 @@ async def startup(self, sockets: Optional[List[socket.socket]] = None) -> None:
"""
Adds a thread-safe call to set a 'started' flag on the object.
"""
ret = await super().startup(sockets)
await super().startup(sockets)
self.set_started(True)
return ret

@contextlib.contextmanager
def run_in_thread(self):
Expand All @@ -161,9 +175,9 @@ def run_in_thread(self):
thread = threading.Thread(target=self.run, daemon=True)
thread.start()
try:
t0 = time.time()
while not self.get_started() and time.time()-t0<1:
time.sleep(1e-3)
time_start = time.time()
while not self.get_started() and time.time() - time_start < 1:
time.sleep(TIME_SLEEP_INTERVAL)
if not self.get_started():
raise Exception("failed to start server")
yield thread
Expand All @@ -181,7 +195,7 @@ def _wrapper_run(self):
while not self.should_exit:
if not thread.is_alive():
raise Exception("worker thread died")
time.sleep(1e-3)
time.sleep(TIME_SLEEP_INTERVAL)
except Exception as e:
self.set_exception(e)

Expand Down Expand Up @@ -458,23 +472,25 @@ def info(self) -> "bittensor.AxonInfo":
placeholder2=0,
)

# Our instantiator should be able to test axon.exception to see if any
# exception occurred.
@property
def exception(self):
def exception(self) -> Optional[Exception]:
"""
Axon objects expose exceptions that occurred internally through the .exception property.
"""
# for future use: setting self._exception to signal an exception
e = getattr(self,'_exception',None)
if e:
return e
exception = getattr(self, "_exception", None)
if exception:
return exception
return self.fast_server.get_exception()

# Our instantiator should be able to test axon.is_running() to see if all
# required threads etc are running.
def is_running(self):
t = self.fast_server.get_thread()
if t is None:
def is_running(self) -> bool:
"""
Axon objects can be queried using .is_running() to test whether worker threads are running.
"""
thread = self.fast_server.get_thread()
if thread is None:
return False
return t.is_alive()
return thread.is_alive()

def attach(
self,
Expand Down

0 comments on commit 25a7a47

Please sign in to comment.