From 9d7a4dd9cf485082eb674ae5e226b49c65b9943b Mon Sep 17 00:00:00 2001 From: Chase <62891993+engechas@users.noreply.github.com> Date: Thu, 4 Apr 2024 11:55:55 -0700 Subject: [PATCH] Allow detectors to be stopped if underlying workflow is deleted. Don't allow them to then be started/editted (#810) (#959) * Allow detectors to be stopped if underlying workflow is deleted. Don't allow them to then be started * Add copyright headers --------- Signed-off-by: Chase Engelbrecht --- .../securityanalytics/model/Detector.java | 4 + .../TransportDeleteDetectorAction.java | 57 +++++------- .../TransportIndexDetectorAction.java | 30 ++++++- .../util/ExceptionChecker.java | 26 ++++++ .../util/ThrowableCheckingPredicates.java | 34 ++++++++ .../resthandler/DetectorRestApiIT.java | 44 ++++++++++ .../util/ExceptionCheckerTests.java | 86 +++++++++++++++++++ .../ThrowableCheckingPredicatesTests.java | 43 ++++++++++ 8 files changed, 286 insertions(+), 38 deletions(-) create mode 100644 src/main/java/org/opensearch/securityanalytics/util/ExceptionChecker.java create mode 100644 src/main/java/org/opensearch/securityanalytics/util/ThrowableCheckingPredicates.java create mode 100644 src/test/java/org/opensearch/securityanalytics/util/ExceptionCheckerTests.java create mode 100644 src/test/java/org/opensearch/securityanalytics/util/ThrowableCheckingPredicatesTests.java diff --git a/src/main/java/org/opensearch/securityanalytics/model/Detector.java b/src/main/java/org/opensearch/securityanalytics/model/Detector.java index 46d3457a2..693aa65e6 100644 --- a/src/main/java/org/opensearch/securityanalytics/model/Detector.java +++ b/src/main/java/org/opensearch/securityanalytics/model/Detector.java @@ -572,6 +572,10 @@ public void setAlertsHistoryIndexPattern(String alertsHistoryIndexPattern) { this.alertsHistoryIndexPattern = alertsHistoryIndexPattern; } + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + public void setEnabledTime(Instant enabledTime) { this.enabledTime = enabledTime; } diff --git a/src/main/java/org/opensearch/securityanalytics/transport/TransportDeleteDetectorAction.java b/src/main/java/org/opensearch/securityanalytics/transport/TransportDeleteDetectorAction.java index 2371f84c0..bd821b6d6 100644 --- a/src/main/java/org/opensearch/securityanalytics/transport/TransportDeleteDetectorAction.java +++ b/src/main/java/org/opensearch/securityanalytics/transport/TransportDeleteDetectorAction.java @@ -42,9 +42,11 @@ import org.opensearch.securityanalytics.model.Detector; import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings; import org.opensearch.securityanalytics.util.DetectorIndices; +import org.opensearch.securityanalytics.util.ExceptionChecker; import org.opensearch.securityanalytics.util.MonitorService; import org.opensearch.securityanalytics.util.RuleTopicIndices; import org.opensearch.securityanalytics.util.SecurityAnalyticsException; +import org.opensearch.securityanalytics.util.ThrowableCheckingPredicates; import org.opensearch.securityanalytics.util.WorkflowService; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; @@ -61,6 +63,11 @@ public class TransportDeleteDetectorAction extends HandledTransportAction { private static final Logger log = LogManager.getLogger(TransportDeleteDetectorAction.class); + private static final List ACCEPTABLE_ENTITY_MISSING_THROWABLE_MATCHERS = List.of( + ThrowableCheckingPredicates.MONITOR_NOT_FOUND, + ThrowableCheckingPredicates.WORKFLOW_NOT_FOUND, + ThrowableCheckingPredicates.ALERTING_CONFIG_INDEX_NOT_FOUND + ); private final Client client; @@ -83,9 +90,13 @@ public class TransportDeleteDetectorAction extends HandledTransportAction responses) { @Override public void onFailure(Exception e) { - if (isOnlyWorkflowOrMonitorOrIndexMissingExceptionThrownByGroupedActionListener(e, detector.getId())) { + if (exceptionChecker.doesGroupedActionListenerExceptionMatch(e, ACCEPTABLE_ENTITY_MISSING_THROWABLE_MATCHERS)) { + logAcceptableEntityMissingException(e, detector.getId()); deleteDetectorFromConfig(detector.getId(), request.getRefreshPolicy()); } else { log.error(String.format(Locale.ROOT, "Failed to delete detector %s", detector.getId()), e); @@ -243,7 +256,8 @@ private void deleteWorkflow(Detector detector, ActionListener actionListener) { - if (isOnlyWorkflowOrMonitorOrIndexMissingExceptionThrownByGroupedActionListener(deleteWorkflowException, detectorId)) { + if (exceptionChecker.doesGroupedActionListenerExceptionMatch(deleteWorkflowException, ACCEPTABLE_ENTITY_MISSING_THROWABLE_MATCHERS)) { + logAcceptableEntityMissingException(deleteWorkflowException, detectorId); actionListener.onResponse(new AcknowledgedResponse(true)); } else { actionListener.onFailure(deleteWorkflowException); @@ -305,39 +319,12 @@ private void finishHim(String detectorId, Exception t) { } })); } - - private boolean isOnlyWorkflowOrMonitorOrIndexMissingExceptionThrownByGroupedActionListener( - Exception ex, - String detectorId - ) { - // grouped action listener listens on mutliple listeners but throws only one exception. If multiple - // listeners fail the other exceptions are added as suppressed exceptions to the first failure. - int len = ex.getSuppressed().length; - for (int i = 0; i <= len; i++) { - Throwable e = i == len ? ex : ex.getSuppressed()[i]; - if (isMonitorNotFoundException(e) || isWorkflowNotFoundException(e) || isAlertingConfigIndexNotFoundException(e)) { - log.error( - String.format(Locale.ROOT, "Workflow, monitor, or jobs index already deleted." + - " Proceeding with detector %s deletion", detectorId), - e); - } else { - return false; - } - } - return true; - } - } - - private boolean isMonitorNotFoundException(final Throwable e) { - return e.getMessage().matches("(.*)Monitor(.*) is not found(.*)"); - } - - private boolean isWorkflowNotFoundException(final Throwable e) { - return e.getMessage().matches("(.*)Workflow(.*) not found(.*)"); } - private boolean isAlertingConfigIndexNotFoundException(final Throwable e) { - return e.getMessage().contains("Configured indices are not found: [.opendistro-alerting-config]"); + private void logAcceptableEntityMissingException(final Exception e, final String detectorId) { + final String errorMsg = String.format(Locale.ROOT, "Workflow, monitor, or jobs index already deleted." + + " Proceeding with detector %s deletion", detectorId); + log.error(errorMsg, e); } private void setEnabledWorkflowUsage(boolean enabledWorkflowUsage) { diff --git a/src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java b/src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java index c27cc14da..0a49ffdf6 100644 --- a/src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java +++ b/src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java @@ -98,11 +98,13 @@ import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings; import org.opensearch.securityanalytics.util.DetectorIndices; import org.opensearch.securityanalytics.util.DetectorUtils; +import org.opensearch.securityanalytics.util.ExceptionChecker; import org.opensearch.securityanalytics.util.IndexUtils; import org.opensearch.securityanalytics.util.MonitorService; import org.opensearch.securityanalytics.util.RuleIndices; import org.opensearch.securityanalytics.util.RuleTopicIndices; import org.opensearch.securityanalytics.util.SecurityAnalyticsException; +import org.opensearch.securityanalytics.util.ThrowableCheckingPredicates; import org.opensearch.securityanalytics.util.WorkflowService; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; @@ -156,6 +158,8 @@ public class TransportIndexDetectorAction extends HandledTransportAction return new IndexMonitorRequest(monitorId, SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM, refreshPolicy, restMethod, monitor, null); } + private void handleUpsertWorkflowFailure(final Exception e, final ActionListener> listener, + final Detector detector, final List monitorsToBeDeleted, + final RefreshPolicy refreshPolicy, final List updatedMonitors) { + if (exceptionChecker.doesGroupedActionListenerExceptionMatch(e, List.of(ThrowableCheckingPredicates.WORKFLOW_NOT_FOUND))) { + if (detector.getEnabled()) { + final String errorMessage = String.format("Underlying workflow associated with detector %s not found. " + + "Delete and recreate the detector to restore functionality.", detector.getName()); + log.error(errorMessage); + listener.onFailure(new SecurityAnalyticsException(errorMessage, RestStatus.BAD_REQUEST, e)); + } else { + log.error("Underlying workflow associated with detector {} not found. Proceeding to disable detector.", detector.getName()); + deleteMonitorStep(monitorsToBeDeleted, refreshPolicy, updatedMonitors, listener); + } + } else { + log.error("Failed to update the workflow"); + listener.onFailure(e); + } + } + /** * Creates doc level monitor which generates per document alerts for the findings of the bucket level delegate monitors in a workflow. * This monitor has match all query applied to generate the alerts per each finding doc. diff --git a/src/main/java/org/opensearch/securityanalytics/util/ExceptionChecker.java b/src/main/java/org/opensearch/securityanalytics/util/ExceptionChecker.java new file mode 100644 index 000000000..eb94abd1e --- /dev/null +++ b/src/main/java/org/opensearch/securityanalytics/util/ExceptionChecker.java @@ -0,0 +1,26 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.securityanalytics.util; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Stream; + +public class ExceptionChecker { + + public boolean doesGroupedActionListenerExceptionMatch(final Exception ex, final List exceptionMatchers) { + // grouped action listener listens on multiple listeners but throws only one exception. If multiple + // listeners fail the other exceptions are added as suppressed exceptions to the first failure. + return Stream.concat(Arrays.stream(ex.getSuppressed()), Stream.of(ex)) + .allMatch(throwable -> doesExceptionMatch(throwable, exceptionMatchers)); + } + + private boolean doesExceptionMatch(final Throwable throwable, final List exceptionMatchers) { + return exceptionMatchers.stream() + .map(ThrowableCheckingPredicates::getMatcherPredicate) + .anyMatch(matcher -> matcher.test(throwable)); + } + +} diff --git a/src/main/java/org/opensearch/securityanalytics/util/ThrowableCheckingPredicates.java b/src/main/java/org/opensearch/securityanalytics/util/ThrowableCheckingPredicates.java new file mode 100644 index 000000000..6282cab5d --- /dev/null +++ b/src/main/java/org/opensearch/securityanalytics/util/ThrowableCheckingPredicates.java @@ -0,0 +1,34 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.securityanalytics.util; + +import java.util.function.Predicate; + +public enum ThrowableCheckingPredicates { + MONITOR_NOT_FOUND(ThrowableCheckingPredicates::isMonitorNotFoundException), + WORKFLOW_NOT_FOUND(ThrowableCheckingPredicates::isWorkflowNotFoundException), + ALERTING_CONFIG_INDEX_NOT_FOUND(ThrowableCheckingPredicates::isAlertingConfigIndexNotFoundException); + + private final Predicate matcherPredicate; + ThrowableCheckingPredicates(final Predicate matcherPredicate) { + this.matcherPredicate = matcherPredicate; + } + + public Predicate getMatcherPredicate() { + return this.matcherPredicate; + } + + private static boolean isMonitorNotFoundException(final Throwable e) { + return e.getMessage().matches("(.*)Monitor(.*) is not found(.*)"); + } + + public static boolean isWorkflowNotFoundException(final Throwable e) { + return e.getMessage().matches("(.*)Workflow(.*) not found(.*)"); + } + + public static boolean isAlertingConfigIndexNotFoundException(final Throwable e) { + return e.getMessage().contains("Configured indices are not found: [.opendistro-alerting-config]"); + } +} diff --git a/src/test/java/org/opensearch/securityanalytics/resthandler/DetectorRestApiIT.java b/src/test/java/org/opensearch/securityanalytics/resthandler/DetectorRestApiIT.java index 3ed83c85e..6045f4439 100644 --- a/src/test/java/org/opensearch/securityanalytics/resthandler/DetectorRestApiIT.java +++ b/src/test/java/org/opensearch/securityanalytics/resthandler/DetectorRestApiIT.java @@ -893,6 +893,50 @@ public void testUpdateADetectorWithIndexNotExists() throws IOException { } } + public void testDisableEnableADetectorWithWorkflowNotExists() throws IOException { + final String index = createTestIndex(randomIndex(), windowsIndexMapping()); + + // Execute CreateMappingsAction to add alias mapping for index + final Request createMappingRequest = new Request("POST", SecurityAnalyticsPlugin.MAPPER_BASE_URI); + // both req params and req body are supported + createMappingRequest.setJsonEntity( + "{ \"index_name\":\"" + index + "\"," + + " \"rule_topic\":\"" + randomDetectorType() + "\", " + + " \"partial\":true" + + "}" + ); + + final Response createMappingResponse = client().performRequest(createMappingRequest); + assertEquals(HttpStatus.SC_OK, createMappingResponse.getStatusLine().getStatusCode()); + + final Detector detector = randomDetector(getRandomPrePackagedRules()); + final Response createResponse = makeRequest(client(), "POST", SecurityAnalyticsPlugin.DETECTOR_BASE_URI, Collections.emptyMap(), toHttpEntity(detector)); + Assert.assertEquals("Create detector failed", RestStatus.CREATED, restStatus(createResponse)); + + final Map createResponseAsMap = asMap(createResponse); + final String detectorId = createResponseAsMap.get("_id").toString(); + + final Map detectorSourceAsMap = getDetectorSourceAsMap(detectorId); + final String workflowId = ((List) detectorSourceAsMap.get("workflow_ids")).get(0); + + final Response deleteWorkflowResponse = deleteAlertingWorkflow(workflowId); + assertEquals(200, deleteWorkflowResponse.getStatusLine().getStatusCode()); + entityAsMap(deleteWorkflowResponse); + + detector.setEnabled(false); + Response updateResponse = makeRequest(client(), "PUT", SecurityAnalyticsPlugin.DETECTOR_BASE_URI + "/" + detectorId, Collections.emptyMap(), toHttpEntity(detector)); + Assert.assertEquals(200, updateResponse.getStatusLine().getStatusCode()); + + try { + detector.setEnabled(true); + makeRequest(client(), "PUT", SecurityAnalyticsPlugin.DETECTOR_BASE_URI + "/" + detectorId, Collections.emptyMap(), toHttpEntity(detector)); + } catch (ResponseException ex) { + Assert.assertEquals(400, ex.getResponse().getStatusLine().getStatusCode()); + Assert.assertEquals(true, ex.getMessage().contains(String.format("Underlying workflow associated with detector %s not found. " + + "Delete and recreate the detector to restore functionality.", detector.getName()))); + } + } + @SuppressWarnings("unchecked") public void testDeletingADetector_single_ruleTopicIndex() throws IOException { String index = createTestIndex(randomIndex(), windowsIndexMapping()); diff --git a/src/test/java/org/opensearch/securityanalytics/util/ExceptionCheckerTests.java b/src/test/java/org/opensearch/securityanalytics/util/ExceptionCheckerTests.java new file mode 100644 index 000000000..02c82c4d1 --- /dev/null +++ b/src/test/java/org/opensearch/securityanalytics/util/ExceptionCheckerTests.java @@ -0,0 +1,86 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.securityanalytics.util; + +import org.junit.Assert; +import org.junit.Before; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +public class ExceptionCheckerTests extends OpenSearchTestCase { + + private ExceptionChecker exceptionChecker; + + @Before + public void setup() { + exceptionChecker = new ExceptionChecker(); + } + + public void testExceptionMatches() { + final Exception e = new Exception("Monitor xyz is not found"); + + final boolean result = exceptionChecker.doesGroupedActionListenerExceptionMatch(e, List.of(ThrowableCheckingPredicates.MONITOR_NOT_FOUND)); + Assert.assertTrue(result); + } + + public void testExceptionDoesNotMatch() { + final Exception e = new Exception(UUID.randomUUID().toString()); + + final boolean result = exceptionChecker.doesGroupedActionListenerExceptionMatch(e, List.of(ThrowableCheckingPredicates.MONITOR_NOT_FOUND)); + Assert.assertFalse(result); + } + + public void testExceptionMatches_WithSuppressed() { + final Exception e = new Exception("Monitor xyz is not found"); + e.addSuppressed(new Exception("Monitor xyz is not found")); + + final boolean result = exceptionChecker.doesGroupedActionListenerExceptionMatch(e, List.of(ThrowableCheckingPredicates.MONITOR_NOT_FOUND)); + Assert.assertTrue(result); + } + + public void testExceptionDoesNotMatch_WithSuppressed() { + final Exception e = new Exception(UUID.randomUUID().toString()); + e.addSuppressed(new Exception(UUID.randomUUID().toString())); + + final boolean result = exceptionChecker.doesGroupedActionListenerExceptionMatch(e, List.of(ThrowableCheckingPredicates.MONITOR_NOT_FOUND)); + Assert.assertFalse(result); + } + + public void testExceptionDoesNotMatch_SuppressedDoesntMatch() { + final Exception e = new Exception("Monitor xyz is not found"); + e.addSuppressed(new Exception(UUID.randomUUID().toString())); + + final boolean result = exceptionChecker.doesGroupedActionListenerExceptionMatch(e, List.of(ThrowableCheckingPredicates.MONITOR_NOT_FOUND)); + Assert.assertFalse(result); + } + + public void testExceptionDoesNotMatch_TopLevelDoesntMatch() { + final Exception e = new Exception(UUID.randomUUID().toString()); + e.addSuppressed(new Exception("Monitor xyz is not found")); + + final boolean result = exceptionChecker.doesGroupedActionListenerExceptionMatch(e, List.of(ThrowableCheckingPredicates.MONITOR_NOT_FOUND)); + Assert.assertFalse(result); + } + + public void testExceptionDoesNotMatch_EmptyPredicates() { + final Exception e = new Exception("Monitor xyz is not found"); + + final boolean result = exceptionChecker.doesGroupedActionListenerExceptionMatch(e, Collections.emptyList()); + Assert.assertFalse(result); + } + + public void testExceptionDoesNotMatch_MultiplePredicates() { + final Exception e = new Exception("Monitor xyz is not found"); + + final boolean result = exceptionChecker.doesGroupedActionListenerExceptionMatch(e, List.of( + ThrowableCheckingPredicates.WORKFLOW_NOT_FOUND, + ThrowableCheckingPredicates.MONITOR_NOT_FOUND + )); + Assert.assertTrue(result); + } +} diff --git a/src/test/java/org/opensearch/securityanalytics/util/ThrowableCheckingPredicatesTests.java b/src/test/java/org/opensearch/securityanalytics/util/ThrowableCheckingPredicatesTests.java new file mode 100644 index 000000000..842adac23 --- /dev/null +++ b/src/test/java/org/opensearch/securityanalytics/util/ThrowableCheckingPredicatesTests.java @@ -0,0 +1,43 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.securityanalytics.util; + +import org.junit.Assert; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.UUID; + +public class ThrowableCheckingPredicatesTests extends OpenSearchTestCase { + + public void testWorkflowNotFound_Success() { + final Exception e = new Exception("Workflow " + UUID.randomUUID() + " not found"); + Assert.assertTrue(ThrowableCheckingPredicates.WORKFLOW_NOT_FOUND.getMatcherPredicate().test(e)); + } + + public void testWorkflowNotFound_Failure() { + final Exception e = new Exception(UUID.randomUUID().toString()); + Assert.assertFalse(ThrowableCheckingPredicates.WORKFLOW_NOT_FOUND.getMatcherPredicate().test(e)); + } + + public void testMonitorNotFound_Success() { + final Exception e = new Exception("Monitor " + UUID.randomUUID() + " is not found"); + Assert.assertTrue(ThrowableCheckingPredicates.MONITOR_NOT_FOUND.getMatcherPredicate().test(e)); + } + + public void testMonitorNotFound_Failure() { + final Exception e = new Exception(UUID.randomUUID().toString()); + Assert.assertFalse(ThrowableCheckingPredicates.MONITOR_NOT_FOUND.getMatcherPredicate().test(e)); + } + + public void testAlertingConfigIndexNotFound_Success() { + final Exception e = new Exception(UUID.randomUUID() + "Configured indices are not found: [.opendistro-alerting-config]"); + Assert.assertTrue(ThrowableCheckingPredicates.ALERTING_CONFIG_INDEX_NOT_FOUND.getMatcherPredicate().test(e)); + } + + public void testAlertingConfigIndexNotFound_Failure() { + final Exception e = new Exception(UUID.randomUUID().toString()); + Assert.assertFalse(ThrowableCheckingPredicates.ALERTING_CONFIG_INDEX_NOT_FOUND.getMatcherPredicate().test(e)); + } +}