Skip to content

Commit

Permalink
Allocate cluster context per ANA group
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Indenbaum <[email protected]>
  • Loading branch information
Alexander Indenbaum committed Feb 13, 2024
1 parent 5f6560e commit 7fe86f3
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 21 deletions.
2 changes: 1 addition & 1 deletion ceph-nvmeof.conf
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ enable_spdk_discovery_controller = False
#omap_file_lock_retry_sleep_interval = 5
#omap_file_update_reloads = 10
log_level=debug
bdevs_per_cluster = 1
bdevs_per_cluster = 8
#log_files_enabled = True
#log_files_rotation_enabled = True
#verbose_log_messages = True
Expand Down
39 changes: 20 additions & 19 deletions control/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import json
import uuid
import random
import logging
import os
import threading
import errno
Expand Down Expand Up @@ -203,9 +202,9 @@ def parse_json_exeption(self, ex):

def _init_cluster_context(self) -> None:
"""Init cluster context management variables"""
self.clusters = {}
self.current_cluster = None
self.bdevs_per_cluster = self.config.getint_with_default("spdk", "bdevs_per_cluster", 1)
self.clusters = defaultdict(dict)
self.current_cluster = {}
self.bdevs_per_cluster = self.config.getint_with_default("spdk", "bdevs_per_cluster", 8)
if self.bdevs_per_cluster < 1:
raise Exception(f"invalid configuration: spdk.bdevs_per_cluster_contexts {self.bdevs_per_cluster} < 1")
self.logger.info(f"NVMeoF bdevs per cluster: {self.bdevs_per_cluster}")
Expand All @@ -214,25 +213,25 @@ def _init_cluster_context(self) -> None:
if self.rados_id == "":
self.rados_id = None

def _get_cluster(self) -> str:
def _get_cluster(self, anagrp: int) -> str:
"""Returns cluster name, enforcing bdev per cluster context"""
cluster_name = None
if self.current_cluster is None:
cluster_name = self._alloc_cluster()
self.current_cluster = cluster_name
self.clusters[cluster_name] = 1
elif self.clusters[self.current_cluster] >= self.bdevs_per_cluster:
self.current_cluster = None
cluster_name = self._get_cluster()
if anagrp not in self.current_cluster:
cluster_name = self._alloc_cluster(anagrp)
self.current_cluster[anagrp] = cluster_name
self.clusters[anagrp][cluster_name] = 1
elif self.current_cluster[anagrp] in self.clusters[anagrp] and self.clusters[anagrp][self.current_cluster[anagrp]] >= self.bdevs_per_cluster:
self.current_cluster.pop(anagrp)
cluster_name = self._get_cluster(anagrp)
else:
cluster_name = self.current_cluster
self.clusters[cluster_name] += 1
cluster_name = self.current_cluster[anagrp]
self.clusters[anagrp][cluster_name] += 1

return cluster_name

def _alloc_cluster(self) -> str:
def _alloc_cluster(self, anagrp: int) -> str:
"""Allocates a new Rados cluster context"""
name = f"cluster_context_{len(self.clusters)}"
name = f"cluster_context_{anagrp}_{len(self.clusters[anagrp])}"
nonce = rpc_bdev.bdev_rbd_register_cluster(
self.spdk_rpc_client,
name = name,
Expand All @@ -254,7 +253,7 @@ def execute_grpc_function(self, func, request, context):
"""
return self.omap_lock.execute_omap_locking_function(self._grpc_function_with_lock, func, request, context)

def create_bdev(self, name, uuid, rbd_pool_name, rbd_image_name, block_size, create_image, rbd_image_size):
def create_bdev(self, anagrp: int, name, uuid, rbd_pool_name, rbd_image_name, block_size, create_image, rbd_image_size):
"""Creates a bdev from an RBD image."""

if create_image:
Expand Down Expand Up @@ -287,7 +286,7 @@ def create_bdev(self, name, uuid, rbd_pool_name, rbd_image_name, block_size, cre
return BdevStatus(status=errno.ENODEV, error_message=f"Failure creating bdev {name}: Can't create RBD image {rbd_image_name}")

try:
cluster_name=self._get_cluster()
cluster_name=self._get_cluster(anagrp)
bdev_name = rpc_bdev.bdev_rbd_create(
self.spdk_rpc_client,
name=name,
Expand All @@ -300,6 +299,7 @@ def create_bdev(self, name, uuid, rbd_pool_name, rbd_image_name, block_size, cre
self.bdev_cluster[name] = cluster_name
self.logger.info(f"bdev_rbd_create: {bdev_name}")
except Exception as ex:
self.logger.exception("bdev_rbd_create")
errmsg = f"bdev_rbd_create {name} failed with:\n{ex}"
self.logger.error(errmsg)
resp = self.parse_json_exeption(ex)
Expand Down Expand Up @@ -785,7 +785,8 @@ def namespace_add_safe(self, request, context):
create_image = request.create_image
if not context:
create_image = False
ret_bdev = self.create_bdev(bdev_name, request.uuid, request.rbd_pool_name,
anagrp = int(request.anagrpid) if request.anagrpid is not None else 0
ret_bdev = self.create_bdev(anagrp, bdev_name, request.uuid, request.rbd_pool_name,
request.rbd_image_name, request.block_size, create_image, request.size)
if ret_bdev.status != 0:
errmsg = f"Failure adding namespace {nsid_msg}to {request.subsystem_nqn}: {ret_bdev.error_message}"
Expand Down
2 changes: 2 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ def test_add_namespace(self, caplog, gateway):
caplog.clear()
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--uuid", uuid, "--size", "16MiB", "--rbd-create-image", "--force"])
assert f"Adding namespace 1 to {subsystem}, load balancing group 1: Successful" in caplog.text
assert "Allocated cluster name='cluster_context_1_0'" in caplog.text
caplog.clear()
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--size", "36M", "--rbd-create-image", "--force"])
assert f"Image {pool}/{image2} already exists with a size of 16777216 bytes which differs from the requested size of 37748736 bytes" in caplog.text
Expand Down Expand Up @@ -742,6 +743,7 @@ def test_add_namespace_ana(self, caplog, gateway):
caplog.clear()
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image, "--load-balancing-group", anagrpid])
assert f"Adding namespace {nsid} to {subsystem}, load balancing group {anagrpid}: Successful" in caplog.text
assert "Allocated cluster name='cluster_context_2_0'" in caplog.text
caplog.clear()
cli(["--format", "json", "namespace", "list", "--subsystem", subsystem, "--nsid", nsid])
assert f'"load_balancing_group": {anagrpid}' in caplog.text
Expand Down
1 change: 0 additions & 1 deletion tests/test_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ def test_create_get_subsys(caplog, config):

for i in range(created_resource_count):
create_resource_by_index(i)
assert "failed" not in caplog.text.lower()
assert "Failure" not in caplog.text

assert f"{subsystem_prefix}0 with ANA group id 1" in caplog.text
Expand Down

0 comments on commit 7fe86f3

Please sign in to comment.