diff --git a/ceph-nvmeof.conf b/ceph-nvmeof.conf index 5e8e1094..3224486f 100644 --- a/ceph-nvmeof.conf +++ b/ceph-nvmeof.conf @@ -21,7 +21,7 @@ enable_spdk_discovery_controller = False #omap_file_lock_retry_sleep_interval = 5 #omap_file_update_reloads = 10 log_level=debug -bdevs_per_cluster = 1 +bdevs_per_cluster = 8 #log_files_enabled = True #log_files_rotation_enabled = True #verbose_log_messages = True diff --git a/control/grpc.py b/control/grpc.py index 72f2cf17..f618bda3 100644 --- a/control/grpc.py +++ b/control/grpc.py @@ -12,7 +12,6 @@ import json import uuid import random -import logging import os import threading import errno @@ -203,9 +202,9 @@ def parse_json_exeption(self, ex): def _init_cluster_context(self) -> None: """Init cluster context management variables""" - self.clusters = {} - self.current_cluster = None - self.bdevs_per_cluster = self.config.getint_with_default("spdk", "bdevs_per_cluster", 1) + self.clusters = defaultdict(dict) + self.current_cluster = {} + self.bdevs_per_cluster = self.config.getint_with_default("spdk", "bdevs_per_cluster", 8) if self.bdevs_per_cluster < 1: raise Exception(f"invalid configuration: spdk.bdevs_per_cluster_contexts {self.bdevs_per_cluster} < 1") self.logger.info(f"NVMeoF bdevs per cluster: {self.bdevs_per_cluster}") @@ -214,25 +213,25 @@ def _init_cluster_context(self) -> None: if self.rados_id == "": self.rados_id = None - def _get_cluster(self) -> str: + def _get_cluster(self, anagrp: int) -> str: """Returns cluster name, enforcing bdev per cluster context""" cluster_name = None - if self.current_cluster is None: - cluster_name = self._alloc_cluster() - self.current_cluster = cluster_name - self.clusters[cluster_name] = 1 - elif self.clusters[self.current_cluster] >= self.bdevs_per_cluster: - self.current_cluster = None - cluster_name = self._get_cluster() + if anagrp not in self.current_cluster: + cluster_name = self._alloc_cluster(anagrp) + self.current_cluster[anagrp] = cluster_name + self.clusters[anagrp][cluster_name] = 1 + elif self.current_cluster[anagrp] in self.clusters[anagrp] and self.clusters[anagrp][self.current_cluster[anagrp]] >= self.bdevs_per_cluster: + self.current_cluster.pop(anagrp) + cluster_name = self._get_cluster(anagrp) else: - cluster_name = self.current_cluster - self.clusters[cluster_name] += 1 + cluster_name = self.current_cluster[anagrp] + self.clusters[anagrp][cluster_name] += 1 return cluster_name - def _alloc_cluster(self) -> str: + def _alloc_cluster(self, anagrp: int) -> str: """Allocates a new Rados cluster context""" - name = f"cluster_context_{len(self.clusters)}" + name = f"cluster_context_{anagrp}_{len(self.clusters[anagrp])}" nonce = rpc_bdev.bdev_rbd_register_cluster( self.spdk_rpc_client, name = name, @@ -254,7 +253,7 @@ def execute_grpc_function(self, func, request, context): """ return self.omap_lock.execute_omap_locking_function(self._grpc_function_with_lock, func, request, context) - def create_bdev(self, name, uuid, rbd_pool_name, rbd_image_name, block_size, create_image, rbd_image_size): + def create_bdev(self, anagrp: int, name, uuid, rbd_pool_name, rbd_image_name, block_size, create_image, rbd_image_size): """Creates a bdev from an RBD image.""" if create_image: @@ -287,7 +286,7 @@ def create_bdev(self, name, uuid, rbd_pool_name, rbd_image_name, block_size, cre return BdevStatus(status=errno.ENODEV, error_message=f"Failure creating bdev {name}: Can't create RBD image {rbd_image_name}") try: - cluster_name=self._get_cluster() + cluster_name=self._get_cluster(anagrp) bdev_name = rpc_bdev.bdev_rbd_create( self.spdk_rpc_client, name=name, @@ -300,6 +299,7 @@ def create_bdev(self, name, uuid, rbd_pool_name, rbd_image_name, block_size, cre self.bdev_cluster[name] = cluster_name self.logger.info(f"bdev_rbd_create: {bdev_name}") except Exception as ex: + self.logger.exception("bdev_rbd_create") errmsg = f"bdev_rbd_create {name} failed with:\n{ex}" self.logger.error(errmsg) resp = self.parse_json_exeption(ex) @@ -785,7 +785,8 @@ def namespace_add_safe(self, request, context): create_image = request.create_image if not context: create_image = False - ret_bdev = self.create_bdev(bdev_name, request.uuid, request.rbd_pool_name, + anagrp = int(request.anagrpid) if request.anagrpid is not None else 0 + ret_bdev = self.create_bdev(anagrp, bdev_name, request.uuid, request.rbd_pool_name, request.rbd_image_name, request.block_size, create_image, request.size) if ret_bdev.status != 0: errmsg = f"Failure adding namespace {nsid_msg}to {request.subsystem_nqn}: {ret_bdev.error_message}" diff --git a/tests/test_cli.py b/tests/test_cli.py index 4e82f06f..69566eb6 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -233,6 +233,7 @@ def test_add_namespace(self, caplog, gateway): caplog.clear() cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--uuid", uuid, "--size", "16MiB", "--rbd-create-image", "--force"]) assert f"Adding namespace 1 to {subsystem}, load balancing group 1: Successful" in caplog.text + assert "Allocated cluster name='cluster_context_1_0'" in caplog.text caplog.clear() cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--size", "36M", "--rbd-create-image", "--force"]) assert f"Image {pool}/{image2} already exists with a size of 16777216 bytes which differs from the requested size of 37748736 bytes" in caplog.text @@ -742,6 +743,7 @@ def test_add_namespace_ana(self, caplog, gateway): caplog.clear() cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image, "--load-balancing-group", anagrpid]) assert f"Adding namespace {nsid} to {subsystem}, load balancing group {anagrpid}: Successful" in caplog.text + assert "Allocated cluster name='cluster_context_2_0'" in caplog.text caplog.clear() cli(["--format", "json", "namespace", "list", "--subsystem", subsystem, "--nsid", nsid]) assert f'"load_balancing_group": {anagrpid}' in caplog.text diff --git a/tests/test_grpc.py b/tests/test_grpc.py index 428965d7..46c65f3b 100644 --- a/tests/test_grpc.py +++ b/tests/test_grpc.py @@ -35,7 +35,6 @@ def test_create_get_subsys(caplog, config): for i in range(created_resource_count): create_resource_by_index(i) - assert "failed" not in caplog.text.lower() assert "Failure" not in caplog.text assert f"{subsystem_prefix}0 with ANA group id 1" in caplog.text