diff --git a/src/main/java/org/opensearch/performanceanalyzer/rca/framework/metrics/ReaderMetrics.java b/src/main/java/org/opensearch/performanceanalyzer/rca/framework/metrics/ReaderMetrics.java index 4c9fc5a04..ec6ce0fd9 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/rca/framework/metrics/ReaderMetrics.java +++ b/src/main/java/org/opensearch/performanceanalyzer/rca/framework/metrics/ReaderMetrics.java @@ -86,7 +86,13 @@ public enum ReaderMetrics implements MeasurementSet { "FaultDetectionMetricsEmitterExecutionTime", "millis", StatsType.LATENCIES, - Statistics.SUM); + Statistics.SUM), + SEARCH_BACK_PRESSURE_METRICS_EMITTER_EXECUTION_TIME( + "SearchBackPressureMetricsEmitterExecutionTime", + "millis", + StatsType.LATENCIES, + Statistics.SUM), + ; /** What we want to appear as the metric name. */ private String name; diff --git a/src/main/java/org/opensearch/performanceanalyzer/reader/MetricsEmitter.java b/src/main/java/org/opensearch/performanceanalyzer/reader/MetricsEmitter.java index 5a225a0ba..c376e2bd6 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/reader/MetricsEmitter.java +++ b/src/main/java/org/opensearch/performanceanalyzer/reader/MetricsEmitter.java @@ -749,6 +749,136 @@ public static void emitGarbageCollectionInfo( ReaderMetrics.GC_INFO_EMITTER_EXECUTION_TIME, mFinalT - mCurrT); } + public static void emitSearchBackPressureMetrics( + MetricsDB metricsDB, + SearchBackPressureMetricsSnapShot searchBackPressureMetricsSnapShot) { + long mCurrT = System.currentTimeMillis(); + Result searchbp_records = searchBackPressureMetricsSnapShot.fetchAll(); + + // String SEARCHBP_MODE_DIM = "searchbp_mode"; + String SEARCHBP_TYPE_DIM = + AllMetrics.SearchBackPressureStatsValue.SEARCHBP_TYPE_DIM.toString(); + String SEARCHBP_TABLE_NAME = + AllMetrics.SearchBackPressureStatsValue.SEARCHBP_TABLE_NAME.toString(); + + List dims = + new ArrayList() { + { + this.add(SEARCHBP_TYPE_DIM); + } + }; + + // stats type in sqlitedb is similar to: + // stats_type_name | sum | avg | min | max + List stats_types = + new ArrayList() { + { + // Shard/Task Stats Cancellation Count + // searchbp_shard_stats_cancellationCount|0.0|0.0|0.0|0.0 + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_CANCELLATIONCOUNT + .toString()); + // searchbp_task_stats_cancellationCount|0.0|0.0|0.0|0.0 + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_CANCELLATIONCOUNT + .toString()); + // Shard Stats Resource Heap / CPU Usage + // searchbp_shard_stats_resource_heap_usage_cancellationCount|0.0|0.0|0.0|0.0 + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_CANCELLATIONCOUNT + .toString()); + // searchbp_shard_stats_resource_heap_usage_currentMax|0.0|0.0|0.0|0.0 + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_CURRENTMAX + .toString()); + // searchbp_shard_stats_resource_heap_usage_rollingAvg|0.0|0.0|0.0|0.0 + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_ROLLINGAVG + .toString()); + // searchbp_shard_stats_resource_cpu_usage_cancellationCount|0.0|0.0|0.0|0.0 + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT + .toString()); + // searchbp_shard_stats_resource_cpu_usage_currentMax|0.0|0.0|0.0|0.0 + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CURRENTMAX + .toString()); + // searchbp_shard_stats_resource_cpu_usage_currentAvg|0.0|0.0|0.0|0.0 + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CURRENTAVG + .toString()); + // Task Stats Resource Heap / CPU Usage + // searchbp_task_stats_resource_heap_usage_cancellationCount|0.0|0.0|0.0|0.0 + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_CANCELLATIONCOUNT + .toString()); + // searchbp_task_stats_resource_heap_usage_currentMax|0.0|0.0|0.0|0.0 + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_CURRENTMAX + .toString()); + // searchbp_task_stats_resource_heap_usage_rollingAvg|0.0|0.0|0.0|0.0 + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_ROLLINGAVG + .toString()); + // searchbp_task_stats_resource_cpu_usage_cancellationCount|0.0|0.0|0.0|0.0 + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT + .toString()); + // searchbp_task_stats_resource_cpu_usage_currentMax|0.0|0.0|0.0|0.0 + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CURRENTMAX + .toString()); + // searchbp_task_stats_resource_cpu_usage_currentAvg|0.0|0.0|0.0|0.0 + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CURRENTAVG + .toString()); + } + }; + + metricsDB.createMetric(new Metric<>(SEARCHBP_TABLE_NAME, 0d), dims); + + BatchBindStep handle = metricsDB.startBatchPut(new Metric<>(SEARCHBP_TABLE_NAME, 0d), dims); + + for (Record record : searchbp_records) { + for (String stats_type : stats_types) { + Optional tmpStatsObj = Optional.ofNullable(record.get(stats_type)); + + handle.bind( + stats_type, + // the rest are agg fields: sum, avg, min, max which don't make sense for + // searchbackpressure + tmpStatsObj.map(o -> Long.parseLong(o.toString())).orElse(0L), + tmpStatsObj.map(o -> Long.parseLong(o.toString())).orElse(0L), + tmpStatsObj.map(o -> Long.parseLong(o.toString())).orElse(0L), + tmpStatsObj.map(o -> Long.parseLong(o.toString())).orElse(0L)); + } + } + + handle.execute(); + + long mFinalT = System.currentTimeMillis(); + LOG.debug( + "Total time taken for writing Search Back Pressure info into metricsDB: {}", + mFinalT - mCurrT); + ServiceMetrics.READER_METRICS_AGGREGATOR.updateStat( + ReaderMetrics.SEARCH_BACK_PRESSURE_METRICS_EMITTER_EXECUTION_TIME, + mFinalT - mCurrT); + } + public static void emitAdmissionControlMetrics( MetricsDB metricsDB, AdmissionControlSnapshot snapshot) { long mCurrT = System.currentTimeMillis(); diff --git a/src/main/java/org/opensearch/performanceanalyzer/reader/ReaderMetricsProcessor.java b/src/main/java/org/opensearch/performanceanalyzer/reader/ReaderMetricsProcessor.java index 512c52f6d..3b446d95e 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/reader/ReaderMetricsProcessor.java +++ b/src/main/java/org/opensearch/performanceanalyzer/reader/ReaderMetricsProcessor.java @@ -70,6 +70,7 @@ public class ReaderMetricsProcessor implements Runnable { clusterManagerThrottlingMetricsMap; private NavigableMap shardStateMetricsMap; private NavigableMap admissionControlMetricsMap; + private NavigableMap searchBackPressureMetricsMap; private static final int MAX_DATABASES = 2; private static final int OS_SNAPSHOTS = 4; @@ -81,6 +82,7 @@ public class ReaderMetricsProcessor implements Runnable { private static final int GC_INFO_SNAPSHOTS = 4; private static final int CLUSTER_MANAGER_THROTTLING_SNAPSHOTS = 2; private static final int AC_SNAPSHOTS = 2; + private static final int SEARCH_BP_SNAPSHOTS = 4; private final String rootLocation; private final AppContext appContext; @@ -125,6 +127,8 @@ public ReaderMetricsProcessor( gcInfoMap = new TreeMap<>(); clusterManagerThrottlingMetricsMap = new TreeMap<>(); admissionControlMetricsMap = new TreeMap<>(); + searchBackPressureMetricsMap = new TreeMap<>(); + this.rootLocation = rootLocation; this.configOverridesApplier = new ConfigOverridesApplier(); @@ -268,6 +272,7 @@ public void trimOldSnapshots() throws Exception { trimMap(gcInfoMap, GC_INFO_SNAPSHOTS); trimMap(clusterManagerThrottlingMetricsMap, CLUSTER_MANAGER_THROTTLING_SNAPSHOTS); trimMap(admissionControlMetricsMap, AC_SNAPSHOTS); + trimMap(searchBackPressureMetricsMap, SEARCH_BP_SNAPSHOTS); for (NavigableMap snap : nodeMetricsMap.values()) { // do the same thing as OS_SNAPSHOTS. Eventually MemoryDBSnapshot @@ -397,6 +402,7 @@ private void emitMetrics(long currWindowStartTime) throws Exception { emitAdmissionControlMetrics(prevWindowStartTime, metricsDB); emitClusterManagerMetrics(prevWindowStartTime, metricsDB); emitClusterManagerThrottlingMetrics(prevWindowStartTime, metricsDB); + emitSearchBackPressureMetrics(prevWindowStartTime, metricsDB); metricsDB.commit(); metricsDBMap.put(prevWindowStartTime, metricsDB); @@ -594,6 +600,19 @@ private void emitClusterManagerThrottlingMetrics( } } + private void emitSearchBackPressureMetrics(long prevWindowStartTime, MetricsDB metricsDB) + throws Exception { + if (searchBackPressureMetricsMap.containsKey(prevWindowStartTime)) { + SearchBackPressureMetricsSnapShot prevSearchBPSnapShot = + searchBackPressureMetricsMap.get(prevWindowStartTime); + MetricsEmitter.emitSearchBackPressureMetrics(metricsDB, prevSearchBPSnapShot); + } else { + LOG.debug( + "Search Back Pressure snapshot does not exist for the previous window. " + + "Not emitting metrics."); + } + } + /** * OS, Request, Http and cluster_manager first aligns the currentTimeStamp with a 5 second * interval. In the current format, a file (previously a directory) is written every 5 seconds. @@ -679,6 +698,9 @@ is ready so it starts to read that file (go back two windows and EventProcessor admissionControlProcessor = AdmissionControlProcessor.build( currWindowStartTime, conn, admissionControlMetricsMap); + EventProcessor searchBackPressureMetricsProcessor = + SearchBackPressureMetricsProcessor.buildSearchBackPressureMetricsProcessor( + currWindowStartTime, conn, searchBackPressureMetricsMap); // The event dispatcher dispatches events to each of the registered event processors. // In addition to event processing each processor has an initialize/finalize function that @@ -702,6 +724,7 @@ is ready so it starts to read that file (go back two windows and eventDispatcher.registerEventProcessor(faultDetectionProcessor); eventDispatcher.registerEventProcessor(garbageCollectorInfoProcessor); eventDispatcher.registerEventProcessor(admissionControlProcessor); + eventDispatcher.registerEventProcessor(searchBackPressureMetricsProcessor); eventDispatcher.initializeProcessing( currWindowStartTime, currWindowStartTime + MetricsConfiguration.SAMPLING_INTERVAL); diff --git a/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsProcessor.java b/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsProcessor.java new file mode 100644 index 000000000..6e9547754 --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsProcessor.java @@ -0,0 +1,193 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.reader; + + +import java.sql.Connection; +import java.util.ArrayList; +import java.util.Map; +import java.util.NavigableMap; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.jooq.BatchBindStep; +import org.opensearch.performanceanalyzer.commons.event_process.Event; +import org.opensearch.performanceanalyzer.commons.event_process.EventProcessor; +import org.opensearch.performanceanalyzer.commons.metrics.AllMetrics; +import org.opensearch.performanceanalyzer.commons.metrics.PerformanceAnalyzerMetrics; +import org.opensearch.performanceanalyzer.commons.util.JsonConverter; + +public class SearchBackPressureMetricsProcessor implements EventProcessor { + + private static final Logger LOG = + LogManager.getLogger(SearchBackPressureMetricsProcessor.class); + + // instance of SearchBackPressureMetricsSnapShot to interact with the backend db + private SearchBackPressureMetricsSnapShot searchBackPressureMetricsSnapShot; + + // entry point for batch queries + private BatchBindStep handle; + + // normally starTime and endTime are gapped by 5 seconds (default sampling interval) + private long startTime; + private long endTime; + + private SearchBackPressureMetricsProcessor( + SearchBackPressureMetricsSnapShot searchBackPressureMetricsSnapShot) { + this.searchBackPressureMetricsSnapShot = searchBackPressureMetricsSnapShot; + } + + /* + * if current SnapShotMap has the snapshot for currentWindowStartTime, use the snapshot to build the processor + * else create a new Instance of SearchBackPressureMetricsSnapShot to initialize the processor + */ + static SearchBackPressureMetricsProcessor buildSearchBackPressureMetricsProcessor( + long currentWindowStartTime, + Connection connection, + NavigableMap + searchBackPressureSnapshotNavigableMap) { + // if current metrics is in searchBackPressureSnapshotNavigableMap map + if (searchBackPressureSnapshotNavigableMap.get(currentWindowStartTime) == null) { + SearchBackPressureMetricsSnapShot searchBackPressureMetricsSnapShot = + new SearchBackPressureMetricsSnapShot(connection, currentWindowStartTime); + searchBackPressureSnapshotNavigableMap.put( + currentWindowStartTime, searchBackPressureMetricsSnapShot); + return new SearchBackPressureMetricsProcessor(searchBackPressureMetricsSnapShot); + } + return new SearchBackPressureMetricsProcessor( + searchBackPressureSnapshotNavigableMap.get(currentWindowStartTime)); + } + + @Override + public void initializeProcessing(long startTime, long endTime) { + this.startTime = startTime; + this.endTime = endTime; + this.handle = searchBackPressureMetricsSnapShot.startBatchPut(); + } + + @Override + public void finalizeProcessing() { + if (handle.size() > 0) { + handle.execute(); + } + } + + @Override + public boolean shouldProcessEvent(Event event) { + return event.key.contains(PerformanceAnalyzerMetrics.sSearchBackPressureMetricsPath); + } + + @Override + public void commitBatchIfRequired() { + if (handle.size() >= BATCH_LIMIT) { + handle.execute(); + handle = searchBackPressureMetricsSnapShot.startBatchPut(); + } + } + + // Handler method for incoming events + private void handleSearchBackPressureEvent(String eventValue) { + String[] lines = eventValue.split(System.lineSeparator()); + if (lines.length < 2) { + throw new RuntimeException("Missing SearchBackPressure Metrics payload and timestamp."); + } + + // Parse metrics payload + parseJsonLine(lines[1]); + } + + private void parseJsonLine(final String jsonString) { + Map map = JsonConverter.createMapFrom(jsonString); + + if (map.isEmpty()) { + throw new RuntimeException("Missing SearchBackPressure Metrics payload."); + } + + // A list of dims to be collected + ArrayList required_searchbp_dims = + new ArrayList() { + { + // Shard/Task Stats Cancellation Count + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_CANCELLATIONCOUNT + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_CANCELLATIONCOUNT + .toString()); + + // Shard Stats Resource Heap / CPU Usage + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_CANCELLATIONCOUNT + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_CURRENTMAX + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_ROLLINGAVG + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CURRENTMAX + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CURRENTAVG + .toString()); + + // Task Stats Resource Heap / CPU Usage + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_CANCELLATIONCOUNT + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_CURRENTMAX + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_ROLLINGAVG + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CURRENTMAX + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CURRENTAVG + .toString()); + } + }; + + Object[] bindVals = new Object[required_searchbp_dims.size()]; + int idx = 0; + for (String dimension : required_searchbp_dims) { + bindVals[idx++] = map.get(dimension); + } + + handle.bind(bindVals); + } + + @Override + public void processEvent(Event event) { + // Handler method for incoming event + handleSearchBackPressureEvent(event.value); + + // commit Batch queries is overflow the limit + commitBatchIfRequired(); + } +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShot.java b/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShot.java new file mode 100644 index 000000000..bcf5c09ad --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShot.java @@ -0,0 +1,188 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.reader; + + +import java.sql.Connection; +import java.util.ArrayList; +import java.util.List; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.jooq.BatchBindStep; +import org.jooq.DSLContext; +import org.jooq.Field; +import org.jooq.Record; +import org.jooq.Result; +import org.jooq.SQLDialect; +import org.jooq.impl.DSL; +import org.opensearch.performanceanalyzer.commons.metrics.AllMetrics; + +/* + * SearchBackPressure cluster/node-level RCA would consume these data in the snapshots and determine whether the search back pressure service + * has cancelled too much/ too less requests, by comparing with predefined threshold. + */ +public class SearchBackPressureMetricsSnapShot implements Removable { + + // Logger for current class + private static final Logger LOG = LogManager.getLogger(SearchBackPressureMetricsSnapShot.class); + + // entry point to interact with SQLite db + private final DSLContext create; + + /* + * This is a tmp table created to populate searchbp stats + * table name is the search_back_pressure_ + windowStartTime + */ + private final String tableName; + + /* columns are the key metrics to be collected (e.g. shar-level search back pressure cancellation count) + */ + private List> columns; + + // Create a table with specifed fields (columns) + public SearchBackPressureMetricsSnapShot(Connection conn, Long windowStartTime) { + this.create = DSL.using(conn, SQLDialect.SQLITE); + this.tableName = "search_back_pressure_" + windowStartTime; + + // Add the ControllerName, searchbp_mode columns in the table + this.columns = + new ArrayList>() { + { + // Shard/Task Stats Cancellation Count + this.add( + DSL.field( + DSL.name( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_CANCELLATIONCOUNT + .toString()), + Integer.class)); + this.add( + DSL.field( + DSL.name( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_CANCELLATIONCOUNT + .toString()), + Integer.class)); + + // Shard Stats Resource Heap / CPU Usage + this.add( + DSL.field( + DSL.name( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_CANCELLATIONCOUNT + .toString()), + Integer.class)); + this.add( + DSL.field( + DSL.name( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_CURRENTMAX + .toString()), + Integer.class)); + this.add( + DSL.field( + DSL.name( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_ROLLINGAVG + .toString()), + Long.class)); + this.add( + DSL.field( + DSL.name( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT + .toString()), + Integer.class)); + + this.add( + DSL.field( + DSL.name( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CURRENTMAX + .toString()), + Integer.class)); + this.add( + DSL.field( + DSL.name( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CURRENTAVG + .toString()), + Long.class)); + + // Task Stats Resource Heap / CPU Usage + this.add( + DSL.field( + DSL.name( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_CANCELLATIONCOUNT + .toString()), + Integer.class)); + this.add( + DSL.field( + DSL.name( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_CURRENTMAX + .toString()), + Integer.class)); + this.add( + DSL.field( + DSL.name( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_ROLLINGAVG + .toString()), + Long.class)); + this.add( + DSL.field( + DSL.name( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT + .toString()), + Integer.class)); + + this.add( + DSL.field( + DSL.name( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CURRENTMAX + .toString()), + Integer.class)); + this.add( + DSL.field( + DSL.name( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CURRENTAVG + .toString()), + Long.class)); + } + }; + + // create table with columns specified + create.createTable(tableName).columns(columns).execute(); + } + + public DSLContext getDSLContext() { + return create; + } + + public BatchBindStep startBatchPut() { + // Add dummy values because jooq requires this to support multiple bind statements with + // single insert query + List dummyValues = new ArrayList<>(); + for (int i = 0; i < columns.size(); i++) { + dummyValues.add(null); + } + return create.batch(create.insertInto(DSL.table(this.tableName)).values(dummyValues)); + } + + public Result fetchAll() { + return create.select().from(DSL.table(tableName)).fetch(); + } + + @Override + public void remove() throws Exception { + create.dropTable(DSL.table(tableName)).execute(); + } +} diff --git a/src/test/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsProcessorTest.java b/src/test/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsProcessorTest.java new file mode 100644 index 000000000..575712a2d --- /dev/null +++ b/src/test/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsProcessorTest.java @@ -0,0 +1,161 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.reader; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.util.NavigableMap; +import java.util.TreeMap; +import org.jooq.Record; +import org.jooq.Result; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.opensearch.performanceanalyzer.commons.event_process.Event; +import org.opensearch.performanceanalyzer.commons.metrics.AllMetrics; +import org.opensearch.performanceanalyzer.commons.metrics.PerformanceAnalyzerMetrics; + +public class SearchBackPressureMetricsProcessorTest { + private static final String DB_URL = "jdbc:sqlite:"; + // private static final String TEST_MEM_POOL = "testMemPool"; + // private static final String COLLECTOR_NAME = "testCollectorName"; + private static final String SEARCH_BACK_PRESSURE_STATS_KEY = "search_back_pressure_stats"; + private SearchBackPressureMetricsProcessor searchBackPressureMetricsProcessor; + private long currTimeStamp; + + private NavigableMap searchBackPressureStatsMap; + Connection conn; + + // mock SearchBackPressureStatsCollector to test Event processing + private static final String SERIALIZED_EVENT = + "{\"searchbp_shard_stats_cancellationCount\":2," + + "\"searchbp_shard_stats_limitReachedCount\":2," + + "\"searchbp_shard_stats_resource_heap_usage_cancellationCount\":3," + + "\"searchbp_shard_stats_resource_heap_usage_currentMax\":3," + + "\"searchbp_shard_stats_resource_heap_usage_rollingAvg\":3," + + "\"searchbp_shard_stats_resource_cpu_usage_cancellationCount\":5," + + "\"searchbp_shard_stats_resource_cpu_usage_currentMax\":5," + + "\"searchbp_shard_stats_resource_cpu_usage_currentAvg\":5," + + "\"searchbp_shard_stats_resource_elaspedtime_usage_cancellationCount\":2," + + "\"searchbp_shard_stats_resource_elaspedtime_usage_currentMax\":2," + + "\"searchbp_shard_stats_resource_elaspedtime_usage_currentAvg\":2," + + "\"searchbp_task_stats_cancellationCount\":0," + + "\"searchbp_task_stats_limitReachedCount\":0," + + "\"searchbp_task_stats_resource_heap_usage_cancellationCount\":0," + + "\"searchbp_task_stats_resource_heap_usage_currentMax\":0," + + "\"searchbp_task_stats_resource_heap_usage_rollingAvg\":0," + + "\"searchbp_task_stats_resource_cpu_usage_cancellationCount\":0," + + "\"searchbp_task_stats_resource_cpu_usage_currentMax\":0," + + "\"searchbp_task_stats_resource_cpu_usage_currentAvg\":0," + + "\"searchbp_task_stats_resource_elaspedtime_usage_cancellationCount\":0," + + "\"searchbp_task_stats_resource_elaspedtime_usage_currentMax\":0," + + "\"searchbp_task_stats_resource_elaspedtime_usage_currentAvg\":0," + + "\"searchbp_mode\":\"MONITOR_ONLY\"," + + "\"searchbp_nodeid\":\"FgNAAAQQQDSROABCDEFHTX\"}"; + + @Before + public void setup() throws Exception { + Class.forName("org.sqlite.JDBC"); + System.setProperty("java.io.tmpdir", "/tmp"); + conn = DriverManager.getConnection(DB_URL); + this.currTimeStamp = System.currentTimeMillis(); + this.searchBackPressureStatsMap = new TreeMap<>(); + this.searchBackPressureMetricsProcessor = + searchBackPressureMetricsProcessor.buildSearchBackPressureMetricsProcessor( + currTimeStamp, conn, searchBackPressureStatsMap); + } + + // Test valid case of the handleSearchBackPressureEvent() + @Test + public void testSearchBackPressureProcessEvent() throws Exception { + // Create a SearchBackPressureEvent + Event testEvent = buildTestSearchBackPressureStatsEvent(); + + // Test the SearchBackPressureMetricsSnapShot + searchBackPressureMetricsProcessor.initializeProcessing( + this.currTimeStamp, System.currentTimeMillis()); + assertTrue(searchBackPressureMetricsProcessor.shouldProcessEvent(testEvent)); + + searchBackPressureMetricsProcessor.processEvent(testEvent); + searchBackPressureMetricsProcessor.finalizeProcessing(); + + SearchBackPressureMetricsSnapShot currSnapshot = + searchBackPressureStatsMap.get(this.currTimeStamp); + Result result = currSnapshot.fetchAll(); + assertEquals(1, result.size()); + + // SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_ROLLINGAVG value is 3L according to the + // SERIALIZED_EVENT, should EQUAL + Assert.assertEquals( + 3L, + result.get(0) + .get( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_ROLLINGAVG + .toString())); + // SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT value is 0L according to the + // SERIALIZED_EVENT, should EQUAL + Assert.assertEquals( + 0, + result.get(0) + .get( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT + .toString())); + + // SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT value is 0L according to the + // SERIALIZED_EVENT, should NOT EQUAL + Assert.assertNotEquals( + 2, + result.get(0) + .get( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT + .toString())); + } + + @Test + public void testEmptySearchBackPressureProcessEvent() throws Exception { + // Create a SearchBackPressureEvent + Event testEvent = buildEmptyTestSearchBackPressureStatsEvent(); + + // Test the SearchBackPressureMetricsSnapShot + searchBackPressureMetricsProcessor.initializeProcessing( + this.currTimeStamp, System.currentTimeMillis()); + assertTrue(searchBackPressureMetricsProcessor.shouldProcessEvent(testEvent)); + + try { + searchBackPressureMetricsProcessor.processEvent(testEvent); + Assert.assertFalse( + "Negative scenario test: Should catch a RuntimeException and skip this test", + true); + } catch (RuntimeException ex) { + // should catch the exception and the previous assertion should not be executed + } + } + + private Event buildTestSearchBackPressureStatsEvent() { + StringBuilder str = new StringBuilder(); + str.append(PerformanceAnalyzerMetrics.getJsonCurrentMilliSeconds()) + .append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor); + + str.append(SERIALIZED_EVENT).append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor); + return new Event( + SEARCH_BACK_PRESSURE_STATS_KEY, str.toString(), System.currentTimeMillis()); + } + + private Event buildEmptyTestSearchBackPressureStatsEvent() { + StringBuilder str = new StringBuilder(); + str.append(PerformanceAnalyzerMetrics.getJsonCurrentMilliSeconds()) + .append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor); + + return new Event( + SEARCH_BACK_PRESSURE_STATS_KEY, str.toString(), System.currentTimeMillis()); + } +} diff --git a/src/test/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShotTest.java b/src/test/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShotTest.java new file mode 100644 index 000000000..2e88aa574 --- /dev/null +++ b/src/test/java/org/opensearch/performanceanalyzer/reader/SearchBackPressureMetricsSnapShotTest.java @@ -0,0 +1,136 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.reader; + +import static org.junit.Assert.assertEquals; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.util.ArrayList; +import org.jooq.BatchBindStep; +import org.jooq.Record; +import org.jooq.Result; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.opensearch.performanceanalyzer.commons.metrics.AllMetrics; + +public class SearchBackPressureMetricsSnapShotTest { + private static final String DB_URL = "jdbc:sqlite:"; + private Connection conn; + SearchBackPressureMetricsSnapShot snapshot; + + ArrayList required_searchbp_dims = + new ArrayList() { + { + // Shard/Task Stats Cancellation Count + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_CANCELLATIONCOUNT + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_CANCELLATIONCOUNT + .toString()); + + // Shard Stats Resource Heap / CPU Usage + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_CANCELLATIONCOUNT + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_CURRENTMAX + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_ROLLINGAVG + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CURRENTMAX + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CURRENTAVG + .toString()); + + // Task Stats Resource Heap / CPU Usage + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_CANCELLATIONCOUNT + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_CURRENTMAX + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_ROLLINGAVG + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CURRENTMAX + .toString()); + this.add( + AllMetrics.SearchBackPressureStatsValue + .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CURRENTAVG + .toString()); + } + }; + + @Before + public void setup() throws Exception { + Class.forName("org.sqlite.JDBC"); + System.setProperty("java.io.tmpdir", "/tmp"); + conn = DriverManager.getConnection(DB_URL); + snapshot = new SearchBackPressureMetricsSnapShot(conn, System.currentTimeMillis()); + } + + @Test + public void testReadSearchBackPressureMetricsSnapshot() throws Exception { + final BatchBindStep handle = snapshot.startBatchPut(); + insertIntoTable(handle); + + final Result result = snapshot.fetchAll(); + + assertEquals(1, result.size()); + // for 14 (length of required_searchbp_dims) fields, each assign a value from 0 to 13 + // test each field and verify the result + for (int i = 0; i < required_searchbp_dims.size(); i++) { + Assert.assertEquals( + AllMetrics.SearchBackPressureStatsValue.SEARCHBP_SHARD_STATS_CANCELLATIONCOUNT + .toString() + + " should be " + + String.valueOf(i), + i, + ((Number) result.get(0).get(required_searchbp_dims.get(i))).intValue()); + } + } + + @After + public void tearDown() throws Exception { + conn.close(); + } + + private void insertIntoTable(BatchBindStep handle) { + Object[] bindVals = new Object[required_searchbp_dims.size()]; + for (int i = 0; i < required_searchbp_dims.size(); i++) { + bindVals[i] = Integer.valueOf(i); + } + + handle.bind(bindVals).execute(); + } +}