Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Merge branch 'main' into election
Browse files Browse the repository at this point in the history
  • Loading branch information
meetshah777 authored Mar 18, 2021
2 parents abcbc45 + 3575243 commit feeaee9
Show file tree
Hide file tree
Showing 15 changed files with 235 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,31 @@
import org.apache.logging.log4j.Logger;

public abstract class PerformanceAnalyzerMetricsCollector implements Runnable {
enum State {
HEALTHY,

// This collector could not complete between two runs of
// ScheduledMetricCollectorsExecutor. First occurrence of
// this is considered a warning.
SLOW,

// A collector is muted if it failed to complete between two runs of
// ScheduledMetricCollectorsExecutor. A muted collector is skipped.
MUTED
}

private static final Logger LOG = LogManager.getLogger(PerformanceAnalyzerMetricsCollector.class);
private int timeInterval;
private long startTime;
private String collectorName;
protected StringBuilder value;
protected State state;

protected PerformanceAnalyzerMetricsCollector(int timeInterval, String collectorName) {
this.timeInterval = timeInterval;
this.collectorName = collectorName;
this.value = new StringBuilder();
this.state = State.HEALTHY;
}

private AtomicBoolean bInProgress = new AtomicBoolean(false);
Expand Down Expand Up @@ -76,4 +91,12 @@ public void run() {
public StringBuilder getValue() {
return value;
}

public State getState() {
return state;
}

public void setState(State state) {
this.state = state;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@

package com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
Expand All @@ -32,6 +36,9 @@ public class ScheduledMetricCollectorsExecutor extends Thread {
private boolean paEnabled = false;
private int minTimeIntervalToSleep = Integer.MAX_VALUE;
private Map<PerformanceAnalyzerMetricsCollector, Long> metricsCollectors;

public static final String COLLECTOR_THREAD_POOL_NAME = "pa-collectors-th";

private ThreadPoolExecutor metricsCollectorsTP;

public ScheduledMetricCollectorsExecutor(
Expand Down Expand Up @@ -62,14 +69,20 @@ public void addScheduledMetricCollector(PerformanceAnalyzerMetricsCollector task
}

public void run() {
Thread.currentThread().setName(this.getClass().getSimpleName());
if (metricsCollectorsTP == null) {
ThreadFactory taskThreadFactory = new ThreadFactoryBuilder()
.setNameFormat(COLLECTOR_THREAD_POOL_NAME)
.setDaemon(true)
.build();
metricsCollectorsTP =
new ThreadPoolExecutor(
collectorThreadCount,
collectorThreadCount,
COLLECTOR_THREAD_KEEPALIVE_SECS,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(metricsCollectors.size()));
new ArrayBlockingQueue<>(metricsCollectors.size()),
taskThreadFactory);
}

long prevStartTimestamp = System.currentTimeMillis();
Expand All @@ -94,11 +107,20 @@ public void run() {
metricsCollectors.entrySet()) {
if (entry.getValue() <= currentTime) {
PerformanceAnalyzerMetricsCollector collector = entry.getKey();
if (collector.getState() == PerformanceAnalyzerMetricsCollector.State.MUTED) {
StatsCollector.instance().logException(StatExceptionCode.COLLECTORS_MUTED);
continue;
}
metricsCollectors.put(collector, entry.getValue() + collector.getTimeInterval());
if (!collector.inProgress()) {
collector.setStartTime(currentTime);
metricsCollectorsTP.execute(collector);
} else {
if (collector.getState() == PerformanceAnalyzerMetricsCollector.State.HEALTHY) {
collector.setState(PerformanceAnalyzerMetricsCollector.State.SLOW);
} else if (collector.getState() == PerformanceAnalyzerMetricsCollector.State.SLOW) {
collector.setState(PerformanceAnalyzerMetricsCollector.State.MUTED);
}
LOG.info(
"Collector {} is still in progress, so skipping this Interval",
collector.getCollectorName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,18 @@ public enum StatExceptionCode {
TOTAL_ERROR("TotalError"),
METRICS_WRITE_ERROR("MetricsWriteError"),
METRICS_REMOVE_ERROR("MetricsRemoveError"),
// Tracks the number of VM attach/dataDump or detach failures.
JVM_ATTACH_ERROR("JvmAttachErrror"),
// This error is thrown if the java_pid file is missing.
JVM_ATTACH_ERROR_JAVA_PID_FILE_MISSING("JvmAttachErrorJavaPidFileMissing"),
// The lock could not be acquired within the timeout.
JVM_ATTACH_LOCK_ACQUISITION_FAILED("JvmAttachLockAcquisitionFailed"),
// ThreadState could not be found for an ES thread in the critical ES path.
NO_THREAD_STATE_INFO("NoThreadStateInfo"),
// This metric indicates that we successfully completed a thread-dump. Likewise,
// an omission of this should indicate that the thread taking the dump got stuck.
JVM_THREAD_DUMP_SUCCESSFUL("JvmThreadDumpSuccessful"),
COLLECTORS_MUTED("CollectorsMutedCount"),
MASTER_METRICS_ERROR("MasterMetricsError"),
DISK_METRICS_ERROR("DiskMetricsError"),
THREAD_IO_ERROR("ThreadIOError"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.amazon.opendistro.elasticsearch.performanceanalyzer.jvm;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.OSMetricsGeneratorFactory;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.ScheduledMetricCollectorsExecutor;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.StatExceptionCode;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.StatsCollector;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.core.Util;
Expand All @@ -32,9 +33,10 @@
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import sun.tools.attach.HotSpotVirtualMachine;
Expand All @@ -50,11 +52,15 @@ public class ThreadList {
static final Logger LOGGER = LogManager.getLogger(ThreadList.class);
static final int samplingInterval =
MetricsConfiguration.CONFIG_MAP.get(ThreadList.class).samplingInterval;

// This value controls how often we do the thread dump.
private static final long minRunInterval = samplingInterval;
private static final ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
private static final Pattern linePattern = Pattern.compile("\"([^\"]*)\"");
private static long lastRunTime = 0;

private static Lock vmAttachLock = new ReentrantLock();

public static class ThreadState {
public long javaTid;
public long nativeTid;
Expand Down Expand Up @@ -104,37 +110,53 @@ public String toString() {
}
}

/**
* This is called from OSMetricsCollector#collectMetrics. So this is not called
* in the critical path of ES request handling. Even for the collector thread,
* we do a timed wait to acquire this lock and move on if we could not get it.
* @return A hashmap of threadId to threadState.
*/
public static Map<Long, ThreadState> getNativeTidMap() {
synchronized (ThreadList.class) {
if (System.currentTimeMillis() > lastRunTime + minRunInterval) {
runThreadDump(pid, new String[0]);
if (vmAttachLock.tryLock()) {
try {
// Thread dumps are expensive and therefore we make sure that at least
// minRunInterval milliseconds have elapsed between two attempts.
if (System.currentTimeMillis() > lastRunTime + minRunInterval) {
runThreadDump(pid, new String[0]);
}
} finally {
vmAttachLock.unlock();
}
// - sending a copy so that if runThreadDump next iteration clears it; caller still has the
// state at the call time
// - not too expensive as this is only being called from Scheduled Collectors (only once in
// few seconds)
return new HashMap<>(nativeTidMap);
} else {
StatsCollector.instance().logException(StatExceptionCode.JVM_ATTACH_LOCK_ACQUISITION_FAILED);
}

// - sending a copy so that if runThreadDump next iteration clears it; caller still has the
// state at the call time
// - not too expensive as this is only being called from Scheduled Collectors (only once in
// few seconds)
return new HashMap<>(nativeTidMap);
}

/**
* This method is called from the critical bulk and search paths which PA
* intercepts. This method used to try to do a thread dump if it could not
* find the information about the thread in question. The thread dump is an
* expensive operation and can stall see VirtualMachineImpl#VirtualMachineImpl()
* for jdk-11 u06. We don't want the ES threads to pay the price. We skip this
* iteration and then hopefully in the next call to getNativeTidMap(), the
* OSMetricsCollector#collectMetrics will fill the jTidMap. This transfers the
* responsibility from the ES threads to the PA collector threads.
*
* @param threadId The threadId of the current thread.
* @return If we have successfully captured the ThreadState, then we emit it or Null otherwise.
*/
public static ThreadState getThreadState(long threadId) {
ThreadState retVal = jTidMap.get(threadId);

if (retVal != null) {
return retVal;
}

synchronized (ThreadList.class) {
retVal = jTidMap.get(threadId);

if (retVal != null) {
return retVal;
}

runThreadDump(pid, new String[0]);
if (retVal == null) {
StatsCollector.instance().logException(StatExceptionCode.NO_THREAD_STATE_INFO);
}

return jTidMap.get(threadId);
return retVal;
}

// Attach to pid and perform a thread dump
Expand All @@ -143,31 +165,28 @@ private static void runAttachDump(String pid, String[] args) {
try {
vm = VirtualMachine.attach(pid);
} catch (Exception ex) {
LOGGER.debug(
"Error in Attaching to VM with exception: {} with ExceptionCode: {}",
() -> ex.toString(),
() -> StatExceptionCode.JVM_ATTACH_ERROR.toString());
StatsCollector.instance().logException(StatExceptionCode.JVM_ATTACH_ERROR);
if (ex.getMessage().contains("java_pid")) {
StatsCollector.instance().logException(StatExceptionCode.JVM_ATTACH_ERROR_JAVA_PID_FILE_MISSING);
} else {
StatsCollector.instance().logException(StatExceptionCode.JVM_ATTACH_ERROR);
}
// If the thread dump failed then we clean up the old map. So, next time when the collection
// happens as it would after a bootup.
oldNativeTidMap.clear();
return;
}

try (InputStream in = ((HotSpotVirtualMachine) vm).remoteDataDump(args); ) {
createMap(in);
} catch (Exception ex) {
LOGGER.debug(
"Cannot list threads with exception: {} with ExceptionCode: {}",
() -> ex.toString(),
() -> StatExceptionCode.JVM_ATTACH_ERROR.toString());
StatsCollector.instance().logException(StatExceptionCode.JVM_ATTACH_ERROR);
oldNativeTidMap.clear();
}

try {
vm.detach();
StatsCollector.instance().logException(StatExceptionCode.JVM_THREAD_DUMP_SUCCESSFUL);
} catch (Exception ex) {
LOGGER.debug(
"Failed in VM Detach with exception: {} with ExceptionCode: {}",
() -> ex.toString(),
() -> StatExceptionCode.JVM_ATTACH_ERROR.toString());
StatsCollector.instance().logException(StatExceptionCode.JVM_ATTACH_ERROR);
}
}
Expand Down Expand Up @@ -236,6 +255,10 @@ private static void parseThreadInfo(final ThreadInfo info) {
}

static void runThreadDump(String pid, String[] args) {
String currentThreadName = Thread.currentThread().getName();
assert currentThreadName.startsWith(ScheduledMetricCollectorsExecutor.COLLECTOR_THREAD_POOL_NAME)
|| currentThreadName.equals(ScheduledMetricCollectorsExecutor.class.getSimpleName()) :
String.format("Thread dump called from a non os collector thread: %s", currentThreadName);
jTidNameMap.clear();
oldNativeTidMap.putAll(nativeTidMap);
nativeTidMap.clear();
Expand All @@ -244,8 +267,11 @@ static void runThreadDump(String pid, String[] args) {

// TODO: make this map update atomic
Util.invokePrivileged(() -> runAttachDump(pid, args));
runMXDump();

// oldNativeTidMap gets cleared if the attach Fails, so that the
// metrics collection starts as it would after a restart.
if (!oldNativeTidMap.isEmpty()) {
runMXDump();
}
lastRunTime = System.currentTimeMillis();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ public enum MetricName {
MASTER_PENDING,
ELECTION_TERM,
MOUNTED_PARTITION_METRICS,
CLUSTER_APPLIER_SERVICE,
ADMISSION_CONTROL_METRICS,
SHARD_INDEXING_PRESSURE
SHARD_INDEXING_PRESSURE,
}

// we don't store node details as a metric on reader side database. We
Expand Down Expand Up @@ -824,20 +825,41 @@ public static class Constants {
}
}

public enum ElectionTermValue implements MetricValue {
ELECTION_TERM(Constants.ELECTION_TERM_VALUE);
public enum ClusterApplierServiceStatsValue implements MetricValue {
CLUSTER_APPLIER_SERVICE_LATENCY(ClusterApplierServiceStatsValue.Constants.CLUSTER_APPLIER_SERVICE_LATENCY),
CLUSTER_APPLIER_SERVICE_FAILURE(ClusterApplierServiceStatsValue.Constants.CLUSTER_APPLIER_SERVICE_FAILURE);

private final String value;

ElectionTermValue(String value) {
ClusterApplierServiceStatsValue(String value) {
this.value = value;
}

@Override
public String toString() {
return this.value;
}

public static class Constants {
public static final String CLUSTER_APPLIER_SERVICE_LATENCY = "ClusterApplierService_Latency";
public static final String CLUSTER_APPLIER_SERVICE_FAILURE = "ClusterApplierService_Failure";
}
}

public enum ElectionTermValue implements MetricValue {
ELECTION_TERM(Constants.ELECTION_TERM_VALUE);

private final String value;

ElectionTermValue(String value) {
this.value = value;
}

@Override
public String toString() {
return value;
}

public static class Constants {
public static final String ELECTION_TERM_VALUE = "Election_Term";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class PerformanceAnalyzerMetrics {
public static final String sMasterTaskPath = "master_task";
public static final String sFaultDetection = "fault_detection";
public static final String sElectionTermPath = "election_term";
public static final String sClusterApplierService = "cluster_applier_service";
public static final String sHttpPath = "http";
public static final String sOSPath = "os_metrics";
public static final String sHeapPath = "heap_metrics";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,16 @@ public class MetricsModel {
new MetricAttributes(
MetricUnits.COUNT.toString(), AllMetrics.ShardStateDimension.values()));

allMetricsInitializer.put(
AllMetrics.ClusterApplierServiceStatsValue.CLUSTER_APPLIER_SERVICE_LATENCY.toString(),
new MetricAttributes(
MetricUnits.MILLISECOND.toString(), EmptyDimension.values()));

allMetricsInitializer.put(
AllMetrics.ClusterApplierServiceStatsValue.CLUSTER_APPLIER_SERVICE_FAILURE.toString(),
new MetricAttributes(
MetricUnits.COUNT.toString(), EmptyDimension.values()));

allMetricsInitializer.put(
AdmissionControlValue.REJECTION_COUNT.toString(),
new MetricAttributes(MetricUnits.COUNT.toString(), AdmissionControlDimension.values())
Expand Down
Loading

0 comments on commit feeaee9

Please sign in to comment.