Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Only emit modify cache actions if cache's actual usage is above lower bound #562

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ public static final class Builder {

private Long currentCacheMaxSizeInBytes;
private Long desiredCacheMaxSizeInBytes;
private Long currentCacheActualSizeInByte;
private Long heapMaxSizeInBytes;
private final long upperBoundInBytes;
private final long lowerBoundInBytes;
Expand All @@ -197,6 +198,8 @@ private Builder(

this.currentCacheMaxSizeInBytes = NodeConfigCacheReaderUtil.readCacheMaxSizeInBytes(
appContext.getNodeConfigCache(), esNode, cacheType);
this.currentCacheActualSizeInByte = NodeConfigCacheReaderUtil.readCacheActualSizeInBytes(
appContext.getNodeConfigCache(), esNode, cacheType);
this.heapMaxSizeInBytes = NodeConfigCacheReaderUtil.readHeapMaxSizeInBytes(
appContext.getNodeConfigCache(), esNode);
this.desiredCacheMaxSizeInBytes = null;
Expand Down Expand Up @@ -261,6 +264,12 @@ public ModifyCacheMaxSizeAction build() {
// Ensure desired cache max size is within thresholds
desiredCacheMaxSizeInBytes = Math.max(Math.min(desiredCacheMaxSizeInBytes, upperBoundInBytes), lowerBoundInBytes);

//ensure that we do not issue action to lower cache size if the actual cache size usage
//is below the target lower bound in each bucket level
if (!this.isIncrease && this.currentCacheActualSizeInByte != null
&& this.currentCacheActualSizeInByte <= lowerBoundInBytes) {
this.canUpdate = false;
}
return new ModifyCacheMaxSizeAction(esNode, cacheType, appContext,
desiredCacheMaxSizeInBytes, currentCacheMaxSizeInBytes, coolOffPeriodInMillis, canUpdate);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,23 @@ public static double readDataFromSqlResult(Result<Record> result, Field<String>
}
return ret;
}

/**
* Sums up the SUM field of all tuples in the SQL result
* @param records the record result from SQL query
* @return sum value
*/
public static Double readSumFromSqlResult(final Result<Record> records) {
if (records == null) {
LOG.error("sql result is null");
return Double.NaN;
}
double size = 0;
// since the flow unit data is aggregated by index, summing the size across indices
if (records.size() > 0) {
size = records.stream().mapToDouble(
record -> record.getValue(MetricsDB.SUM, Double.class)).sum();
}
return size;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ public class ResourceUtil {
public static final Resource FIELD_DATA_CACHE_MAX_SIZE = Resource.newBuilder()
.setResourceEnum(ResourceEnum.FIELD_DATA_CACHE)
.setMetricEnum(MetricEnum.CACHE_MAX_SIZE).build();
public static final Resource FIELD_DATA_CACHE_ACTUAL_SIZE = Resource.newBuilder()
.setResourceEnum(ResourceEnum.FIELD_DATA_CACHE)
.setMetricEnum(MetricEnum.CACHE_ACTUAL_SIZE).build();
public static final Resource SHARD_REQUEST_CACHE_EVICTION = Resource.newBuilder()
.setResourceEnum(ResourceEnum.SHARD_REQUEST_CACHE)
.setMetricEnum(MetricEnum.CACHE_EVICTION).build();
Expand All @@ -98,6 +101,9 @@ public class ResourceUtil {
public static final Resource SHARD_REQUEST_CACHE_MAX_SIZE = Resource.newBuilder()
.setResourceEnum(ResourceEnum.SHARD_REQUEST_CACHE)
.setMetricEnum(MetricEnum.CACHE_MAX_SIZE).build();
public static final Resource SHARD_REQUEST_CACHE_ACTUAL_SIZE = Resource.newBuilder()
.setResourceEnum(ResourceEnum.SHARD_REQUEST_CACHE)
.setMetricEnum(MetricEnum.CACHE_ACTUAL_SIZE).build();

/**
* Read the resourceType name from the ResourceType object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,20 +239,12 @@ public void construct() {
cacheMaxSize.addTag(TAG_LOCUS, LOCUS_DATA_MASTER_NODE);
addLeaf(cacheMaxSize);

NodeConfigCollector nodeConfigCollector = new NodeConfigCollector(RCA_PERIOD, queueCapacity, cacheMaxSize, heapMax);
nodeConfigCollector.addTag(TAG_LOCUS, LOCUS_DATA_MASTER_NODE);
nodeConfigCollector.addAllUpstreams(Arrays.asList(queueCapacity, cacheMaxSize, heapMax));
NodeConfigClusterCollector nodeConfigClusterCollector = new NodeConfigClusterCollector(nodeConfigCollector);
nodeConfigClusterCollector.addTag(TAG_LOCUS, LOCUS_MASTER_NODE);
nodeConfigClusterCollector.addAllUpstreams(Collections.singletonList(nodeConfigCollector));
nodeConfigClusterCollector.addTag(TAG_AGGREGATE_UPSTREAM, LOCUS_DATA_NODE);

// Field Data Cache RCA
Metric fieldDataCacheEvictions = new Cache_FieldData_Eviction(EVALUATION_INTERVAL_SECONDS);
fieldDataCacheEvictions.addTag(TAG_LOCUS, LOCUS_DATA_MASTER_NODE);
addLeaf(fieldDataCacheEvictions);

Metric fieldDataCacheSizeGroupByOperation = new AggregateMetric(EVALUATION_INTERVAL_SECONDS,
AggregateMetric fieldDataCacheSizeGroupByOperation = new AggregateMetric(EVALUATION_INTERVAL_SECONDS,
Cache_FieldData_Size.NAME,
AggregateFunction.SUM,
MetricsDB.MAX, ShardStatsDerivedDimension.INDEX_NAME.toString());
Expand All @@ -278,7 +270,7 @@ public void construct() {
shardRequestHits.addTag(TAG_LOCUS, LOCUS_DATA_MASTER_NODE);
addLeaf(shardRequestHits);

Metric shardRequestCacheSizeGroupByOperation = new AggregateMetric(EVALUATION_INTERVAL_SECONDS,
AggregateMetric shardRequestCacheSizeGroupByOperation = new AggregateMetric(EVALUATION_INTERVAL_SECONDS,
Cache_Request_Size.NAME,
AggregateFunction.SUM,
MetricsDB.MAX, ShardStatsDerivedDimension.INDEX_NAME.toString());
Expand All @@ -298,6 +290,16 @@ public void construct() {
shardRequestCacheClusterRca.addAllUpstreams(Collections.singletonList(shardRequestCacheNodeRca));
shardRequestCacheClusterRca.addTag(TAG_AGGREGATE_UPSTREAM, LOCUS_DATA_NODE);

//node config collector
NodeConfigCollector nodeConfigCollector = new NodeConfigCollector(RCA_PERIOD, queueCapacity, cacheMaxSize, heapMax,
fieldDataCacheSizeGroupByOperation, shardRequestCacheSizeGroupByOperation);
nodeConfigCollector.addTag(TAG_LOCUS, LOCUS_DATA_MASTER_NODE);
nodeConfigCollector.addAllUpstreams(Arrays.asList(queueCapacity, cacheMaxSize, heapMax));
NodeConfigClusterCollector nodeConfigClusterCollector = new NodeConfigClusterCollector(nodeConfigCollector);
nodeConfigClusterCollector.addTag(TAG_LOCUS, LOCUS_MASTER_NODE);
nodeConfigClusterCollector.addAllUpstreams(Collections.singletonList(nodeConfigCollector));
nodeConfigClusterCollector.addTag(TAG_AGGREGATE_UPSTREAM, LOCUS_DATA_NODE);

// Cache Health Decider
CacheHealthDecider cacheHealthDecider = new CacheHealthDecider(
EVALUATION_INTERVAL_SECONDS, 12, fieldDataCacheClusterRca, shardRequestCacheClusterRca, highHeapUsageClusterRca);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.metrics.ThreadPool_QueueCapacity;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.persist.SQLParsingUtil;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.metric.AggregateMetric;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cache.CacheUtil;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey;
import java.util.HashMap;
import org.apache.logging.log4j.LogManager;
Expand All @@ -48,17 +50,23 @@ public class NodeConfigCollector extends EsConfigNode {
private final ThreadPool_QueueCapacity threadPool_queueCapacity;
private final Cache_Max_Size cacheMaxSize;
private final Heap_Max heapMaxSize;
private final AggregateMetric fieldDataCacheSizeGroupByAggregation;
private final AggregateMetric shardRequestCacheSizeGroupByAggregation;
private final int rcaPeriod;
private int counter;
private final HashMap<Resource, Double> configResult;

public NodeConfigCollector(int rcaPeriod,
ThreadPool_QueueCapacity threadPool_queueCapacity,
Cache_Max_Size cacheMaxSize,
Heap_Max heapMaxSize) {
Heap_Max heapMaxSize,
AggregateMetric fieldDataCacheSizeGroupByAggregation,
AggregateMetric shardRequestCacheSizeGroupByAggregation) {
Comment on lines +63 to +64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we choose to add the FieldData and Request cache usage to Node Config Cache ?
Both of these will keep changing frequently and will require frequent cache updated. Why not use the Cache_FieldData_Size and Cache_Request_Size metrics available to us ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because the cache modify action is generated on master so it needs to collect cache size usage from each data node. We currently use NodeConfigCache as the temp storage layer to for those node level metics. Reading the Cache_FieldData_Size on master will provide cache usage on master only which is not what we want.

this.threadPool_queueCapacity = threadPool_queueCapacity;
this.cacheMaxSize = cacheMaxSize;
this.heapMaxSize = heapMaxSize;
this.fieldDataCacheSizeGroupByAggregation = fieldDataCacheSizeGroupByAggregation;
this.shardRequestCacheSizeGroupByAggregation = shardRequestCacheSizeGroupByAggregation;
this.rcaPeriod = rcaPeriod;
this.counter = 0;
this.configResult = new HashMap<>();
Expand All @@ -84,6 +92,16 @@ private void collectCacheMaxSize(MetricFlowUnit cacheMaxSize) {
collectAndPublishMetric(ResourceUtil.SHARD_REQUEST_CACHE_MAX_SIZE, shardRequestCacheMaxSize);
}

private void collectFieldDataCacheActualSize(MetricFlowUnit fieldDataCacheSizeGroupByOperation) {
final double fieldDataCacheActualSize = SQLParsingUtil.readSumFromSqlResult(fieldDataCacheSizeGroupByOperation.getData());
collectAndPublishMetric(ResourceUtil.FIELD_DATA_CACHE_ACTUAL_SIZE, fieldDataCacheActualSize);
}

private void collectShardRequestCacheActualSize(MetricFlowUnit shardRequestCacheSizeGroupByOperation) {
final double shardRequestCacheActualSize = SQLParsingUtil.readSumFromSqlResult(shardRequestCacheSizeGroupByOperation.getData());
collectAndPublishMetric(ResourceUtil.SHARD_REQUEST_CACHE_ACTUAL_SIZE, shardRequestCacheActualSize);
}

private void collectHeapStats(MetricFlowUnit heapMax) {
// total maximum heap size
final double heapMaxSize = SQLParsingUtil.readDataFromSqlResult(heapMax.getData(),
Expand Down Expand Up @@ -140,6 +158,18 @@ public NodeConfigFlowUnit operate() {
}
collectHeapStats(flowUnit);
}
for (MetricFlowUnit flowUnit : fieldDataCacheSizeGroupByAggregation.getFlowUnits()) {
if (flowUnit.isEmpty()) {
continue;
}
collectFieldDataCacheActualSize(flowUnit);
}
for (MetricFlowUnit flowUnit : shardRequestCacheSizeGroupByAggregation.getFlowUnits()) {
if (flowUnit.isEmpty()) {
continue;
}
collectShardRequestCacheActualSize(flowUnit);
}

if (counter == rcaPeriod) {
counter = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metricsdb.MetricsDB;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Metric;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.MetricFlowUnit;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.persist.SQLParsingUtil;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -33,27 +34,20 @@ public class CacheUtil {
public static final long MB_TO_BYTES = KB_TO_BYTES * 1024;
public static final long GB_TO_BYTES = MB_TO_BYTES * 1024;

public static Double getTotalSizeInKB(final Metric cacheSizeGroupByOperation) {
double totalSizeInKB = 0;
public static Double getTotalSizeInKB(final Metric cacheSizeGroupByOperation) throws IllegalArgumentException {
double totalSizeInByte = 0;

if (cacheSizeGroupByOperation.getFlowUnits().size() > 0) {
// we expect the Metric to have single flow unit since it is consumed locally
MetricFlowUnit flowUnit = cacheSizeGroupByOperation.getFlowUnits().get(0);
if (flowUnit.isEmpty() || flowUnit.getData() == null) {
return totalSizeInKB;
}

// since the flow unit data is aggregated by index, summing the size across indices
if (flowUnit.getData().size() > 0) {
Result<Record> records = flowUnit.getData();
double size = records.stream().mapToDouble(
record -> record.getValue(MetricsDB.SUM, Double.class)).sum();
totalSizeInKB += getSizeInKB(size);
return totalSizeInByte;
}
totalSizeInByte = SQLParsingUtil.readSumFromSqlResult(flowUnit.getData());
}

if (!Double.isNaN(totalSizeInKB)) {
return totalSizeInKB;
if (!Double.isNaN(totalSizeInByte)) {
return totalSizeInByte / ((double) KB_TO_BYTES);
} else {
throw new IllegalArgumentException("invalid value: {} in getTotalSizeInKB" + Float.NaN);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,19 @@ public static Long readCacheMaxSizeInBytes(
return null;
}

public static Long readCacheActualSizeInBytes(
final NodeConfigCache nodeConfigCache, final NodeKey esNode, final ResourceEnum cacheType) {
try {
if (cacheType.equals(ResourceEnum.FIELD_DATA_CACHE)) {
return (long) nodeConfigCache.get(esNode, ResourceUtil.FIELD_DATA_CACHE_ACTUAL_SIZE);
}
return (long) nodeConfigCache.get(esNode, ResourceUtil.SHARD_REQUEST_CACHE_ACTUAL_SIZE);
} catch (final IllegalArgumentException e) {
LOG.error("Exception while reading cache actual size from Node Config Cache", e);
}
return null;
}

public static Long readHeapMaxSizeInBytes(
final NodeConfigCache nodeConfigCache, final NodeKey esNode) {
try {
Expand Down
1 change: 1 addition & 0 deletions src/main/proto/inter_node_rpc_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ enum MetricEnum {
CACHE_EVICTION = 10 [(additional_fields).name = "cache eviction", (additional_fields).description = "cache eviction count"];
CACHE_HIT = 11 [(additional_fields).name = "cache hit", (additional_fields).description = "cache hit count"];
CACHE_MAX_SIZE = 12 [(additional_fields).name = "cache max size", (additional_fields).description = "max cache size in bytes"];
CACHE_ACTUAL_SIZE = 13 [(additional_fields).name = "cache actual size", (additional_fields).description = "cache actual size in bytes"];

// Heap
HEAP_MAX = 16 [(additional_fields).name = "heap max", (additional_fields).description = "max heap size in bytes"];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,33 @@ public void testNoAvailableAction() {
List<Action> actions = LevelOneActionBuilder.newBuilder(node, testAppContext, rcaConf).build();
Assert.assertEquals(0, actions.size());
}

@Test
public void testSuppressActionWhenCacheUsageIsLow() {
final double fielddataCacheSizeInPercent = 0.3;
final double shardRequestCacheSizeInPercent = 0.04;
dummyCache.put(node, ResourceUtil.FIELD_DATA_CACHE_MAX_SIZE,
(long)(heapMaxSizeInBytes * fielddataCacheSizeInPercent));
dummyCache.put(node, ResourceUtil.SHARD_REQUEST_CACHE_MAX_SIZE,
(long)(heapMaxSizeInBytes * shardRequestCacheSizeInPercent));
final double fielddataCacheUsageInPercent = 0.02;
final double shardRequestCacheUsageInPercent = 0.02;
dummyCache.put(node, ResourceUtil.FIELD_DATA_CACHE_ACTUAL_SIZE,
(long)(heapMaxSizeInBytes * fielddataCacheUsageInPercent));
dummyCache.put(node, ResourceUtil.SHARD_REQUEST_CACHE_ACTUAL_SIZE,
(long)(heapMaxSizeInBytes * shardRequestCacheUsageInPercent));
List<Action> actions = LevelOneActionBuilder.newBuilder(node, testAppContext, rcaConf).build();
deciderActionParser.addActions(actions);

Assert.assertEquals(1, actions.size());
long expectedCacheSize;
long currentCacheSize;
ModifyCacheMaxSizeAction requestCacheAction = deciderActionParser.readCacheAction(ResourceEnum.SHARD_REQUEST_CACHE);
Assert.assertNotNull(requestCacheAction);
expectedCacheSize =
(long) ((shardRequestCacheSizeInPercent - shardRequestCacheStepSize) * heapMaxSizeInBytes);
currentCacheSize = (long) (shardRequestCacheSizeInPercent * heapMaxSizeInBytes);
Assert.assertEquals(expectedCacheSize, requestCacheAction.getDesiredCacheMaxSizeInBytes(), 10);
Assert.assertEquals(currentCacheSize, requestCacheAction.getCurrentCacheMaxSizeInBytes(), 10);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.configs.CacheActionConfig;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.configs.QueueActionConfig;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.configs.jvm.LevelThreeActionBuilderConfig;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.jvm.old_gen.LevelOneActionBuilder;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.jvm.old_gen.LevelThreeActionBuilder;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.test_utils.DeciderActionParserUtil;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.ResourceEnum;
Expand Down Expand Up @@ -198,4 +199,38 @@ public void testDownSizeActionableResources() {
Assert.assertTrue(cacheClearAction.isActionable());
Assert.assertEquals(2, cacheClearAction.impactedNodes().size());
}

@Test
public void testSuppressActionWhenCacheUsageIsLow() {
final double fielddataCacheSizeInPercent = 0.3;
final double shardRequestCacheSizeInPercent = 0.04;
final int writeQueueSize = QueueActionConfig.DEFAULT_WRITE_QUEUE_LOWER_BOUND;
final int searchQueueSize = 2000;
dummyCache.put(node, ResourceUtil.FIELD_DATA_CACHE_MAX_SIZE,
(long)(heapMaxSizeInBytes * fielddataCacheSizeInPercent));
dummyCache.put(node, ResourceUtil.SHARD_REQUEST_CACHE_MAX_SIZE,
(long)(heapMaxSizeInBytes * shardRequestCacheSizeInPercent));
final double fielddataCacheUsageInPercent = 0.02;
final double shardRequestCacheUsageInPercent = 0.02;
dummyCache.put(node, ResourceUtil.FIELD_DATA_CACHE_ACTUAL_SIZE,
(long)(heapMaxSizeInBytes * fielddataCacheUsageInPercent));
dummyCache.put(node, ResourceUtil.SHARD_REQUEST_CACHE_ACTUAL_SIZE,
(long)(heapMaxSizeInBytes * shardRequestCacheUsageInPercent));
dummyCache.put(node, ResourceUtil.WRITE_QUEUE_CAPACITY, writeQueueSize);
dummyCache.put(node, ResourceUtil.SEARCH_QUEUE_CAPACITY, searchQueueSize);
List<Action> actions = LevelThreeActionBuilder.newBuilder(node, testAppContext, rcaConf).build();
deciderActionParser.addActions(actions);

Assert.assertEquals(3, actions.size());
long expectedCacheSize;
long currentCacheSize;
ModifyCacheMaxSizeAction requestCacheAction = deciderActionParser.readCacheAction(ResourceEnum.SHARD_REQUEST_CACHE);
Assert.assertNotNull(requestCacheAction);
expectedCacheSize = (long) (CacheActionConfig.DEFAULT_SHARD_REQUEST_CACHE_LOWER_BOUND * heapMaxSizeInBytes);
currentCacheSize = (long) (shardRequestCacheSizeInPercent * heapMaxSizeInBytes);
Assert.assertEquals(expectedCacheSize, requestCacheAction.getDesiredCacheMaxSizeInBytes(), 10);
Assert.assertEquals(currentCacheSize, requestCacheAction.getCurrentCacheMaxSizeInBytes(), 10);
ModifyCacheMaxSizeAction fielddatatCacheAction = deciderActionParser.readCacheAction(ResourceEnum.FIELD_DATA_CACHE);
Assert.assertNull(fielddatatCacheAction);
}
}
Loading