From 9b2e21ab44d18d227cbb20a8091a1c407a96157a Mon Sep 17 00:00:00 2001 From: Aishwarya Mathuria Date: Tue, 12 Oct 2021 14:27:33 +0000 Subject: [PATCH] cluster: Addition of new tests to run Scrub and Recovery Add 2 new tests similar to the background Recovery test for running Scrub load with client IO and the other one for running Scrub, Recovery, and Client load. The Scrub and client IO test performs the following steps: - Create a pool and image to populate scrub objects (scrub pool) - Create scrub thread - Populate the scrub pool with objects using radosbench - Initiate deep-scrub on the scrub pool - Create a second pool and an image in it to run client IO - Initiate fio job on the second image at the same time the deep-scrub starts In the second test, we have an additional recovery pool that is populated after an OSD is marked down and out. Once the pool is populated we mark the OSD up and in which starts backfill. At the same time, we begin deep-scrub on the scrub pool and client IO. Signed-off-by: Aishwarya Mathuria --- benchmark/fio.py | 30 ++ benchmark/librbdfio.py | 39 ++ client_endpoints/ceph_client_endpoints.py | 11 + client_endpoints/client_endpoints.py | 3 + client_endpoints/librbd_client_endpoints.py | 3 + client_endpoints/rbdfuse_client_endpoints.py | 3 + .../rbdkernel_client_endpoints.py | 3 + client_endpoints/rbdnbd_client_endpoints.py | 3 + client_endpoints/rbdtcmu_client_endpoints.py | 3 + cluster/ceph.py | 356 +++++++++++++++++- 10 files changed, 445 insertions(+), 9 deletions(-) diff --git a/benchmark/fio.py b/benchmark/fio.py index b504b389..7ccfc631 100644 --- a/benchmark/fio.py +++ b/benchmark/fio.py @@ -66,6 +66,11 @@ def initialize_endpoints(self): # Create the recovery image based on test type requested if 'recovery_test' in self.cluster.config and self.recov_test_type == 'background': self.client_endpoints_object.create_recovery_image() + if 'scrubbing_test' in self.cluster.config: + self.client_endpoints_object.create_scrubbing_image() + if 'scrub_recov_test' in self.cluster.config: + self.client_endpoints_object.create_recovery_image() + self.client_endpoints_object.create_scrubbing_image() self.create_endpoints() def create_endpoints(self): @@ -213,6 +218,18 @@ def run(self): # Wait for signal to start client IO self.cluster.wait_start_io() + if 'scrubbing_test' in self.cluster.config: + logger.info('Scrubbing test in config') + scrubbing_callback = self.scrubbing_callback + self.cluster.create_scrubbing_test(self.run_dir, scrubbing_callback) + self.cluster.wait_start_io() + + if 'scrub_recov_test' in self.cluster.config: + logger.info('Scrub+Recov') + scrub_recov_callback = self.scrub_recov_callback + self.cluster.create_scrub_recovery_test(self.run_dir, scrub_recov_callback) + self.cluster.wait_start_io() + monitoring.start(self.run_dir) logger.info('Running fio %s test.', self.mode) @@ -225,6 +242,13 @@ def run(self): # If we were doing recovery, wait until it's done. if 'recovery_test' in self.cluster.config: self.cluster.wait_recovery_done() + # If we were doing scrubbing, wait until it's done. + if 'scrubbing_test' in self.cluster.config: + self.cluster.wait_scrubbing_done() + + if 'scrub_recov_test' in self.cluster.config: + self.cluster.wait_scrub_recovery_done() + monitoring.stop(self.run_dir) @@ -239,6 +263,12 @@ def recovery_callback_blocking(self): def recovery_callback_background(self): logger.info('Recovery thread completed!') + def scrubbing_callback(self): + logger.info('Scrubbing thread completed') + + def scrub_recov_callback(self): + logger.info('Scrub+Recovery thread completed') + def analyze(self, out_dir): logger.info('Convert results to json format.') for client in settings.getnodes('clients').split(','): diff --git a/benchmark/librbdfio.py b/benchmark/librbdfio.py index 84ea0de3..66e487c6 100644 --- a/benchmark/librbdfio.py +++ b/benchmark/librbdfio.py @@ -46,6 +46,8 @@ def __init__(self, archive_dir, cluster, config): self.use_existing_volumes = config.get('use_existing_volumes', False) self.pool_name = config.get("poolname", "cbt-librbdfio") self.recov_pool_name = config.get("recov_pool_name", "cbt-librbdfio-recov") + self.scrub_pool_name = config.get("scrub_pool_name", "cbt-librbdfio-scrub") + self.scrub_pool_profile = config.get("scrub_pool_profile", "default") self.rbdname = config.get('rbdname', '') self.total_procs = self.procs_per_volume * self.volumes_per_client * len(settings.getnodes('clients').split(',')) @@ -80,9 +82,17 @@ def initialize(self): common.sync_files('%s/*' % self.run_dir, self.out_dir) + if 'scrubbing_test' in self.cluster.config: + self.mkscrubimage() + # Create the recovery image based on test type requested if 'recovery_test' in self.cluster.config and self.recov_test_type == 'background': self.mkrecovimage() + + if 'scrub_recov_test' in self.cluster.config: + self.mkrecovimage() + self.mkscrubimage() + self.mkimages() # populate the fio files ps = [] @@ -128,6 +138,17 @@ def run(self): # Wait for a signal from the recovery thread to initiate client IO self.cluster.wait_start_io() + if 'scrubbing_test' in self.cluster.config: + scrubbing_callback = self.scrubbing_callback + self.cluster.create_scrubbing_test(self.run_dir, scrubbing_callback) + self.cluster.wait_start_io() + + if 'scrub_recov_test' in self.cluster.config: + scrub_recov_callback = self.scrub_recov_callback + self.cluster.create_scrub_recovery_test(self.run_dir, scrub_recov_callback) + self.cluster.wait_start_io() + + monitoring.start(self.run_dir) logger.info('Running rbd fio %s test.', self.mode) @@ -142,6 +163,9 @@ def run(self): if 'recovery_test' in self.cluster.config: self.cluster.wait_recovery_done() + if 'scrub_recov_test' in self.cluster.config: + self.cluster.wait_scrub_recovery_done() + monitoring.stop(self.run_dir) # Finally, get the historic ops @@ -210,6 +234,18 @@ def mkrecovimage(self): self.cluster.mkimage('cbt-librbdfio-recov-%s-%d' % (node,volnum), self.vol_size, self.recov_pool_name, self.data_pool, self.vol_object_size) monitoring.stop() + def mkscrubimage(self): + logger.info('Creating scrubbing image...') + monitoring.start("%s/scrub_pool_monitoring" % self.run_dir) + if (self.use_existing_volumes == False): + self.cluster.rmpool(self.scrub_pool_name, self.scrub_pool_profile) + self.cluster.mkpool(self.scrub_pool_name, self.scrub_pool_profile, 'rbd') + for node in common.get_fqdn_list('clients'): + for volnum in range(0, self.volumes_per_client): + node = node.rpartition("@")[2] + self.cluster.mkimage('cbt-librbdfio-scrub-%s-%d' % (node,volnum), self.vol_size, self.scrub_pool_name, self.data_pool, self.vol_object_size) + monitoring.stop() + def mkimages(self): monitoring.start("%s/pool_monitoring" % self.run_dir) if (self.use_existing_volumes == False): @@ -231,6 +267,9 @@ def recovery_callback_blocking(self): def recovery_callback_background(self): logger.info('Recovery thread completed!') + def scrubbing_callback(self): + logger.info('Scrubbing thread completed!') + def parse(self, out_dir): for client in settings.getnodes('clients').split(','): host = settings.host_info(client)["host"] diff --git a/client_endpoints/ceph_client_endpoints.py b/client_endpoints/ceph_client_endpoints.py index f61fb9bb..fe4b450f 100644 --- a/client_endpoints/ceph_client_endpoints.py +++ b/client_endpoints/ceph_client_endpoints.py @@ -26,6 +26,8 @@ def __init__(self, cluster, config): self.data_pool_profile = config.get('data_pool_profile', None) self.recov_pool = None self.recov_pool_profile = config.get('recov_pool_profile', 'default') + self.scrub_pool = None + self.scrub_pool_profile = config.get('scrub_pool_profile', 'default') self.order = config.get('order', 22) self.disabled_features = config.get('disabled_features', None) @@ -114,6 +116,15 @@ def create_rbd_recovery(self): rbd_name = '%s-%s' % (self.pool, self.get_rbd_name(node, ep_num)) self.cluster.mkimage(rbd_name, self.endpoint_size, self.pool, self.data_pool, self.order) + def create_rbd_scrubbing(self): + self.pool = '%s-scrub' % self.name + self.cluster.rmpool(self.pool, self.scrub_pool_profile) + self.cluster.mkpool(self.pool, self.scrub_pool_profile, 'rbd') + for node in common.get_fqdn_list('clients'): + for ep_num in range(0, self.endpoints_per_client): + rbd_name = '%s-%s' % (self.pool, self.get_rbd_name(node, ep_num)) + self.cluster.mkimage(rbd_name, self.endpoint_size, self.pool, self.data_pool, self.order) + def mount_rbd(self): for ep_num in range(0, self.endpoints_per_client): dir_name = self.get_dir_name(ep_num) diff --git a/client_endpoints/client_endpoints.py b/client_endpoints/client_endpoints.py index a5b19252..97ed2a06 100644 --- a/client_endpoints/client_endpoints.py +++ b/client_endpoints/client_endpoints.py @@ -45,3 +45,6 @@ def remove(self): def create_recovery_image(self): pass + + def create_scrubbing_image(self): + pass diff --git a/client_endpoints/librbd_client_endpoints.py b/client_endpoints/librbd_client_endpoints.py index 64fd3bd3..57fd3bba 100644 --- a/client_endpoints/librbd_client_endpoints.py +++ b/client_endpoints/librbd_client_endpoints.py @@ -19,3 +19,6 @@ def mount(self): def create_recovery_image(self): self.create_rbd_recovery() + + def create_scrubbing_image(self): + self.create_rbd_scrubbing() diff --git a/client_endpoints/rbdfuse_client_endpoints.py b/client_endpoints/rbdfuse_client_endpoints.py index ce4586f6..fa94a1ae 100644 --- a/client_endpoints/rbdfuse_client_endpoints.py +++ b/client_endpoints/rbdfuse_client_endpoints.py @@ -31,3 +31,6 @@ def map_rbd(self, node, rbd_name): def create_recovery_image(self): self.create_rbd_recovery() + + def create_scrubbing_image(self): + self.create_rbd_scrubbing() diff --git a/client_endpoints/rbdkernel_client_endpoints.py b/client_endpoints/rbdkernel_client_endpoints.py index 66dbd0a7..8a75afb9 100644 --- a/client_endpoints/rbdkernel_client_endpoints.py +++ b/client_endpoints/rbdkernel_client_endpoints.py @@ -22,3 +22,6 @@ def map_rbd(self, node, rbd_name): def create_recovery_image(self): self.create_rbd_recovery() + + def create_scrubbing_image(self): + self.create_rbd_scrubbing() diff --git a/client_endpoints/rbdnbd_client_endpoints.py b/client_endpoints/rbdnbd_client_endpoints.py index 22a05a98..93abc3be 100644 --- a/client_endpoints/rbdnbd_client_endpoints.py +++ b/client_endpoints/rbdnbd_client_endpoints.py @@ -16,3 +16,6 @@ def map_rbd(self, node, rbd_name): def create_recovery_image(self): self.create_rbd_recovery() + + def create_scrubbing_image(self): + self.create_rbd_scrubbing() diff --git a/client_endpoints/rbdtcmu_client_endpoints.py b/client_endpoints/rbdtcmu_client_endpoints.py index 0ec2c0e7..236e69cd 100644 --- a/client_endpoints/rbdtcmu_client_endpoints.py +++ b/client_endpoints/rbdtcmu_client_endpoints.py @@ -23,3 +23,6 @@ def map_rbd(self, node, rbd_name): def create_recovery_image(self): self.create_rbd_recovery() + + def create_scrubbing_image(self): + self.create_rbd_scrubbing() diff --git a/cluster/ceph.py b/cluster/ceph.py index 93fc9f32..d23897a4 100644 --- a/cluster/ceph.py +++ b/cluster/ceph.py @@ -148,6 +148,13 @@ def __init__(self, config): self.prefill_recov_time = 0 self.recov_pool_name = '' + #Scrubbing tests + self.scrub_enabled = config.get('enable_scrub', False) + self.prefill_scrub_objects = 0 + self.prefill_scrub_object_size = 0 + self.prefill_scrub_time = 0 + self.scrub_pool_name = '' + def initialize(self): # Reset the rulesets self.ruleset_map = {} @@ -194,7 +201,9 @@ def initialize(self): monitoring.stop() # Disable scrub and wait for any scrubbing to complete - self.disable_scrub() + logger.info("Scrub enabled is %s", self.scrub_enabled) + if not self.scrub_enabled: + self.disable_scrub() if self.disable_bal: self.disable_balancer() @@ -532,6 +541,12 @@ def start_rgw(self): def disable_scrub(self): common.pdsh(settings.getnodes('head'), "ceph osd set noscrub; ceph osd set nodeep-scrub").communicate() + def disable_recovery(self): + common.pdsh(settings.getnodes('head'), "ceph osd set norecover; ceph osd set nobackfill").communicate() + + def enable_recovery(self): + common.pdsh(settings.getnodes('head'), "ceph osd unset norecover; ceph osd unset nobackfill").communicate() + def disable_balancer(self): common.pdsh(settings.getnodes('head'), "ceph balancer off").communicate() @@ -548,7 +563,6 @@ def check_health(self, check_list=None, logfile=None, recstatsfile=None): if recstatsfile: header = "Time, Num Deg Objs, Total Deg Objs" stdout, stderr = common.pdsh(settings.getnodes('head'), 'echo %s >> %s' % (header, recstatsfile)).communicate() - while True: stdout, stderr = common.pdsh(settings.getnodes('head'), '%s -c %s health %s' % (self.ceph_cmd, self.tmp_conf, logline)).communicate() self.log_recovery_stats(recstatsfile) @@ -563,6 +577,44 @@ def check_health(self, check_list=None, logfile=None, recstatsfile=None): return ret + def log_scrubbing_stats(self, scrubstatsfile=None, pgid=None): + if not scrubstatsfile: + return + fmtjson = "--format=json" + PG_MAP = "pg_map" + PG_STATS = "pg_stats" + separator = " " + PG_ID = "pgid" + NUM_OBJECTS = "num_objects" + LAST_SCRUB_STAMP = "last_scrub_stamp" + STAT_SUM = "stat_sum" + SCRUB_DURATION = "last_scrub_duration" + OBJECTS_SCRUBBED = "objects_scrubbed" + stdout, stderr = common.pdsh(settings.getnodes('head'), '%s -c %s pg dump %s' % (self.ceph_cmd, self.tmp_conf, fmtjson)).communicate() + stdout = stdout.split(':', 1)[1] + stdout = stdout.strip() + try: + jsondata = json.loads(stdout) + except ValueError as e: + logger.error(str(e)) + return + scrubstats = [] + scrubstats.append(str(time.time())) + if PG_STATS in jsondata[PG_MAP]: + no_pgs = len(jsondata[PG_MAP][PG_STATS]) + for i in range(0, no_pgs): + if str(jsondata[PG_MAP][PG_STATS][i][PG_ID]) == pgid: + scrubstats.append(str(jsondata[PG_MAP][PG_STATS][i][PG_ID])) + scrubstats.append(str(jsondata[PG_MAP][PG_STATS][i][STAT_SUM][NUM_OBJECTS])) + scrubstats.append(str(jsondata[PG_MAP][PG_STATS][i][SCRUB_DURATION])) + scrubstats.append(str(jsondata[PG_MAP][PG_STATS][i][OBJECTS_SCRUBBED])) + + if len(scrubstats): + print(scrubstatsfile) + message = separator.join(scrubstats) + stdout, stderr = common.pdsh(settings.getnodes('head'), 'echo -e %s >> %s' % (message, scrubstatsfile)).communicate() + + def log_recovery_stats(self, recstatsfile=None): if not recstatsfile: return @@ -623,15 +675,39 @@ def check_backfill(self, check_list=None, logfile=None, recstatsfile=None): time.sleep(1) return ret - def check_scrub(self): + def check_scrub(self, scrubstatsfile=None): logger.info('Waiting until Scrubbing completes...') + fmtjson = '--format=json' + SCRUB_DURATION = "last_scrub_duration" + PG_STATS = "pg_stats" + PG_ID = "pgid" + pgs_scrubbed = [] while True: - stdout, stderr = common.pdsh(settings.getnodes('head'), '%s -c %s pg dump | cut -f 16 | grep "0.000000" | wc -l' % (self.ceph_cmd, self.tmp_conf)).communicate() - if " 0\n" in stdout: - break - else: - logger.info(stdout) - time.sleep(1) + stdout, stderr = common.pdsh(settings.getnodes('head'), '%s pg ls-by-pool %s %s' %(self.ceph_cmd, self.scrub_pool_name, fmtjson)).communicate() + stdout = stdout.split(':', 1)[1] + stdout = stdout.strip() + scrubbed_pgs = 0 + try: + jsondata = json.loads(stdout) + except ValueError as e: + logger.error(str(e)) + return 0 + logger.info('PG STATS present') + for i in range(0, len(jsondata[PG_STATS])): + if jsondata[PG_STATS][i][SCRUB_DURATION] == 0: + time.sleep(1) + else: + scrubbed_pgs += 1 + logger.info(scrubbed_pgs) + logger.info(jsondata[PG_STATS][i][SCRUB_DURATION]) + logger.info('Scrub done for: ') + logger.info(jsondata[PG_STATS][i][PG_ID]) + if jsondata[PG_STATS][i][PG_ID] not in pgs_scrubbed: + pgs_scrubbed.append(jsondata[PG_STATS][i][PG_ID]) + self.log_scrubbing_stats(scrubstatsfile, str(jsondata[PG_STATS][i][PG_ID])) + if scrubbed_pgs == len(jsondata[PG_STATS]): + logger.info('Scrubbing is complete') + return 1 def dump_config(self, run_dir): common.pdsh(settings.getnodes('osds'), 'sudo %s -c %s daemon osd.0 config show > %s/ceph_settings.out' % (self.ceph_cmd, self.tmp_conf, run_dir)).communicate() @@ -654,6 +730,21 @@ def create_recovery_test(self, run_dir, callback, test_type='blocking'): self.rt = RecoveryTestThreadBackground(rt_config, self, callback, self.stoprequest, self.haltrequest, self.startiorequest) self.rt.start() + def create_scrubbing_test(self, run_dir, callback): + ''' + Only background type currently + ''' + st_config = self.config.get("scrubbing_test", {}) + st_config['run_dir'] = run_dir + self.st = ScrubbingTestThreadBackground(st_config, self, callback, self.stoprequest, self.haltrequest, self.startiorequest) + self.st.start() + + def create_scrub_recovery_test(self, run_dir, callback): + config = self.config.get("scrub_recov_test", {}) + config['run_dir'] = run_dir + self.srt = ScrubRecoveryThreadBackground(config, self, callback, self.stoprequest, self.haltrequest, self.startiorequest) + self.srt.start() + def wait_start_io(self): logger.info("Waiting for signal to start client io...") self.startiorequest.wait() @@ -672,6 +763,32 @@ def wait_recovery_done(self): break self.rt.join(1) + def maybe_populate_scrubbing_pool(self): + if self.prefill_scrub_objects > 0 or self.prefill_scrub_time > 0: + logger.info('prefilling %s %sbyte objects into scrubbing pool %s' % (self.prefill_scrub_objects, self.prefill_scrub_object_size, self.scrub_pool_name)) + common.pdsh(settings.getnodes('head'), 'sudo %s -p %s bench %s write -b %s --max-objects %s --no-cleanup' % (self.rados_cmd, self.scrub_pool_name, self.prefill_scrub_time, self.prefill_scrub_object_size, self.prefill_scrub_objects)).communicate() + #self.check_health() + + def initiate_scrubbing(self): + logger.info("Initiating scrub on pool %s" % self.scrub_pool_name) + common.pdsh(settings.getnodes('head'), '%s osd pool deep-scrub %s' % (self.ceph_cmd, self.scrub_pool_name)).communicate() + + def wait_scrubbing_done(self): + self.stoprequest.set() + while True: + threads = threading.enumerate() + if len(threads) == 1: + break + self.st.join(1) + + def wait_scrub_recovery_done(self): + self.stoprequest.set() + while True: + threads = threading.enumerate() + if len(threads) == 1: + break + self.srt.join(1) + def check_pg_autoscaler(self, timeout=-1, logfile=None): ret = 0 if not timeout: @@ -772,6 +889,14 @@ def mkpool(self, name, profile_name, application, base_name=None): if self.prefill_recov_objects > 0: self.recov_pool_name = name + scrub_pool = profile.get('scrub_pool', False) + if scrub_pool: + self.prefill_scrub_objects = profile.get('prefill_scrub_objects', 0) + self.prefill_scrub_object_size = profile.get('prefill_scrub_object_size', 0) + self.prefill_scrub_time = profile.get('prefill_scrub_time', 0) + if self.prefill_scrub_objects > 0: + self.scrub_pool_name = name + if replication and replication == 'erasure': common.pdsh(settings.getnodes('head'), 'sudo %s -c %s osd pool create %s %d %d erasure %s' % (self.ceph_cmd, self.tmp_conf, name, pg_size, pgp_size, erasure_profile), continue_if_error=False).communicate() @@ -1167,3 +1292,216 @@ def run(self): self.states[self.state]() common.pdsh(settings.getnodes('head'), self.logcmd('Exiting recovery test thread. Last state was: %s' % self.state)).communicate() + +class ScrubbingTestThreadBackground(threading.Thread): + def __init__(self, config, cluster, callback, stoprequest, haltrequest, startiorequest): + threading.Thread.__init__(self) + self.config = config + self.cluster = cluster + self.callback = callback + self.state = 'pre' + self.states = {'pre': self.pre, 'osdout': self.osdout, 'osdin':self.osdin, + 'post': self.post, 'done': self.done} + self.startiorequest = startiorequest + self.stoprequest = stoprequest + self.haltrequest = haltrequest + self.outhealthtries = 0 + self.inhealthtries = 0 + self.maxhealthtries = 60 + self.health_checklist = ["peering", "recovery_wait", "stuck", "inactive", "unclean", "recovery"] + self.ceph_cmd = self.cluster.ceph_cmd + self.lasttime = time.time() + + def logcmd(self, message): + return 'echo "[`date`] %s" >> %s/scrubbing.log' % (message, self.config.get('run_dir')) + + def pre(self): + pre_time = self.config.get("pre_time", 60) + common.pdsh(settings.getnodes('head'), self.logcmd('Starting Scrubbing Test Thread, waiting %s seconds.' % pre_time)).communicate() + time.sleep(pre_time) + self.state = 'osdout' + + def osdout(self): + scrub_log = "%s/scrub.log" % self.config.get('run_dir') + scrub_stats_log = "%s/scrub_stats.log" % self.config.get('run_dir') + ret = self.cluster.check_health(self.health_checklist, None, None) + + common.pdsh(settings.getnodes('head'), self.logcmd("ret: %s" % ret)).communicate() + + self.cluster.maybe_populate_scrubbing_pool() + common.pdsh(settings.getnodes('head'), self.logcmd("osdout state - Sleeping for 10 secs after populating scrubbing pool.")).communicate() + time.sleep(10) + self.lasttime = time.time() + self.state = "osdin" + + def osdin(self): + scrub_stats_log = "%s/scrub_stats.log" % self.config.get('run_dir') + self.startiorequest.set() + self.cluster.initiate_scrubbing() + ret = self.cluster.check_scrub(scrub_stats_log) + if ret == 1: + self.state = "post" + + def post(self): + if self.stoprequest.isSet(): + common.pdsh(settings.getnodes('head'), self.logcmd('Cluster is healthy, but stoprequest is set, finishing now.')).communicate() + self.haltrequest.set() + return + + if self.config.get("repeat", False): + # reset counters + self.outhealthtries = 0 + self.inhealthtries = 0 + + common.pdsh(settings.getnodes('head'), self.logcmd('Cluster is healthy, but repeat is set. Moving to "osdout" state.')).communicate() + self.state = "osdout" + return + + common.pdsh(settings.getnodes('head'), self.logcmd('Cluster is healthy, finishing up...')).communicate() + self.state = "done" + + def done(self): + common.pdsh(settings.getnodes('head'), self.logcmd("Done. Calling parent callback function.")).communicate() + self.callback() + self.haltrequest.set() + + def join(self, timeout=None): + common.pdsh(settings.getnodes('head'), self.logcmd('Received notification that parent is finished and waiting.')).communicate() + super(ScrubbingTestThreadBackground, self).join(timeout) + + def run(self): + self.haltrequest.clear() + self.stoprequest.clear() + self.startiorequest.clear() + while not self.haltrequest.isSet(): + self.states[self.state]() + common.pdsh(settings.getnodes('head'), self.logcmd('Exiting scrubbing test thread. Last state was: %s' % self.state)).communicate() + + +class ScrubRecoveryThreadBackground(threading.Thread): + def __init__(self, config, cluster, callback, stoprequest, haltrequest, startiorequest): + threading.Thread.__init__(self) + self.config = config + self.cluster = cluster + self.callback = callback + self.state = 'pre' + self.states = {'pre': self.pre, 'markdown': self.markdown, 'osdout': self.osdout, 'osdin':self.osdin, + 'post': self.post, 'done': self.done} + self.startiorequest = startiorequest + self.stoprequest = stoprequest + self.haltrequest = haltrequest + self.outhealthtries = 0 + self.inhealthtries = 0 + self.maxhealthtries = 60 + self.health_checklist = ["peering", "recovery_wait", "stuck", "inactive", "unclean", "recovery"] + self.ceph_cmd = self.cluster.ceph_cmd + self.lasttime = time.time() + + def logcmd(self, message): + return 'echo "[`date`] %s" >> %s/scrub_recov.log' % (message, self.config.get('run_dir')) + + def pre(self): + pre_time = self.config.get("pre_time", 60) + common.pdsh(settings.getnodes('head'), self.logcmd('Starting Scrub+Recovery Test Thread, waiting %s seconds.' % pre_time)).communicate() + time.sleep(pre_time) + lcmd = self.logcmd("Setting the ceph osd noup flag") + common.pdsh(settings.getnodes('head'), '%s -c %s osd set noup;%s' % (self.ceph_cmd, self.cluster.tmp_conf, lcmd)).communicate() + self.state = 'markdown' + + def markdown(self): + for osdnum in self.config.get('osds'): + lcmd = self.logcmd("Marking OSD %s down." % osdnum) + common.pdsh(settings.getnodes('head'), '%s -c %s osd down %s;%s' % (self.ceph_cmd, self.cluster.tmp_conf, osdnum, lcmd)).communicate() + lcmd = self.logcmd("Marking OSD %s out." % osdnum) + common.pdsh(settings.getnodes('head'), '%s -c %s osd out %s;%s' % (self.ceph_cmd, self.cluster.tmp_conf, osdnum, lcmd)).communicate() + common.pdsh(settings.getnodes('head'), self.logcmd('Waiting for the cluster to break and heal')).communicate() + self.lasttime = time.time() + self.state = 'osdout' + + + def osdout(self): + reclog = "%s/recovery.log" % self.config.get('run_dir') + recstatslog = "%s/recovery_stats.log" % self.config.get('run_dir') + ret = self.cluster.check_health(self.health_checklist, reclog, recstatslog) + + common.pdsh(settings.getnodes('head'), self.logcmd("ret: %s" % ret)).communicate() + + if ret == 0: + common.pdsh(settings.getnodes('head'), self.logcmd('Cluster never went unhealthy.')).communicate() + else: + common.pdsh(settings.getnodes('head'), self.logcmd('Cluster appears to have healed.')).communicate() + rectime = str(time.time() - self.lasttime) + common.pdsh(settings.getnodes('head'), 'echo Time: %s >> %s' % (rectime, recstatslog)).communicate() + common.pdsh(settings.getnodes('head'), self.logcmd('Time: %s' % rectime)).communicate() + + # Populate the recovery pool + self.cluster.maybe_populate_recovery_pool() + + common.pdsh(settings.getnodes('head'), self.logcmd("osdout state - Sleeping for 10 secs after populating recovery pool.")).communicate() + time.sleep(10) + lcmd = self.logcmd("Unsetting the ceph osd noup flag") + self.cluster.disable_recovery() + common.pdsh(settings.getnodes('head'), '%s -c %s osd unset noup;%s' % (self.ceph_cmd, self.cluster.tmp_conf, lcmd)).communicate() + for osdnum in self.config.get('osds'): + lcmd = self.logcmd("Marking OSD %s up." % osdnum) + common.pdsh(settings.getnodes('head'), '%s -c %s osd up %s;%s' % (self.ceph_cmd, self.cluster.tmp_conf, osdnum, lcmd)).communicate() + lcmd = self.logcmd("Marking OSD %s in." % osdnum) + common.pdsh(settings.getnodes('head'), '%s -c %s osd in %s;%s' % (self.ceph_cmd, self.cluster.tmp_conf, osdnum, lcmd)).communicate() + self.lasttime = time.time() + # Populate the scrub pool + logger.info("Sleep before scrub populate") + time.sleep(10) + self.cluster.maybe_populate_scrubbing_pool() + self.state = "osdin" + + + def osdin(self): + #Start scrub + self.startiorequest.set() + self.cluster.initiate_scrubbing() + self.cluster.enable_recovery() + recstatslog = "%s/recovery_backfill_stats.log" % self.config.get('run_dir') + scrub_stats_log = "%s/scrub_stats.log" % self.config.get('run_dir') + backfill = threading.Thread(target=self.cluster.check_backfill, args=(self.health_checklist, "%s/recovery.log" % self.config.get('run_dir'), recstatslog,)) + scrub_check = threading.Thread(target=self.cluster.check_scrub, args=(scrub_stats_log,)) + backfill.start() + scrub_check.start() + backfill.join() + scrub_check.join() + self.state = "post" + + + def post(self): + if self.stoprequest.isSet(): + common.pdsh(settings.getnodes('head'), self.logcmd('Cluster is healthy, but stoprequest is set, finishing now.')).communicate() + self.haltrequest.set() + return + + if self.config.get("repeat", False): + # reset counters + self.outhealthtries = 0 + self.inhealthtries = 0 + + common.pdsh(settings.getnodes('head'), self.logcmd('Cluster is healthy, but repeat is set. Moving to "markdown" state.')).communicate() + self.state = "markdown" + return + + common.pdsh(settings.getnodes('head'), self.logcmd('Cluster is healthy, finishing up...')).communicate() + self.state = "done" + + def done(self): + common.pdsh(settings.getnodes('head'), self.logcmd("Done. Calling parent callback function.")).communicate() + self.callback() + self.haltrequest.set() + + def join(self, timeout=None): + common.pdsh(settings.getnodes('head'), self.logcmd('Received notification that parent is finished and waiting.')).communicate() + super(ScrubRecoveryThreadBackground, self).join(timeout) + + def run(self): + self.haltrequest.clear() + self.stoprequest.clear() + self.startiorequest.clear() + while not self.haltrequest.isSet(): + self.states[self.state]() + common.pdsh(settings.getnodes('head'), self.logcmd('Exiting scrub+recovery test thread. Last state was: %s' % self.state)).communicate()