From 3f3448f919de3868727a0255f43e830f6fadb57a Mon Sep 17 00:00:00 2001 From: Gil Bregman Date: Mon, 8 Jul 2024 20:56:33 +0300 Subject: [PATCH] Close discovery service socket before exiting. Fixes #748 Signed-off-by: Gil Bregman --- control/discovery.py | 35 ++++++++++++++++++++++++++--------- control/server.py | 2 +- control/state.py | 4 ++-- 3 files changed, 29 insertions(+), 12 deletions(-) diff --git a/control/discovery.py b/control/discovery.py index 6c6b9e2c..657ee555 100644 --- a/control/discovery.py +++ b/control/discovery.py @@ -327,17 +327,36 @@ def __init__(self, config): assert 0 self.logger.info(f"discovery addr: {self.discovery_addr} port: {self.discovery_port}") + self.sock = None + self.passed_enter = False self.conn_vals = {} self.connection_counter = 1 self.selector = selectors.DefaultSelector() def __enter__(self): + self.passed_enter = True return self def __exit__(self, exc_type, exc_value, traceback): if self.omap_state: self.omap_state.cleanup_omap() self.omap_state = None + try: + for key in self.conn_vals: + self.conn_vals[key].connection.close() + except Except as ex: + pass + self.conn_vals = {} + if self.selector: + self.selector.close() + self.selector = None + if self.sock: + try: + self.sock.close() + except Exception as ex: + pass + self.sock = None + self.passed_enter = False def _read_all(self) -> Dict[str, str]: """Reads OMAP and returns dict of all keys and values.""" @@ -1074,11 +1093,12 @@ def update_log_level(self): def start_service(self): """Enable listening on the server side.""" - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.bind((self.discovery_addr, int(self.discovery_port))) - sock.listen(MAX_CONNECTION) - sock.setblocking(False) - self.selector.register(sock, selectors.EVENT_READ, self.nvmeof_accept) + assert self.passed_enter, 'Discovery service was started outside of a "with" block' + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.bind((self.discovery_addr, int(self.discovery_port))) + self.sock.listen(MAX_CONNECTION) + self.sock.setblocking(False) + self.selector.register(self.sock, selectors.EVENT_READ, self.nvmeof_accept) self.logger.debug("waiting for connection...") t = threading.Thread(target=self.handle_timeout) t.start() @@ -1096,10 +1116,7 @@ def start_service(self): callback = key.data callback(key.fileobj, mask) except KeyboardInterrupt: - for key in self.conn_vals: - self.conn_vals[key].connection.close() - self.selector.close() - self.logger.debug("received a ctrl+C interrupt. exiting...") + self.logger.debug("received a ctrl+C interrupt. exiting...") def main(args=None): parser = argparse.ArgumentParser(prog="python3 -m control", diff --git a/control/server.py b/control/server.py index d386bae3..6e246bf4 100644 --- a/control/server.py +++ b/control/server.py @@ -476,7 +476,7 @@ def _stop_discovery(self): try: os.kill(self.discovery_pid, signal.SIGINT) os.waitpid(self.discovery_pid, 0) - except ChildProcessError: + except (ChildProcessError, ProcessLookupError): pass # ignore self.logger.info("Discovery service terminated") diff --git a/control/state.py b/control/state.py index cb746947..7bad93a9 100644 --- a/control/state.py +++ b/control/state.py @@ -537,14 +537,14 @@ def cleanup_omap(self): if self.watch: try: self.watch.close() - self.logger.debug("Unregistered watch") + self.logger.debug(f"Unregistered watch ({self.id_text})") self.watch = None except Exception: pass if self.ioctx: try: self.ioctx.close() - self.logger.debug("Closed Rados connection") + self.logger.debug(f"Closed Rados connection ({self.id_text})") self.ioctx = None except Exception: pass