diff --git a/.env b/.env index d952581e..acaaf6b7 100644 --- a/.env +++ b/.env @@ -8,6 +8,7 @@ QUAY_CEPH="${CONTAINER_REGISTRY}/vstart-cluster" QUAY_NVMEOF="${CONTAINER_REGISTRY}/nvmeof" QUAY_NVMEOFCLI="${CONTAINER_REGISTRY}/nvmeof-cli" MAINTAINER="Ceph Developers " +COMPOSE_PROJECT_NAME="ceph-nvmeof" # Performance NVMEOF_NOFILE=20480 # Max number of open files (depends on number of hosts connected) diff --git a/ceph-nvmeof.conf b/ceph-nvmeof.conf index f05cd065..a2d11810 100644 --- a/ceph-nvmeof.conf +++ b/ceph-nvmeof.conf @@ -18,6 +18,10 @@ state_update_interval_sec = 5 #min_controller_id = 1 #max_controller_id = 65519 enable_discovery_controller = false +#omap_file_lock_timeout = 60 +#omap_file_lock_retries = 15 +#omap_file_lock_retry_sleep_interval = 5 +#omap_file_update_retries = 10 [discovery] addr = 0.0.0.0 diff --git a/control/grpc.py b/control/grpc.py index 871ab56f..3f66cc74 100644 --- a/control/grpc.py +++ b/control/grpc.py @@ -13,6 +13,7 @@ import uuid import random import logging +import errno import spdk.rpc.bdev as rpc_bdev import spdk.rpc.nvmf as rpc_nvmf @@ -35,15 +36,18 @@ class GatewayService(pb2_grpc.GatewayServicer): spdk_rpc_client: Client of SPDK RPC server """ - def __init__(self, config, gateway_state, spdk_rpc_client) -> None: + def __init__(self, config, gateway_state, omap_state, spdk_rpc_client) -> None: """Constructor""" self.logger = logging.getLogger(__name__) self.config = config self.gateway_state = gateway_state + self.omap_state = omap_state self.spdk_rpc_client = spdk_rpc_client self.gateway_name = self.config.get("gateway", "name") if not self.gateway_name: self.gateway_name = socket.gethostname() + self.omap_file_lock_timeout = self.config.getint_with_default("gateway", "omap_file_lock_timeout", 60) + self.omap_file_update_retries = self.config.getint_with_default("gateway", "omap_file_update_retries", 10) self._init_cluster_context() def _init_cluster_context(self) -> None: @@ -86,9 +90,42 @@ def _alloc_cluster(self) -> str: ) return name + def lock_and_update_omap_file(self): + got_omap_lock = False + + if self.omap_file_lock_timeout <= 0: + self.logger.warning(f"Will not lock OMAP file, timeout is not positive") + return False + + need_to_update = False + for i in range(1, self.omap_file_update_retries): + try: + if need_to_update: + self.logger.warning(f"An update is required before locking the OMAP file") + self.gateway_state.update() + need_to_update = False + got_omap_lock = self.omap_state.lock_omap(self.omap_file_lock_timeout) + break + except OSError as err: + if err.errno == errno.EAGAIN: + self.logger.warning(f"Error locking OMAP file. The file is not current, will read the file and try again") + need_to_update = True + else: + self.logger.error(f"Error locking OMAP file, got exception: {err}") + raise + except Exception as ex: + self.logger.error(f"Error locking OMAP file, exception: {ex}") + raise + + if need_to_update: + raise Exception(f"Unable to lock OMAP file after updating {self.omap_file_update_retries} times, exiting") + + return got_omap_lock + def create_bdev(self, request, context=None): """Creates a bdev from an RBD image.""" + got_omap_lock = False if not request.uuid: request.uuid = str(uuid.uuid4()) @@ -96,6 +133,9 @@ def create_bdev(self, request, context=None): self.logger.info(f"Received request to create bdev {name} from" f" {request.rbd_pool_name}/{request.rbd_image_name}" f" with block size {request.block_size}") + if context: + got_omap_lock = self.lock_and_update_omap_file() + try: bdev_name = rpc_bdev.bdev_rbd_create( self.spdk_rpc_client, @@ -110,6 +150,8 @@ def create_bdev(self, request, context=None): except Exception as ex: self.logger.error(f"create_bdev failed with: \n {ex}") if context: + if got_omap_lock: + self.omap_state.unlock_omap() context.set_code(grpc.StatusCode.INTERNAL) context.set_details(f"{ex}") return pb2.bdev() @@ -124,6 +166,9 @@ def create_bdev(self, request, context=None): self.logger.error( f"Error persisting create_bdev {bdev_name}: {ex}") raise + finally: + if got_omap_lock: + self.omap_state.unlock_omap() return pb2.bdev(bdev_name=bdev_name, status=True) @@ -131,34 +176,44 @@ def delete_bdev(self, request, context=None): """Deletes a bdev.""" self.logger.info(f"Received request to delete bdev {request.bdev_name}") + got_omap_lock = False use_excep = None - req_get_subsystems = pb2.get_subsystems_req() - ret = self.get_subsystems(req_get_subsystems, context) - subsystems = json.loads(ret.subsystems) - for subsystem in subsystems: - for namespace in subsystem['namespaces']: - if namespace['bdev_name'] == request.bdev_name: - # We found a namespace still using this bdev. If --force was used we will try to remove this namespace. - # Otherwise fail with EBUSY - if request.force: - self.logger.info(f"Will remove namespace {namespace['nsid']} from {subsystem['nqn']} as it is using bdev {request.bdev_name}") - try: - req_rm_ns = pb2.remove_namespace_req(subsystem_nqn=subsystem['nqn'], nsid=namespace['nsid']) - ret = self.remove_namespace(req_rm_ns, context) - self.logger.info( - f"Removed namespace {namespace['nsid']} from {subsystem['nqn']}: {ret.status}") - except Exception as ex: - self.logger.error(f"Error removing namespace {namespace['nsid']} from {subsystem['nqn']}, will delete bdev {request.bdev_name} anyway: {ex}") - pass - else: - self.logger.error(f"Namespace {namespace['nsid']} from {subsystem['nqn']} is still using bdev {request.bdev_name}. You need to either remove it or use the '--force' command line option") - req = {"name": request.bdev_name, "method": "bdev_rbd_delete", "req_id": 0} - ret = {"code": -16, "message": "Device or resource busy"} - msg = "\n".join(["request:", "%s" % json.dumps(req, indent=2), - "Got JSON-RPC error response", - "response:", - json.dumps(ret, indent=2)]) - use_excep = Exception(msg) + + if context: + req_get_subsystems = pb2.get_subsystems_req() + ret = self.get_subsystems(req_get_subsystems, context) + subsystems = json.loads(ret.subsystems) + for subsystem in subsystems: + if use_excep is not None: + break + for namespace in subsystem['namespaces']: + if use_excep is not None: + break + if namespace['bdev_name'] == request.bdev_name: + # We found a namespace still using this bdev. If --force was used we will try to remove this namespace. + # Otherwise fail with EBUSY + if request.force: + self.logger.info(f"Will remove namespace {namespace['nsid']} from {subsystem['nqn']} as it is using bdev {request.bdev_name}") + try: + req_rm_ns = pb2.remove_namespace_req(subsystem_nqn=subsystem['nqn'], nsid=namespace['nsid']) + ret = self.remove_namespace(req_rm_ns, context) + self.logger.info( + f"Removed namespace {namespace['nsid']} from {subsystem['nqn']}: {ret.status}") + except Exception as ex: + self.logger.error(f"Error removing namespace {namespace['nsid']} from {subsystem['nqn']}, will delete bdev {request.bdev_name} anyway: {ex}") + pass + else: + self.logger.error(f"Namespace {namespace['nsid']} from {subsystem['nqn']} is still using bdev {request.bdev_name}. You need to either remove it or use the '--force' command line option") + req = {"name": request.bdev_name, "method": "bdev_rbd_delete", "req_id": 0} + ret = {"code": -16, "message": "Device or resource busy"} + msg = "\n".join(["request:", "%s" % json.dumps(req, indent=2), + "Got JSON-RPC error response", + "response:", + json.dumps(ret, indent=2)]) + use_excep = Exception(msg) + + if not use_excep: + got_omap_lock = self.lock_and_update_omap_file() try: if use_excep: @@ -171,6 +226,8 @@ def delete_bdev(self, request, context=None): except Exception as ex: self.logger.error(f"delete_bdev failed with: \n {ex}") if context: + if not use_excep and got_omap_lock: + self.omap_state.unlock_omap() context.set_code(grpc.StatusCode.INTERNAL) context.set_details(f"{ex}") return pb2.req_status() @@ -183,6 +240,9 @@ def delete_bdev(self, request, context=None): self.logger.error( f"Error persisting delete_bdev {request.bdev_name}: {ex}") raise + finally: + if got_omap_lock: + self.omap_state.unlock_omap() return pb2.req_status(status=ret) @@ -191,12 +251,17 @@ def create_subsystem(self, request, context=None): self.logger.info( f"Received request to create subsystem {request.subsystem_nqn}") + got_omap_lock = False min_cntlid = self.config.getint_with_default("gateway", "min_controller_id", 1) max_cntlid = self.config.getint_with_default("gateway", "max_controller_id", 65519) if not request.serial_number: random.seed() randser = random.randint(2, 99999999999999) request.serial_number = f"SPDK{randser}" + + if context: + got_omap_lock = self.lock_and_update_omap_file() + try: ret = rpc_nvmf.nvmf_create_subsystem( self.spdk_rpc_client, @@ -212,6 +277,8 @@ def create_subsystem(self, request, context=None): if context: context.set_code(grpc.StatusCode.INTERNAL) context.set_details(f"{ex}") + if got_omap_lock: + self.omap_state.unlock_omap() return pb2.req_status() if context: @@ -225,6 +292,9 @@ def create_subsystem(self, request, context=None): self.logger.error(f"Error persisting create_subsystem" f" {request.subsystem_nqn}: {ex}") raise + finally: + if got_omap_lock: + self.omap_state.unlock_omap() return pb2.req_status(status=ret) @@ -233,6 +303,11 @@ def delete_subsystem(self, request, context=None): self.logger.info( f"Received request to delete subsystem {request.subsystem_nqn}") + got_omap_lock = False + + if context: + got_omap_lock = self.lock_and_update_omap_file() + try: ret = rpc_nvmf.nvmf_delete_subsystem( self.spdk_rpc_client, @@ -242,6 +317,8 @@ def delete_subsystem(self, request, context=None): except Exception as ex: self.logger.error(f"delete_subsystem failed with: \n {ex}") if context: + if got_omap_lock: + self.omap_state.unlock_omap() context.set_code(grpc.StatusCode.INTERNAL) context.set_details(f"{ex}") return pb2.req_status() @@ -254,6 +331,9 @@ def delete_subsystem(self, request, context=None): self.logger.error(f"Error persisting delete_subsystem" f" {request.subsystem_nqn}: {ex}") raise + finally: + if got_omap_lock: + self.omap_state.unlock_omap() return pb2.req_status(status=ret) @@ -262,6 +342,11 @@ def add_namespace(self, request, context=None): self.logger.info(f"Received request to add {request.bdev_name} to" f" {request.subsystem_nqn}") + + got_omap_lock = False + if context: + got_omap_lock = self.lock_and_update_omap_file() + try: nsid = rpc_nvmf.nvmf_subsystem_add_ns( self.spdk_rpc_client, @@ -273,6 +358,8 @@ def add_namespace(self, request, context=None): except Exception as ex: self.logger.error(f"add_namespace failed with: \n {ex}") if context: + if got_omap_lock: + self.omap_state.unlock_omap() context.set_code(grpc.StatusCode.INTERNAL) context.set_details(f"{ex}") return pb2.nsid() @@ -290,6 +377,9 @@ def add_namespace(self, request, context=None): self.logger.error( f"Error persisting add_namespace {nsid}: {ex}") raise + finally: + if got_omap_lock: + self.omap_state.unlock_omap() return pb2.nsid(nsid=nsid, status=True) @@ -298,6 +388,11 @@ def remove_namespace(self, request, context=None): self.logger.info(f"Received request to remove {request.nsid} from" f" {request.subsystem_nqn}") + + got_omap_lock = False + if context: + got_omap_lock = self.lock_and_update_omap_file() + try: ret = rpc_nvmf.nvmf_subsystem_remove_ns( self.spdk_rpc_client, @@ -308,6 +403,8 @@ def remove_namespace(self, request, context=None): except Exception as ex: self.logger.error(f"remove_namespace failed with: \n {ex}") if context: + if got_omap_lock: + self.omap_state.unlock_omap() context.set_code(grpc.StatusCode.INTERNAL) context.set_details(f"{ex}") return pb2.req_status() @@ -321,12 +418,19 @@ def remove_namespace(self, request, context=None): self.logger.error( f"Error persisting remove_namespace {request.nsid}: {ex}") raise + finally: + if got_omap_lock: + self.omap_state.unlock_omap() return pb2.req_status(status=ret) def add_host(self, request, context=None): """Adds a host to a subsystem.""" + got_omap_lock = False + if context: + got_omap_lock = self.lock_and_update_omap_file() + try: if request.host_nqn == "*": # Allow any host access to subsystem self.logger.info(f"Received request to allow any host to" @@ -350,6 +454,8 @@ def add_host(self, request, context=None): except Exception as ex: self.logger.error(f"add_host failed with: \n {ex}") if context: + if got_omap_lock: + self.omap_state.unlock_omap() context.set_code(grpc.StatusCode.INTERNAL) context.set_details(f"{ex}") return pb2.req_status() @@ -365,12 +471,19 @@ def add_host(self, request, context=None): self.logger.error( f"Error persisting add_host {request.host_nqn}: {ex}") raise + finally: + if got_omap_lock: + self.omap_state.unlock_omap() return pb2.req_status(status=ret) def remove_host(self, request, context=None): """Removes a host from a subsystem.""" + got_omap_lock = False + if context: + got_omap_lock = self.lock_and_update_omap_file() + try: if request.host_nqn == "*": # Disable allow any host access self.logger.info( @@ -395,6 +508,8 @@ def remove_host(self, request, context=None): except Exception as ex: self.logger.error(f"remove_host failed with: \n {ex}") if context: + if got_omap_lock: + self.omap_state.unlock_omap() context.set_code(grpc.StatusCode.INTERNAL) context.set_details(f"{ex}") return pb2.req_status() @@ -407,16 +522,24 @@ def remove_host(self, request, context=None): except Exception as ex: self.logger.error(f"Error persisting remove_host: {ex}") raise + finally: + if got_omap_lock: + self.omap_state.unlock_omap() return pb2.req_status(status=ret) def create_listener(self, request, context=None): """Creates a listener for a subsystem at a given IP/Port.""" + got_omap_lock = False ret = True self.logger.info(f"Received request to create {request.gateway_name}" f" {request.trtype} listener for {request.nqn} at" f" {request.traddr}:{request.trsvcid}.") + + if context: + got_omap_lock = self.lock_and_update_omap_file() + try: if request.gateway_name == self.gateway_name: ret = rpc_nvmf.nvmf_subsystem_add_listener( @@ -434,6 +557,8 @@ def create_listener(self, request, context=None): except Exception as ex: self.logger.error(f"create_listener failed with: \n {ex}") if context: + if got_omap_lock: + self.omap_state.unlock_omap() context.set_code(grpc.StatusCode.INTERNAL) context.set_details(f"{ex}") return pb2.req_status() @@ -451,16 +576,24 @@ def create_listener(self, request, context=None): self.logger.error( f"Error persisting add_listener {request.trsvcid}: {ex}") raise + finally: + if got_omap_lock: + self.omap_state.unlock_omap() return pb2.req_status(status=ret) def delete_listener(self, request, context=None): """Deletes a listener from a subsystem at a given IP/Port.""" + got_omap_lock = False ret = True self.logger.info(f"Received request to delete {request.gateway_name}" f" {request.trtype} listener for {request.nqn} at" f" {request.traddr}:{request.trsvcid}.") + + if context: + got_omap_lock = self.lock_and_update_omap_file() + try: if request.gateway_name == self.gateway_name: ret = rpc_nvmf.nvmf_subsystem_remove_listener( @@ -478,6 +611,8 @@ def delete_listener(self, request, context=None): except Exception as ex: self.logger.error(f"delete_listener failed with: \n {ex}") if context: + if got_omap_lock: + self.omap_state.unlock_omap() context.set_code(grpc.StatusCode.INTERNAL) context.set_details(f"{ex}") return pb2.req_status() @@ -494,6 +629,9 @@ def delete_listener(self, request, context=None): self.logger.error( f"Error persisting delete_listener {request.trsvcid}: {ex}") raise + finally: + if got_omap_lock: + self.omap_state.unlock_omap() return pb2.req_status(status=ret) diff --git a/control/server.py b/control/server.py index bc740fc7..f25ef5f3 100644 --- a/control/server.py +++ b/control/server.py @@ -100,7 +100,7 @@ def serve(self): local_state = LocalGatewayState() gateway_state = GatewayStateHandler(self.config, local_state, omap_state, self.gateway_rpc_caller) - self.gateway_rpc = GatewayService(self.config, gateway_state, + self.gateway_rpc = GatewayService(self.config, gateway_state, omap_state, self.spdk_rpc_client) self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=1)) pb2_grpc.add_GatewayServicer_to_server(self.gateway_rpc, self.server) diff --git a/control/state.py b/control/state.py index b99664c3..8db2d79f 100644 --- a/control/state.py +++ b/control/state.py @@ -162,6 +162,8 @@ class OmapGatewayState(GatewayState): """ OMAP_VERSION_KEY = "omap_version" + OMAP_FILE_LOCK_NAME = "omap_file_lock" + OMAP_FILE_LOCK_COOKIE = "" def __init__(self, config): self.config = config @@ -173,6 +175,8 @@ def __init__(self, config): ceph_pool = self.config.get("ceph", "pool") ceph_conf = self.config.get("ceph", "config_file") rados_id = self.config.get_with_default("ceph", "id", "") + self.omap_file_lock_retries = self.config.getint_with_default("gateway", "omap_file_lock_retries", 15) + self.omap_file_lock_retry_sleep_interval = self.config.getint_with_default("gateway", "omap_file_lock_retry_sleep_interval", 5) try: conn = rados.Rados(conffile=ceph_conf, rados_id=rados_id) @@ -222,6 +226,55 @@ def get_omap_version(self) -> int: f" invalid number of values ({value_list}).") raise + def lock_omap(self, timeout = None): + got_lock = False + need_to_unlock = False + + if timeout is None: + self.logger.warning(f"Can't lock OMAP file without a lock timeout") + return False + + for i in range(1, self.omap_file_lock_retries): + try: + with rados.WriteOpCtx() as write_op: + self.ioctx.lock_exclusive(self.omap_name, self.OMAP_FILE_LOCK_NAME, self.OMAP_FILE_LOCK_COOKIE, "OMAP file changes lock", timeout, 0) + got_lock = True + need_to_unlock = True + break + except rados.ObjectExists as ex: + self.logger.info(f"We already locked the OMAP file") + got_lock = True + break + except rados.ObjectBusy as ex: + self.logger.warning(f"Someone else locked the OMAP file, will try again in {self.omap_file_lock_retry_sleep_interval} seconds") + time.sleep(self.omap_file_lock_retry_sleep_interval) + except Exception as ex: + self.logger.error(f"Unable to lock OMAP file: {ex}. Exiting!") + raise + + if not got_lock: + self.logger.error(f"Unable to lock OMAP file after {self.omap_file_lock_retries} retries. Exiting!") + raise Exception("Unable to lock OMAP file") + + omap_version = self.get_omap_version() + + if omap_version > self.version: + self.logger.warning(f"Local version {self.version} differs from OMAP file version {omap_version}, need to read the OMAP file") + self.unlock_omap() + raise OSError(11, "Unable to lock OMAP file, file not current", self.omap_name) + + return True + + def unlock_omap(self): + try: + with rados.WriteOpCtx() as write_op: + self.ioctx.unlock(self.omap_name, self.OMAP_FILE_LOCK_NAME, self.OMAP_FILE_LOCK_COOKIE) + except rados.ObjectNotFound as ex: + self.logger.warning(f"No such lock, maybe the lock timeout has expired") + except Exception as ex: + self.logger.error(f"Unable to unlock OMAP file: {ex}. Exiting!") + raise + def get_state(self) -> Dict[str, str]: """Returns dict of all OMAP keys and values.""" with rados.ReadOpCtx() as read_op: diff --git a/mk/demo.mk b/mk/demo.mk index 4f2f9c80..c70b49ef 100644 --- a/mk/demo.mk +++ b/mk/demo.mk @@ -7,7 +7,7 @@ rbd: CMD = bash -c "rbd -p $(RBD_POOL) info $(RBD_IMAGE_NAME) || rbd -p $(RBD_PO # demo # the fist gateway in docker enviroment, hostname defaults to container id -demo: export NVMEOF_HOSTNAME != docker ps -q -f name=ceph-nvmeof_nvmeof_1 +demo: export NVMEOF_HOSTNAME != docker ps -q -f name=ceph-nvmeof-nvmeof-1 demo: rbd ## Expose RBD_IMAGE_NAME as NVMe-oF target $(NVMEOF_CLI) create_bdev --pool $(RBD_POOL) --image $(RBD_IMAGE_NAME) --bdev $(BDEV_NAME) $(NVMEOF_CLI) create_subsystem --subnqn $(NQN)