Skip to content

Commit

Permalink
Dynamically detect OS signal availability at use instead of import ti…
Browse files Browse the repository at this point in the history
…me. (#30)
  • Loading branch information
rafa-be authored Oct 8, 2024
1 parent 116f47e commit 7c2faf9
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 11 deletions.
2 changes: 1 addition & 1 deletion scaler/about.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.8.6"
__version__ = "1.8.7"
14 changes: 11 additions & 3 deletions scaler/worker/agent/processor/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from scaler.utility.zmq_config import ZMQConfig
from scaler.worker.agent.processor.object_cache import ObjectCache

SUSPEND_SIGNAL: signal.Signals = signal.SIGUSR1
SUSPEND_SIGNAL = "SIGUSR1" # use str instead of a signal.Signal to not trigger an import error on unsupported systems.

_current_processor: ContextVar[Optional["Processor"]] = ContextVar("_current_processor", default=None)

Expand Down Expand Up @@ -97,10 +97,10 @@ def __initialize(self):
self.__register_signals()

def __register_signals(self):
signal.signal(signal.SIGTERM, self.__interrupt)
self.__register_signal("SIGTERM", self.__interrupt)

if self._resume_event is not None:
signal.signal(SUSPEND_SIGNAL, self.__suspend)
self.__register_signal(SUSPEND_SIGNAL, self.__suspend)

def __interrupt(self, *args):
self._connector.close() # interrupts any blocking socket.
Expand Down Expand Up @@ -283,3 +283,11 @@ def __processor_context(self):
yield
finally:
self.__set_current_processor(None)

@staticmethod
def __register_signal(signal_name: str, handler: Callable) -> None:
signal_instance = getattr(signal, signal_name, None)
if signal_instance is None:
raise RuntimeError(f"unsupported platform, signal not availaible: {signal_name}.")

signal.signal(signal_instance, handler)
18 changes: 11 additions & 7 deletions scaler/worker/agent/processor_holder.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def suspend(self):
assert self._suspended is False

if self._hard_suspend:
os.kill(self.pid(), signal.SIGSTOP)
self.__send_signal("SIGSTOP")
else:
# If we do not want to hardly suspend the processor's process (e.g. to keep network links alive), we request
# the process to wait on a synchronization event. That will stop the main thread while allowing the helper
Expand All @@ -94,7 +94,7 @@ def suspend(self):
assert self._resume_event is not None
self._resume_event.clear()

os.kill(self.pid(), SUSPEND_SIGNAL)
self.__send_signal(SUSPEND_SIGNAL)

self._suspended = True

Expand All @@ -103,26 +103,30 @@ def resume(self):
assert self._suspended is True

if self._hard_suspend:
os.kill(self.pid(), signal.SIGCONT)
self.__send_signal("SIGCONT")
else:
assert self._resume_event is not None
self._resume_event.set()

self._suspended = False

def kill(self):
self.__send_signal(signal.SIGTERM)
self.__send_signal("SIGTERM")
self._processor.join(DEFAULT_PROCESSOR_KILL_DELAY_SECONDS)

if self._processor.exitcode is None:
# TODO: some processors fail to interrupt because of a blocking 0mq call. Ideally we should interrupt
# these blocking calls instead of sending a SIGKILL signal.

logging.warning(f"Processor[{self.pid()}] does not terminate in time, send SIGKILL.")
self.__send_signal(signal.SIGKILL)
self.__send_signal("SIGKILL")
self._processor.join()

self.set_task(None)

def __send_signal(self, sig: int):
os.kill(self.pid(), sig)
def __send_signal(self, signal_name: str):
signal_instance = getattr(signal, signal_name, None)
if signal_instance is None:
raise RuntimeError(f"unsupported platform, signal not availaible: {signal_name}.")

os.kill(self.pid(), signal_instance)

0 comments on commit 7c2faf9

Please sign in to comment.