Skip to content

Commit

Permalink
Merge remote-tracking branch 'OFFICIAL/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
manolama committed Nov 21, 2018
2 parents f35f2e8 + e4da2c0 commit 15944f6
Show file tree
Hide file tree
Showing 6 changed files with 234 additions and 50 deletions.
28 changes: 28 additions & 0 deletions NEWS
Original file line number Diff line number Diff line change
@@ -1,5 +1,33 @@
OpenTSDB - User visible changes.

* Version 2.3.1 (2018-04-21)

Noteworthy Changes:
- When setting up aggregators, advance to the first data point equal to or greater
than the query start timestamp. This helps with calendar downsampling intervals.
- Add support to the Nagios check script for downsampling fill policies.

Bug Fixes:
- Fix expression calculation by avoiding double execution and checking both
output types for boolean values.
- Fixing missing tools scripts in builds.
- Default HBase 1.2.5 in the OSX install script
- Upgrade AsyncBigtable to 0.3.1
- Log query stats when a channel is closed unexpectedly.
- Add the Java 8 path in the debian init script and remove Java 6.
- Pass the column family name to the get requests in the compaction scheduler.
- Fix a comparison issue in the UI on group by tags.
- Filter annotation queries by the starting timestamp, excluding those in a row that
began before the query start time.
- Tiny stap at purging backticks from Gnuplot scripts.
- Remove the `final` annotation from the meta classes so they can be extended.
- Fix the javacc maven plugin version.
- Fix the literal or filter to allow single character filters.
- Fix query start stats logging to use the ms instead of nano time.
- Move Jackson and Netty to newer versions for security reasons.
- Upgrade to AsyncHBase 1.8.2 for compatibility with HBase 1.3 and 2.0
- Fix the Highest Current calculation to handle empty time series.
- Change the cache hits counters to longs.

* Version 2.3.0 (2016-12-31)

Expand Down
5 changes: 0 additions & 5 deletions src/core/IncomingDataPoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,6 @@ public final String getTSUID() {
return tsuid;
}

/** @param moretags the hashmap of kv pair to add */
public final void addTags(HashMap<String, String> moretags) {
this.tags.putAll(moretags);
}

