Skip to content

Commit

Permalink
STORM-3186: Customizable configuration for metric reporting interval
Browse files Browse the repository at this point in the history
  • Loading branch information
jainrish committed Sep 30, 2019
1 parent a2a2028 commit 379604d
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 2 deletions.
6 changes: 6 additions & 0 deletions storm-server/src/main/java/org/apache/storm/DaemonConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ public class DaemonConfig implements Validated {
@IsString
public static final String STORM_DAEMON_METRICS_REPORTER_PLUGIN_DOMAIN = "storm.daemon.metrics.reporter.plugin.domain";

/**
* We report the metrics with this interval period.
*/
@IsString
public static final String STORM_DAEMON_METRICS_REPORTER_INTERVAL_SECS = "storm.daemon.metrics.reporter.interval.secs";

/**
* A specify csv reporter directory for CvsPreparableReporter daemon metrics reporter.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.storm.DaemonConfig;
import org.apache.storm.daemon.metrics.ClientMetricsUtils;
import org.apache.storm.utils.ObjectReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsolePreparableReporter implements PreparableReporter {
private static final Logger LOG = LoggerFactory.getLogger(ConsolePreparableReporter.class);
ConsoleReporter reporter = null;
Integer reportingIntervalPeriod = null;

@Override
public void prepare(MetricRegistry metricsRegistry, Map<String, Object> topoConf) {
Expand All @@ -46,13 +50,14 @@ public void prepare(MetricRegistry metricsRegistry, Map<String, Object> topoConf
builder.convertDurationsTo(durationUnit);
}
reporter = builder.build();
reportingIntervalPeriod = ObjectReader.getInt(topoConf.get(DaemonConfig.STORM_DAEMON_METRICS_REPORTER_INTERVAL_SECS), 10);
}

@Override
public void start() {
if (reporter != null) {
LOG.debug("Starting...");
reporter.start(10, TimeUnit.SECONDS);
reporter.start(reportingIntervalPeriod, TimeUnit.SECONDS);
} else {
throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.storm.DaemonConfig;
import org.apache.storm.daemon.metrics.ClientMetricsUtils;
import org.apache.storm.daemon.metrics.MetricsUtils;
import org.apache.storm.utils.ObjectReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CsvPreparableReporter implements PreparableReporter {
private static final Logger LOG = LoggerFactory.getLogger(CsvPreparableReporter.class);
CsvReporter reporter = null;
Integer reportingIntervalPeriod = null;

@Override
public void prepare(MetricRegistry metricsRegistry, Map<String, Object> topoConf) {
Expand All @@ -49,13 +53,14 @@ public void prepare(MetricRegistry metricsRegistry, Map<String, Object> topoConf

File csvMetricsDir = MetricsUtils.getCsvLogDir(topoConf);
reporter = builder.build(csvMetricsDir);
reportingIntervalPeriod = ObjectReader.getInt(topoConf.get(DaemonConfig.STORM_DAEMON_METRICS_REPORTER_INTERVAL_SECS), 10);
}

@Override
public void start() {
if (reporter != null) {
LOG.debug("Starting...");
reporter.start(10, TimeUnit.SECONDS);
reporter.start(reportingIntervalPeriod, TimeUnit.SECONDS);
} else {
throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName());
}
Expand Down

0 comments on commit 379604d

Please sign in to comment.