Skip to content

Commit

Permalink
Clean up Rados connection and notification watcher when we exit.
Browse files Browse the repository at this point in the history
Fixes ceph#712

Signed-off-by: Gil Bregman <[email protected]>
  • Loading branch information
gbregman committed Jun 20, 2024
1 parent 8bc6b4d commit cc04470
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 5 deletions.
4 changes: 4 additions & 0 deletions control/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
from .config import GatewayConfig
from .utils import GatewayLogger

gw = None
gw_logger = None
gw_name = None

def sigterm_handler(signum, frame):
if gw and gw.omap_state:
gw.omap_state.cleanup_omap()
if gw_logger and gw_name:
gw_logger.compress_final_log_file(gw_name)

Expand All @@ -40,6 +43,7 @@ def sigterm_handler(signum, frame):
config.display_environment_info(gw_logger.logger)
config.dump_config_file(gw_logger.logger)
with GatewayServer(config) as gateway:
gw = gateway
gw_name = gateway.name
gateway.serve()
gateway.keep_alive()
2 changes: 0 additions & 2 deletions control/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,6 @@ class DiscoveryService:
config: Basic gateway parameters
logger: Logger instance to track discovery controller events
omap_name: OMAP object name
ioctx: I/O context which allows OMAP access
discovery_addr: Discovery controller addr which allows initiator send command
discovery_port: Discovery controller's listening port
"""
Expand All @@ -321,7 +320,6 @@ def __init__(self, config):
if gateway_group else "nvmeof.state"
self.logger.info(f"log pages info from omap: {self.omap_name}")

self.ioctx = self.omap_state.open_rados_connection(config)
self.discovery_addr = self.config.get_with_default("discovery", "addr", "0.0.0.0")
self.discovery_port = self.config.get_with_default("discovery", "port", "8009")
if not self.discovery_addr or not self.discovery_port:
Expand Down
6 changes: 6 additions & 0 deletions control/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ def __init__(self, config: GatewayConfig):
self.rpc_lock = threading.Lock()
self.group_id = 0
self.monitor_client = '/usr/bin/ceph-nvmeof-monitor-client'
self.omap_state = None

self.name = self.config.get("gateway", "name")
if not self.name:
Expand Down Expand Up @@ -127,6 +128,10 @@ def __exit__(self, exc_type, exc_value, traceback):
if self.discovery_pid:
self._stop_discovery()

if self.omap_state:
self.omap_state.cleanup_omap()
self.omap_state = None

if logger:
logger.info("Exiting the gateway process.")
gw_logger.compress_final_log_file(gw_name)
Expand Down Expand Up @@ -161,6 +166,7 @@ def serve(self):
self.logger.info(f"Starting serve, monitor client version: {self._monitor_client_version()}")

omap_state = OmapGatewayState(self.config, f"gateway-{self.name}")
self.omap_state = omap_state
local_state = LocalGatewayState()
omap_state.check_for_old_format_omap_files()

Expand Down
47 changes: 44 additions & 3 deletions control/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from collections import defaultdict
from abc import ABC, abstractmethod
from .utils import GatewayLogger
import atexit

class GatewayState(ABC):
"""Persists gateway NVMeoF target state.
Expand Down Expand Up @@ -269,6 +270,10 @@ def lock_omap(self):
got_lock = False
assert self.rpc_lock.locked(), "The RPC lock is not locked."

if not self.omap_state.ioctx:
self.logger.warning(f"Not locking OMAP as Rados connection is closed")
return

