Skip to content

Commit

Permalink
Add search request timeouts for correlations workflows (#893)
Browse files Browse the repository at this point in the history
* Reinstating more leaks plugged-in for correlations workflows

Signed-off-by: Megha Goyal <[email protected]>

* Add search timeouts to all correlation searches

Signed-off-by: Megha Goyal <[email protected]>

* Fix logging and exception messages

Signed-off-by: Megha Goyal <[email protected]>

* Change search timeout to 30 seconds

Signed-off-by: Megha Goyal <[email protected]>

---------

Signed-off-by: Megha Goyal <[email protected]>
  • Loading branch information
goyamegh authored and riysaxen-amzn committed Mar 18, 2024
1 parent 63d266a commit 1f630c1
Showing 3 changed files with 78 additions and 53 deletions.
Original file line number Diff line number Diff line change
@@ -9,7 +9,10 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.join.ScoreMode;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.ActionListener;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.commons.alerting.model.DocLevelQuery;
import org.opensearch.core.action.ActionListener;
import org.opensearch.action.search.MultiSearchRequest;
import org.opensearch.action.search.MultiSearchResponse;
import org.opensearch.action.search.SearchRequest;
@@ -118,6 +121,7 @@ private void generateAutoCorrelations(Detector detector, Finding finding) throws
searchRequest.indices(DetectorMonitorConfig.getAllFindingsIndicesPattern(logTypeName));
searchRequest.source(sourceBuilder);
searchRequest.preference(Preference.PRIMARY_FIRST.type());
searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));
mSearchRequest.add(searchRequest);
}

@@ -199,6 +203,8 @@ private void onAutoCorrelations(Detector detector, Finding finding, Map<String,
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices(CorrelationRule.CORRELATION_RULE_INDEX);
searchRequest.source(searchSourceBuilder);
searchRequest.preference(Preference.PRIMARY_FIRST.type());
searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));

client.search(searchRequest, ActionListener.wrap(response -> {
if (response.isTimedOut()) {
@@ -253,7 +259,8 @@ private void getValidDocuments(String detectorType, List<String> indices, List<C
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices(indices.toArray(new String[]{}));
searchRequest.source(searchSourceBuilder);

searchRequest.preference(Preference.PRIMARY_FIRST.type());
searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));
validCorrelationRules.add(rule);
mSearchRequest.add(searchRequest);
}
@@ -351,6 +358,8 @@ private void searchFindingsByTimestamp(String detectorType, Map<String, List<Cor
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices(DetectorMonitorConfig.getAllFindingsIndicesPattern(categoryToQueries.getKey()));
searchRequest.source(searchSourceBuilder);
searchRequest.preference(Preference.PRIMARY_FIRST.type());
searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));
mSearchRequest.add(searchRequest);
categoryToQueriesPairs.add(new Pair<>(categoryToQueries.getKey(), categoryToQueries.getValue()));
}
@@ -413,7 +422,8 @@ private void searchDocsWithFilterKeys(String detectorType, Map<String, DocSearch
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices(docSearchCriteria.getValue().indices.toArray(new String[]{}));
searchRequest.source(searchSourceBuilder);

searchRequest.preference(Preference.PRIMARY_FIRST.type());
searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));
categories.add(docSearchCriteria.getKey());
mSearchRequest.add(searchRequest);
}
@@ -473,7 +483,8 @@ private void getCorrelatedFindings(String detectorType, Map<String, List<String>
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices(DetectorMonitorConfig.getAllFindingsIndicesPattern(relatedDocIds.getKey()));
searchRequest.source(searchSourceBuilder);

searchRequest.preference(Preference.PRIMARY_FIRST.type());
searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));
categories.add(relatedDocIds.getKey());
mSearchRequest.add(searchRequest);
}
Original file line number Diff line number Diff line change
@@ -30,6 +30,7 @@
import org.opensearch.securityanalytics.transport.TransportCorrelateFindingAction;
import org.opensearch.securityanalytics.util.CorrelationIndices;