/** @param metric the metric to set */
public final void setMetric(String metric) {
this.metric = metric;
Expand Down
10 changes: 0 additions & 10 deletions src/tsd/AbstractHttpQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -165,16 +165,6 @@ public Map<String, String> getHeaders() {
return headers;
}

/**
* Return the value of the given HTTP Header
* first match wins
* @return Header value as string
*/
public String getHeaderValue(final String headerName) {
if (headerName == null) { return null; }
return request.headers().get(headerName);
}

/** @param stats The stats object to mark after writing is complete */
public void setStats(final QueryStats stats) {
this.stats = stats;
Expand Down
18 changes: 0 additions & 18 deletions src/tsd/PutDataPointRpc.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ public void execute(final TSDB tsdb, final HttpQuery query)
throw new BadRequestException("No datapoints found in content");
}

final HashMap<String, String> query_tags = new HashMap<String, String>();
final boolean show_details = query.hasQueryStringParam("details");
final boolean show_summary = query.hasQueryStringParam("summary");
final boolean synchronous = query.hasQueryStringParam("sync");
Expand All @@ -139,18 +138,6 @@ public void execute(final TSDB tsdb, final HttpQuery query)
int queued = 0;
final List<Deferred<Boolean>> deferreds = synchronous ?
new ArrayList<Deferred<Boolean>>(dps.size()) : null;

if (tsdb.getConfig().enable_header_tag()) {
LOG.debug("Looking for tag header " + tsdb.getConfig().get_name_header_tag());
final String header_tag_value = query.getHeaderValue(tsdb.getConfig().get_name_header_tag()) ;
if (header_tag_value != null) {
LOG.debug(" header found with value:" + header_tag_value);
Tags.parse(query_tags, header_tag_value);
} else {
LOG.debug(" no such header in request");
}
}

for (final IncomingDataPoint dp : dps) {

/** Handles passing a data point to the storage exception handler if
Expand Down Expand Up @@ -183,11 +170,6 @@ public String toString() {
}

try {
/** Add additionnal tags from HTTP header */
if ( (query_tags != null) && (query_tags.size() > 0) ) {
dp.addTags(query_tags);
}

if (dp.getMetric() == null || dp.getMetric().isEmpty()) {
if (show_details) {
details.add(this.getHttpDetails("Metric name was empty", dp));
Expand Down
17 changes: 0 additions & 17 deletions src/utils/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,6 @@ public class Config {
/** tsd.storage.fix_duplicates */
private boolean fix_duplicates = false;

/** tsd.http.header_tag */
private String http_header_tag = null;

/** tsd.http.request.max_chunk */
private int max_chunked_requests = 4096;

Expand Down Expand Up @@ -231,16 +228,6 @@ public int scanner_maxNumRows() {
return scanner_max_num_rows;
}

/** @return whether or not additional http header tag is allowed */
public boolean enable_header_tag() {
return http_header_tag != null ;
}

/** @return the lookup value for additional http header tag */
public String get_name_header_tag() {
return http_header_tag ;
}

/** @return whether or not chunked requests are supported */
public boolean enable_chunked_requests() {
return enable_chunked_requests;
Expand Down Expand Up @@ -548,7 +535,6 @@ protected void setDefaults() {
default_map.put("tsd.core.stats_with_port", "false");
default_map.put("tsd.http.show_stack_trace", "true");
default_map.put("tsd.http.query.allow_delete", "false");
default_map.put("tsd.http.header_tag", "");
default_map.put("tsd.http.request.enable_chunked", "false");
default_map.put("tsd.http.request.max_chunk", "4096");
default_map.put("tsd.http.request.cors_domains", "");
Expand Down Expand Up @@ -666,9 +652,6 @@ protected void loadStaticVariables() {
if (this.hasProperty("tsd.http.request.max_chunk")) {
max_chunked_requests = this.getInt("tsd.http.request.max_chunk");
}
if (this.hasProperty("tsd.http.header_tag")) {
http_header_tag = this.getString("tsd.http.header_tag");
}
enable_tree_processing = this.getBoolean("tsd.core.tree.enable_processing");
fix_duplicates = this.getBoolean("tsd.storage.fix_duplicates");
scanner_max_num_rows = this.getInt("tsd.storage.hbase.scanner.maxNumRows");
Expand Down
206 changes: 206 additions & 0 deletions tools/repair-tsd
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
#!/usr/bin/env python3

from subprocess import Popen, PIPE, TimeoutExpired, check_output
from random import shuffle
import time
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
import logging
import pprint

log = logging.getLogger("repair-tsd")
log.setLevel(logging.INFO)
ch = logging.StreamHandler()
logformat = '%(asctime)s %(name)s %(levelname)s %(message)s'
formatter = logging.Formatter(logformat)
ch.setFormatter(formatter)
log.addHandler(ch)


class TSDRepair(object):
def __init__(self, args):
self.time_chunk = args.get("time_chunk", 15)
self.timeout = int(self.time_chunk * 60)
self.retries = args.get("retries", 1)
self.multiplier = int(60 / self.time_chunk)
self.time_range = args.get("time_range", 48)
self.chunk_count = self.time_range * self.multiplier
self.tsd_path = args.get("tsd_path", "/usr/share/opentsdb/bin/tsdb")
self.cfg_path = args.get("cfg_path", "/etc/opentsdb/opentsdb.conf")
self.use_sudo = args.get("use_sudo", False)
self.sudo_user = args.get("sudo_user", "opentsdb")
self.log = logging.getLogger("repair-tsd")
self.base = "{} fsck --config={}".format(self.tsd_path, self.cfg_path)
self.check_cmd = "{} uid --config={} metrics".format(self.tsd_path, self.cfg_path)
if self.use_sudo:
self.base = "sudo -u {} {}".format(self.sudo_user, self.base)
self.check_cmd = "sudo -u {} {}".format(self.sudo_user, self.check_cmd)

def _get_metrics(self):
"""
Collect all metrics from OpenTSDB
:returns: all metrics
:rtype: list
"""
try:
self.store_path = args.get('store_path', '/tmp/opentsdb.list')
with open(self.store_path, 'r') as f_in:
finished_metrics = [m for m in f_in.read().split('\n') if m]
except Exception:
finished_metrics = []
cmd = '{} uid --config={} grep metrics ".*"'.format(self.tsd_path,
self.cfg_path)
proc = Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE)
results = proc.communicate()
metrics = [m.split(" ")[1].strip(":")
for m in results[0].decode().split("\n") if m]
metrics = [m for m in metrics if m and m != "\x00" and
m not in finished_metrics]
shuffle(metrics)
self.log.info("There are {} metrics to process".format(len(metrics)))
return metrics

def _repair_metric_chunk(self, metric, chunk):
"""
Repair one 'chunk' of data for a metric
"""
self.log.debug("Running chunk {} for {}".format(chunk, metric))
if chunk < 2:
timestr = "{}m-ago".format(self.time_chunk)
else:
timestr = "{}m-ago {}m-ago".format((chunk + 1) * self.time_chunk,
chunk * self.time_chunk)
cmd = "{} {} sum".format(self.base, timestr)
"""
Even though we're chunking, it's worth trying things more than once
"""
for x in range(1, self.retries + 2):
self.log.debug("Repair try {} for {}".format(x, timestr))
fullcmd = "{} {} --fix-all --compact".format(cmd, metric)
self.log.debug("Full command: {}".format(fullcmd))
metricproc = Popen(fullcmd, shell=True, stdout=PIPE, stderr=PIPE)
try:
results, err = metricproc.communicate(timeout=self.timeout)
except TimeoutExpired:
self.log.debug("{} failed to complete in window (run {})".format(metric, x))
continue
except Exception as e:
self.log.error("{} general exception :: {}".format(metric,
e))
else:
results = [r for r in results.decode().split("\n") if r][-26:]
final_results = []
"""
We'll only collect results that are non-0
since we're not super interested in stuff that didn't change.
"""
for r in results:
# Strip the timestamp from the log line
line = r.split(" ")[6:]
try:
if int(line[-1]) != 0:
final_results.append(" ".join(line))
except Exception:
final_results.append(" ".join(line))
result_str = "\n".join(final_results)
self.log.debug("{} results:\n{}".format(metric, result_str))
if chunk % 20 == 0:
self.log.info("Chunk {} of {} finished".format(chunk, self.chunk_count))
else:
self.log.debug("Chunk {} of {} finished".format(chunk, self.chunk_count))
try:
with open(self.store_path, 'a') as f_out:
f_out.write("{}\n".format(metric))
except Exception:
pass
return None
else:
self.log.error("Failed to completely repair {}".format(metric))
return metric

def process_metrics(self):
"""
Run fsck on a list of metrics over a time range
"""
failed_metrics = []
metrics = self._get_metrics()
for index, metric in enumerate(metrics):
try:
check_output("{} {}".format(self.check_cmd, metric),
shell=True)
except Exception:
log.warning("{} doesn't exist! Skipping...".format(metric))
continue
logline = "{} ({} of {})".format(metric, index + 1, len(metrics))
logline += " ({} failed) in {} chunks".format(len(failed_metrics),
self.chunk_count)
self.log.info(logline)
start_time = time.time()
start_time_min = int(start_time//60 * 60)
failed_metrics = [self._repair_metric_chunk(metric, x)
for x in range(1, self.chunk_count + 1)]
failed_metrics = [m for m in failed_metrics if m]
runtime = time.time() - start_time
self.log.info("{} repair took {} seconds".format(metric,
int(runtime)))
self.log.info("Failed metrics: {}".format(failed_metrics))
return failed_metrics


def cli_opts():
parser = ArgumentParser(description="Repair all OpenTSDB metrics",
formatter_class=ArgumentDefaultsHelpFormatter)
parser.add_argument("--debug", action="store_true", default=False,
help="Show debug information")
parser.add_argument("--time-range", default="48",
help="How many hours of time we collect to repair")
parser.add_argument("--time-chunk", default="15",
help="How many minutes of data to scan per chunk")
parser.add_argument("--retries", default="1",
help="How many times we should try failed metrics")
parser.add_argument("--tsd-path", default="/usr/share/opentsdb/bin/tsdb",
help="Path to the OpenTSDB CLI binary")
parser.add_argument("--cfg-path", default="/etc/opentsdb/opentsdb.conf",
help="Path to OpenTSDB config")
parser.add_argument("--store-path", default="/opentsdb-fsck.list",
help="Path to OpenTSDB config")
parser.add_argument("--use-sudo", action="store_true",
default=False,
help="switch user when running repairs?")
parser.add_argument("--sudo-user", default="opentsdb",
help="User to switch to...")
return parser.parse_args()


def main():
args = cli_opts()
if args.debug:
log.setLevel(logging.DEBUG)
try:
time_range = int(args.time_range)
except Exception as e:
log.error("Invalid time range {} :: {}".format(args.time_range, e))
try:
retries = int(args.retries)
except Exception as e:
log.error("Invalid retry number {} :: {}".format(args.retries, e))
try:
time_chunk = int(args.time_chunk)
if 60 % time_chunk != 0:
raise ArithmeticError
except Exception as e:
log.error("Invalid time chunk {} :: {}".format(args.retries, e))

repair_tool = TSDRepair({"time_range": time_range,
"use_sudo": args.use_sudo,
"sudo_user": args.sudo_user,
"time_chunk": time_chunk,
"tsd_path": args.tsd_path,
"cfg_path": args.cfg_path,
"store_path": args.store_path,
"retries": retries})
repair_tool.process_metrics()


if __name__ == "__main__":
main()

0 comments on commit 15944f6

Please sign in to comment.