for i in range(0, self.omap_file_lock_retries + 1):
try:
self.omap_state.ioctx.lock_exclusive(self.omap_state.omap_name, self.OMAP_FILE_LOCK_NAME,
Expand Down Expand Up @@ -310,6 +315,9 @@ def unlock_omap(self):
self.logger.warning(f"OMAP file unlock was disabled, will not unlock file")
return

if not self.omap_state.ioctx:
return

try:
self.omap_state.ioctx.unlock(self.omap_state.omap_name, self.OMAP_FILE_LOCK_NAME, self.OMAP_FILE_LOCK_COOKIE)
self.is_locked = False
Expand Down Expand Up @@ -347,6 +355,7 @@ def __init__(self, config, id_text=""):
self.config = config
self.version = 1
self.logger = GatewayLogger(self.config).logger
self.ioctx = None
self.watch = None
gateway_group = self.config.get("gateway", "group")
self.omap_name = f"nvmeof.{gateway_group}.state" if gateway_group else "nvmeof.state"
Expand All @@ -369,11 +378,10 @@ def __init__(self, config, id_text=""):
except Exception:
self.logger.exception(f"Unable to create OMAP, exiting!")
raise
atexit.register(self.cleanup_omap)

def __exit__(self, exc_type, exc_value, traceback):
if self.watch is not None:
self.watch.close()
self.ioctx.close()
self.cleanup_omap()

def check_for_old_format_omap_files(self):
omap_dict = self.get_state()
Expand Down Expand Up @@ -401,6 +409,10 @@ def set_local_version(self, version_update: int):

def get_omap_version(self) -> int:
"""Returns OMAP version."""
if not self.ioctx:
self.logger.warning(f"Trying to get OMAP version when Rados connection is closed")
return -1

with rados.ReadOpCtx() as read_op:
i, _ = self.ioctx.get_omap_vals_by_keys(read_op,
(self.OMAP_VERSION_KEY,))
Expand All @@ -419,6 +431,9 @@ def get_state(self) -> Dict[str, str]:
"""Returns dict of all OMAP keys and values."""
omap_list = [("", 0)] # Dummy, non empty, list value. Just so we would enter the while
omap_dict = {}
if not self.ioctx:
self.logger.warning(f"Trying to get OMAP state when Rados connection is closed")
return omap_dict
# The number of items returned is limited by Ceph, so we need to read in a loop until no more items are returned
while len(omap_list) > 0:
last_key_read = omap_list[-1][0]
Expand All @@ -431,6 +446,9 @@ def get_state(self) -> Dict[str, str]:

def _add_key(self, key: str, val: str):
"""Adds key and value to the OMAP."""
if not self.ioctx:
raise

try:
version_update = self.version + 1
with rados.WriteOpCtx() as write_op:
Expand All @@ -455,6 +473,9 @@ def _add_key(self, key: str, val: str):

def _remove_key(self, key: str):
"""Removes key from the OMAP."""
if not self.ioctx:
raise

try:
version_update = self.version + 1
with rados.WriteOpCtx() as write_op:
Expand All @@ -479,6 +500,9 @@ def _remove_key(self, key: str):

def delete_state(self):
"""Deletes OMAP object contents."""
if not self.ioctx:
raise

try:
with rados.WriteOpCtx() as write_op:
self.ioctx.clear_omap(write_op)
Expand All @@ -497,6 +521,9 @@ def register_watch(self, notify_event):
def _watcher_callback(notify_id, notifier_id, watch_id, data):
notify_event.set()

if not self.ioctx:
return

if self.watch is None:
try:
self.watch = self.ioctx.watch(self.omap_name, _watcher_callback)
Expand All @@ -505,6 +532,16 @@ def _watcher_callback(notify_id, notifier_id, watch_id, data):
else:
self.logger.info(f"Watch already exists.")

def cleanup_omap(self):
self.logger.info(f"Cleanup OMAP on exit ({self.id_text})")
if self.watch:
self.logger.debug("Unregistering watch")
self.watch.close()
self.watch = None
if self.ioctx:
self.logger.debug("Closing Rados connection")
self.ioctx.close()
self.ioctx = None

class GatewayStateHandler:
"""Maintains consistency in NVMeoF target state store instances.
Expand Down Expand Up @@ -635,6 +672,10 @@ def update(self) -> bool:
self.logger.warning(f"An update is already running, ignore")
return False

if not self.omap.ioctx:
self.logger.warning(f"Can't update when Rados connection is closed")
return False

with self.update_is_active_lock:
prefix_list = [
GatewayState.SUBSYSTEM_PREFIX,
Expand Down

0 comments on commit cc04470

Please sign in to comment.