From f892617d3dfe30271ed02974a68aae4896bfaa57 Mon Sep 17 00:00:00 2001 From: Gil Bregman <134946773+gbregman@users.noreply.github.com> Date: Tue, 9 Jul 2024 19:22:37 +0300 Subject: [PATCH] Merge pull request #751 from gbregman/devel Close discovery service socket before exiting Signed-off-by: barakda --- control/discovery.py | 56 ++++++++++++++++++++++++++++++++++++-------- control/server.py | 2 +- control/state.py | 16 +++++++++++++ tests/ha/4gws.sh | 15 +++++++++++- 4 files changed, 77 insertions(+), 12 deletions(-) diff --git a/control/discovery.py b/control/discovery.py index e98c4d1a..3bb65d06 100644 --- a/control/discovery.py +++ b/control/discovery.py @@ -295,7 +295,7 @@ class DiscoveryLogEntry(AutoSerializableStructure): class DiscoveryService: """Implements discovery controller. - Response discover request from initiator. + Response discover request from initiator, this must be called from within a "with" block. Instance attributes: version: Discovery controller version @@ -329,10 +329,49 @@ def __init__(self, config): assert 0 self.logger.info(f"discovery addr: {self.discovery_addr} port: {self.discovery_port}") + self.sock = None self.conn_vals = {} self.connection_counter = 1 self.selector = selectors.DefaultSelector() + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + if self.omap_state: + self.omap_state.cleanup_omap() + self.omap_state = None + + if self.selector: + with self.lock: + for key in self.conn_vals: + try: + self.selector.unregister(self.conn_vals[key].connection) + except Except as ex: + pass + try: + self.conn_vals[key].connection.close() + except Except as ex: + pass + self.conn_vals = {} + + if self.sock: + try: + self.selector.unregister(self.sock) + except Exception as ex: + pass + try: + self.sock.close() + except Exception as ex: + pass + self.sock = None + + try: + self.selector.close() + except Exception as ex: + pass + self.selector = None + def _read_all(self) -> Dict[str, str]: """Reads OMAP and returns dict of all keys and values.""" @@ -1068,11 +1107,11 @@ def update_log_level(self): def start_service(self): """Enable listening on the server side.""" - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.bind((self.discovery_addr, int(self.discovery_port))) - sock.listen(MAX_CONNECTION) - sock.setblocking(False) - self.selector.register(sock, selectors.EVENT_READ, self.nvmeof_accept) + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.bind((self.discovery_addr, int(self.discovery_port))) + self.sock.listen(MAX_CONNECTION) + self.sock.setblocking(False) + self.selector.register(self.sock, selectors.EVENT_READ, self.nvmeof_accept) self.logger.debug("waiting for connection...") t = threading.Thread(target=self.handle_timeout) t.start() @@ -1090,10 +1129,7 @@ def start_service(self): callback = key.data callback(key.fileobj, mask) except KeyboardInterrupt: - for key in self.conn_vals: - self.conn_vals[key].connection.close() - self.selector.close() - self.logger.debug("received a ctrl+C interrupt. exiting...") + self.logger.debug("received a ctrl+C interrupt. exiting...") def main(args=None): parser = argparse.ArgumentParser(prog="python3 -m control", diff --git a/control/server.py b/control/server.py index 676acde0..c681d29e 100644 --- a/control/server.py +++ b/control/server.py @@ -466,7 +466,7 @@ def _stop_discovery(self): try: os.kill(self.discovery_pid, signal.SIGINT) os.waitpid(self.discovery_pid, 0) - except ChildProcessError: + except (ChildProcessError, ProcessLookupError): pass # ignore self.logger.info("Discovery service terminated") diff --git a/control/state.py b/control/state.py index de79feb4..6325a7bf 100644 --- a/control/state.py +++ b/control/state.py @@ -505,6 +505,22 @@ def _watcher_callback(notify_id, notifier_id, watch_id, data): else: self.logger.info(f"Watch already exists.") + def cleanup_omap(self): + self.logger.info(f"Cleanup OMAP on exit ({self.id_text})") + if self.watch: + try: + self.watch.close() + self.logger.debug(f"Unregistered watch ({self.id_text})") + self.watch = None + except Exception: + pass + if self.ioctx: + try: + self.ioctx.close() + self.logger.debug(f"Closed Rados connection ({self.id_text})") + self.ioctx = None + except Exception: + pass class GatewayStateHandler: """Maintains consistency in NVMeoF target state store instances. diff --git a/tests/ha/4gws.sh b/tests/ha/4gws.sh index 0e23dfef..fd29ebbf 100755 --- a/tests/ha/4gws.sh +++ b/tests/ha/4gws.sh @@ -7,7 +7,20 @@ expect_optimized() { EXPECTED_OPTIMIZED=$2 NQN=$3 - socket=$(docker exec "$GW_NAME" find /var/run/ceph -name spdk.sock) + socket_retries=0 + socket="" + while [ $socket_retries -lt 10 ] ; do + socket=$(docker exec "$GW_NAME" find /var/run/ceph -name spdk.sock) + if [ -n "$socket" ]; then + break + fi + socket_retries=$(expr $socket_retries + 1) + sleep 1 + done + if [ -z "$socket" ]; then + exit 1 # failed + fi + # Verify expected number of "optimized" for i in $(seq 50); do response=$(docker exec "$GW_NAME" "$rpc" "-s" "$socket" "$cmd" "$NQN")