diff --git a/benchmark/fio.py b/benchmark/fio.py index b504b389..7ccfc631 100644 --- a/benchmark/fio.py +++ b/benchmark/fio.py @@ -66,6 +66,11 @@ def initialize_endpoints(self): # Create the recovery image based on test type requested if 'recovery_test' in self.cluster.config and self.recov_test_type == 'background': self.client_endpoints_object.create_recovery_image() + if 'scrubbing_test' in self.cluster.config: + self.client_endpoints_object.create_scrubbing_image() + if 'scrub_recov_test' in self.cluster.config: + self.client_endpoints_object.create_recovery_image() + self.client_endpoints_object.create_scrubbing_image() self.create_endpoints() def create_endpoints(self): @@ -213,6 +218,18 @@ def run(self): # Wait for signal to start client IO self.cluster.wait_start_io() + if 'scrubbing_test' in self.cluster.config: + logger.info('Scrubbing test in config') + scrubbing_callback = self.scrubbing_callback + self.cluster.create_scrubbing_test(self.run_dir, scrubbing_callback) + self.cluster.wait_start_io() + + if 'scrub_recov_test' in self.cluster.config: + logger.info('Scrub+Recov') + scrub_recov_callback = self.scrub_recov_callback + self.cluster.create_scrub_recovery_test(self.run_dir, scrub_recov_callback) + self.cluster.wait_start_io() + monitoring.start(self.run_dir) logger.info('Running fio %s test.', self.mode) @@ -225,6 +242,13 @@ def run(self): # If we were doing recovery, wait until it's done. if 'recovery_test' in self.cluster.config: self.cluster.wait_recovery_done() + # If we were doing scrubbing, wait until it's done. + if 'scrubbing_test' in self.cluster.config: + self.cluster.wait_scrubbing_done() + + if 'scrub_recov_test' in self.cluster.config: + self.cluster.wait_scrub_recovery_done() + monitoring.stop(self.run_dir) @@ -239,6 +263,12 @@ def recovery_callback_blocking(self): def recovery_callback_background(self): logger.info('Recovery thread completed!') + def scrubbing_callback(self): + logger.info('Scrubbing thread completed') + + def scrub_recov_callback(self): + logger.info('Scrub+Recovery thread completed') + def analyze(self, out_dir): logger.info('Convert results to json format.') for client in settings.getnodes('clients').split(','): diff --git a/benchmark/librbdfio.py b/benchmark/librbdfio.py index 84ea0de3..66e487c6 100644 --- a/benchmark/librbdfio.py +++ b/benchmark/librbdfio.py @@ -46,6 +46,8 @@ def __init__(self, archive_dir, cluster, config): self.use_existing_volumes = config.get('use_existing_volumes', False) self.pool_name = config.get("poolname", "cbt-librbdfio") self.recov_pool_name = config.get("recov_pool_name", "cbt-librbdfio-recov") + self.scrub_pool_name = config.get("scrub_pool_name", "cbt-librbdfio-scrub") + self.scrub_pool_profile = config.get("scrub_pool_profile", "default") self.rbdname = config.get('rbdname', '') self.total_procs = self.procs_per_volume * self.volumes_per_client * len(settings.getnodes('clients').split(',')) @@ -80,9 +82,17 @@ def initialize(self): common.sync_files('%s/*' % self.run_dir, self.out_dir) + if 'scrubbing_test' in self.cluster.config: + self.mkscrubimage() + # Create the recovery image based on test type requested if 'recovery_test' in self.cluster.config and self.recov_test_type == 'background': self.mkrecovimage() + + if 'scrub_recov_test' in self.cluster.config: + self.mkrecovimage() + self.mkscrubimage() + self.mkimages() # populate the fio files ps = [] @@ -128,6 +138,17 @@ def run(self): # Wait for a signal from the recovery thread to initiate client IO self.cluster.wait_start_io() + if 'scrubbing_test' in self.cluster.config: + scrubbing_callback = self.scrubbing_callback + self.cluster.create_scrubbing_test(self.run_dir, scrubbing_callback) + self.cluster.wait_start_io() + + if 'scrub_recov_test' in self.cluster.config: + scrub_recov_callback = self.scrub_recov_callback + self.cluster.create_scrub_recovery_test(self.run_dir, scrub_recov_callback) + self.cluster.wait_start_io() + + monitoring.start(self.run_dir) logger.info('Running rbd fio %s test.', self.mode) @@ -142,6 +163,9 @@ def run(self): if 'recovery_test' in self.cluster.config: self.cluster.wait_recovery_done() + if 'scrub_recov_test' in self.cluster.config: + self.cluster.wait_scrub_recovery_done() + monitoring.stop(self.run_dir) # Finally, get the historic ops @@ -210,6 +234,18 @@ def mkrecovimage(self): self.cluster.mkimage('cbt-librbdfio-recov-%s-%d' % (node,volnum), self.vol_size, self.recov_pool_name, self.data_pool, self.vol_object_size) monitoring.stop() + def mkscrubimage(self): + logger.info('Creating scrubbing image...') + monitoring.start("%s/scrub_pool_monitoring" % self.run_dir) + if (self.use_existing_volumes == False): + self.cluster.rmpool(self.scrub_pool_name, self.scrub_pool_profile) + self.cluster.mkpool(self.scrub_pool_name, self.scrub_pool_profile, 'rbd') + for node in common.get_fqdn_list('clients'): + for volnum in range(0, self.volumes_per_client): + node = node.rpartition("@")[2] + self.cluster.mkimage('cbt-librbdfio-scrub-%s-%d' % (node,volnum), self.vol_size, self.scrub_pool_name, self.data_pool, self.vol_object_size) + monitoring.stop() + def mkimages(self): monitoring.start("%s/pool_monitoring" % self.run_dir) if (self.use_existing_volumes == False): @@ -231,6 +267,9 @@ def recovery_callback_blocking(self): def recovery_callback_background(self): logger.info('Recovery thread completed!') + def scrubbing_callback(self): + logger.info('Scrubbing thread completed!') + def parse(self, out_dir): for client in settings.getnodes('clients').split(','): host = settings.host_info(client)["host"] diff --git a/client_endpoints/ceph_client_endpoints.py b/client_endpoints/ceph_client_endpoints.py index f61fb9bb..fe4b450f 100644 --- a/client_endpoints/ceph_client_endpoints.py +++ b/client_endpoints/ceph_client_endpoints.py @@ -26,6 +26,8 @@ def __init__(self, cluster, config): self.data_pool_profile = config.get('data_pool_profile', None) self.recov_pool = None self.recov_pool_profile = config.get('recov_pool_profile', 'default') + self.scrub_pool = None + self.scrub_pool_profile = config.get('scrub_pool_profile', 'default') self.order = config.get('order', 22) self.disabled_features = config.get('disabled_features', None) @@ -114,6 +116,15 @@ def create_rbd_recovery(self): rbd_name = '%s-%s' % (self.pool, self.get_rbd_name(node, ep_num)) self.cluster.mkimage(rbd_name, self.endpoint_size, self.pool, self.data_pool, self.order) + def create_rbd_scrubbing(self): + self.pool = '%s-scrub' % self.name + self.cluster.rmpool(self.pool, self.scrub_pool_profile) + self.cluster.mkpool(self.pool, self.scrub_pool_profile, 'rbd') + for node in common.get_fqdn_list('clients'): + for ep_num in range(0, self.endpoints_per_client): + rbd_name = '%s-%s' % (self.pool, self.get_rbd_name(node, ep_num)) + self.cluster.mkimage(rbd_name, self.endpoint_size, self.pool, self.data_pool, self.order) + def mount_rbd(self): for ep_num in range(0, self.endpoints_per_client): dir_name = self.get_dir_name(ep_num) diff --git a/client_endpoints/client_endpoints.py b/client_endpoints/client_endpoints.py index a5b19252..97ed2a06 100644 --- a/client_endpoints/client_endpoints.py +++ b/client_endpoints/client_endpoints.py @@ -45,3 +45,6 @@ def remove(self): def create_recovery_image(self): pass + + def create_scrubbing_image(self): + pass diff --git a/client_endpoints/librbd_client_endpoints.py b/client_endpoints/librbd_client_endpoints.py index 64fd3bd3..57fd3bba 100644 --- a/client_endpoints/librbd_client_endpoints.py +++ b/client_endpoints/librbd_client_endpoints.py @@ -19,3 +19,6 @@ def mount(self): def create_recovery_image(self): self.create_rbd_recovery() + + def create_scrubbing_image(self): + self.create_rbd_scrubbing() diff --git a/client_endpoints/rbdfuse_client_endpoints.py b/client_endpoints/rbdfuse_client_endpoints.py index ce4586f6..fa94a1ae 100644 --- a/client_endpoints/rbdfuse_client_endpoints.py +++ b/client_endpoints/rbdfuse_client_endpoints.py @@ -31,3 +31,6 @@ def map_rbd(self, node, rbd_name): def create_recovery_image(self): self.create_rbd_recovery() + + def create_scrubbing_image(self): + self.create_rbd_scrubbing() diff --git a/client_endpoints/rbdkernel_client_endpoints.py b/client_endpoints/rbdkernel_client_endpoints.py index 66dbd0a7..8a75afb9 100644 --- a/client_endpoints/rbdkernel_client_endpoints.py +++ b/client_endpoints/rbdkernel_client_endpoints.py @@ -22,3 +22,6 @@ def map_rbd(self, node, rbd_name): def create_recovery_image(self): self.create_rbd_recovery() + + def create_scrubbing_image(self): + self.create_rbd_scrubbing() diff --git a/client_endpoints/rbdnbd_client_endpoints.py b/client_endpoints/rbdnbd_client_endpoints.py index 22a05a98..93abc3be 100644 --- a/client_endpoints/rbdnbd_client_endpoints.py +++ b/client_endpoints/rbdnbd_client_endpoints.py @@ -16,3 +16,6 @@ def map_rbd(self, node, rbd_name): def create_recovery_image(self): self.create_rbd_recovery() + + def create_scrubbing_image(self): + self.create_rbd_scrubbing() diff --git a/client_endpoints/rbdtcmu_client_endpoints.py b/client_endpoints/rbdtcmu_client_endpoints.py index 0ec2c0e7..236e69cd 100644 --- a/client_endpoints/rbdtcmu_client_endpoints.py +++ b/client_endpoints/rbdtcmu_client_endpoints.py @@ -23,3 +23,6 @@ def map_rbd(self, node, rbd_name): def create_recovery_image(self): self.create_rbd_recovery() + + def create_scrubbing_image(self): + self.create_rbd_scrubbing() diff --git a/cluster/ceph.py b/cluster/ceph.py index 93fc9f32..d23897a4 100644 --- a/cluster/ceph.py +++ b/cluster/ceph.py @@ -148,6 +148,13 @@ def __init__(self, config): self.prefill_recov_time = 0 self.recov_pool_name = '' + #Scrubbing tests + self.scrub_enabled = config.get('enable_scrub', False) + self.prefill_scrub_objects = 0 + self.prefill_scrub_object_size = 0 + self.prefill_scrub_time = 0 + self.scrub_pool_name = '' + def initialize(self): # Reset the rulesets self.ruleset_map = {} @@ -194,7 +201,9 @@ def initialize(self): monitoring.stop() # Disable scrub and wait for any scrubbing to complete - self.disable_scrub() + logger.info("Scrub enabled is %s", self.scrub_enabled) + if not self.scrub_enabled: + self.disable_scrub() if self.disable_bal: self.disable_balancer() @@ -532,6 +541,12 @@ def start_rgw(self): def disable_scrub(self): common.pdsh(settings.getnodes('head'), "ceph osd set noscrub; ceph osd set nodeep-scrub").communicate() + def disable_recovery(self): + common.pdsh(settings.getnodes('head'), "ceph osd set norecover; ceph osd set nobackfill").communicate() + + def enable_recovery(self): + common.pdsh(settings.getnodes('head'), "ceph osd unset norecover; ceph osd unset nobackfill").communicate() + def disable_balancer(self): common.pdsh(settings.getnodes('head'), "ceph balancer off").communicate() @@ -548,7 +563,6 @@ def check_health(self, check_list=None, logfile=None, recstatsfile=None): if recstatsfile: header = "Time, Num Deg Objs, Total Deg Objs" stdout, stderr = common.pdsh(settings.getnodes('head'), 'echo %s >> %s' % (header, recstatsfile)).communicate() - while True: stdout, stderr = common.pdsh(settings.getnodes('head'), '%s -c %s health %s' % (self.ceph_cmd, self.tmp_conf, logline)).communicate() self.log_recovery_stats(recstatsfile) @@ -563,6 +577,44 @@ def check_health(self, check_list=None, logfile=None, recstatsfile=None): return ret + def log_scrubbing_stats(self, scrubstatsfile=None, pgid=None): + if not scrubstatsfile: + return + fmtjson = "--format=json" + PG_MAP = "pg_map" + PG_STATS = "pg_stats" + separator = " " + PG_ID = "pgid" + NUM_OBJECTS = "num_objects" + LAST_SCRUB_STAMP = "last_scrub_stamp" + STAT_SUM = "stat_sum" + SCRUB_DURATION = "last_scrub_duration" + OBJECTS_SCRUBBED = "objects_scrubbed" + stdout, stderr = common.pdsh(settings.getnodes('head'), '%s -c %s pg dump %s' % (self.ceph_cmd, self.tmp_conf, fmtjson)).communicate() + stdout = stdout.split(':', 1)[1] + stdout = stdout.strip() + try: + jsondata = json.loads(stdout) + except ValueError as e: + logger.error(str(e)) + return + scrubstats = [] + scrubstats.append(str(time.time())) + if PG_STATS in jsondata[PG_MAP]: + no_pgs = len(jsondata[PG_MAP][PG_STATS]) + for i in range(0, no_pgs): + if str(jsondata[PG_MAP][PG_STATS][i][PG_ID]) == pgid: + scrubstats.append(str(jsondata[PG_MAP][PG_STATS][i][PG_ID])) + scrubstats.append(str(jsondata[PG_MAP][PG_STATS][i][STAT_SUM][NUM_OBJECTS])) + scrubstats.append(str(jsondata[PG_MAP][PG_STATS][i][SCRUB_DURATION])) + scrubstats.append(str(jsondata[PG_MAP][PG_STATS][i][OBJECTS_SCRUBBED])) + + if len(scrubstats): + print(scrubstatsfile) + message = separator.join(scrubstats) + stdout, stderr = common.pdsh(settings.getnodes('head'), 'echo -e %s >> %s' % (message, scrubstatsfile)).communicate() + + def log_recovery_stats(self, recstatsfile=None): if not recstatsfile: return @@ -623,15 +675,39 @@ def check_backfill(self, check_list=None, logfile=None, recstatsfile=None): time.sleep(1) return ret - def check_scrub(self): + def check_scrub(self, scrubstatsfile=None): logger.info('Waiting until Scrubbing completes...') + fmtjson = '--format=json' + SCRUB_DURATION = "last_scrub_duration" + PG_STATS = "pg_stats" + PG_ID = "pgid" + pgs_scrubbed = [] while True: - stdout, stderr = common.pdsh(settings.getnodes('head'), '%s -c %s pg dump | cut -f 16 | grep "0.000000" | wc -l' % (self.ceph_cmd, self.tmp_conf)).communicate() - if " 0\n" in stdout: - break - else: - logger.info(stdout) - time.sleep(1) + stdout, stderr = common.pdsh(settings.getnodes('head'), '%s pg ls-by-pool %s %s' %(self.ceph_cmd, self.scrub_pool_name, fmtjson)).communicate() + stdout = stdout.split(':', 1)[1] + stdout = stdout.strip() + scrubbed_pgs = 0 + try: + jsondata = json.loads(stdout) + except ValueError as e: + logger.error(str(e)) + return 0 + logger.info('PG STATS present') + for i in range(0, len(jsondata[PG_STATS])): + if jsondata[PG_STATS][i][SCRUB_DURATION] == 0: + time.sleep(1) + else: + scrubbed_pgs += 1 + logger.info(scrubbed_pgs) + logger.info(jsondata[PG_STATS][i][SCRUB_DURATION]) + logger.info('Scrub done for: ') + logger.info(jsondata[PG_STATS][i][PG_ID]) + if jsondata[PG_STATS][i][PG_ID] not in pgs_scrubbed: + pgs_scrubbed.append(jsondata[PG_STATS][i][PG_ID]) + self.log_scrubbing_stats(scrubstatsfile, str(jsondata[PG_STATS][i][PG_ID])) + if scrubbed_pgs == len(jsondata[PG_STATS]): + logger.info('Scrubbing is complete') + return 1 def dump_config(self, run_dir): common.pdsh(settings.getnodes('osds'), 'sudo %s -c %s daemon osd.0 config show > %s/ceph_settings.out' % (self.ceph_cmd, self.tmp_conf, run_dir)).communicate() @@ -654,6 +730,21 @@ def create_recovery_test(self, run_dir, callback, test_type='blocking'): self.rt = RecoveryTestThreadBackground(rt_config, self, callback, self.stoprequest, self.haltrequest, self.startiorequest) self.rt.start() + def create_scrubbing_test(self, run_dir, callback): + ''' + Only background type currently + ''' + st_config = self.config.get("scrubbing_test", {}) + st_config['run_dir'] = run_dir + self.st = ScrubbingTestThreadBackground(st_config, self, callback, self.stoprequest, self.haltrequest, self.startiorequest) + self.st.start() + + def create_scrub_recovery_test(self, run_dir, callback): + config = self.config.get("scrub_recov_test", {}) + config['run_dir'] = run_dir + self.srt = ScrubRecoveryThreadBackground(config, self, callback, self.stoprequest, self.haltrequest, self.startiorequest) + self.srt.start() + def wait_start_io(self): logger.info("Waiting for signal to start client io...") self.startiorequest.wait() @@ -672,6 +763,32 @@ def wait_recovery_done(self): break self.rt.join(1) + def maybe_populate_scrubbing_pool(self): + if self.prefill_scrub_objects > 0 or self.prefill_scrub_time > 0: + logger.info('prefilling %s %sbyte objects into scrubbing pool %s' % (self.prefill_scrub_objects, self.prefill_scrub_object_size, self.scrub_pool_name)) + common.pdsh(settings.getnodes('head'), 'sudo %s -p %s bench %s write -b %s --max-objects %s --no-cleanup' % (self.rados_cmd, self.scrub_pool_name, self.prefill_scrub_time, self.prefill_scrub_object_size, self.prefill_scrub_objects)).communicate() + #self.check_health() + + def initiate_scrubbing(self): + logger.info("Initiating scrub on pool %s" % self.scrub_pool_name) + common.pdsh(settings.getnodes('head'), '%s osd pool deep-scrub %s' % (self.ceph_cmd, self.scrub_pool_name)).communicate() + + def wait_scrubbing_done(self): + self.stoprequest.set() + while True: + threads = threading.enumerate() + if len(threads) == 1: + break + self.st.join(1) + + def wait_scrub_recovery_done(self): + self.stoprequest.set() + while True: + threads = threading.enumerate() + if len(threads) == 1: + break + self.srt.join(1) + def check_pg_autoscaler(self, timeout=-1, logfile=None): ret = 0 if not timeout: @@ -772,6 +889,14 @@ def mkpool(self, name, profile_name, application, base_name=None): if self.prefill_recov_objects > 0: self.recov_pool_name = name + scrub_pool = profile.get('scrub_pool', False) + if scrub_pool: + self.prefill_scrub_objects = profile.get('prefill_scrub_objects', 0) + self.prefill_scrub_object_size = profile.get('prefill_scrub_object_size', 0) + self.prefill_scrub_time = profile.get('prefill_scrub_time', 0) + if self.prefill_scrub_objects > 0: + self.scrub_pool_name = name + if replication and replication == 'erasure': common.pdsh(settings.getnodes('head'), 'sudo %s -c %s osd pool create %s %d %d erasure %s' % (self.ceph_cmd, self.tmp_conf, name, pg_size, pgp_size, erasure_profile), continue_if_error=False).communicate() @@ -1167,3 +1292,216 @@ def run(self): self.states[self.state]() common.pdsh(settings.getnodes('head'), self.logcmd('Exiting recovery test thread. Last state was: %s' % self.state)).communicate() + +class ScrubbingTestThreadBackground(threading.Thread): + def __init__(self, config, cluster, callback, stoprequest, haltrequest, startiorequest): + threading.Thread.__init__(self) + self.config = config + self.cluster = cluster + self.callback = callback + self.state = 'pre' + self.states = {'pre': self.pre, 'osdout': self.osdout, 'osdin':self.osdin, + 'post': self.post, 'done': self.done} + self.startiorequest = startiorequest + self.stoprequest = stoprequest + self.haltrequest = haltrequest + self.outhealthtries = 0 + self.inhealthtries = 0 + self.maxhealthtries = 60 + self.health_checklist = ["peering", "recovery_wait", "stuck", "inactive", "unclean", "recovery"] + self.ceph_cmd = self.cluster.ceph_cmd + self.lasttime = time.time() + + def logcmd(self, message): + return 'echo "[`date`] %s" >> %s/scrubbing.log' % (message, self.config.get('run_dir')) + + def pre(self): + pre_time = self.config.get("pre_time", 60) + common.pdsh(settings.getnodes('head'), self.logcmd('Starting Scrubbing Test Thread, waiting %s seconds.' % pre_time)).communicate() + time.sleep(pre_time) + self.state = 'osdout' + + def osdout(self): + scrub_log = "%s/scrub.log" % self.config.get('run_dir') + scrub_stats_log = "%s/scrub_stats.log" % self.config.get('run_dir') + ret = self.cluster.check_health(self.health_checklist, None, None) + + common.pdsh(settings.getnodes('head'), self.logcmd("ret: %s" % ret)).communicate() + + self.cluster.maybe_populate_scrubbing_pool() + common.pdsh(settings.getnodes('head'), self.logcmd("osdout state - Sleeping for 10 secs after populating scrubbing pool.")).communicate() + time.sleep(10) + self.lasttime = time.time() + self.state = "osdin" + + def osdin(self): + scrub_stats_log = "%s/scrub_stats.log" % self.config.get('run_dir') + self.startiorequest.set() + self.cluster.initiate_scrubbing() + ret = self.cluster.check_scrub(scrub_stats_log) + if ret == 1: + self.state = "post" + + def post(self): + if self.stoprequest.isSet(): + common.pdsh(settings.getnodes('head'), self.logcmd('Cluster is healthy, but stoprequest is set, finishing now.')).communicate() + self.haltrequest.set() + return + + if self.config.get("repeat", False): + # reset counters + self.outhealthtries = 0 + self.inhealthtries = 0 + + common.pdsh(settings.getnodes('head'), self.logcmd('Cluster is healthy, but repeat is set. Moving to "osdout" state.')).communicate() + self.state = "osdout" + return + + common.pdsh(settings.getnodes('head'), self.logcmd('Cluster is healthy, finishing up...')).communicate() + self.state = "done" + + def done(self): + common.pdsh(settings.getnodes('head'), self.logcmd("Done. Calling parent callback function.")).communicate() + self.callback() + self.haltrequest.set() + + def join(self, timeout=None): + common.pdsh(settings.getnodes('head'), self.logcmd('Received notification that parent is finished and waiting.')).communicate() + super(ScrubbingTestThreadBackground, self).join(timeout) + + def run(self): + self.haltrequest.clear() + self.stoprequest.clear() + self.startiorequest.clear() + while not self.haltrequest.isSet(): + self.states[self.state]() + common.pdsh(settings.getnodes('head'), self.logcmd('Exiting scrubbing test thread. Last state was: %s' % self.state)).communicate() + + +class ScrubRecoveryThreadBackground(threading.Thread): + def __init__(self, config, cluster, callback, stoprequest, haltrequest, startiorequest): + threading.Thread.__init__(self) + self.config = config + self.cluster = cluster + self.callback = callback + self.state = 'pre' + self.states = {'pre': self.pre, 'markdown': self.markdown, 'osdout': self.osdout, 'osdin':self.osdin, + 'post': self.post, 'done': self.done} + self.startiorequest = startiorequest + self.stoprequest = stoprequest + self.haltrequest = haltrequest + self.outhealthtries = 0 + self.inhealthtries = 0 + self.maxhealthtries = 60 + self.health_checklist = ["peering", "recovery_wait", "stuck", "inactive", "unclean", "recovery"] + self.ceph_cmd = self.cluster.ceph_cmd + self.lasttime = time.time() + + def logcmd(self, message): + return 'echo "[`date`] %s" >> %s/scrub_recov.log' % (message, self.config.get('run_dir')) + + def pre(self): + pre_time = self.config.get("pre_time", 60) + common.pdsh(settings.getnodes('head'), self.logcmd('Starting Scrub+Recovery Test Thread, waiting %s seconds.' % pre_time)).communicate() + time.sleep(pre_time) + lcmd = self.logcmd("Setting the ceph osd noup flag") + common.pdsh(settings.getnodes('head'), '%s -c %s osd set noup;%s' % (self.ceph_cmd, self.cluster.tmp_conf, lcmd)).communicate() + self.state = 'markdown' + + def markdown(self): + for osdnum in self.config.get('osds'): + lcmd = self.logcmd("Marking OSD %s down." % osdnum) + common.pdsh(settings.getnodes('head'), '%s -c %s osd down %s;%s' % (self.ceph_cmd, self.cluster.tmp_conf, osdnum, lcmd)).communicate() + lcmd = self.logcmd("Marking OSD %s out." % osdnum) + common.pdsh(settings.getnodes('head'), '%s -c %s osd out %s;%s' % (self.ceph_cmd, self.cluster.tmp_conf, osdnum, lcmd)).communicate() + common.pdsh(settings.getnodes('head'), self.logcmd('Waiting for the cluster to break and heal')).communicate() + self.lasttime = time.time() + self.state = 'osdout' + + + def osdout(self): + reclog = "%s/recovery.log" % self.config.get('run_dir') + recstatslog = "%s/recovery_stats.log" % self.config.get('run_dir') + ret = self.cluster.check_health(self.health_checklist, reclog, recstatslog) + + common.pdsh(settings.getnodes('head'), self.logcmd("ret: %s" % ret)).communicate() + + if ret == 0: + common.pdsh(settings.getnodes('head'), self.logcmd('Cluster never went unhealthy.')).communicate() + else: + common.pdsh(settings.getnodes('head'), self.logcmd('Cluster appears to have healed.')).communicate() + rectime = str(time.time() - self.lasttime) + common.pdsh(settings.getnodes('head'), 'echo Time: %s >> %s' % (rectime, recstatslog)).communicate() + common.pdsh(settings.getnodes('head'), self.logcmd('Time: %s' % rectime)).communicate() + + # Populate the recovery pool + self.cluster.maybe_populate_recovery_pool() + + common.pdsh(settings.getnodes('head'), self.logcmd("osdout state - Sleeping for 10 secs after populating recovery pool.")).communicate() + time.sleep(10) + lcmd = self.logcmd("Unsetting the ceph osd noup flag") + self.cluster.disable_recovery() + common.pdsh(settings.getnodes('head'), '%s -c %s osd unset noup;%s' % (self.ceph_cmd, self.cluster.tmp_conf, lcmd)).communicate() + for osdnum in self.config.get('osds'): + lcmd = self.logcmd("Marking OSD %s up." % osdnum) + common.pdsh(settings.getnodes('head'), '%s -c %s osd up %s;%s' % (self.ceph_cmd, self.cluster.tmp_conf, osdnum, lcmd)).communicate() + lcmd = self.logcmd("Marking OSD %s in." % osdnum) + common.pdsh(settings.getnodes('head'), '%s -c %s osd in %s;%s' % (self.ceph_cmd, self.cluster.tmp_conf, osdnum, lcmd)).communicate() + self.lasttime = time.time() + # Populate the scrub pool + logger.info("Sleep before scrub populate") + time.sleep(10) + self.cluster.maybe_populate_scrubbing_pool() + self.state = "osdin" + + + def osdin(self): + #Start scrub + self.startiorequest.set() + self.cluster.initiate_scrubbing() + self.cluster.enable_recovery() + recstatslog = "%s/recovery_backfill_stats.log" % self.config.get('run_dir') + scrub_stats_log = "%s/scrub_stats.log" % self.config.get('run_dir') + backfill = threading.Thread(target=self.cluster.check_backfill, args=(self.health_checklist, "%s/recovery.log" % self.config.get('run_dir'), recstatslog,)) + scrub_check = threading.Thread(target=self.cluster.check_scrub, args=(scrub_stats_log,)) + backfill.start() + scrub_check.start() + backfill.join() + scrub_check.join() + self.state = "post" + + + def post(self): + if self.stoprequest.isSet(): + common.pdsh(settings.getnodes('head'), self.logcmd('Cluster is healthy, but stoprequest is set, finishing now.')).communicate() + self.haltrequest.set() + return + + if self.config.get("repeat", False): + # reset counters + self.outhealthtries = 0 + self.inhealthtries = 0 + + common.pdsh(settings.getnodes('head'), self.logcmd('Cluster is healthy, but repeat is set. Moving to "markdown" state.')).communicate() + self.state = "markdown" + return + + common.pdsh(settings.getnodes('head'), self.logcmd('Cluster is healthy, finishing up...')).communicate() + self.state = "done" + + def done(self): + common.pdsh(settings.getnodes('head'), self.logcmd("Done. Calling parent callback function.")).communicate() + self.callback() + self.haltrequest.set() + + def join(self, timeout=None): + common.pdsh(settings.getnodes('head'), self.logcmd('Received notification that parent is finished and waiting.')).communicate() + super(ScrubRecoveryThreadBackground, self).join(timeout) + + def run(self): + self.haltrequest.clear() + self.stoprequest.clear() + self.startiorequest.clear() + while not self.haltrequest.isSet(): + self.states[self.state]() + common.pdsh(settings.getnodes('head'), self.logcmd('Exiting scrub+recovery test thread. Last state was: %s' % self.state)).communicate()