From 2071234111488e107eb53fccb8ed92e4b38d9ffa Mon Sep 17 00:00:00 2001 From: Aishwarya Mathuria Date: Tue, 30 Nov 2021 18:43:54 +0000 Subject: [PATCH] cluster: Addition of new tests to run Scrub and Recovery Add 2 new tests similar to the background Recovery test for running Scrub load with client IO and the other one for running Scrub, Recovery, and Client load. The Scrub and client IO test performs the following steps: - Create a pool and image to populate scrub objects (scrub pool) - Create scrub thread - Populate the scrub pool with objects using radosbench - Initiate deep-scrub on the scrub pool - Create a second pool and an image in it to run client IO - Initiate fio job on the second image at the same time the deep-scrub starts In the second test, we have an additional recovery pool that is populated after an OSD is marked down and out. Once the pool is populated we mark the OSD up and in which starts backfill. At the same time, we begin deep-scrub on the scrub pool and client IO. Signed-off-by: Aishwarya Mathuria --- benchmark/fio.py | 16 ++ benchmark/librbdfio.py | 16 +- client_endpoints/ceph_client_endpoints.py | 2 +- .../cephfsfuse_client_endpoints.py | 3 - .../cephfskernel_client_endpoints.py | 3 - cluster/ceph.py | 188 +++++++++++++++--- 6 files changed, 197 insertions(+), 31 deletions(-) diff --git a/benchmark/fio.py b/benchmark/fio.py index 4565224a..7ccfc631 100644 --- a/benchmark/fio.py +++ b/benchmark/fio.py @@ -68,6 +68,9 @@ def initialize_endpoints(self): self.client_endpoints_object.create_recovery_image() if 'scrubbing_test' in self.cluster.config: self.client_endpoints_object.create_scrubbing_image() + if 'scrub_recov_test' in self.cluster.config: + self.client_endpoints_object.create_recovery_image() + self.client_endpoints_object.create_scrubbing_image() self.create_endpoints() def create_endpoints(self): @@ -221,6 +224,12 @@ def run(self): self.cluster.create_scrubbing_test(self.run_dir, scrubbing_callback) self.cluster.wait_start_io() + if 'scrub_recov_test' in self.cluster.config: + logger.info('Scrub+Recov') + scrub_recov_callback = self.scrub_recov_callback + self.cluster.create_scrub_recovery_test(self.run_dir, scrub_recov_callback) + self.cluster.wait_start_io() + monitoring.start(self.run_dir) logger.info('Running fio %s test.', self.mode) @@ -237,6 +246,10 @@ def run(self): if 'scrubbing_test' in self.cluster.config: self.cluster.wait_scrubbing_done() + if 'scrub_recov_test' in self.cluster.config: + self.cluster.wait_scrub_recovery_done() + + monitoring.stop(self.run_dir) # Finally, get the historic ops @@ -253,6 +266,9 @@ def recovery_callback_background(self): def scrubbing_callback(self): logger.info('Scrubbing thread completed') + def scrub_recov_callback(self): + logger.info('Scrub+Recovery thread completed') + def analyze(self, out_dir): logger.info('Convert results to json format.') for client in settings.getnodes('clients').split(','): diff --git a/benchmark/librbdfio.py b/benchmark/librbdfio.py index c562d07d..66e487c6 100644 --- a/benchmark/librbdfio.py +++ b/benchmark/librbdfio.py @@ -88,6 +88,11 @@ def initialize(self): # Create the recovery image based on test type requested if 'recovery_test' in self.cluster.config and self.recov_test_type == 'background': self.mkrecovimage() + + if 'scrub_recov_test' in self.cluster.config: + self.mkrecovimage() + self.mkscrubimage() + self.mkimages() # populate the fio files ps = [] @@ -135,9 +140,15 @@ def run(self): if 'scrubbing_test' in self.cluster.config: scrubbing_callback = self.scrubbing_callback - self.cluster.create_scrub_test(self.run_dir, scrubbing_callback) + self.cluster.create_scrubbing_test(self.run_dir, scrubbing_callback) self.cluster.wait_start_io() + if 'scrub_recov_test' in self.cluster.config: + scrub_recov_callback = self.scrub_recov_callback + self.cluster.create_scrub_recovery_test(self.run_dir, scrub_recov_callback) + self.cluster.wait_start_io() + + monitoring.start(self.run_dir) logger.info('Running rbd fio %s test.', self.mode) @@ -152,6 +163,9 @@ def run(self): if 'recovery_test' in self.cluster.config: self.cluster.wait_recovery_done() + if 'scrub_recov_test' in self.cluster.config: + self.cluster.wait_scrub_recovery_done() + monitoring.stop(self.run_dir) # Finally, get the historic ops diff --git a/client_endpoints/ceph_client_endpoints.py b/client_endpoints/ceph_client_endpoints.py index 7c0da9c9..fe4b450f 100644 --- a/client_endpoints/ceph_client_endpoints.py +++ b/client_endpoints/ceph_client_endpoints.py @@ -123,7 +123,7 @@ def create_rbd_scrubbing(self): for node in common.get_fqdn_list('clients'): for ep_num in range(0, self.endpoints_per_client): rbd_name = '%s-%s' % (self.pool, self.get_rbd_name(node, ep_num)) - self.cluster.mkimage(rbd_name, self.get_endpoint_size, self.pool, self.data_pool, self.order) + self.cluster.mkimage(rbd_name, self.endpoint_size, self.pool, self.data_pool, self.order) def mount_rbd(self): for ep_num in range(0, self.endpoints_per_client): diff --git a/client_endpoints/cephfsfuse_client_endpoints.py b/client_endpoints/cephfsfuse_client_endpoints.py index 4a9b0d0d..09757a0b 100644 --- a/client_endpoints/cephfsfuse_client_endpoints.py +++ b/client_endpoints/cephfsfuse_client_endpoints.py @@ -15,6 +15,3 @@ def mount_fs_helper(self, node, dir_name): def create_recovery_image(self): self.create_rbd_recovery() - - def create_scrubbing_image(self): - self.create_rbd_scrubbing() diff --git a/client_endpoints/cephfskernel_client_endpoints.py b/client_endpoints/cephfskernel_client_endpoints.py index bd07d18b..bfd1b7b9 100644 --- a/client_endpoints/cephfskernel_client_endpoints.py +++ b/client_endpoints/cephfskernel_client_endpoints.py @@ -15,6 +15,3 @@ def mount_fs_helper(self, node, dir_name): def create_recovery_image(self): self.create_rbd_recovery() - - def create_scrubbing_image(self): - self.create_rbd_scrubbing() diff --git a/cluster/ceph.py b/cluster/ceph.py index b6e2f6d5..2d57abfa 100644 --- a/cluster/ceph.py +++ b/cluster/ceph.py @@ -541,10 +541,16 @@ def start_rgw(self): def disable_scrub(self): common.pdsh(settings.getnodes('head'), "ceph osd set noscrub; ceph osd set nodeep-scrub").communicate() + def disable_recovery(self): + common.pdsh(settings.getnodes('head'), "ceph osd set norecover; ceph osd set nobackfill").communicate() + + def enable_recovery(self): + common.pdsh(settings.getnodes('head'), "ceph osd unset norecover; ceph osd unset nobackfill").communicate() + def disable_balancer(self): common.pdsh(settings.getnodes('head'), "ceph balancer off").communicate() - def check_health(self, check_list=None, logfile=None, recstatsfile=None, scrubstatsfile=None): + def check_health(self, check_list=None, logfile=None, recstatsfile=None): # Wait for a defined amount of time in case ceph health is delayed time.sleep(self.health_wait) logline = "" @@ -557,15 +563,9 @@ def check_health(self, check_list=None, logfile=None, recstatsfile=None, scrubst if recstatsfile: header = "Time, Num Deg Objs, Total Deg Objs" stdout, stderr = common.pdsh(settings.getnodes('head'), 'echo %s >> %s' % (header, recstatsfile)).communicate() - ''' - if scrubstatsfile: - header = "PG ID, Num Objs" - stdout, stderr = common.pdsh(settings.getnodes('head'), 'echo %s >> %s' % (header, scrubstatsfile)).communicate() - ''' while True: stdout, stderr = common.pdsh(settings.getnodes('head'), '%s -c %s health %s' % (self.ceph_cmd, self.tmp_conf, logline)).communicate() self.log_recovery_stats(recstatsfile) - #self.log_scrubbing_stats(scrubstatsfile) if check_list and not any(x in stdout for x in check_list): break if "HEALTH_OK" in stdout: @@ -577,7 +577,7 @@ def check_health(self, check_list=None, logfile=None, recstatsfile=None, scrubst return ret - def log_scrubbing_stats(self, scrubstatsfile=None): + def log_scrubbing_stats(self, scrubstatsfile=None, pgid=None): if not scrubstatsfile: return fmtjson = "--format=json" @@ -588,7 +588,7 @@ def log_scrubbing_stats(self, scrubstatsfile=None): NUM_OBJECTS = "num_objects" LAST_SCRUB_STAMP = "last_scrub_stamp" STAT_SUM = "stat_sum" - SCRUB_DURATION = "scrub_duration" + SCRUB_DURATION = "last_scrub_duration" OBJECTS_SCRUBBED = "objects_scrubbed" stdout, stderr = common.pdsh(settings.getnodes('head'), '%s -c %s pg dump %s' % (self.ceph_cmd, self.tmp_conf, fmtjson)).communicate() stdout = stdout.split(':', 1)[1] @@ -599,17 +599,16 @@ def log_scrubbing_stats(self, scrubstatsfile=None): logger.error(str(e)) return scrubstats = [] + scrubstats.append(str(time.time())) if PG_STATS in jsondata[PG_MAP]: no_pgs = len(jsondata[PG_MAP][PG_STATS]) for i in range(0, no_pgs): - print(jsondata[PG_MAP][PG_STATS][i][SCRUB_DURATION]) - if jsondata[PG_MAP][PG_STATS][i][SCRUB_DURATION] != 0: + if str(jsondata[PG_MAP][PG_STATS][i][PG_ID]) == pgid: scrubstats.append(str(jsondata[PG_MAP][PG_STATS][i][PG_ID])) scrubstats.append(str(jsondata[PG_MAP][PG_STATS][i][STAT_SUM][NUM_OBJECTS])) scrubstats.append(str(jsondata[PG_MAP][PG_STATS][i][SCRUB_DURATION])) scrubstats.append(str(jsondata[PG_MAP][PG_STATS][i][OBJECTS_SCRUBBED])) - if len(scrubstats): print(scrubstatsfile) message = separator.join(scrubstats) @@ -679,8 +678,10 @@ def check_backfill(self, check_list=None, logfile=None, recstatsfile=None): def check_scrub(self, scrubstatsfile=None): logger.info('Waiting until Scrubbing completes...') fmtjson = '--format=json' - SCRUB_DURATION = "scrub_duration" + SCRUB_DURATION = "last_scrub_duration" PG_STATS = "pg_stats" + PG_ID = "pgid" + pgs_scrubbed = [] while True: stdout, stderr = common.pdsh(settings.getnodes('head'), '%s pg ls-by-pool %s %s' %(self.ceph_cmd, self.scrub_pool_name, fmtjson)).communicate() stdout = stdout.split(':', 1)[1] @@ -694,16 +695,18 @@ def check_scrub(self, scrubstatsfile=None): logger.info('PG STATS present') for i in range(0, len(jsondata[PG_STATS])): if jsondata[PG_STATS][i][SCRUB_DURATION] == 0: - logger.info('Scrubbing in progress') - logger.info(jsondata[PG_STATS][i][SCRUB_DURATION]) time.sleep(1) else: scrubbed_pgs += 1 logger.info(scrubbed_pgs) logger.info(jsondata[PG_STATS][i][SCRUB_DURATION]) - if scrubbed_pgs == 64: + logger.info('Scrub done for: ') + logger.info(jsondata[PG_STATS][i][PG_ID]) + if jsondata[PG_STATS][i][PG_ID] not in pgs_scrubbed: + pgs_scrubbed.append(jsondata[PG_STATS][i][PG_ID]) + self.log_scrubbing_stats(scrubstatsfile, str(jsondata[PG_STATS][i][PG_ID])) + if scrubbed_pgs == len(jsondata[PG_STATS]): logger.info('Scrubbing is complete') - self.log_scrubbing_stats(scrubstatsfile) return 1 def dump_config(self, run_dir): @@ -736,6 +739,12 @@ def create_scrubbing_test(self, run_dir, callback): self.st = ScrubbingTestThreadBackground(st_config, self, callback, self.stoprequest, self.haltrequest, self.startiorequest) self.st.start() + def create_scrub_recovery_test(self, run_dir, callback): + config = self.config.get("scrub_recov_test", {}) + config['run_dir'] = run_dir + self.srt = ScrubRecoveryThreadBackground(config, self, callback, self.stoprequest, self.haltrequest, self.startiorequest) + self.srt.start() + def wait_start_io(self): logger.info("Waiting for signal to start client io...") self.startiorequest.wait() @@ -758,7 +767,7 @@ def maybe_populate_scrubbing_pool(self): if self.prefill_scrub_objects > 0 or self.prefill_scrub_time > 0: logger.info('prefilling %s %sbyte objects into scrubbing pool %s' % (self.prefill_scrub_objects, self.prefill_scrub_object_size, self.scrub_pool_name)) common.pdsh(settings.getnodes('head'), 'sudo %s -p %s bench %s write -b %s --max-objects %s --no-cleanup' % (self.rados_cmd, self.scrub_pool_name, self.prefill_scrub_time, self.prefill_scrub_object_size, self.prefill_scrub_objects)).communicate() - self.check_health() + #self.check_health() def initiate_scrubbing(self): logger.info("Initiating scrub on pool %s" % self.scrub_pool_name) @@ -772,6 +781,14 @@ def wait_scrubbing_done(self): break self.st.join(1) + def wait_scrub_recovery_done(self): + self.stoprequest.set() + while True: + threads = threading.enumerate() + if len(threads) == 1: + break + self.srt.join(1) + def check_pg_autoscaler(self, timeout=-1, logfile=None): ret = 0 if not timeout: @@ -1299,25 +1316,21 @@ def logcmd(self, message): return 'echo "[`date`] %s" >> %s/scrubbing.log' % (message, self.config.get('run_dir')) def pre(self): - logger.info("Scrub PRE") pre_time = self.config.get("pre_time", 60) common.pdsh(settings.getnodes('head'), self.logcmd('Starting Scrubbing Test Thread, waiting %s seconds.' % pre_time)).communicate() time.sleep(pre_time) self.state = 'osdout' def osdout(self): - logger.info("Scrub OSDOUT") scrub_log = "%s/scrub.log" % self.config.get('run_dir') scrub_stats_log = "%s/scrub_stats.log" % self.config.get('run_dir') - ret = self.cluster.check_health(self.health_checklist, None, None, scrub_stats_log) + ret = self.cluster.check_health(self.health_checklist, None, None) common.pdsh(settings.getnodes('head'), self.logcmd("ret: %s" % ret)).communicate() - #if ret == 0: self.cluster.maybe_populate_scrubbing_pool() common.pdsh(settings.getnodes('head'), self.logcmd("osdout state - Sleeping for 10 secs after populating scrubbing pool.")).communicate() time.sleep(10) - #self.cluster.initiate_scrubbing() self.lasttime = time.time() self.state = "osdin" @@ -1363,3 +1376,132 @@ def run(self): while not self.haltrequest.isSet(): self.states[self.state]() common.pdsh(settings.getnodes('head'), self.logcmd('Exiting scrubbing test thread. Last state was: %s' % self.state)).communicate() + + +class ScrubRecoveryThreadBackground(threading.Thread): + def __init__(self, config, cluster, callback, stoprequest, haltrequest, startiorequest): + threading.Thread.__init__(self) + self.config = config + self.cluster = cluster + self.callback = callback + self.state = 'pre' + self.states = {'pre': self.pre, 'markdown': self.markdown, 'osdout': self.osdout, 'osdin':self.osdin, + 'post': self.post, 'done': self.done} + self.startiorequest = startiorequest + self.stoprequest = stoprequest + self.haltrequest = haltrequest + self.outhealthtries = 0 + self.inhealthtries = 0 + self.maxhealthtries = 60 + self.health_checklist = ["peering", "recovery_wait", "stuck", "inactive", "unclean", "recovery"] + self.ceph_cmd = self.cluster.ceph_cmd + self.lasttime = time.time() + + def logcmd(self, message): + return 'echo "[`date`] %s" >> %s/scrub_recov.log' % (message, self.config.get('run_dir')) + + def pre(self): + pre_time = self.config.get("pre_time", 60) + common.pdsh(settings.getnodes('head'), self.logcmd('Starting Scrub+Recovery Test Thread, waiting %s seconds.' % pre_time)).communicate() + time.sleep(pre_time) + lcmd = self.logcmd("Setting the ceph osd noup flag") + common.pdsh(settings.getnodes('head'), '%s -c %s osd set noup;%s' % (self.ceph_cmd, self.cluster.tmp_conf, lcmd)).communicate() + self.state = 'markdown' + + def markdown(self): + for osdnum in self.config.get('osds'): + lcmd = self.logcmd("Marking OSD %s down." % osdnum) + common.pdsh(settings.getnodes('head'), '%s -c %s osd down %s;%s' % (self.ceph_cmd, self.cluster.tmp_conf, osdnum, lcmd)).communicate() + lcmd = self.logcmd("Marking OSD %s out." % osdnum) + common.pdsh(settings.getnodes('head'), '%s -c %s osd out %s;%s' % (self.ceph_cmd, self.cluster.tmp_conf, osdnum, lcmd)).communicate() + common.pdsh(settings.getnodes('head'), self.logcmd('Waiting for the cluster to break and heal')).communicate() + self.lasttime = time.time() + self.state = 'osdout' + + + def osdout(self): + reclog = "%s/recovery.log" % self.config.get('run_dir') + recstatslog = "%s/recovery_stats.log" % self.config.get('run_dir') + ret = self.cluster.check_health(self.health_checklist, reclog, recstatslog) + + common.pdsh(settings.getnodes('head'), self.logcmd("ret: %s" % ret)).communicate() + + if ret == 0: + common.pdsh(settings.getnodes('head'), self.logcmd('Cluster never went unhealthy.')).communicate() + else: + common.pdsh(settings.getnodes('head'), self.logcmd('Cluster appears to have healed.')).communicate() + rectime = str(time.time() - self.lasttime) + common.pdsh(settings.getnodes('head'), 'echo Time: %s >> %s' % (rectime, recstatslog)).communicate() + common.pdsh(settings.getnodes('head'), self.logcmd('Time: %s' % rectime)).communicate() + + # Populate the recovery pool + self.cluster.maybe_populate_recovery_pool() + + common.pdsh(settings.getnodes('head'), self.logcmd("osdout state - Sleeping for 10 secs after populating recovery pool.")).communicate() + time.sleep(10) + lcmd = self.logcmd("Unsetting the ceph osd noup flag") + common.pdsh(settings.getnodes('head'), '%s -c %s osd unset noup;%s' % (self.ceph_cmd, self.cluster.tmp_conf, lcmd)).communicate() + for osdnum in self.config.get('osds'): + lcmd = self.logcmd("Marking OSD %s up." % osdnum) + common.pdsh(settings.getnodes('head'), '%s -c %s osd up %s;%s' % (self.ceph_cmd, self.cluster.tmp_conf, osdnum, lcmd)).communicate() + lcmd = self.logcmd("Marking OSD %s in." % osdnum) + common.pdsh(settings.getnodes('head'), '%s -c %s osd in %s;%s' % (self.ceph_cmd, self.cluster.tmp_conf, osdnum, lcmd)).communicate() + self.lasttime = time.time() + # Populate the scrub pool + logger.info("Sleep before scrub populate") + time.sleep(10) + self.cluster.disable_recovery() + self.cluster.maybe_populate_scrubbing_pool() + self.state = "osdin" + + + def osdin(self): + #Start scrub + self.startiorequest.set() + self.cluster.initiate_scrubbing() + self.cluster.enable_recovery() + recstatslog = "%s/recovery_backfill_stats.log" % self.config.get('run_dir') + scrub_stats_log = "%s/scrub_stats.log" % self.config.get('run_dir') + backfill = threading.Thread(target=self.cluster.check_backfill, args=(self.health_checklist, "%s/recovery.log" % self.config.get('run_dir'), recstatslog,)) + scrub_check = threading.Thread(target=self.cluster.check_scrub, args=(scrub_stats_log,)) + backfill.start() + scrub_check.start() + backfill.join() + scrub_check.join() + self.state = "post" + + + def post(self): + if self.stoprequest.isSet(): + common.pdsh(settings.getnodes('head'), self.logcmd('Cluster is healthy, but stoprequest is set, finishing now.')).communicate() + self.haltrequest.set() + return + + if self.config.get("repeat", False): + # reset counters + self.outhealthtries = 0 + self.inhealthtries = 0 + + common.pdsh(settings.getnodes('head'), self.logcmd('Cluster is healthy, but repeat is set. Moving to "markdown" state.')).communicate() + self.state = "markdown" + return + + common.pdsh(settings.getnodes('head'), self.logcmd('Cluster is healthy, finishing up...')).communicate() + self.state = "done" + + def done(self): + common.pdsh(settings.getnodes('head'), self.logcmd("Done. Calling parent callback function.")).communicate() + self.callback() + self.haltrequest.set() + + def join(self, timeout=None): + common.pdsh(settings.getnodes('head'), self.logcmd('Received notification that parent is finished and waiting.')).communicate() + super(ScrubRecoveryThreadBackground, self).join(timeout) + + def run(self): + self.haltrequest.clear() + self.stoprequest.clear() + self.startiorequest.clear() + while not self.haltrequest.isSet(): + self.states[self.state]() + common.pdsh(settings.getnodes('head'), self.logcmd('Exiting scrub+recovery test thread. Last state was: %s' % self.state)).communicate()