import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -92,6 +93,7 @@ public void insertCorrelatedFindings(String detectorType, Finding finding, Strin
request.indices(CorrelationIndices.CORRELATION_HISTORY_INDEX_PATTERN_REGEXP);
request.source(searchSourceBuilder);
request.preference(Preference.PRIMARY_FIRST.type());
request.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));

mSearchRequest.add(request);
}
@@ -193,6 +195,12 @@ public void insertCorrelatedFindings(String detectorType, Finding finding, Strin
}

public void insertOrphanFindings(String detectorType, Finding finding, float timestampFeature, Map<String, CustomLogType> logTypes) {
if (logTypes.get(detectorType) == null ) {
log.debug("Missing detector type {} in the log types index for finding id {}. Keys in the index: {}",
detectorType, finding.getId(), Arrays.toString(logTypes.keySet().toArray()));
onFailure(new OpenSearchStatusException("insertOrphanFindings null log types for detector type: " + detectorType, RestStatus.INTERNAL_SERVER_ERROR));
}

SearchRequest searchRequest = getSearchMetadataIndexRequest(detectorType, finding, logTypes);
Map<String, Object> tags = logTypes.get(detectorType).getTags();
String correlationId = tags.get("correlation_id").toString();
@@ -248,7 +256,8 @@ public void insertOrphanFindings(String detectorType, Finding finding, float tim
onFailure(ex);
}
} else {
onFailure(new OpenSearchStatusException(indexResponse.toString(), RestStatus.INTERNAL_SERVER_ERROR));
onFailure(new OpenSearchStatusException("Indexing failed with response {} ",
indexResponse.status(), indexResponse.toString()));
}
}, this::onFailure));
} else {
@@ -294,7 +303,8 @@ public void insertOrphanFindings(String detectorType, Finding finding, float tim
onFailure(ex);
}
} else {
onFailure(new OpenSearchStatusException(indexResponse.toString(), RestStatus.INTERNAL_SERVER_ERROR));
onFailure(new OpenSearchStatusException("Indexing failed with response {} ",
indexResponse.status(), indexResponse.toString()));
}
}, this::onFailure));
} else {
@@ -321,6 +331,7 @@ public void insertOrphanFindings(String detectorType, Finding finding, float tim
request.indices(CorrelationIndices.CORRELATION_HISTORY_INDEX_PATTERN_REGEXP);
request.source(searchSourceBuilder);
request.preference(Preference.PRIMARY_FIRST.type());
request.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));

client.search(request, ActionListener.wrap(searchResponse -> {
if (searchResponse.isTimedOut()) {
@@ -405,6 +416,9 @@ public void insertOrphanFindings(String detectorType, Finding finding, float tim
} catch (Exception ex) {
onFailure(ex);
}
} else {
onFailure(new OpenSearchStatusException("Indexing failed with response {} ",
indexResponse.status(), indexResponse.toString()));
}
}, this::onFailure));
} catch (Exception ex) {
@@ -430,7 +444,7 @@ private void indexCorrelatedFindings(XContentBuilder builder) {
if (response.status().equals(RestStatus.CREATED)) {
correlateFindingAction.onOperation();
} else {
onFailure(new OpenSearchStatusException(response.toString(), RestStatus.INTERNAL_SERVER_ERROR));
onFailure(new OpenSearchStatusException("Indexing failed with response {} ", response.status(), response.toString()));
}
}, this::onFailure));
}
@@ -452,6 +466,7 @@ private SearchRequest getSearchMetadataIndexRequest(String detectorType, Finding
searchRequest.indices(CorrelationIndices.CORRELATION_METADATA_INDEX);
searchRequest.source(searchSourceBuilder);
searchRequest.preference(Preference.PRIMARY_FIRST.type());
searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));
return searchRequest;
}

Loading

0 comments on commit 1f630c1

Please sign in to comment.