diff --git a/.githubmap b/.githubmap index d69f58b4cf768..c5fd6048fd021 100644 --- a/.githubmap +++ b/.githubmap @@ -126,3 +126,4 @@ kotreshhr Kotresh Hiremath Ravishankar vkmc Victoria Martinez de la Cruz gouthampacha Goutham Pacha Ravi zdover23 Zac Dover +ShyamsundarR Shyamsundar R diff --git a/doc/cephfs/fs-volumes.rst b/doc/cephfs/fs-volumes.rst index 3eb2a7a8fbae3..4cc22d17dbc81 100644 --- a/doc/cephfs/fs-volumes.rst +++ b/doc/cephfs/fs-volumes.rst @@ -291,6 +291,10 @@ Similar to specifying a pool layout when creating a subvolume, pool layout can b $ ceph fs subvolume snapshot clone --pool_layout +Configure maximum number of concurrent clones. The default is set to 4:: + + $ ceph config set mgr mgr/volumes/max_concurrent_clones + To check the status of a clone operation use:: $ ceph fs clone status [--group_name ] diff --git a/qa/tasks/cephfs/test_volumes.py b/qa/tasks/cephfs/test_volumes.py index c5d3ab8fefb4a..e45ca4bf09881 100644 --- a/qa/tasks/cephfs/test_volumes.py +++ b/qa/tasks/cephfs/test_volumes.py @@ -2767,6 +2767,25 @@ def test_subvolume_snapshot_clone(self): # verify trash dir is clean self._wait_for_trash_empty() + def test_subvolume_snapshot_reconf_max_concurrent_clones(self): + """ + Validate 'max_concurrent_clones' config option + """ + + # get the default number of cloner threads + default_max_concurrent_clones = int(self.config_get('mgr', 'mgr/volumes/max_concurrent_clones')) + self.assertEqual(default_max_concurrent_clones, 4) + + # Increase number of cloner threads + self.config_set('mgr', 'mgr/volumes/max_concurrent_clones', 6) + max_concurrent_clones = int(self.config_get('mgr', 'mgr/volumes/max_concurrent_clones')) + self.assertEqual(max_concurrent_clones, 6) + + # Decrease number of cloner threads + self.config_set('mgr', 'mgr/volumes/max_concurrent_clones', 2) + max_concurrent_clones = int(self.config_get('mgr', 'mgr/volumes/max_concurrent_clones')) + self.assertEqual(max_concurrent_clones, 2) + def test_subvolume_snapshot_clone_pool_layout(self): subvolume = self._generate_random_subvolume_name() snapshot = self._generate_random_snapshot_name() diff --git a/src/pybind/mgr/volumes/fs/async_cloner.py b/src/pybind/mgr/volumes/fs/async_cloner.py index ac3b10d6a9c3b..61928ec2d0f42 100644 --- a/src/pybind/mgr/volumes/fs/async_cloner.py +++ b/src/pybind/mgr/volumes/fs/async_cloner.py @@ -274,6 +274,9 @@ def __init__(self, volume_client, tp_size): } super(Cloner, self).__init__(volume_client, "cloner", tp_size) + def reconfigure_max_concurrent_clones(self, tp_size): + super(Cloner, self).reconfigure_max_concurrent_clones("cloner", tp_size) + def is_clone_cancelable(self, clone_state): return not (SubvolumeOpSm.is_complete_state(clone_state) or SubvolumeOpSm.is_failed_state(clone_state)) diff --git a/src/pybind/mgr/volumes/fs/async_job.py b/src/pybind/mgr/volumes/fs/async_job.py index 3bdedb723b9ce..fb7051f47c242 100644 --- a/src/pybind/mgr/volumes/fs/async_job.py +++ b/src/pybind/mgr/volumes/fs/async_job.py @@ -27,6 +27,7 @@ def run(self): thread_id = threading.currentThread() assert isinstance(thread_id, JobThread) thread_name = thread_id.getName() + log.debug("thread [{0}] starting".format(thread_name)) while retries < JobThread.MAX_RETRIES_ON_EXCEPTION: vol_job = None @@ -34,6 +35,10 @@ def run(self): # fetch next job to execute with self.async_job.lock: while True: + if self.should_reconfigure_num_threads(): + log.info("thread [{0}] terminating due to reconfigure".format(thread_name)) + self.async_job.threads.remove(self) + return vol_job = self.async_job.get_job() if vol_job: break @@ -62,6 +67,12 @@ def run(self): time.sleep(1) log.error("thread [{0}] reached exception limit, bailing out...".format(thread_name)) self.vc.cluster_log("thread {0} bailing out due to exception".format(thread_name)) + with self.async_job.lock: + self.async_job.threads.remove(self) + + def should_reconfigure_num_threads(self): + # reconfigure of max_concurrent_clones + return len(self.async_job.threads) > self.async_job.nr_concurrent_jobs def cancel_job(self): self.cancel_event.set() @@ -103,12 +114,28 @@ def __init__(self, volume_client, name_pfx, nr_concurrent_jobs): # cv for job cancelation self.waiting = False self.cancel_cv = threading.Condition(self.lock) + self.nr_concurrent_jobs = nr_concurrent_jobs self.threads = [] for i in range(nr_concurrent_jobs): self.threads.append(JobThread(self, volume_client, name="{0}.{1}".format(name_pfx, i))) self.threads[-1].start() + def reconfigure_max_concurrent_clones(self, name_pfx, nr_concurrent_jobs): + """ + reconfigure number of cloner threads + """ + with self.lock: + self.nr_concurrent_jobs = nr_concurrent_jobs + # Decrease in concurrency. Notify threads which are waiting for a job to terminate. + if len(self.threads) > nr_concurrent_jobs: + self.cv.notifyAll() + # Increase in concurrency + if len(self.threads) < nr_concurrent_jobs: + for i in range(len(self.threads), nr_concurrent_jobs): + self.threads.append(JobThread(self, self.vc, name="{0}.{1}.{2}".format(name_pfx, time.time(), i))) + self.threads[-1].start() + def get_job(self): log.debug("processing {0} volume entries".format(len(self.q))) nr_vols = len(self.q) diff --git a/src/pybind/mgr/volumes/fs/volume.py b/src/pybind/mgr/volumes/fs/volume.py index 9f2a277ca107a..c4fc4ce52e70c 100644 --- a/src/pybind/mgr/volumes/fs/volume.py +++ b/src/pybind/mgr/volumes/fs/volume.py @@ -45,8 +45,7 @@ def __init__(self, mgr): super().__init__(mgr) # volume specification self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir')) - # TODO: make thread pool size configurable - self.cloner = Cloner(self, 4) + self.cloner = Cloner(self, self.mgr.max_concurrent_clones) self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4) # on startup, queue purge job for available volumes to kickstart # purge for leftover subvolume entries in trash. note that, if the diff --git a/src/pybind/mgr/volumes/module.py b/src/pybind/mgr/volumes/module.py index 80b6300cd35ca..e60f8d824c40b 100644 --- a/src/pybind/mgr/volumes/module.py +++ b/src/pybind/mgr/volumes/module.py @@ -2,6 +2,7 @@ import json import logging import traceback +import threading from mgr_module import MgrModule import orchestrator @@ -375,11 +376,28 @@ class Module(orchestrator.OrchestratorClientMixin, MgrModule): # volume in the lifetime of this module instance. ] + MODULE_OPTIONS = [ + { + 'name': 'max_concurrent_clones', + 'type': 'int', + 'default': 4, + 'desc': 'Number of asynchronous cloner threads', + } + ] + def __init__(self, *args, **kwargs): + self.inited = False + # for mypy + self.max_concurrent_clones = None + self.lock = threading.Lock() super(Module, self).__init__(*args, **kwargs) - self.vc = VolumeClient(self) - self.fs_export = FSExport(self) - self.nfs = NFSCluster(self) + # Initialize config option members + self.config_notify() + with self.lock: + self.vc = VolumeClient(self) + self.fs_export = FSExport(self) + self.nfs = NFSCluster(self) + self.inited = True def __del__(self): self.vc.shutdown() @@ -387,6 +405,21 @@ def __del__(self): def shutdown(self): self.vc.shutdown() + def config_notify(self): + """ + This method is called whenever one of our config options is changed. + """ + with self.lock: + for opt in self.MODULE_OPTIONS: + setattr(self, + opt['name'], # type: ignore + self.get_module_option(opt['name'])) # type: ignore + self.log.debug(' mgr option %s = %s', + opt['name'], getattr(self, opt['name'])) # type: ignore + if self.inited: + if opt['name'] == "max_concurrent_clones": + self.vc.cloner.reconfigure_max_concurrent_clones(self.max_concurrent_clones) + def handle_command(self, inbuf, cmd): handler_name = "_cmd_" + cmd['prefix'].replace(" ", "_") try: