Skip to content

Commit

Permalink
Segment compact and repair commands to improve performance (OpenTSDB#…
Browse files Browse the repository at this point in the history
…1433)

* Add repair wrapper
* store as object
* Move log message
* Fix the rest of the logging
* Don't try to repair deleted metrics and better logging
* Try again immediately, rather than after a full run
* Actually track failed metrics
* Chunk repairs into 1 hour increments to reduce 'timeout' condition
* Better chunking and some comments on weird code blocks
* Return failed metrics correctly
* Only log chunks when they take a long time
* Spit repairs even further (30 minutes by default)
* Improve logging
* More updates to repair tool
* Track finished metrics between runs...
* Only write success when success happens
* Better warnings around slow chunks
* Multiprocessing of metrics (speeds things up)
* flake8 fixes
* Global counter to track progress
* Configurable threads and track failed metrics
* Don't require compaction
* 'uid metrics' command doesn't reliably exit, use 'uid grep metrics' instead
* Split compact and repair commands, as they seem to cause scanner problems in HBase 2.0
* Slightly better logging and default to hour chunks (better compact?)
* Actually kill subprocesses when timeout is reached
* flake8 fixes
* Allow for time chunks larger than 60, but still ensure alignment
* Simplify to list comprehension
* More threads, increasing timeouts for bigger metrics, and actually kill timed out subprocesses
* Nicer log message and fix bad setting
* Better logging

Signed-off-by: Chris Larsen <[email protected]>
  • Loading branch information
johnseekins authored and manolama committed Dec 11, 2018
1 parent dc31503 commit ed56c5a
Showing 1 changed file with 117 additions and 49 deletions.
166 changes: 117 additions & 49 deletions tools/repair-tsd
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@

from subprocess import Popen, PIPE, TimeoutExpired, check_output
from random import shuffle
from multiprocessing import Pool, Value
from pprint import pformat
import signal
import os
from multiprocessing import Pool, Value, cpu_count
import copy
import time
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
import logging
import math

log = logging.getLogger("repair-tsd")
log.setLevel(logging.INFO)
Expand All @@ -19,22 +23,40 @@ metric_count = Value('i', 0)
failed_count = Value('i', 0)


def get_large_divisors(num):
"""
Get "large" divisors of num
i.e. only numbers that, when multiplied, are > than num
e.g. 60 -> 10, 12, 15, 20, 30, 60
* Because while 1, 2, 3, 4, 6 are all divisors, they are too small
when mutiplied together
:param int num: The number to check
:returns: all large divisors
:rtype: list
"""
return [int(num / i) for i in range(1, int(math.sqrt(num)) + 1)
if num % i == 0 and i * i != num]


def get_metrics(args):
"""
Collect all metrics from OpenTSDB
:returns: all metrics
:rtype: list
"""
time_chunk = args.get("time_chunk", 15)
time_chunk = args.get("time_chunk", 60)
multiplier = int(60 / time_chunk)
time_range = args.get("time_range", 48)
tsd_path = args.get("tsd_path", "/usr/share/opentsdb/bin/tsdb")
cfg_path = args.get("cfg_path", "/etc/opentsdb/opentsdb.conf")
use_sudo = args.get("use_sudo", False)
sudo_user = args.get("sudo_user", "opentsdb")
base = "{} fsck --config={}".format(tsd_path, cfg_path)
check_cmd = "{} uid --config={} metrics".format(tsd_path, cfg_path)
check_cmd = "{} uid --config={} grep metrics".format(tsd_path, cfg_path)
if use_sudo:
base = "sudo -u {} {}".format(sudo_user, base)
check_cmd = "sudo -u {} {}".format(sudo_user, check_cmd)
Expand All @@ -46,9 +68,9 @@ def get_metrics(args):
for m in results[0].decode().split("\n") if m]
metriclist = [m for m in metriclist if m and m != "\x00"]
metricobj = {"time_chunk": time_chunk,
"timeout": int(time_chunk * 60),
"timeout": int((time_chunk * 60) / 2),
"retries": args.get("retries", 1),
"compact": args.get("compact", 1),
"compact": args.get("compact", False),
"multiplier": multiplier,
"metriccount": len(metriclist),
"chunk_count": time_range * multiplier,
Expand All @@ -64,6 +86,62 @@ def get_metrics(args):
return metrics


def _process_metric_chunk(metric, chunk, x, cmd, timeout):
"""
Actually run the cli command to repair this chunk.
we segment this so the calls to both the compact and
regular runs can be somewhat consistently managed
:param str metric: name of the metric we're processing
:param int chunk: Which time segment we're processing
:param int x: which attempt for the time segment we're on
:param str cmd: The actual command we'll run
:param int timeout: How long the command is allowed to run
:returns: whether the command was successful
:rtype: bool
"""
log.debug("Running command: {}".format(cmd))
metricproc = Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE,
preexec_fn=os.setsid)
try:
results, err = metricproc.communicate(timeout=timeout)
except TimeoutExpired:
msg = "{}: chunk {} failed (timeout: {}) (run {})".format(metric,
chunk,
timeout,
x)
log.warning(msg)
try:
os.killpg(os.getpgid(metricproc.pid), signal.SIGTERM)
except Exception as e:
log.warning("Couldn't kill subprocess :: {}".format(e))
return False
except Exception as e:
log.error("{} general exception :: {}".format(metric, e))
try:
os.killpg(os.getpgid(metricproc.pid), signal.SIGTERM)
except Exception as e:
log.warning("Couldn't kill subprocess :: {}".format(e))
return False
results = [r for r in results.decode().split("\n") if r][-26:]
final_results = []
"""
We'll only collect results that are non-0
since we're not super interested in stuff that didn't change.
"""
for r in results:
# Strip the timestamp from the log line
line = r.split(" ")[6:]
try:
if int(line[-1]) != 0:
final_results.append(" ".join(line))
except Exception:
final_results.append(" ".join(line))
result_str = "\n".join(final_results)
log.debug("{} results:\n{}".format(metric, result_str))
return True


def repair_metric_chunk(metricobj, chunk):
"""
Repair one 'chunk' of data for a metric
Expand All @@ -81,47 +159,26 @@ def repair_metric_chunk(metricobj, chunk):
timestr = "{}m-ago {}m-ago".format((chunk + 1) * time_chunk,
chunk * time_chunk)
cmd = "{} {} sum".format(base, timestr)
fullcmd = "{} {} --delete-bad-compacts --delete-bad-rows \
--delete-bad-values --delete-unknown-columns \
--delete-orphans --resolve-duplicates --fix".format(cmd,
metric)
ccmd = "{} {} --fix --compact".format(cmd, metric)
"""
Even though we're chunking, it's worth trying things more than once
"""
for x in range(1, metricobj["retries"] + 2):
log.debug("Repair try {} for {}".format(x, timestr))
if compact:
fullcmd = "{} {} --fix-all --compact".format(cmd, metric)
else:
fullcmd = "{} {} --fix-all".format(cmd, metric)
log.debug("Full command: {}".format(fullcmd))
metricproc = Popen(fullcmd, shell=True, stdout=PIPE, stderr=PIPE)
try:
results, err = metricproc.communicate(timeout=timeout)
except TimeoutExpired:
log.warning("{}: chunk {} failed in window (run {})".format(metric,
chunk,
x))
if not _process_metric_chunk(metric, chunk, x, fullcmd, timeout * x):
continue
except Exception as e:
log.error("{} general exception :: {}".format(metric,
e))
else:
results = [r for r in results.decode().split("\n") if r][-26:]
final_results = []
"""
We'll only collect results that are non-0
since we're not super interested in stuff that didn't change.
"""
for r in results:
# Strip the timestamp from the log line
line = r.split(" ")[6:]
try:
if int(line[-1]) != 0:
final_results.append(" ".join(line))
except Exception:
final_results.append(" ".join(line))
result_str = "\n".join(final_results)
log.debug("{} results:\n{}".format(metric, result_str))
if chunk % 20 == 0:
log.debug("Chunk {} of {} finished".format(chunk, chunk_count))
return None
if compact:
if not _process_metric_chunk(metric, chunk, x, ccmd, timeout * x):
continue
if chunk % 20 == 0:
log.info("{} -> Chunk {} of {} finished".format(metric,
chunk,
chunk_count))
return None
else:
log.error("Failed to completely repair {}".format(metric))
return metric
Expand All @@ -134,7 +191,7 @@ def process_metric(metricobj):
metric = metricobj["metric"]
chunk_count = metricobj["chunk_count"]
try:
check_output("{} {}".format(metricobj["check_cmd"], metric),
check_output("{} \"^{}$\"".format(metricobj["check_cmd"], metric),
shell=True)
except Exception:
log.warning("{} doesn't exist! Skipping...".format(metric))
Expand All @@ -150,8 +207,8 @@ def process_metric(metricobj):
runtime = time.time() - start_time
with metric_count.get_lock():
metric_count.value += 1
line = "{} repair took {} seconds".format(metric,
int(runtime))
line = "COMPLETE: {} repair took {} seconds".format(metric,
int(runtime))
line += " ({} of {} metrics complete)".format(metric_count.value,
metricobj["metriccount"])
line += " ({} failed)".format(failed_count.value)
Expand All @@ -162,7 +219,6 @@ def process_metrics(metric_list, threads):
threads = Pool(threads)
failed_metrics = threads.map(process_metric, metric_list)
failed_metrics = [m for m in failed_metrics if m]
log.warning("Failed metrics: {}".format(failed_metrics))
return failed_metrics


Expand All @@ -173,7 +229,7 @@ def cli_opts():
help="Show debug information")
parser.add_argument("--time-range", default="48",
help="How many hours of time we collect to repair")
parser.add_argument("--time-chunk", default="15",
parser.add_argument("--time-chunk", default="60",
help="How many minutes of data to scan per chunk")
parser.add_argument("--retries", default="1",
help="How many times we should try failed metrics")
Expand All @@ -192,7 +248,7 @@ def cli_opts():
parser.add_argument("--shuffle", action="store_true",
default=False,
help="Mix up incoming metric order")
parser.add_argument("--threads", default="4",
parser.add_argument("--threads", default="{}".format(int(cpu_count() / 2)),
help="Total number of metrics to process at once")
parser.add_argument("--sudo-user", default="opentsdb",
help="User to switch to...")
Expand All @@ -201,6 +257,7 @@ def cli_opts():

def main():
args = cli_opts()
chunks = get_large_divisors(60)
if args.debug:
log.setLevel(logging.DEBUG)
try:
Expand All @@ -217,8 +274,12 @@ def main():
log.error("Invalid thread count {} :: {}".format(args.threads, e))
try:
time_chunk = int(args.time_chunk)
if 60 % time_chunk != 0:
raise ArithmeticError
if time_chunk < 60:
if time_chunk not in chunks:
raise ArithmeticError
if time_chunk > 60:
if not any(n for n in chunks if time_chunk % chunk == 0):
raise ArithmeticError
except Exception as e:
log.error("Invalid time chunk {} :: {}".format(args.time_chunk, e))

Expand All @@ -232,7 +293,14 @@ def main():
"shuffle": args.shuffle,
"compact": args.compact,
"retries": retries})
process_metrics(metric_list, threads)
stime = time.time()
failed = process_metrics(metric_list, threads)
etime = time.time()
log.info("Processed {} metrics in [{}] seconds".format(len(metric_list),
etime - stime))
if len(failed) > 0:
log.warning("{} failed metrics:\n{}".format(len(failed),
pformat(sorted(failed))))


if __name__ == "__main__":
Expand Down

0 comments on commit ed56c5a

Please sign in to comment.