Skip to content

Commit

Permalink
Allocate cluster context per ANA group
Browse files Browse the repository at this point in the history
- implement bdev removal
- free the cluster context if no longer used by any bdev

Signed-off-by: Alexander Indenbaum <[email protected]>
  • Loading branch information
Alexander Indenbaum committed Feb 18, 2024
1 parent 4a0a10a commit cf598de
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 20 deletions.
2 changes: 1 addition & 1 deletion ceph-nvmeof.conf
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ enable_spdk_discovery_controller = False
#omap_file_lock_retry_sleep_interval = 5
#omap_file_update_reloads = 10
log_level=debug
bdevs_per_cluster = 1
bdevs_per_cluster = 32
#log_files_enabled = True
#log_files_rotation_enabled = True
#verbose_log_messages = True
Expand Down
64 changes: 45 additions & 19 deletions control/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import json
import uuid
import random
import logging
import os
import threading
import errno
Expand Down Expand Up @@ -203,9 +202,8 @@ def parse_json_exeption(self, ex):

def _init_cluster_context(self) -> None:
"""Init cluster context management variables"""
self.clusters = {}
self.current_cluster = None
self.bdevs_per_cluster = self.config.getint_with_default("spdk", "bdevs_per_cluster", 1)
self.clusters = defaultdict(dict)
self.bdevs_per_cluster = self.config.getint_with_default("spdk", "bdevs_per_cluster", 32)
if self.bdevs_per_cluster < 1:
raise Exception(f"invalid configuration: spdk.bdevs_per_cluster_contexts {self.bdevs_per_cluster} < 1")
self.logger.info(f"NVMeoF bdevs per cluster: {self.bdevs_per_cluster}")
Expand All @@ -214,25 +212,51 @@ def _init_cluster_context(self) -> None:
if self.rados_id == "":
self.rados_id = None

def _get_cluster(self) -> str:
def _get_cluster(self, anagrp: int) -> str:
"""Returns cluster name, enforcing bdev per cluster context"""
cluster_name = None
if self.current_cluster is None:
cluster_name = self._alloc_cluster()
self.current_cluster = cluster_name
self.clusters[cluster_name] = 1
elif self.clusters[self.current_cluster] >= self.bdevs_per_cluster:
self.current_cluster = None
cluster_name = self._get_cluster()
for name in self.clusters[anagrp]:
if self.clusters[anagrp][name] < self.bdevs_per_cluster:
cluster_name = name
break

if not cluster_name:
cluster_name = self._alloc_cluster(anagrp)
self.clusters[anagrp][cluster_name] = 1
else:
cluster_name = self.current_cluster
self.clusters[cluster_name] += 1
self.clusters[anagrp][cluster_name] += 1

return cluster_name

def _alloc_cluster(self) -> str:
def _put_cluster(self, name: str) -> None:
for anagrp in self.clusters:
if name in self.clusters[anagrp]:
self.clusters[anagrp][name] -= 1
assert self.clusters[anagrp][name] >= 0
# free the cluster context if no longer used by any bdev
if self.clusters[anagrp][name] == 0:
ret = rpc_bdev.bdev_rbd_unregister_cluster(
self.spdk_rpc_client,
name = name
)
self.logger.info(f"Free cluster {name=} {ret=}")
assert ret
self.clusters[anagrp].pop(name)
return
assert False # we should find the cluster in our state

def _alloc_cluster_name(self, anagrp: int) -> str:
"""Allocates a new cluster name for ana group"""
x = 0
while True:
name = f"cluster_context_{anagrp}_{x}"
if name not in self.clusters[anagrp]:
return name
x += 1

def _alloc_cluster(self, anagrp: int) -> str:
"""Allocates a new Rados cluster context"""
name = f"cluster_context_{len(self.clusters)}"
name = self._alloc_cluster_name(anagrp)
nonce = rpc_bdev.bdev_rbd_register_cluster(
self.spdk_rpc_client,
name = name,
Expand All @@ -254,7 +278,7 @@ def execute_grpc_function(self, func, request, context):
"""
return self.omap_lock.execute_omap_locking_function(self._grpc_function_with_lock, func, request, context)

def create_bdev(self, name, uuid, rbd_pool_name, rbd_image_name, block_size, create_image, rbd_image_size):
def create_bdev(self, anagrp: int, name, uuid, rbd_pool_name, rbd_image_name, block_size, create_image, rbd_image_size):
"""Creates a bdev from an RBD image."""

if create_image:
Expand Down Expand Up @@ -288,7 +312,7 @@ def create_bdev(self, name, uuid, rbd_pool_name, rbd_image_name, block_size, cre
return BdevStatus(status=errno.ENODEV, error_message=f"Failure creating bdev {name}: {errmsg}")

try:
cluster_name=self._get_cluster()
cluster_name=self._get_cluster(anagrp)
bdev_name = rpc_bdev.bdev_rbd_create(
self.spdk_rpc_client,
name=name,
Expand Down Expand Up @@ -361,6 +385,7 @@ def delete_bdev(self, bdev_name):
self.spdk_rpc_client,
bdev_name,
)
self._put_cluster(self.bdev_cluster[bdev_name])
self.logger.info(f"delete_bdev {bdev_name}: {ret}")
except Exception as ex:
errmsg = f"Failure deleting bdev {bdev_name}"
Expand Down Expand Up @@ -791,7 +816,8 @@ def namespace_add_safe(self, request, context):
create_image = request.create_image
if not context:
create_image = False
ret_bdev = self.create_bdev(bdev_name, request.uuid, request.rbd_pool_name,
anagrp = int(request.anagrpid) if request.anagrpid is not None else 0
ret_bdev = self.create_bdev(anagrp, bdev_name, request.uuid, request.rbd_pool_name,
request.rbd_image_name, request.block_size, create_image, request.size)
if ret_bdev.status != 0:
errmsg = f"Failure adding namespace {nsid_msg}to {request.subsystem_nqn}: {ret_bdev.error_message}"
Expand Down
2 changes: 2 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ def test_add_namespace(self, caplog, gateway):
caplog.clear()
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--uuid", uuid, "--size", "16MiB", "--rbd-create-image", "--force"])
assert f"Adding namespace 1 to {subsystem}, load balancing group 1: Successful" in caplog.text
assert "Allocated cluster name='cluster_context_1_0'" in caplog.text
caplog.clear()
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--size", "36M", "--rbd-create-image", "--force"])
assert f"Image {pool}/{image2} already exists with a size of 16777216 bytes which differs from the requested size of 37748736 bytes" in caplog.text
Expand Down Expand Up @@ -742,6 +743,7 @@ def test_add_namespace_ana(self, caplog, gateway):
caplog.clear()
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image, "--load-balancing-group", anagrpid])
assert f"Adding namespace {nsid} to {subsystem}, load balancing group {anagrpid}: Successful" in caplog.text
assert "Allocated cluster name='cluster_context_2_0'" in caplog.text
caplog.clear()
cli(["--format", "json", "namespace", "list", "--subsystem", subsystem, "--nsid", nsid])
assert f'"load_balancing_group": {anagrpid}' in caplog.text
Expand Down

0 comments on commit cf598de

Please sign in to comment.