diff --git a/data-prepper-plugins/opensearch/build.gradle b/data-prepper-plugins/opensearch/build.gradle index 00814b40c7..13ea63adbe 100644 --- a/data-prepper-plugins/opensearch/build.gradle +++ b/data-prepper-plugins/opensearch/build.gradle @@ -11,7 +11,6 @@ dependencies { implementation project(':data-prepper-plugins:buffer-common') implementation project(':data-prepper-plugins:common') implementation project(':data-prepper-plugins:failures-common') - implementation project(':data-prepper-plugins:rule-engine') implementation libs.opensearch.client implementation libs.opensearch.rhlc implementation libs.opensearch.java diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java index 8f9450c267..6cefdc3490 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java @@ -15,7 +15,6 @@ import org.opensearch.client.opensearch.core.bulk.BulkResponseItem; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.configuration.PluginSetting; -import org.opensearch.dataprepper.plugins.processor.model.event.EventWrapper; import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.AccumulatingBulkRequest; import org.opensearch.dataprepper.plugins.sink.opensearch.dlq.FailedBulkOperation; import org.opensearch.rest.RestStatus; @@ -24,7 +23,6 @@ import java.io.IOException; import java.time.Duration; -import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.List; @@ -187,23 +185,21 @@ private void incrementErrorCounters(final Exception e) { } } - public List execute(final AccumulatingBulkRequest bulkRequest) throws InterruptedException { - final List eventWrappers = new ArrayList<>(); - + public void execute(final AccumulatingBulkRequest bulkRequest) throws InterruptedException { final Backoff backoff = Backoff.exponential(INITIAL_DELAY_MS, MAXIMUM_DELAY_MS).withMaxAttempts(maxRetries); BulkOperationRequestResponse operationResponse; BulkResponse response = null; AccumulatingBulkRequest request = bulkRequest; int attempt = 1; do { - operationResponse = handleRetry(request, response, attempt, eventWrappers); + operationResponse = handleRetry(request, response, attempt); if (operationResponse != null) { final long delayMillis = backoff.nextDelayMillis(attempt++); request = operationResponse.getBulkRequest(); response = operationResponse.getResponse(); if (delayMillis < 0) { RuntimeException e = new RuntimeException(String.format("Number of retries reached the limit of max retries (configured value %d)", maxRetries)); - handleFailures(request, null, e, eventWrappers); + handleFailures(request, null, e); break; } // Wait for backOff duration @@ -214,8 +210,6 @@ public List execute(final AccumulatingBulkRequest bulkRequest) thr } } } while (operationResponse != null); - - return eventWrappers; } public boolean canRetry(final BulkResponse response) { @@ -236,8 +230,7 @@ public static boolean canRetry(final Exception e) { private BulkOperationRequestResponse handleRetriesAndFailures(final AccumulatingBulkRequest bulkRequestForRetry, final int retryCount, final BulkResponse bulkResponse, - final Exception exceptionFromRequest, - final List eventWrappers) { + final Exception exceptionFromRequest) { final boolean doRetry = (Objects.isNull(exceptionFromRequest)) ? canRetry(bulkResponse) : canRetry(exceptionFromRequest); if (!Objects.isNull(bulkResponse) && retryCount == 1) { // first attempt for (final BulkResponseItem bulkItemResponse : bulkResponse.items()) { @@ -260,13 +253,12 @@ private BulkOperationRequestResponse handleRetriesAndFailures(final Accumulating bulkRequestNumberOfRetries.increment(); return new BulkOperationRequestResponse(bulkRequestForRetry, bulkResponse); } else { - handleFailures(bulkRequestForRetry, bulkResponse, exceptionFromRequest, eventWrappers); + handleFailures(bulkRequestForRetry, bulkResponse, exceptionFromRequest); } return null; } - private void handleFailures(final AccumulatingBulkRequest bulkRequest, final BulkResponse bulkResponse, final Throwable failure, - final List eventWrappers) { + private void handleFailures(final AccumulatingBulkRequest bulkRequest, final BulkResponse bulkResponse, final Throwable failure) { if (failure == null) { for (final BulkResponseItem bulkItemResponse : bulkResponse.items()) { // Skip logging the error for version conflicts @@ -274,7 +266,7 @@ private void handleFailures(final AccumulatingBulkRequest eventWrappers) throws InterruptedException { - final AccumulatingBulkRequest bulkRequestForRetry = createBulkRequestForRetry(request, response, eventWrappers); + private BulkOperationRequestResponse handleRetry(final AccumulatingBulkRequest request, final BulkResponse response, int retryCount) throws InterruptedException { + final AccumulatingBulkRequest bulkRequestForRetry = createBulkRequestForRetry(request, response); if (bulkRequestForRetry.getOperationsCount() == 0) { return null; } @@ -294,10 +285,10 @@ private BulkOperationRequestResponse handleRetry(final AccumulatingBulkRequest r bulkResponse = requestFunction.apply(bulkRequestForRetry); } catch (Exception e) { incrementErrorCounters(e); - return handleRetriesAndFailures(bulkRequestForRetry, retryCount, null, e, eventWrappers); + return handleRetriesAndFailures(bulkRequestForRetry, retryCount, null, e); } if (bulkResponse.errors()) { - return handleRetriesAndFailures(bulkRequestForRetry, retryCount, bulkResponse, null, eventWrappers); + return handleRetriesAndFailures(bulkRequestForRetry, retryCount, bulkResponse, null); } else { final int numberOfDocs = bulkRequestForRetry.getOperationsCount(); final boolean firstAttempt = (retryCount == 1); @@ -310,14 +301,13 @@ private BulkOperationRequestResponse handleRetry(final AccumulatingBulkRequest r final BulkResponseItem bulkResponseItem = bulkResponse.items().get(i); bulkOperation.releaseEventHandle(true); - eventWrappers.add(createEventWrapper(bulkOperation, bulkResponseItem)); } } return null; } private AccumulatingBulkRequest createBulkRequestForRetry( - final AccumulatingBulkRequest request, final BulkResponse response, final List eventWrappers) { + final AccumulatingBulkRequest request, final BulkResponse response) { if (response == null) { // first attempt or retry due to Exception return request; @@ -335,7 +325,6 @@ private AccumulatingBulkRequest createBulkReq documentsVersionConflictErrors.increment(); LOG.debug("Received version conflict from OpenSearch: {}", bulkItemResponse.error().reason()); bulkOperation.releaseEventHandle(true); - eventWrappers.add(createEventWrapper(bulkOperation, bulkItemResponse)); } else { nonRetryableFailures.add(FailedBulkOperation.builder() .withBulkOperation(bulkOperation) @@ -346,7 +335,6 @@ private AccumulatingBulkRequest createBulkReq } else { sentDocumentsCounter.increment(); bulkOperation.releaseEventHandle(true); - eventWrappers.add(createEventWrapper(bulkOperation, bulkItemResponse)); } index++; } @@ -358,8 +346,7 @@ private AccumulatingBulkRequest createBulkReq } } - private void handleFailures(final AccumulatingBulkRequest accumulatingBulkRequest, final List itemResponses, - final List eventWrappers) { + private void handleFailures(final AccumulatingBulkRequest accumulatingBulkRequest, final List itemResponses) { assert accumulatingBulkRequest.getOperationsCount() == itemResponses.size(); final ImmutableList.Builder failures = ImmutableList.builder(); for (int i = 0; i < itemResponses.size(); i++) { @@ -370,7 +357,6 @@ private void handleFailures(final AccumulatingBulkRequest> { private DlqProvider dlqProvider; private final ConcurrentHashMap> bulkRequestMap; private final ConcurrentHashMap lastFlushTimeMap; - private RuleEngine ruleEngine = null; @DataPrepperPluginConstructor public OpenSearchSink(final PluginSetting pluginSetting, @@ -267,9 +262,6 @@ private void doInitializeInternal() throws IOException { objectMapper = new ObjectMapper(); - final Optional ruleEngineConfig = openSearchSinkConfig.getRuleEngineConfig(); - ruleEngineConfig.ifPresent(engineConfig -> ruleEngine = new RuleEngine(engineConfig, openSearchClient)); - this.initialized = true; LOG.info("Initialized OpenSearch sink"); } @@ -503,10 +495,7 @@ private void flushBatch(AccumulatingBulkRequest accumulatingBulkRequest) { bulkRequestTimer.record(() -> { try { LOG.debug("Sending data to OpenSearch"); - final List eventWrappers = bulkRetryStrategy.execute(accumulatingBulkRequest); - if (ruleEngine != null) { - ruleEngine.doExecute(eventWrappers); - } + bulkRetryStrategy.execute(accumulatingBulkRequest); bulkRequestSizeBytesSummary.record(accumulatingBulkRequest.getEstimatedSizeInBytes()); } catch (final InterruptedException e) { LOG.error("Unexpected Interrupt:", e); diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkConfiguration.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkConfiguration.java index d71935f371..4ec8f17bc8 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkConfiguration.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkConfiguration.java @@ -6,13 +6,9 @@ package org.opensearch.dataprepper.plugins.sink.opensearch; import org.opensearch.dataprepper.model.configuration.PluginSetting; -import org.opensearch.dataprepper.plugins.processor.RuleEngineConfig; -import org.opensearch.dataprepper.plugins.processor.RuleEngineConfigWrapper; import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration; import org.opensearch.dataprepper.expression.ExpressionEvaluator; -import java.util.Optional; - import static com.google.common.base.Preconditions.checkNotNull; public class OpenSearchSinkConfiguration { @@ -20,7 +16,6 @@ public class OpenSearchSinkConfiguration { private final ConnectionConfiguration connectionConfiguration; private final IndexConfiguration indexConfiguration; private final RetryConfiguration retryConfiguration; - private final RuleEngineConfigWrapper ruleEngineConfigWrapper; public ConnectionConfiguration getConnectionConfiguration() { return connectionConfiguration; @@ -34,21 +29,15 @@ public RetryConfiguration getRetryConfiguration() { return retryConfiguration; } - public Optional getRuleEngineConfig() { - return Optional.ofNullable(ruleEngineConfigWrapper.getRuleEngineConfig()); - } - private OpenSearchSinkConfiguration( final ConnectionConfiguration connectionConfiguration, final IndexConfiguration indexConfiguration, - final RetryConfiguration retryConfiguration, final RuleEngineConfigWrapper ruleEngineConfigWrapper) { + final RetryConfiguration retryConfiguration) { checkNotNull(connectionConfiguration, "connectionConfiguration cannot be null"); checkNotNull(indexConfiguration, "indexConfiguration cannot be null"); checkNotNull(retryConfiguration, "retryConfiguration cannot be null"); - checkNotNull(ruleEngineConfigWrapper, "ruleEngineConfigWrapper cannot be null"); this.connectionConfiguration = connectionConfiguration; this.indexConfiguration = indexConfiguration; this.retryConfiguration = retryConfiguration; - this.ruleEngineConfigWrapper = ruleEngineConfigWrapper; } public static OpenSearchSinkConfiguration readESConfig(final PluginSetting pluginSetting) { @@ -60,8 +49,7 @@ public static OpenSearchSinkConfiguration readESConfig(final PluginSetting plugi ConnectionConfiguration.readConnectionConfiguration(pluginSetting); final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting, expressionEvaluator); final RetryConfiguration retryConfiguration = RetryConfiguration.readRetryConfig(pluginSetting); - final RuleEngineConfigWrapper ruleEngineConfigWrapper = RuleEngineConfigWrapper.readRuleEngineConfigWrapper(pluginSetting); - return new OpenSearchSinkConfiguration(connectionConfiguration, indexConfiguration, retryConfiguration, ruleEngineConfigWrapper); + return new OpenSearchSinkConfiguration(connectionConfiguration, indexConfiguration, retryConfiguration); } } diff --git a/data-prepper-plugins/rule-engine/build.gradle b/data-prepper-plugins/rule-engine/build.gradle index b36dc360fd..4ec4f91f27 100644 --- a/data-prepper-plugins/rule-engine/build.gradle +++ b/data-prepper-plugins/rule-engine/build.gradle @@ -18,6 +18,9 @@ dependencies { exclude group: 'org.glassfish', module: 'javax.json' } implementation project(':data-prepper-api') + implementation project(':data-prepper-plugins:common') + implementation project(':data-prepper-plugins:aws-plugin-api') + implementation project(':data-prepper-plugins:opensearch') implementation 'javax.inject:javax.inject:1' implementation(libs.spring.core) { exclude group: 'commons-logging', module: 'commons-logging' @@ -32,6 +35,7 @@ dependencies { implementation libs.opensearch.client implementation libs.opensearch.rhlc implementation libs.opensearch.java + implementation 'com.fasterxml.jackson.core:jackson-core' implementation 'com.fasterxml.jackson.core:jackson-databind' implementation libs.commons.lang3 } @@ -57,3 +61,8 @@ jacocoTestCoverageVerification { })) } } + +checkstyleMain { + source = fileTree('src/main/java') +} + diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/RuleEngine.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/RuleEngine.java index 80b0d639c2..e3c3acbe0c 100644 --- a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/RuleEngine.java +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/RuleEngine.java @@ -1,126 +1,66 @@ package org.opensearch.dataprepper.plugins.processor; -import org.opensearch.client.opensearch.OpenSearchClient; -import org.opensearch.client.opensearch.core.BulkRequest; -import org.opensearch.client.opensearch.core.BulkResponse; -import org.opensearch.client.opensearch.core.bulk.BulkOperation; -import org.opensearch.client.opensearch.core.bulk.BulkResponseItem; -import org.opensearch.client.opensearch.core.bulk.IndexOperation; -import org.opensearch.dataprepper.plugins.processor.generator.FindingGenerator; -import org.opensearch.dataprepper.plugins.processor.model.event.EventWrapper; -import org.opensearch.dataprepper.plugins.processor.model.findings.Finding; -import org.opensearch.dataprepper.plugins.processor.rules.Rule; -import org.opensearch.dataprepper.plugins.processor.rules.RuleConverter; -import org.opensearch.dataprepper.plugins.processor.rules.RuleFetcher; +import org.opensearch.dataprepper.plugins.processor.evaluator.DefaultRuleEvaluator; +import org.opensearch.dataprepper.plugins.processor.evaluator.RuleEvaluator; +import org.opensearch.dataprepper.plugins.processor.model.mappings.Mapping; +import org.opensearch.dataprepper.plugins.processor.parser.RuleParser; +import org.opensearch.dataprepper.plugins.processor.provider.rules.RuleProvider; +import org.opensearch.dataprepper.plugins.processor.registrar.MappingRegistrar; +import org.opensearch.dataprepper.plugins.processor.registrar.RuleProviderRegistrar; +import org.opensearch.dataprepper.plugins.processor.rules.RuleRefresher; import org.opensearch.dataprepper.plugins.processor.rules.RuleStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; +import java.util.function.Supplier; public class RuleEngine { - private static final String FINDINGS_INDEX = ".opensearch-sap-cloudtrail-findings"; - private static final Logger LOG = LoggerFactory.getLogger(RuleEngine.class); private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor(); - private final RuleStore ruleStore; - private final FindingGenerator findingGenerator; - private final OpenSearchClient openSearchClient; + private final MappingRegistrar mappingRegistrar; + private final RuleProviderRegistrar ruleProviderRegistrar; + private RuleEvaluator ruleEvaluator; - public RuleEngine(final RuleEngineConfig config, final OpenSearchClient openSearchClient) { - ruleStore = new RuleStore(); - findingGenerator = new FindingGenerator(); - this.openSearchClient = openSearchClient; - setupRuleFetching(config, ruleStore); + public RuleEngine() { + mappingRegistrar = new MappingRegistrar(); + ruleProviderRegistrar = new RuleProviderRegistrar(); } - public void doExecute(final Collection eventWrappers) { - if (eventWrappers.isEmpty()) { - return; - } - - final Map> indexToSigmaRules = ruleStore.getRules(); - final Map> allFindings = new HashMap<>(); - eventWrappers.forEach(eventWrapper -> { - final String indexName = eventWrapper.getIndexName(); - final List sigmaRules = indexToSigmaRules.get(indexName); - if (sigmaRules == null || sigmaRules.isEmpty()) { - return; - } - - // TODO - should just model after OpenSearch SAP, have list of detectors, iterate over them - final List ruleMatches = sigmaRules.stream() - .filter(sigmaRule -> sigmaRule.getCondition().test(eventWrapper.getEvent())) - .collect(Collectors.toList()); - - if (!ruleMatches.isEmpty()) { - final Map> findingsIndexToFindings = findingGenerator.generateFindings(eventWrapper, ruleMatches); - findingsIndexToFindings.forEach((findingsIndex, findings) -> { - allFindings.putIfAbsent(findingsIndex, new ArrayList<>()); - allFindings.get(findingsIndex).addAll(findings); - }); - } - }); + public void registerMapping(final String logType, final Supplier mappingSupplier) { + mappingRegistrar.registerMapping(logType, mappingSupplier); + } - indexFindings(allFindings); + public void registerRuleProvider(final String ruleLocation, final Supplier ruleProviderSupplier) { + ruleProviderRegistrar.registerRuleProvider(ruleLocation, ruleProviderSupplier); } - private void indexFindings(final Map> allFindings) { - if (allFindings.isEmpty()) { - LOG.debug("No findings to index"); - return; - } + public RuleEvaluator start(final RuleEngineConfig config) { + final RuleStore ruleStore = new RuleStore(); + setupRuleFetching(config, ruleStore); - final BulkRequest bulkRequest = createBulkRequest(allFindings); - try { - final BulkResponse bulkResponse = openSearchClient.bulk(bulkRequest); - if (bulkResponse.errors()) { - LOG.error("BulkResponse has errors"); - bulkResponse.items().stream() - .filter(bulkResponseItem -> bulkResponseItem.error() != null) - .forEach(bulkResponseItem -> LOG.error("BulkItemError for ID {}: {}", bulkResponseItem.id(), bulkResponseItem.error().reason())); - } - } catch (IOException e) { - LOG.error("Caught exception indexing findings", e); + if (ruleEvaluator == null) { + ruleEvaluator = new DefaultRuleEvaluator(ruleStore); } - } - private BulkRequest createBulkRequest(final Map> allFindings) { - final List allBulkOperations = new ArrayList<>(); - allFindings.forEach((findingsIndex, findings) -> { - final List bulkOperations = findings.stream() - .peek(finding -> LOG.debug("Indexing finding with ID: {}", finding.getId())) - .map(finding -> new IndexOperation.Builder<>() - .id(finding.getId()) - .index(findingsIndex) - .document(finding) - .build()) - .map(idxOp -> new BulkOperation.Builder().index(idxOp).build()) - .collect(Collectors.toList()); - allBulkOperations.addAll(bulkOperations); - }); - - return new BulkRequest.Builder() - .operations(allBulkOperations) - .build(); + return ruleEvaluator; } private void setupRuleFetching(final RuleEngineConfig config, final RuleStore ruleStore) { - final RuleConverter ruleConverter = new RuleConverter(config); - final RuleFetcher ruleFetcher = new RuleFetcher(openSearchClient, ruleStore, ruleConverter); + final RuleProvider ruleProvider = ruleProviderRegistrar.getRuleProvider(config.getRuleLocation()); + LOG.info("Using RuleProvider of type {}", ruleProvider.getClass()); + + final Map mapping = mappingRegistrar.getMapping(config.getLogType(), config.getLogFormat()); + final RuleParser ruleParser = config.getRuleSchema().getParserConstructor().apply(mapping); + + final RuleRefresher ruleRefresher = new RuleRefresher(ruleProvider, ruleParser, ruleStore); SCHEDULED_EXECUTOR_SERVICE.scheduleAtFixedRate( - ruleFetcher, + ruleRefresher, 0L, config.getRuleRefreshInterval().toMillis(), TimeUnit.MILLISECONDS diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/RuleEngineConfig.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/RuleEngineConfig.java index b2c106340c..94c02d7a53 100644 --- a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/RuleEngineConfig.java +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/RuleEngineConfig.java @@ -1,85 +1,42 @@ package org.opensearch.dataprepper.plugins.processor; -import org.opensearch.dataprepper.model.configuration.PluginSetting; -import org.opensearch.dataprepper.plugins.processor.model.log.LogFormat; -import org.opensearch.dataprepper.plugins.processor.model.log.LogType; +import org.opensearch.dataprepper.plugins.processor.model.rule.RuleSchema; import java.time.Duration; -import static com.google.common.base.Preconditions.checkNotNull; - public class RuleEngineConfig { - static final long DEFAULT_RULE_REFRESH_INTERVAL_MILLIS = Duration.ofMinutes(1).toMillis(); - static final LogFormat DEFAULT_LOG_FORMAT = LogFormat.NONE; - - public static final String RULE_REFRESH_INTERVAL_MILLIS = "rule_refresh_interval_millis"; - public static final String LOG_TYPE = "log_type"; - public static final String LOG_FORMAT = "log_format"; - private final Duration ruleRefreshInterval; - private final LogType logType; - private final LogFormat logFormat; + private final String logFormat; + private final String logType; + private final RuleSchema ruleSchema; + private final String ruleLocation; + + public RuleEngineConfig(final Duration ruleRefreshInterval, final String logFormat, final String logType, + final RuleSchema ruleSchema, final String ruleLocation) { + this.ruleRefreshInterval = ruleRefreshInterval; + this.logFormat = logFormat; + this.logType = logType; + this.ruleSchema = ruleSchema; + this.ruleLocation = ruleLocation; + } public Duration getRuleRefreshInterval() { return ruleRefreshInterval; } - public LogType getLogType() { - return logType; - } - - public LogFormat getLogFormat() { + public String getLogFormat() { return logFormat; } - public static class Builder { - private Duration ruleRefreshInterval = Duration.ofMillis(DEFAULT_RULE_REFRESH_INTERVAL_MILLIS); - private LogType logType; - private LogFormat logFormat = DEFAULT_LOG_FORMAT; - - public Builder withRuleRefreshInterval(final Duration ruleRefreshInterval) { - checkNotNull(ruleRefreshInterval, "ruleRefreshInterval cannot be null."); - this.ruleRefreshInterval = ruleRefreshInterval; - return this; - } - - public Builder withLogType(final LogType logType) { - checkNotNull(logType, "logType cannot be null."); - this.logType = logType; - return this; - } - - public Builder withLogFormat(final LogFormat logFormat) { - checkNotNull(logFormat, "logFormat cannot be null"); - this.logFormat = logFormat; - return this; - } - - public RuleEngineConfig build() { - return new RuleEngineConfig(this); - } + public String getLogType() { + return logType; } - private RuleEngineConfig(final Builder builder) { - this.ruleRefreshInterval = builder.ruleRefreshInterval; - this.logType = checkNotNull(builder.logType, "logType cannot be null."); - this.logFormat = builder.logFormat; + public RuleSchema getRuleSchema() { + return ruleSchema; } - public static RuleEngineConfig readRuleEngineConfig(final PluginSetting pluginSetting) { - RuleEngineConfig.Builder builder = new RuleEngineConfig.Builder(); - - final Duration ruleRefreshInterval = Duration.ofMillis( - pluginSetting.getLongOrDefault(RULE_REFRESH_INTERVAL_MILLIS, DEFAULT_RULE_REFRESH_INTERVAL_MILLIS) - ); - builder = builder.withRuleRefreshInterval(ruleRefreshInterval); - - final LogType logType = LogType.valueOf(pluginSetting.getStringOrDefault(LOG_TYPE, null).toUpperCase()); - builder = builder.withLogType(logType); - - final LogFormat logFormat = LogFormat.valueOf(pluginSetting.getStringOrDefault(LOG_FORMAT, DEFAULT_LOG_FORMAT.name()).toUpperCase()); - builder = builder.withLogFormat(logFormat); - - return builder.build(); + public String getRuleLocation() { + return ruleLocation; } } diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/RuleEngineConfigWrapper.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/RuleEngineConfigWrapper.java deleted file mode 100644 index 35999fa8c5..0000000000 --- a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/RuleEngineConfigWrapper.java +++ /dev/null @@ -1,49 +0,0 @@ -package org.opensearch.dataprepper.plugins.processor; - -import org.opensearch.dataprepper.model.configuration.PluginSetting; - -import java.util.LinkedHashMap; -import java.util.Map; - -import static com.google.common.base.Preconditions.checkNotNull; - -public class RuleEngineConfigWrapper { - public static final String RULE_ENGINE = "rule_engine"; - - private final RuleEngineConfig ruleEngineConfig; - - public RuleEngineConfig getRuleEngineConfig() { - return ruleEngineConfig; - } - - public static class Builder { - private RuleEngineConfig ruleEngineConfig; - - public Builder withRuleEngineConfig(final RuleEngineConfig ruleEngineConfig) { - checkNotNull(ruleEngineConfig, "ruleEngineConfig cannot be null."); - this.ruleEngineConfig = ruleEngineConfig; - return this; - } - - public RuleEngineConfigWrapper build() { - return new RuleEngineConfigWrapper(this); - } - } - - private RuleEngineConfigWrapper(final Builder builder) { - this.ruleEngineConfig = builder.ruleEngineConfig; - } - - public static RuleEngineConfigWrapper readRuleEngineConfigWrapper(final PluginSetting pluginSetting) { - RuleEngineConfigWrapper.Builder builder = new RuleEngineConfigWrapper.Builder(); - - final Map ruleEngineSettingsMap = (Map) pluginSetting.getAttributeFromSettings(RULE_ENGINE); - if (ruleEngineSettingsMap != null) { - final PluginSetting ruleEngineSettings = new PluginSetting(RULE_ENGINE, ruleEngineSettingsMap); - final RuleEngineConfig ruleEngineConfig = RuleEngineConfig.readRuleEngineConfig(ruleEngineSettings); - builder = builder.withRuleEngineConfig(ruleEngineConfig); - } - - return builder.build(); - } -} diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/RuleEngineProcessor.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/RuleEngineProcessor.java new file mode 100644 index 0000000000..b75c65672a --- /dev/null +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/RuleEngineProcessor.java @@ -0,0 +1,100 @@ +package org.opensearch.dataprepper.plugins.processor; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.log.JacksonLog; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.processor.AbstractProcessor; +import org.opensearch.dataprepper.model.processor.Processor; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.processor.converters.OCSFConverter; +import org.opensearch.dataprepper.plugins.processor.evaluator.RuleEvaluator; +import org.opensearch.dataprepper.plugins.processor.model.datatypes.DataType; +import org.opensearch.dataprepper.plugins.processor.model.matches.Match; +import org.opensearch.dataprepper.plugins.processor.provider.rules.RuleProvider; +import org.opensearch.dataprepper.plugins.processor.provider.rules.opensearch.OpenSearchRuleProvider; +import org.opensearch.dataprepper.plugins.sink.opensearch.ConnectionConfiguration; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +@DataPrepperPlugin(name = "rule_engine", pluginType = Processor.class, pluginConfigurationType = RuleEngineProcessorConfig.class) +public class RuleEngineProcessor extends AbstractProcessor, Record> { + private final RuleEvaluator ruleEvaluator; + private final RuleEngineProcessorConfig config; + private final OCSFConverter ocsfConverter; + private final ObjectMapper objectMapper = new ObjectMapper(); + + @DataPrepperPluginConstructor + public RuleEngineProcessor(final PluginMetrics pluginMetrics, + final RuleEngineProcessorConfig config, + final PluginFactory pluginFactory) { + super(pluginMetrics); + + final RuleEngine ruleEngine = new RuleEngine(); + ruleEngine.registerRuleProvider("opensearch", this::getRuleProvider); + this.config = config; + + final RuleEngineConfig ruleEngineConfig = new RuleEngineConfig(config.getRuleRefreshInterval(), config.getLogFormat(), config.getLogType(), + config.getRuleSchema(), config.getRuleLocation()); + ruleEvaluator = ruleEngine.start(ruleEngineConfig); + ocsfConverter = new OCSFConverter(); + } + + @Override + public Collection> doExecute(final Collection> records) { + final Collection dataWithMatches = ruleEvaluator.evaluate(convertToOCSF(records)); + final Collection> matches = convertMatchesToEvents(dataWithMatches); + + if (config.isDropData()) { + return matches; + } + + records.addAll(matches); + return records; + } + + private Collection convertToOCSF(final Collection> records) { + return records.stream() + .map(ocsfConverter::convert) + .peek(ocsf -> ocsf.putMetadataValue("index", "test-index-4")) + .collect(Collectors.toList()); + } + + private List> convertMatchesToEvents(final Collection dataWithMatches) { + return dataWithMatches.stream() + .map(match -> (Map) objectMapper.convertValue(match, new TypeReference<>() {})) + .map(matchesMap -> JacksonLog.builder().withData(matchesMap).build()) + .map(event -> new Record(event)) + .collect(Collectors.toList()); + } + + @Override + public void prepareForShutdown() { + } + + @Override + public boolean isReadyForShutdown() { + return true; + } + + @Override + public void shutdown() { + } + + private RuleProvider getRuleProvider() { + final ConnectionConfiguration connectionConfiguration = new ConnectionConfiguration.Builder(List.of("localhost:9200")) + .withInsecure(true) + .build(); + final OpenSearchClient openSearchClient = connectionConfiguration.createOpenSearchClient(connectionConfiguration.createClient(null), null); + + return new OpenSearchRuleProvider(openSearchClient); + } +} diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/RuleEngineProcessorConfig.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/RuleEngineProcessorConfig.java new file mode 100644 index 0000000000..ea662a570e --- /dev/null +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/RuleEngineProcessorConfig.java @@ -0,0 +1,61 @@ +package org.opensearch.dataprepper.plugins.processor; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; +import org.opensearch.dataprepper.plugins.processor.model.log.LogFormat; +import org.opensearch.dataprepper.plugins.processor.model.rule.RuleSchema; + +import java.time.Duration; + +public class RuleEngineProcessorConfig { + static final Duration DEFAULT_RULE_REFRESH_INTERVAL = Duration.ofMinutes(1); + static final LogFormat DEFAULT_LOG_FORMAT = LogFormat.NONE; + + @JsonProperty("rule_refresh_interval") + private Duration ruleRefreshInterval = DEFAULT_RULE_REFRESH_INTERVAL; + + @JsonProperty("log_type") + @NotEmpty + @NotNull + private String logType; + + @JsonProperty("log_format") + private String logFormat = DEFAULT_LOG_FORMAT.getKeyName(); + + @JsonProperty("rule_schema") + @NotNull + private RuleSchema ruleSchema; + + @JsonProperty("rule_location") + @NotEmpty + @NotNull + private String ruleLocation; + + @JsonProperty("drop_data") + private boolean dropData = false; + + public Duration getRuleRefreshInterval() { + return ruleRefreshInterval; + } + + public String getLogType() { + return logType; + } + + public String getLogFormat() { + return logFormat; + } + + public RuleSchema getRuleSchema() { + return ruleSchema; + } + + public String getRuleLocation() { + return ruleLocation; + } + + public boolean isDropData() { + return dropData; + } +} diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/converters/OCSFConverter.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/converters/OCSFConverter.java new file mode 100644 index 0000000000..400dec0080 --- /dev/null +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/converters/OCSFConverter.java @@ -0,0 +1,15 @@ +package org.opensearch.dataprepper.plugins.processor.converters; + +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.processor.model.datatypes.OCSF; + +public class OCSFConverter { + public OCSF convert(final Record record) { + final Event event = record.getData(); + final String apiOperation = event.get("/api/operation", String.class); + final String apiServiceName = event.get("/api/service/name", String.class); + + return new OCSF(apiOperation, apiServiceName); + } +} diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/evaluator/DefaultRuleEvaluator.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/evaluator/DefaultRuleEvaluator.java new file mode 100644 index 0000000000..9cf7f37026 --- /dev/null +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/evaluator/DefaultRuleEvaluator.java @@ -0,0 +1,44 @@ +package org.opensearch.dataprepper.plugins.processor.evaluator; + +import org.opensearch.dataprepper.plugins.processor.model.datatypes.DataType; +import org.opensearch.dataprepper.plugins.processor.model.matches.Match; +import org.opensearch.dataprepper.plugins.processor.rules.Rule; +import org.opensearch.dataprepper.plugins.processor.rules.RuleStore; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +public class DefaultRuleEvaluator implements RuleEvaluator { + private final RuleStore ruleStore; + + public DefaultRuleEvaluator(final RuleStore ruleStore) { + this.ruleStore = ruleStore; + } + + @Override + public Collection evaluate(final Collection data) { + if (data.isEmpty()) { + return Collections.emptyList(); + } + + final List rules = ruleStore.getRules(); + final List matches = new ArrayList<>(); + + data.stream().forEach(item -> { + final List ruleMatches = rules.stream() + // Skip rules that don't apply + .filter(rule -> rule.getEvaluationCondition().test(item)) + .filter(rule -> rule.getRuleCondition().test(item)) + .collect(Collectors.toList()); + + if (ruleMatches.size() > 0) { + matches.add(new Match(item, ruleMatches)); + } + }); + + return matches; + } +} diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/evaluator/RuleEvaluator.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/evaluator/RuleEvaluator.java new file mode 100644 index 0000000000..ac1893f484 --- /dev/null +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/evaluator/RuleEvaluator.java @@ -0,0 +1,10 @@ +package org.opensearch.dataprepper.plugins.processor.evaluator; + +import org.opensearch.dataprepper.plugins.processor.model.datatypes.DataType; +import org.opensearch.dataprepper.plugins.processor.model.matches.Match; + +import java.util.Collection; + +public interface RuleEvaluator { + Collection evaluate(Collection data); +} diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/formats/accessors/FieldAccessor.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/formats/accessors/FieldAccessor.java index c0f156b927..f8deb2c412 100644 --- a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/formats/accessors/FieldAccessor.java +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/formats/accessors/FieldAccessor.java @@ -1,11 +1,7 @@ package org.opensearch.dataprepper.plugins.processor.formats.accessors; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.plugins.processor.RuleEngineConfig; -import org.opensearch.dataprepper.plugins.processor.mappings.MappingParser; -import org.opensearch.dataprepper.plugins.processor.model.log.LogFormat; -import org.opensearch.dataprepper.plugins.processor.model.log.LogType; +import org.opensearch.dataprepper.plugins.processor.exceptions.MappingException; +import org.opensearch.dataprepper.plugins.processor.model.datatypes.DataType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -14,52 +10,43 @@ public class FieldAccessor { private static final Logger LOG = LoggerFactory.getLogger(FieldAccessor.class); - private final Map mappings; + private final Map mapping; - public FieldAccessor(final RuleEngineConfig config) { - final ObjectMapper objectMapper = new ObjectMapper(); - final MappingParser mappingParser = new MappingParser(objectMapper); - - final LogType logType = config.getLogType(); - LOG.info("Log type: {}", logType.name()); - final LogFormat logFormat = config.getLogFormat(); - LOG.info("Log format: {}", logFormat.name()); - mappings = mappingParser.parseMappings(logType, logFormat); + public FieldAccessor(final Map mapping) { + this.mapping = mapping; } - public String getStringValue(final Event event, final String fieldName) { - return getValue(event, fieldName, String.class); + public String getStringValue(final DataType event, final String fieldName) { + return getValue(event, convertFieldName(fieldName), String.class); } - public Boolean getBooleanValue(final Event event, final String fieldName) { - return getValue(event, fieldName, Boolean.class); + public Boolean getBooleanValue(final DataType event, final String fieldName) { + return getValue(event, convertFieldName(fieldName), Boolean.class); } - public Integer getIntegerValue(final Event event, final String fieldName) { - return getValue(event, fieldName, Integer.class); + public Integer getIntegerValue(final DataType event, final String fieldName) { + return getValue(event, convertFieldName(fieldName), Integer.class); } - public Float getFloatValue(final Event event, final String fieldName) { - return getValue(event, fieldName, Float.class); + public Float getFloatValue(final DataType event, final String fieldName) { + return getValue(event, convertFieldName(fieldName), Float.class); } - public Object getObjectValue(final Event event, final String fieldName) { - return getValue(event, fieldName, Object.class); + public Object getObjectValue(final DataType event, final String fieldName) { + return getValue(event, convertFieldName(fieldName), Object.class); } - private T getValue(final Event event, final String fieldName, final Class clazz) { - //LOG.info("Field: {}", fieldName); - final String mappedField = convertFieldName(fieldName); - //System.out.println("Mapped field: " + mappedField); - final String jsonPointer = getJsonPointer(mappedField); - //System.out.println("Pointer: "+ jsonPointer); - final T value = event.get(jsonPointer, clazz); - //LOG.info("Value: {}", value); - return value; + private T getValue(final DataType event, final String fieldName, final Class clazz) { + try { + return clazz.cast(event.getValue(fieldName)); + } catch (final ClassCastException e) { + throw new MappingException("Unable to cast field " + fieldName + " to class " + clazz.getName(), e); + } } private String convertFieldName(final String fieldName) { - return mappings.get(fieldName); + final String mappedFieldName = mapping.get(fieldName); + return mappedFieldName == null ? fieldName : mappedFieldName; } // TODO - need flag for this diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/generator/FindingGenerator.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/generator/FindingGenerator.java deleted file mode 100644 index 6a0c5ceb66..0000000000 --- a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/generator/FindingGenerator.java +++ /dev/null @@ -1,54 +0,0 @@ -package org.opensearch.dataprepper.plugins.processor.generator; - -import org.opensearch.dataprepper.plugins.processor.model.event.EventWrapper; -import org.opensearch.dataprepper.plugins.processor.model.findings.DocLevelQuery; -import org.opensearch.dataprepper.plugins.processor.model.findings.Finding; -import org.opensearch.dataprepper.plugins.processor.rules.Rule; - -import java.time.Instant; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.stream.Collectors; - -public class FindingGenerator { - public Map> generateFindings(final EventWrapper eventWrapper, final List ruleMatches) { - final Map> monitorIdToRules = new HashMap<>(); - ruleMatches.forEach(rule -> { - final String monitorId = rule.getMonitorId(); - monitorIdToRules.putIfAbsent(monitorId, new ArrayList<>()); - monitorIdToRules.get(monitorId).add(rule); - }); - - final Map> findingsIndexToFindings = new HashMap<>(); - monitorIdToRules.forEach((monitorId, rules) -> { - final String findingsIndex = rules.get(0).getFindingsIndex(); - - final List queries = rules.stream() - .map(this::createDocLevelQuery) - .collect(Collectors.toList()); - - final Finding finding = new Finding( - UUID.randomUUID().toString(), - List.of(eventWrapper.getDocId()), - List.of(eventWrapper.getDocId()), - monitorId, - rules.get(0).getMonitorName(), - eventWrapper.getIndexName(), - queries, - Instant.now().toEpochMilli() - ); - - findingsIndexToFindings.putIfAbsent(findingsIndex, new ArrayList<>()); - findingsIndexToFindings.get(findingsIndex).add(finding); - }); - - return findingsIndexToFindings; - } - - private DocLevelQuery createDocLevelQuery(final Rule rule) { - return new DocLevelQuery(rule.getId(), rule.getId(), rule.getTags(), rule.getQuery()); - } -} diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/mappings/FileMappingProvider.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/mappings/FileMappingProvider.java new file mode 100644 index 0000000000..0d1c6e2e1a --- /dev/null +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/mappings/FileMappingProvider.java @@ -0,0 +1,28 @@ +package org.opensearch.dataprepper.plugins.processor.mappings; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.opensearch.dataprepper.plugins.processor.exceptions.MappingException; +import org.opensearch.dataprepper.plugins.processor.model.mappings.Mapping; + +import java.io.IOException; +import java.net.URL; + +public class FileMappingProvider { + private static final String MAPPING_PATH_FORMAT = "mappings/%s"; + + private final ObjectMapper objectMapper; + + public FileMappingProvider() { + this.objectMapper = new ObjectMapper(); + } + + public Mapping getMapping(final String mappingsFile) { + final String relativePath = String.format(MAPPING_PATH_FORMAT, mappingsFile); + final URL mappingsPath = getClass().getClassLoader().getResource(relativePath); + try { + return objectMapper.readValue(mappingsPath, Mapping.class); + } catch (final IOException e) { + throw new MappingException("Exception reading mapping: " + relativePath, e); + } + } +} diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/mappings/MappingParser.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/mappings/MappingParser.java deleted file mode 100644 index 05604cca22..0000000000 --- a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/mappings/MappingParser.java +++ /dev/null @@ -1,40 +0,0 @@ -package org.opensearch.dataprepper.plugins.processor.mappings; - -import com.fasterxml.jackson.databind.ObjectMapper; -import org.opensearch.dataprepper.plugins.processor.exceptions.MappingException; -import org.opensearch.dataprepper.plugins.processor.model.log.LogFormat; -import org.opensearch.dataprepper.plugins.processor.model.log.LogType; -import org.opensearch.dataprepper.plugins.processor.model.mappings.Mapping; - -import java.io.IOException; -import java.net.URL; -import java.util.Map; -import java.util.stream.Collectors; - -public class MappingParser { - private static final String MAPPING_PATH_FORMAT = "mappings/%s"; - - private final ObjectMapper objectMapper; - - public MappingParser(final ObjectMapper objectMapper) { - this.objectMapper = objectMapper; - } - - public Map parseMappings(final LogType logType, final LogFormat logFormat) { - final Mapping mapping = getMapping(logType); - - return mapping.getMappings().stream() - .map(fieldMapping -> Map.entry(fieldMapping.get(LogFormat.NONE.getKeyName()), fieldMapping.get(logFormat.getKeyName()))) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - } - - private Mapping getMapping(final LogType logType) { - final String relativePath = String.format(MAPPING_PATH_FORMAT, logType.getMappingsFile()); - final URL mappingsPath = getClass().getClassLoader().getResource(relativePath); - try { - return objectMapper.readValue(mappingsPath, Mapping.class); - } catch (final IOException e) { - throw new MappingException("Exception reading mapping: " + relativePath, e); - } - } -} diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/datatypes/CloudTrail.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/datatypes/CloudTrail.java new file mode 100644 index 0000000000..5fdfb874a0 --- /dev/null +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/datatypes/CloudTrail.java @@ -0,0 +1,29 @@ +package org.opensearch.dataprepper.plugins.processor.model.datatypes; + +public class CloudTrail extends DataType { + private final String eventName; + private final String eventSource; + + public CloudTrail(final String eventName, final String eventSource) { + super(); + this.eventName = eventName; + this.eventSource = eventSource; + } + + public String getEventName() { + return eventName; + } + + public String getEventSource() { + return eventSource; + } + + @Override + public Object getValue(final String fieldName) { + switch (fieldName) { + case "eventName": return eventName; + case "eventSource": return eventSource; + default: throw new IllegalArgumentException("Field " + fieldName + " does not exist in class " + getClass().getName()); + } + } +} diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/datatypes/DataType.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/datatypes/DataType.java new file mode 100644 index 0000000000..82708ef8d3 --- /dev/null +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/datatypes/DataType.java @@ -0,0 +1,25 @@ +package org.opensearch.dataprepper.plugins.processor.model.datatypes; + +import java.util.HashMap; + +public abstract class DataType { + private final HashMap metadata; + + public DataType() { + metadata = new HashMap<>(); + } + + public HashMap getMetadata() { + return metadata; + } + + public abstract Object getValue(final String fieldName); + + public void putMetadataValue(final String metadataFieldName, final String metadataFieldValue) { + metadata.put(metadataFieldName, metadataFieldValue); + } + + public String getMetadataValue(final String metadataFieldName) { + return metadata.get(metadataFieldName); + } +} diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/datatypes/OCSF.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/datatypes/OCSF.java new file mode 100644 index 0000000000..b7c2b1638f --- /dev/null +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/datatypes/OCSF.java @@ -0,0 +1,21 @@ +package org.opensearch.dataprepper.plugins.processor.model.datatypes; + +public class OCSF extends DataType { + private final String apiOperation; + private final String apiServiceName; + + public OCSF(final String apiOperation, final String apiServiceName) { + super(); + this.apiOperation = apiOperation; + this.apiServiceName = apiServiceName; + } + + @Override + public Object getValue(final String fieldName) { + switch (fieldName) { + case "api.operation": return apiOperation; + case "api.service.name": return apiServiceName; + default: throw new IllegalArgumentException("Field " + fieldName + " does not exist in class " + getClass().getName()); + } + } +} diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/event/EventWrapper.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/event/EventWrapper.java deleted file mode 100644 index c00df2a72d..0000000000 --- a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/event/EventWrapper.java +++ /dev/null @@ -1,27 +0,0 @@ -package org.opensearch.dataprepper.plugins.processor.model.event; - -import org.opensearch.dataprepper.model.event.Event; - -public class EventWrapper { - private final String indexName; - private final String docId; - private final Event event; - - public EventWrapper(final String indexName, final String docId, final Event event) { - this.indexName = indexName; - this.docId = docId; - this.event = event; - } - - public String getIndexName() { - return indexName; - } - - public String getDocId() { - return docId; - } - - public Event getEvent() { - return event; - } -} diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/findings/DocLevelQuery.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/findings/DocLevelQuery.java deleted file mode 100644 index 83373f3917..0000000000 --- a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/findings/DocLevelQuery.java +++ /dev/null @@ -1,33 +0,0 @@ -package org.opensearch.dataprepper.plugins.processor.model.findings; - -import java.util.List; - -public class DocLevelQuery { - private String id; - private String name; - private List tags; - private String query; - - public DocLevelQuery(final String id, final String name, final List tags, final String query) { - this.id = id; - this.name = name; - this.tags = tags; - this.query = query; - } - - public String getId() { - return id; - } - - public String getName() { - return name; - } - - public List getTags() { - return tags; - } - - public String getQuery() { - return query; - } -} diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/findings/Finding.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/findings/Finding.java deleted file mode 100644 index 8fcce3db0d..0000000000 --- a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/findings/Finding.java +++ /dev/null @@ -1,64 +0,0 @@ -package org.opensearch.dataprepper.plugins.processor.model.findings; - -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.util.List; - -public class Finding { - private String id; - @JsonProperty("related_doc_ids") - private List relatedDocIds; - @JsonProperty("correlated_doc_ids") - private List correlatedDocIds; - @JsonProperty("monitor_id") - private String monitorId; - @JsonProperty("monitor_name") - private String monitorName; - private String index; - private List queries; - private long timestamp; - - public Finding(final String id, final List relatedDocIds, final List correlatedDocIds, final String monitorId, - final String monitorName, final String index, final List queries, final long timestamp) { - this.id = id; - this.relatedDocIds = relatedDocIds; - this.correlatedDocIds = correlatedDocIds; - this.monitorId = monitorId; - this.monitorName = monitorName; - this.index = index; - this.queries = queries; - this.timestamp = timestamp; - } - - public String getId() { - return id; - } - - public List getRelatedDocIds() { - return relatedDocIds; - } - - public List getCorrelatedDocIds() { - return correlatedDocIds; - } - - public String getMonitorId() { - return monitorId; - } - - public String getMonitorName() { - return monitorName; - } - - public String getIndex() { - return index; - } - - public List getQueries() { - return queries; - } - - public long getTimestamp() { - return timestamp; - } -} diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/mappings/Mapping.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/mappings/Mapping.java index da5617685f..3cc7ec0b6e 100644 --- a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/mappings/Mapping.java +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/mappings/Mapping.java @@ -1,16 +1,26 @@ package org.opensearch.dataprepper.plugins.processor.model.mappings; +import org.opensearch.dataprepper.plugins.processor.model.log.LogFormat; + import java.util.List; import java.util.Map; +import java.util.stream.Collectors; public class Mapping { private List> mappings; - public List> getMappings() { - return mappings; - } - public void setMappings(final List> mappings) { this.mappings = mappings; } + + public Map getFieldMappingForLogFormat(final LogFormat logFormat) { + return getFieldMappingForLogFormat(logFormat.getKeyName()); + } + + public Map getFieldMappingForLogFormat(final String logFormat) { + // TODO - restructure mappings file to be map of raw name to map of other format mappings + return mappings.stream() + .map(fieldMapping -> Map.entry(fieldMapping.get(LogFormat.NONE.getKeyName()), fieldMapping.get(logFormat))) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } } diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/matches/Match.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/matches/Match.java new file mode 100644 index 0000000000..2b47976b38 --- /dev/null +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/matches/Match.java @@ -0,0 +1,24 @@ +package org.opensearch.dataprepper.plugins.processor.model.matches; + +import org.opensearch.dataprepper.plugins.processor.model.datatypes.DataType; +import org.opensearch.dataprepper.plugins.processor.rules.Rule; + +import java.util.List; + +public class Match { + private final DataType dataType; + private final List ruleMatches; + + public Match(final DataType dataType, final List ruleMatches) { + this.dataType = dataType; + this.ruleMatches = ruleMatches; + } + + public DataType getDataType() { + return dataType; + } + + public List getRuleMatches() { + return ruleMatches; + } +} diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/rule/RuleSchema.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/rule/RuleSchema.java new file mode 100644 index 0000000000..ace0e1073a --- /dev/null +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/rule/RuleSchema.java @@ -0,0 +1,21 @@ +package org.opensearch.dataprepper.plugins.processor.model.rule; + +import org.opensearch.dataprepper.plugins.processor.parser.RuleParser; +import org.opensearch.dataprepper.plugins.processor.parser.SigmaV1RuleParser; + +import java.util.Map; +import java.util.function.Function; + +public enum RuleSchema { + SIGMA_V1(SigmaV1RuleParser::new); + + private final Function, RuleParser> parserConstructor; + + RuleSchema(final Function, RuleParser> parserConstructor) { + this.parserConstructor = parserConstructor; + } + + public Function, RuleParser> getParserConstructor() { + return parserConstructor; + } +} diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/RuleParser.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/RuleParser.java new file mode 100644 index 0000000000..fca8468c8b --- /dev/null +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/RuleParser.java @@ -0,0 +1,8 @@ +package org.opensearch.dataprepper.plugins.processor.parser; + +import org.opensearch.dataprepper.plugins.processor.provider.rules.model.RuleData; +import org.opensearch.dataprepper.plugins.processor.rules.Rule; + +public interface RuleParser { + Rule parseRule(RuleData ruleData); +} diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/SigmaRuleParser.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/SigmaRuleParser.java deleted file mode 100644 index 2891ecb7a6..0000000000 --- a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/SigmaRuleParser.java +++ /dev/null @@ -1,7 +0,0 @@ -package org.opensearch.dataprepper.plugins.processor.parser; - -import org.opensearch.dataprepper.plugins.processor.parser.objects.SigmaRule; - -public interface SigmaRuleParser { - T parseRule(SigmaRule sigmaRule); -} diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/SigmaRulePredicateParser.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/SigmaV1RuleConditionParser.java similarity index 77% rename from data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/SigmaRulePredicateParser.java rename to data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/SigmaV1RuleConditionParser.java index a1827adf47..cfe03cf649 100644 --- a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/SigmaRulePredicateParser.java +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/SigmaV1RuleConditionParser.java @@ -1,8 +1,7 @@ package org.opensearch.dataprepper.plugins.processor.parser; -import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.plugins.processor.RuleEngineConfig; import org.opensearch.dataprepper.plugins.processor.formats.accessors.FieldAccessor; +import org.opensearch.dataprepper.plugins.processor.model.datatypes.DataType; import org.opensearch.dataprepper.plugins.processor.parser.condition.ConditionAND; import org.opensearch.dataprepper.plugins.processor.parser.condition.ConditionFieldEqualsValueExpression; import org.opensearch.dataprepper.plugins.processor.parser.condition.ConditionItem; @@ -21,20 +20,20 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Map; import java.util.function.Predicate; import java.util.stream.Stream; -public class SigmaRulePredicateParser implements SigmaRuleParser> { - private static final Logger LOG = LoggerFactory.getLogger(SigmaRulePredicateParser.class); +public class SigmaV1RuleConditionParser { + private static final Logger LOG = LoggerFactory.getLogger(SigmaV1RuleConditionParser.class); private final FieldAccessor fieldAccessor; - public SigmaRulePredicateParser(final RuleEngineConfig config) { - this.fieldAccessor = new FieldAccessor(config); + public SigmaV1RuleConditionParser(final Map mapping) { + this.fieldAccessor = new FieldAccessor(mapping); } - @Override - public Predicate parseRule(final SigmaRule sigmaRule) { + public Predicate parseRuleCondition(final SigmaRule sigmaRule) { return sigmaRule.getDetection().getParsedConditions().stream() .map(SigmaCondition::parsed) .map(this::parsePredicateFromConditionItem) @@ -44,7 +43,7 @@ public Predicate parseRule(final SigmaRule sigmaRule) { .orElse(x -> false); } - private Predicate parsePredicateFromConditionItem(final ConditionItem conditionItem) { + private Predicate parsePredicateFromConditionItem(final ConditionItem conditionItem) { if (conditionItem instanceof ConditionAND) { return convertAndCondition(conditionItem); } else if (conditionItem instanceof ConditionOR) { @@ -61,28 +60,28 @@ private Predicate parsePredicateFromConditionItem(final ConditionItem con } } - private Predicate convertAndCondition(final ConditionItem condition) { + private Predicate convertAndCondition(final ConditionItem condition) { return getPredicatesFromConditions(condition) .reduce(Predicate::and) // TODO - not sure on this, need to figure out why right would be a string to land here with an empty optional .orElse(x -> true); } - private Predicate convertOrCondition(final ConditionItem condition) { + private Predicate convertOrCondition(final ConditionItem condition) { return getPredicatesFromConditions(condition) .reduce(Predicate::or) // TODO - not sure on this, need to figure out why right would be a string to land here with an empty optional .orElse(x -> true); } - private Predicate convertNotCondition(final ConditionItem condition) { + private Predicate convertNotCondition(final ConditionItem condition) { return getPredicatesFromConditions(condition) .map(Predicate::negate) .findFirst() .orElseThrow(() -> new IllegalArgumentException("Expected exactly on condition for NOT operator")); } - private Stream> getPredicatesFromConditions(final ConditionItem condition) { + private Stream> getPredicatesFromConditions(final ConditionItem condition) { return condition.getArgs().stream() // Filter on is another condition .filter(Either::isLeft) @@ -90,7 +89,7 @@ private Stream> getPredicatesFromConditions(final ConditionItem .map(this::parsePredicateFromConditionItem); } - private Predicate convertFieldEquals(final ConditionFieldEqualsValueExpression condition) { + private Predicate convertFieldEquals(final ConditionFieldEqualsValueExpression condition) { final SigmaType conditionValue = condition.getValue(); if (conditionValue instanceof SigmaString) { @@ -108,36 +107,31 @@ private Predicate convertFieldEquals(final ConditionFieldEqualsValueExpre } } - private Predicate convertStringEquals(final ConditionFieldEqualsValueExpression condition) { + private Predicate convertStringEquals(final ConditionFieldEqualsValueExpression condition) { final SigmaString sigmaString = (SigmaString) condition.getValue(); return event -> { - //LOG.info("Field: {}", condition.getField()); - //LOG.info("Expected value: {}", sigmaString.getOriginal()); - //LOG.info("Event: {}", event.toJsonString()); - final String value = fieldAccessor.getStringValue(event, condition.getField()); - //LOG.info("Value: {}", value); return sigmaString.getOriginal().equals(value); }; } - private Predicate convertBooleanEquals(final ConditionFieldEqualsValueExpression condition) { + private Predicate convertBooleanEquals(final ConditionFieldEqualsValueExpression condition) { final SigmaBool sigmaBool = (SigmaBool) condition.getValue(); return event -> sigmaBool.getBoolean().equals(fieldAccessor.getBooleanValue(event, condition.getField())); } - private Predicate convertIntegerEquals(final ConditionFieldEqualsValueExpression condition) { + private Predicate convertIntegerEquals(final ConditionFieldEqualsValueExpression condition) { final SigmaInteger sigmaInteger = (SigmaInteger) condition.getValue(); return event -> sigmaInteger.getInteger().equals(fieldAccessor.getIntegerValue(event, condition.getField())); } - private Predicate convertFloatEquals(final ConditionFieldEqualsValueExpression condition) { + private Predicate convertFloatEquals(final ConditionFieldEqualsValueExpression condition) { final SigmaFloat sigmaFloat = (SigmaFloat) condition.getValue(); return event -> sigmaFloat.getFloat().equals(fieldAccessor.getFloatValue(event, condition.getField())); } - private Predicate convertNullEquals(final ConditionFieldEqualsValueExpression condition) { + private Predicate convertNullEquals(final ConditionFieldEqualsValueExpression condition) { return event -> fieldAccessor.getObjectValue(event, condition.getField()) == null; } } diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/SigmaV1RuleParser.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/SigmaV1RuleParser.java new file mode 100644 index 0000000000..a0525dab48 --- /dev/null +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/SigmaV1RuleParser.java @@ -0,0 +1,35 @@ +package org.opensearch.dataprepper.plugins.processor.parser; + +import org.opensearch.dataprepper.plugins.processor.model.datatypes.DataType; +import org.opensearch.dataprepper.plugins.processor.parser.objects.SigmaRule; +import org.opensearch.dataprepper.plugins.processor.parser.objects.SigmaRuleTag; +import org.opensearch.dataprepper.plugins.processor.provider.rules.model.RuleData; +import org.opensearch.dataprepper.plugins.processor.rules.Rule; +import org.opensearch.dataprepper.plugins.processor.rules.SigmaV1Rule; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; + +public class SigmaV1RuleParser implements RuleParser { + private final SigmaV1RuleConditionParser conditionParser; + + public SigmaV1RuleParser(final Map mapping) { + this.conditionParser = new SigmaV1RuleConditionParser(mapping); + } + + @Override + public Rule parseRule(final RuleData ruleData) { + final SigmaRule sigmaRule = SigmaRule.fromYaml(ruleData.getRuleAsString(), true); + final Predicate ruleCondition = conditionParser.parseRuleCondition(sigmaRule); + final List tags = new ArrayList<>(); + tags.add(sigmaRule.getLevel().toString()); + tags.add(sigmaRule.getLogSource().getService()); + sigmaRule.getTags().stream() + .map(SigmaRuleTag::toString) + .forEach(tags::add); + + return new SigmaV1Rule(sigmaRule.getTitle(), sigmaRule.getId().toString(), tags, ruleCondition, ruleData.getEvaluationCondition()); + } +} diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/condition/ConditionAND.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/condition/ConditionAND.java index c1a89db7b0..a255b57115 100644 --- a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/condition/ConditionAND.java +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/condition/ConditionAND.java @@ -4,7 +4,6 @@ */ package org.opensearch.dataprepper.plugins.processor.parser.condition; -import org.opensearch.dataprepper.plugins.processor.parser.utils.AnyOneOf; import org.opensearch.dataprepper.plugins.processor.parser.utils.Either; import java.util.List; diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/condition/ConditionIdentifier.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/condition/ConditionIdentifier.java index 904be6f965..a9c49dfc74 100644 --- a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/condition/ConditionIdentifier.java +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/condition/ConditionIdentifier.java @@ -7,7 +7,6 @@ import org.opensearch.dataprepper.plugins.processor.parser.exceptions.SigmaConditionError; import org.opensearch.dataprepper.plugins.processor.parser.objects.SigmaDetection; import org.opensearch.dataprepper.plugins.processor.parser.objects.SigmaDetections; -import org.opensearch.dataprepper.plugins.processor.parser.utils.AnyOneOf; import org.opensearch.dataprepper.plugins.processor.parser.utils.Either; import java.util.List; diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/condition/ConditionItem.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/condition/ConditionItem.java index 2c22a4a03c..84fac36218 100644 --- a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/condition/ConditionItem.java +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/condition/ConditionItem.java @@ -7,7 +7,6 @@ import org.opensearch.dataprepper.plugins.processor.parser.exceptions.SigmaConditionError; import org.opensearch.dataprepper.plugins.processor.parser.objects.SigmaDetectionItem; import org.opensearch.dataprepper.plugins.processor.parser.objects.SigmaDetections; -import org.opensearch.dataprepper.plugins.processor.parser.utils.AnyOneOf; import org.opensearch.dataprepper.plugins.processor.parser.utils.Either; import java.util.ArrayList; diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/condition/ConditionNOT.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/condition/ConditionNOT.java index cdf3affbb4..817dc55250 100644 --- a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/condition/ConditionNOT.java +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/condition/ConditionNOT.java @@ -4,7 +4,6 @@ */ package org.opensearch.dataprepper.plugins.processor.parser.condition; -import org.opensearch.dataprepper.plugins.processor.parser.utils.AnyOneOf; import org.opensearch.dataprepper.plugins.processor.parser.utils.Either; import java.util.List; diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/condition/ConditionOR.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/condition/ConditionOR.java index b71ab5bfc7..02dff0f11f 100644 --- a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/condition/ConditionOR.java +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/condition/ConditionOR.java @@ -4,7 +4,6 @@ */ package org.opensearch.dataprepper.plugins.processor.parser.condition; -import org.opensearch.dataprepper.plugins.processor.parser.utils.AnyOneOf; import org.opensearch.dataprepper.plugins.processor.parser.utils.Either; import java.util.List; diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/condition/ConditionSelector.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/condition/ConditionSelector.java index 6d297ccfee..c815f7ad87 100644 --- a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/condition/ConditionSelector.java +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/condition/ConditionSelector.java @@ -7,7 +7,6 @@ import org.opensearch.dataprepper.plugins.processor.parser.exceptions.SigmaConditionError; import org.opensearch.dataprepper.plugins.processor.parser.objects.SigmaDetectionItem; import org.opensearch.dataprepper.plugins.processor.parser.objects.SigmaDetections; -import org.opensearch.dataprepper.plugins.processor.parser.utils.AnyOneOf; import org.opensearch.dataprepper.plugins.processor.parser.utils.Either; import java.util.ArrayList; diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/condition/ConditionTraverseVisitor.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/condition/ConditionTraverseVisitor.java index 76b6ab341e..a88c78ff79 100644 --- a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/condition/ConditionTraverseVisitor.java +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/condition/ConditionTraverseVisitor.java @@ -6,7 +6,6 @@ import org.opensearch.dataprepper.plugins.processor.parser.exceptions.SigmaConditionError; import org.opensearch.dataprepper.plugins.processor.parser.objects.SigmaCondition; -import org.opensearch.dataprepper.plugins.processor.parser.utils.AnyOneOf; import org.opensearch.dataprepper.plugins.processor.parser.utils.Either; import org.opensearch.dataprepper.plugins.processor.rules.antlr.ConditionBaseVisitor; import org.opensearch.dataprepper.plugins.processor.rules.antlr.ConditionParser; diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/objects/SigmaCondition.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/objects/SigmaCondition.java index 55e5305e76..7dec12a160 100644 --- a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/objects/SigmaCondition.java +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/objects/SigmaCondition.java @@ -13,7 +13,6 @@ import org.opensearch.dataprepper.plugins.processor.parser.condition.ConditionTraverseVisitor; import org.opensearch.dataprepper.plugins.processor.parser.condition.ConditionValueExpression; import org.opensearch.dataprepper.plugins.processor.parser.exceptions.SigmaConditionError; -import org.opensearch.dataprepper.plugins.processor.parser.utils.AnyOneOf; import org.opensearch.dataprepper.plugins.processor.parser.utils.Either; import org.opensearch.dataprepper.plugins.processor.rules.antlr.ConditionLexer; import org.opensearch.dataprepper.plugins.processor.rules.antlr.ConditionParser; diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/objects/SigmaDetection.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/objects/SigmaDetection.java index 024933219e..ac511d89fe 100644 --- a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/objects/SigmaDetection.java +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/objects/SigmaDetection.java @@ -14,7 +14,6 @@ import org.opensearch.dataprepper.plugins.processor.parser.exceptions.SigmaModifierError; import org.opensearch.dataprepper.plugins.processor.parser.exceptions.SigmaRegularExpressionError; import org.opensearch.dataprepper.plugins.processor.parser.exceptions.SigmaValueError; -import org.opensearch.dataprepper.plugins.processor.parser.utils.AnyOneOf; import org.opensearch.dataprepper.plugins.processor.parser.utils.Either; import java.util.ArrayList; diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/objects/SigmaDetectionItem.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/objects/SigmaDetectionItem.java index e8ff51d0c9..d4dabc67fa 100644 --- a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/objects/SigmaDetectionItem.java +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/objects/SigmaDetectionItem.java @@ -20,7 +20,6 @@ import org.opensearch.dataprepper.plugins.processor.parser.types.SigmaNull; import org.opensearch.dataprepper.plugins.processor.parser.types.SigmaType; import org.opensearch.dataprepper.plugins.processor.parser.types.SigmaTypeFacade; -import org.opensearch.dataprepper.plugins.processor.parser.utils.AnyOneOf; import org.opensearch.dataprepper.plugins.processor.parser.utils.Either; import java.util.ArrayList; diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/provider/rules/RuleProvider.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/provider/rules/RuleProvider.java new file mode 100644 index 0000000000..efefdc1798 --- /dev/null +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/provider/rules/RuleProvider.java @@ -0,0 +1,14 @@ +package org.opensearch.dataprepper.plugins.processor.provider.rules; + +import org.opensearch.dataprepper.plugins.processor.provider.rules.model.RuleData; + +import java.util.List; + +public interface RuleProvider { + /** + * Gets the data of the rules to be evaluated + * + * @return - a list of RuleData that is used by the RuleParser to generate Rules + */ + List getRules(); +} diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/provider/rules/model/RuleData.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/provider/rules/model/RuleData.java new file mode 100644 index 0000000000..9d395a2ef5 --- /dev/null +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/provider/rules/model/RuleData.java @@ -0,0 +1,30 @@ +package org.opensearch.dataprepper.plugins.processor.provider.rules.model; + +import org.opensearch.dataprepper.plugins.processor.model.datatypes.DataType; +import org.opensearch.dataprepper.plugins.processor.util.Predicates; + +import java.util.function.Predicate; + +public class RuleData { + private final String ruleAsString; + private final Predicate evaluationCondition; + + public RuleData(final String ruleAsString, final Predicate evaluationCondition) { + this.ruleAsString = ruleAsString; + this.evaluationCondition = evaluationCondition; + } + + // Helper method for always evaluate rules + public RuleData(final String ruleAsString) { + this.ruleAsString = ruleAsString; + evaluationCondition = Predicates.ALWAYS_TRUE.getValue(); + } + + public String getRuleAsString() { + return ruleAsString; + } + + public Predicate getEvaluationCondition() { + return evaluationCondition; + } +} diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/provider/rules/opensearch/OpenSearchRuleProvider.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/provider/rules/opensearch/OpenSearchRuleProvider.java new file mode 100644 index 0000000000..857633e45e --- /dev/null +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/provider/rules/opensearch/OpenSearchRuleProvider.java @@ -0,0 +1,203 @@ +package org.opensearch.dataprepper.plugins.processor.provider.rules.opensearch; + +import org.apache.commons.lang3.tuple.Pair; +import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.client.opensearch.core.MgetRequest; +import org.opensearch.client.opensearch.core.MgetResponse; +import org.opensearch.client.opensearch.core.SearchRequest; +import org.opensearch.client.opensearch.core.SearchResponse; +import org.opensearch.client.opensearch.core.mget.MultiGetResponseItem; +import org.opensearch.client.opensearch.core.search.Hit; +import org.opensearch.dataprepper.plugins.processor.exceptions.RuleRefreshException; +import org.opensearch.dataprepper.plugins.processor.model.datatypes.DataType; +import org.opensearch.dataprepper.plugins.processor.provider.rules.model.RuleData; +import org.opensearch.dataprepper.plugins.processor.provider.rules.opensearch.model.Detector; +import org.opensearch.dataprepper.plugins.processor.provider.rules.opensearch.model.DetectorInput; +import org.opensearch.dataprepper.plugins.processor.provider.rules.opensearch.model.DetectorRule; +import org.opensearch.dataprepper.plugins.processor.provider.rules.opensearch.model.DetectorWrapper; +import org.opensearch.dataprepper.plugins.processor.provider.rules.RuleProvider; +import org.opensearch.dataprepper.plugins.processor.provider.rules.opensearch.model.Input; +import org.opensearch.dataprepper.plugins.processor.provider.rules.opensearch.model.RuleWrapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class OpenSearchRuleProvider implements RuleProvider { + private static final Logger LOG = LoggerFactory.getLogger(OpenSearchRuleProvider.class); + + private static final String DETECTORS_INDEX = ".opensearch-sap-detectors-config"; + private static final String PREPACKAGED_RULES_INDEX = ".opensearch-sap-pre-packaged-rules-config"; + private static final String CUSTOM_RULES_INDEX = ".opensearch-sap-custom-rules-config"; + private static final String INDEX_FIELD_NAME = "index"; + + private final OpenSearchClient openSearchClient; + + public OpenSearchRuleProvider(final OpenSearchClient openSearchClient) { + this.openSearchClient = openSearchClient; + } + + @Override + public List getRules() { + final List detectors = getDetectors(); + final Map, List>> detectorToRuleIds = getDetectorToRuleIds(detectors); + final Map ruleIdToRuleAsString = getRuleIdToRuleAsString(detectorToRuleIds); + + return buildRules(detectorToRuleIds, ruleIdToRuleAsString); + } + + private List getDetectors() { + final SearchRequest listDetectorsRequest = getListDetectorsRequest(); + + try { + final SearchResponse listDetectorsResponse = openSearchClient.search(listDetectorsRequest, DetectorWrapper.class); + return parseDetectors(listDetectorsResponse); + } catch (final Exception e) { + throw new RuleRefreshException("Exception listing detectors", e); + } + } + + // TODO - build API in OpenSearch? + private SearchRequest getListDetectorsRequest() { + return new SearchRequest.Builder() + .index(DETECTORS_INDEX) + .size(10000) // TODO - pagination + .build(); + } + + private List parseDetectors(final SearchResponse listDetectorsResponse) { + return listDetectorsResponse.hits().hits().stream() + .map(Hit::source) + .filter(Objects::nonNull) + .map(DetectorWrapper::getDetector) + .collect(Collectors.toList()); + } + + private Map, List>> getDetectorToRuleIds(final List detectors) { + return detectors.stream() + .map(detector -> { + final List prepackagedRuleIds = getRuleIds(detector, DetectorInput::getPrePackagedRules); + final List customRuleIds = getRuleIds(detector, DetectorInput::getCustomRules); + + return Map.entry(detector, Pair.of(prepackagedRuleIds, customRuleIds)); + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + private List getRuleIds(final Detector detector, final Function> ruleGetter) { + return detector.getInputs().stream() + .map(Input::getDetectorInput) + .map(ruleGetter) + .flatMap(Collection::stream) + .map(DetectorRule::getId) + .collect(Collectors.toList()); + } + + private Map getRuleIdToRuleAsString(final Map, List>> detectorToRuleIds) { + final Set prepackagedRuleIds = getRuleIdsSet(detectorToRuleIds, Pair::getLeft); + final Set customRuleIds = getRuleIdsSet(detectorToRuleIds, Pair::getRight); + + final Optional getPrepackagedRulesRequest = getMgetRequest(PREPACKAGED_RULES_INDEX, prepackagedRuleIds); + final Optional getCustomRulesRequest = getMgetRequest(CUSTOM_RULES_INDEX, customRuleIds); + + final HashMap ruleIdToRuleAsString = new HashMap<>(); + ruleIdToRuleAsString.putAll(fetchRules(getPrepackagedRulesRequest)); + ruleIdToRuleAsString.putAll(fetchRules(getCustomRulesRequest)); + + return ruleIdToRuleAsString; + } + + private Set getRuleIdsSet(final Map, List>> detectorToRuleIds, + final Function, List>, List> rulesGetter) { + return detectorToRuleIds.values().stream() + .map(rulesGetter) + .flatMap(Collection::stream) + .collect(Collectors.toSet()); + } + + private Optional getMgetRequest(final String index, final Set docIds) { + if (docIds.isEmpty()) { + return Optional.empty(); + } + + final List docIdsList = new ArrayList<>(docIds); + + return Optional.of(new MgetRequest.Builder() + .index(index) + .ids(docIdsList) + .build()); + } + + private Map fetchRules(final Optional optionalGetRulesRequest) { + if (optionalGetRulesRequest.isEmpty()) { + return Collections.emptyMap(); + } + + try { + final MgetResponse getRulesResponse = openSearchClient.mget(optionalGetRulesRequest.get(), RuleWrapper.class); + return parseGetRulesResponse(getRulesResponse); + } catch (final Exception e) { + throw new RuleRefreshException("Exception getting prepackaged rules", e); + } + } + + private Map parseGetRulesResponse(final MgetResponse getRulesResponse) { + return getRulesResponse.docs().stream() + .map(MultiGetResponseItem::result) + .filter(ruleWrapperGetResult -> Objects.nonNull(ruleWrapperGetResult.source())) + .map(ruleWrapperGetResult -> { + final String ruleId = ruleWrapperGetResult.id(); + final String ruleAsString = ruleWrapperGetResult.source().getRule().getRule(); + + return Map.entry(ruleId, ruleAsString); + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + private List buildRules(final Map, List>> detectorToRuleIds, + final Map ruleIdToRuleAsString) { + return detectorToRuleIds.entrySet().stream() + .map(mapEntry -> buildDetectorRules(mapEntry.getKey(), mapEntry.getValue(), ruleIdToRuleAsString)) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + } + + private List buildDetectorRules(final Detector detector, final Pair, List> ruleIdsPair, + final Map ruleIdToRuleAsString) { + final Predicate evaluationCondition = getDetectorEvaluationCondition(detector); + + return Stream.concat(ruleIdsPair.getLeft().stream(), ruleIdsPair.getRight().stream()) + .map(ruleId -> new RuleData(ruleIdToRuleAsString.get(ruleId), evaluationCondition)) + .collect(Collectors.toList()); + } + + private Predicate getDetectorEvaluationCondition(final Detector detector) { + // TODO - aliases, index patterns, etc + final Set detectorIndices = detector.getInputs().stream() + .map(Input::getDetectorInput) + .map(DetectorInput::getIndices) + .flatMap(Collection::stream) + .collect(Collectors.toSet()); + + return dataType -> { + final String index = dataType.getMetadataValue(INDEX_FIELD_NAME); + if (index == null) { + return false; + } + + return detectorIndices.contains(index); + }; + } +} diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/detector/Detector.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/provider/rules/opensearch/model/Detector.java similarity index 92% rename from data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/detector/Detector.java rename to data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/provider/rules/opensearch/model/Detector.java index b01ff9fe16..373939a0a8 100644 --- a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/detector/Detector.java +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/provider/rules/opensearch/model/Detector.java @@ -1,4 +1,4 @@ -package org.opensearch.dataprepper.plugins.processor.model.detector; +package org.opensearch.dataprepper.plugins.processor.provider.rules.opensearch.model; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/detector/DetectorInput.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/provider/rules/opensearch/model/DetectorInput.java similarity index 92% rename from data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/detector/DetectorInput.java rename to data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/provider/rules/opensearch/model/DetectorInput.java index 190e5901cc..901ab36c42 100644 --- a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/detector/DetectorInput.java +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/provider/rules/opensearch/model/DetectorInput.java @@ -1,4 +1,4 @@ -package org.opensearch.dataprepper.plugins.processor.model.detector; +package org.opensearch.dataprepper.plugins.processor.provider.rules.opensearch.model; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/detector/DetectorRule.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/provider/rules/opensearch/model/DetectorRule.java similarity index 76% rename from data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/detector/DetectorRule.java rename to data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/provider/rules/opensearch/model/DetectorRule.java index f812417dcf..5e8aa8ddd0 100644 --- a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/detector/DetectorRule.java +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/provider/rules/opensearch/model/DetectorRule.java @@ -1,4 +1,4 @@ -package org.opensearch.dataprepper.plugins.processor.model.detector; +package org.opensearch.dataprepper.plugins.processor.provider.rules.opensearch.model; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/detector/DetectorWrapper.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/provider/rules/opensearch/model/DetectorWrapper.java similarity index 79% rename from data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/detector/DetectorWrapper.java rename to data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/provider/rules/opensearch/model/DetectorWrapper.java index 763cb629e4..0f1781ccad 100644 --- a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/detector/DetectorWrapper.java +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/provider/rules/opensearch/model/DetectorWrapper.java @@ -1,4 +1,4 @@ -package org.opensearch.dataprepper.plugins.processor.model.detector; +package org.opensearch.dataprepper.plugins.processor.provider.rules.opensearch.model; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/detector/Input.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/provider/rules/opensearch/model/Input.java similarity index 84% rename from data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/detector/Input.java rename to data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/provider/rules/opensearch/model/Input.java index 605869a3a5..d707584215 100644 --- a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/detector/Input.java +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/provider/rules/opensearch/model/Input.java @@ -1,4 +1,4 @@ -package org.opensearch.dataprepper.plugins.processor.model.detector; +package org.opensearch.dataprepper.plugins.processor.provider.rules.opensearch.model; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/rule/Rule.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/provider/rules/opensearch/model/Rule.java similarity index 76% rename from data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/rule/Rule.java rename to data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/provider/rules/opensearch/model/Rule.java index 2ba6beed8d..013eed4fe7 100644 --- a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/rule/Rule.java +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/provider/rules/opensearch/model/Rule.java @@ -1,4 +1,4 @@ -package org.opensearch.dataprepper.plugins.processor.model.rule; +package org.opensearch.dataprepper.plugins.processor.provider.rules.opensearch.model; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/rule/RuleWrapper.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/provider/rules/opensearch/model/RuleWrapper.java similarity index 67% rename from data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/rule/RuleWrapper.java rename to data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/provider/rules/opensearch/model/RuleWrapper.java index 05da5a11e9..25f451cb47 100644 --- a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/rule/RuleWrapper.java +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/provider/rules/opensearch/model/RuleWrapper.java @@ -1,4 +1,4 @@ -package org.opensearch.dataprepper.plugins.processor.model.rule; +package org.opensearch.dataprepper.plugins.processor.provider.rules.opensearch.model; public class RuleWrapper { private Rule rule; diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/registrar/MappingRegistrar.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/registrar/MappingRegistrar.java new file mode 100644 index 0000000000..0837b2accd --- /dev/null +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/registrar/MappingRegistrar.java @@ -0,0 +1,46 @@ +package org.opensearch.dataprepper.plugins.processor.registrar; + +import org.opensearch.dataprepper.plugins.processor.mappings.FileMappingProvider; +import org.opensearch.dataprepper.plugins.processor.model.log.LogType; +import org.opensearch.dataprepper.plugins.processor.model.mappings.Mapping; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.function.Supplier; + +public class MappingRegistrar { + private static final Logger LOG = LoggerFactory.getLogger(MappingRegistrar.class); + + private final Map> mappingSuppliers; + + public MappingRegistrar() { + mappingSuppliers = new HashMap<>(); + + final FileMappingProvider mappingProvider = new FileMappingProvider(); + Arrays.stream(LogType.values()).forEach(logType -> + registerMapping(logType.name().toLowerCase(), () -> mappingProvider.getMapping(logType.getMappingsFile()))); + } + + public void registerMapping(final String logType, final Supplier mappingSupplier) { + mappingSuppliers.put(logType, mappingSupplier); + } + + public Map getMapping(final String logType, final String logFormat) { + final Optional> optionalMapping = Optional.of(mappingSuppliers) + .map(mappingSuppliers -> mappingSuppliers.get(logType)) + .map(Supplier::get) + .map(mapping -> mapping.getFieldMappingForLogFormat(logFormat)); + + if (optionalMapping.isEmpty()) { + LOG.warn("No mappings found for log type {} and log format {}. Using empty mappings", logType, logFormat); + return Collections.emptyMap(); + } + + return optionalMapping.get(); + } +} diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/registrar/RuleProviderRegistrar.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/registrar/RuleProviderRegistrar.java new file mode 100644 index 0000000000..732d901fd8 --- /dev/null +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/registrar/RuleProviderRegistrar.java @@ -0,0 +1,28 @@ +package org.opensearch.dataprepper.plugins.processor.registrar; + +import org.opensearch.dataprepper.plugins.processor.provider.rules.RuleProvider; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Supplier; + +public class RuleProviderRegistrar { + private final Map> ruleProviderSuppliers; + + public RuleProviderRegistrar() { + ruleProviderSuppliers = new HashMap<>(); + } + + public void registerRuleProvider(final String ruleLocation, final Supplier ruleProviderSupplier) { + ruleProviderSuppliers.put(ruleLocation, ruleProviderSupplier); + } + + public RuleProvider getRuleProvider(final String ruleLocation) { + final Supplier ruleProviderSupplier = ruleProviderSuppliers.get(ruleLocation); + if (ruleProviderSupplier == null) { + throw new IllegalArgumentException("No RuleProvider registered for location: " + ruleLocation); + } + + return ruleProviderSupplier.get(); + } +} diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/rules/DetectorDTO.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/rules/DetectorDTO.java deleted file mode 100644 index eb67a62808..0000000000 --- a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/rules/DetectorDTO.java +++ /dev/null @@ -1,33 +0,0 @@ -package org.opensearch.dataprepper.plugins.processor.rules; - -import java.util.Set; - -public class DetectorDTO { - private Set ruleIds; - private String monitorID; - private String monitorName; - private String findingsIndex; - - public DetectorDTO(final Set ruleIds, final String monitorID, final String monitorName, final String findingsIndex) { - this.ruleIds = ruleIds; - this.monitorID = monitorID; - this.monitorName = monitorName; - this.findingsIndex = findingsIndex; - } - - public Set getRuleIds() { - return ruleIds; - } - - public String getMonitorID() { - return monitorID; - } - - public String getMonitorName() { - return monitorName; - } - - public String getFindingsIndex() { - return findingsIndex; - } -} diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/rules/Rule.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/rules/Rule.java index be18cc3af4..c387bc575d 100644 --- a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/rules/Rule.java +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/rules/Rule.java @@ -1,69 +1,29 @@ package org.opensearch.dataprepper.plugins.processor.rules; -import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.plugins.processor.model.datatypes.DataType; -import java.util.List; import java.util.function.Predicate; -public class Rule { - private final String title; - private final String id; - private final Predicate condition; - private final List tags; - private final String query; - private String monitorId; - private String monitorName; - private String findingsIndex; +public abstract class Rule { + private final Predicate ruleCondition; + private final Predicate evaluationCondition; - public Rule(final String title, final String id, final Predicate condition, final List tags) { - this.title = title; - this.id = id; - this.condition = condition; - this.tags = tags; - this.query = "PLACEHOLDER_QUERY"; + public Rule(final Predicate ruleCondition, final Predicate evaluationCondition) { + this.ruleCondition = ruleCondition; + this.evaluationCondition = evaluationCondition; } - public String getTitle() { - return title; + // Helper for always evaluate rules + public Rule(final Predicate ruleCondition) { + this.ruleCondition = ruleCondition; + this.evaluationCondition = i -> true; } - public String getId() { - return id; + public Predicate getRuleCondition() { + return ruleCondition; } - public Predicate getCondition() { - return condition; - } - - public List getTags() { - return tags; - } - - public String getQuery() { - return query; - } - - public String getMonitorId() { - return monitorId; - } - - public String getMonitorName() { - return monitorName; - } - - public String getFindingsIndex() { - return findingsIndex; - } - - public void setMonitorId(final String monitorId) { - this.monitorId = monitorId; - } - - public void setMonitorName(String monitorName) { - this.monitorName = monitorName; - } - - public void setFindingsIndex(final String findingsIndex) { - this.findingsIndex = findingsIndex; + public Predicate getEvaluationCondition() { + return evaluationCondition; } } diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/rules/RuleConverter.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/rules/RuleConverter.java deleted file mode 100644 index 048ff59111..0000000000 --- a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/rules/RuleConverter.java +++ /dev/null @@ -1,35 +0,0 @@ -package org.opensearch.dataprepper.plugins.processor.rules; - -import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.plugins.processor.RuleEngineConfig; -import org.opensearch.dataprepper.plugins.processor.parser.SigmaRulePredicateParser; -import org.opensearch.dataprepper.plugins.processor.parser.objects.SigmaRule; -import org.opensearch.dataprepper.plugins.processor.parser.objects.SigmaRuleTag; - -import java.util.ArrayList; -import java.util.List; -import java.util.function.Function; -import java.util.function.Predicate; -import java.util.stream.Collectors; - -public class RuleConverter implements Function { - private final SigmaRulePredicateParser sigmaRulePredicateParser; - - public RuleConverter(final RuleEngineConfig config) { - this.sigmaRulePredicateParser = new SigmaRulePredicateParser(config); - } - - @Override - public Rule apply(final SigmaRule sigmaRule) { - final Predicate predicate = sigmaRulePredicateParser.parseRule(sigmaRule); - - final List tags = new ArrayList<>(); - tags.add(sigmaRule.getLevel().toString()); - tags.add(sigmaRule.getLogSource().getService()); - sigmaRule.getTags().stream() - .map(SigmaRuleTag::toString) - .forEach(tags::add); - - return new Rule(sigmaRule.getTitle(), sigmaRule.getId().toString(), predicate, tags); - } -} diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/rules/RuleFetcher.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/rules/RuleFetcher.java deleted file mode 100644 index e169c08951..0000000000 --- a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/rules/RuleFetcher.java +++ /dev/null @@ -1,245 +0,0 @@ -package org.opensearch.dataprepper.plugins.processor.rules; - -import org.opensearch.client.opensearch.OpenSearchClient; -import org.opensearch.client.opensearch.core.MgetRequest; -import org.opensearch.client.opensearch.core.MgetResponse; -import org.opensearch.client.opensearch.core.SearchRequest; -import org.opensearch.client.opensearch.core.SearchResponse; -import org.opensearch.client.opensearch.core.mget.MultiGetResponseItem; -import org.opensearch.client.opensearch.core.search.Hit; -import org.opensearch.dataprepper.plugins.processor.exceptions.RuleRefreshException; -import org.opensearch.dataprepper.plugins.processor.model.detector.Detector; -import org.opensearch.dataprepper.plugins.processor.model.detector.DetectorInput; -import org.opensearch.dataprepper.plugins.processor.model.detector.DetectorRule; -import org.opensearch.dataprepper.plugins.processor.model.detector.DetectorWrapper; -import org.opensearch.dataprepper.plugins.processor.model.detector.Input; -import org.opensearch.dataprepper.plugins.processor.model.rule.RuleWrapper; -import org.opensearch.dataprepper.plugins.processor.parser.objects.SigmaRule; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; -import java.util.function.Function; -import java.util.stream.Collectors; - -public class RuleFetcher implements Runnable { - private static final Logger LOG = LoggerFactory.getLogger(RuleFetcher.class); - - private static final String DETECTORS_INDEX = ".opensearch-sap-detectors-config"; - private static final String PREPACKAGED_RULES_INDEX = ".opensearch-sap-pre-packaged-rules-config"; - private static final String CUSTOM_RULES_INDEX = ".opensearch-sap-custom-rules-config"; - - private final OpenSearchClient openSearchClient; - private final RuleStore sigmaRuleStore; - private final RuleConverter ruleConverter; - - public RuleFetcher(final OpenSearchClient openSearchClient, - final RuleStore sigmaRuleStore, - final RuleConverter ruleConverter) { - this.openSearchClient = openSearchClient; - this.sigmaRuleStore = sigmaRuleStore; - this.ruleConverter = ruleConverter; - } - - @Override - public void run() { - try { - LOG.debug("Starting rule fetch"); - final List detectors = getDetectors(); - LOG.debug("Found detectors: {}", detectors); - - final Map> indexToPrepackagedRules = getIndexToPrepackagedRules(detectors); - final Map> indexToCustomRules = getIndexToCustomRules(detectors); - - final Map> mergedIndexToRules = mergeIndexToRules(List.of(indexToPrepackagedRules, indexToCustomRules)); - sigmaRuleStore.updateRuleStore(mergedIndexToRules); - } catch (final Exception e) { - LOG.error("Caught exception refreshing rules", e); - } - } - - private List getDetectors() { - final SearchRequest listDetectorsRequest = getListDetectorsRequest(); - - try { - final SearchResponse listDetectorsResponse = openSearchClient.search(listDetectorsRequest, DetectorWrapper.class); - return parseDetectors(listDetectorsResponse); - } catch (final Exception e) { - throw new RuleRefreshException("Exception listing detectors", e); - } - } - - // TODO - build API in OpenSearch - private SearchRequest getListDetectorsRequest() { - return new SearchRequest.Builder() - .index(DETECTORS_INDEX) - .size(10000) // TODO - pagination - .build(); - } - - private List parseDetectors(final SearchResponse listDetectorsResponse) { - return listDetectorsResponse.hits().hits().stream() - .map(Hit::source) - .filter(Objects::nonNull) - .map(DetectorWrapper::getDetector) - .collect(Collectors.toList()); - } - - private Map> getIndexToPrepackagedRules(final List detectors) { - final Map> indexToPrepackagedRuleIds = getIndexToDetectorMetadata(detectors, DetectorInput::getPrePackagedRules); - final Set prepackagedRuleIds = indexToPrepackagedRuleIds.values().stream() - .flatMap(Collection::stream) - .map(DetectorDTO::getRuleIds) - .flatMap(Collection::stream) - .collect(Collectors.toSet()); - final Optional getPrepackagedRulesRequest = getMgetRequest(PREPACKAGED_RULES_INDEX, prepackagedRuleIds); - - if (getPrepackagedRulesRequest.isEmpty()) { - // No rules to get - return Collections.emptyMap(); - } - - try { - final MgetResponse getRulesResponse = openSearchClient.mget(getPrepackagedRulesRequest.get(), RuleWrapper.class); - return getIndexToRules(indexToPrepackagedRuleIds, getRulesResponse); - } catch (final Exception e) { - throw new RuleRefreshException("Exception getting prepackaged rules", e); - } - } - - private Map> getIndexToCustomRules(final List detectors) { - final Map> indexToCustomRuleIds = getIndexToDetectorMetadata(detectors, DetectorInput::getCustomRules); - final Set customRuleIds = indexToCustomRuleIds.values().stream() - .flatMap(Collection::stream) - .map(DetectorDTO::getRuleIds) - .flatMap(Collection::stream) - .collect(Collectors.toSet()); - final Optional getCustomRulesRequest = getMgetRequest(CUSTOM_RULES_INDEX, customRuleIds); - - if (getCustomRulesRequest.isEmpty()) { - // No rules to get - return Collections.emptyMap(); - } - - try { - final MgetResponse getRulesResponse = openSearchClient.mget(getCustomRulesRequest.get(), RuleWrapper.class); - return getIndexToRules(indexToCustomRuleIds, getRulesResponse); - } catch (final Exception e) { - throw new RuleRefreshException("Exception getting custom rules", e); - } - } - - private Map> getIndexToDetectorMetadata(final List detectors, final Function> ruleGetter) { - final Map> indexToRuleIds = new HashMap<>(); - detectors.stream() - .forEach(detector -> { - final List inputs = detector.getInputs(); - final List detectorInputs = inputs.stream() - .map(Input::getDetectorInput) - .collect(Collectors.toList()); - - detectorInputs.forEach(detectorInput -> { - final Set ruleIds = getRuleIds(detectorInput, ruleGetter); - final DetectorDTO detectorDTO = new DetectorDTO(ruleIds, detector.getMonitorId().get(0), detector.getName(), detector.getFindingsIndex()); - - detectorInput.getIndices().forEach(index -> { - indexToRuleIds.putIfAbsent(index, new ArrayList<>()); - indexToRuleIds.get(index).add(detectorDTO); - }); - }); - }); - - return indexToRuleIds; - } - - private Set getRuleIds(final DetectorInput detectorInput, final Function> ruleGetter) { - return ruleGetter.apply(detectorInput).stream() - .map(DetectorRule::getId) - .collect(Collectors.toSet()); - } - - private Optional getMgetRequest(final String index, final Set docIds) { - if (docIds.isEmpty()) { - return Optional.empty(); - } - - final List docIdsList = new ArrayList<>(docIds); - - return Optional.of(new MgetRequest.Builder() - .index(index) - .ids(docIdsList) - .build()); - } - - private Map> getIndexToRules(final Map> indexToDetectorMetadata, final MgetResponse getRulesResponse) { - final Map ruleIdToSigmaRule = getRuleIdToSigmaRule(getRulesResponse); - - final Map> indexToRules = new HashMap<>(); - indexToDetectorMetadata.forEach((index, detectorDTOs) -> { - final List rules = detectorDTOs.stream() - .map(detectorDTO -> { - final Set ruleIds = detectorDTO.getRuleIds(); - return ruleIds.stream() - .map(ruleId -> { - final Rule rule = ruleIdToSigmaRule.get(ruleId); - rule.setMonitorId(detectorDTO.getMonitorID()); - rule.setFindingsIndex(detectorDTO.getFindingsIndex()); - rule.setMonitorName(detectorDTO.getMonitorName()); - - return rule; - }) - .collect(Collectors.toList()); - }) - .flatMap(Collection::stream) - .collect(Collectors.toList()); - - if (!rules.isEmpty()) { - indexToRules.put(index, rules); - } - }); - - return indexToRules; - } - - private Map getRuleIdToSigmaRule(final MgetResponse getRulesResponse) { - return getRulesResponse.docs().stream() - .map(MultiGetResponseItem::result) - .filter(ruleWrapperGetResult -> Objects.nonNull(ruleWrapperGetResult.source())) - .map(ruleWrapperGetResult -> { - final String ruleID = ruleWrapperGetResult.id(); - final Rule sigmaRule = parseSigmaRule(ruleWrapperGetResult.source()); - - return Map.entry(ruleID, sigmaRule); - }) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - } - - private Rule parseSigmaRule(final RuleWrapper ruleWrapper) { - final String ruleString = ruleWrapper.getRule().getRule(); - final SigmaRule sigmaRule = SigmaRule.fromYaml(ruleString, true); - - return ruleConverter.apply(sigmaRule); - } - - private Map> mergeIndexToRules(final List>> indexToRulesList) { - final Map> indexToRules = new HashMap<>(); - - indexToRulesList.forEach(indexToRulesEntry -> { - indexToRulesEntry.forEach((index, sigmaRules) -> { - indexToRules.putIfAbsent(index, new ArrayList<>()); - indexToRules.get(index).addAll(sigmaRules); - }); - }); - - return indexToRules; - } -} diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/rules/RuleRefresher.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/rules/RuleRefresher.java new file mode 100644 index 0000000000..97df850add --- /dev/null +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/rules/RuleRefresher.java @@ -0,0 +1,39 @@ +package org.opensearch.dataprepper.plugins.processor.rules; + +import org.opensearch.dataprepper.plugins.processor.parser.RuleParser; +import org.opensearch.dataprepper.plugins.processor.provider.rules.RuleProvider; +import org.opensearch.dataprepper.plugins.processor.provider.rules.model.RuleData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.stream.Collectors; + +public class RuleRefresher implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(RuleRefresher.class); + + private final RuleProvider ruleProvider; + private final RuleParser ruleParser; + private final RuleStore sigmaRuleStore; + + public RuleRefresher(final RuleProvider ruleProvider, + final RuleParser ruleParser, + final RuleStore sigmaRuleStore) { + this.ruleProvider = ruleProvider; + this.ruleParser = ruleParser; + this.sigmaRuleStore = sigmaRuleStore; + } + + @Override + public void run() { + try { + final List ruleData = ruleProvider.getRules(); + final List rules = ruleData.stream() + .map(ruleParser::parseRule) + .collect(Collectors.toList()); + sigmaRuleStore.updateRuleStore(rules); + } catch (final Exception e) { + LOG.error("Caught exception refreshing rules", e); + } + } +} diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/rules/RuleStore.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/rules/RuleStore.java index 28036aacbd..fe44f00480 100644 --- a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/rules/RuleStore.java +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/rules/RuleStore.java @@ -1,22 +1,21 @@ package org.opensearch.dataprepper.plugins.processor.rules; -import java.util.HashMap; +import java.util.ArrayList; import java.util.List; -import java.util.Map; // TODO - does this need locking? public class RuleStore { - private Map> rules; + private List rules; public RuleStore() { - this.rules = new HashMap<>(); + this.rules = new ArrayList<>(); } - public void updateRuleStore(final Map> updatedRules) { + public void updateRuleStore(final List updatedRules) { rules = updatedRules; } - public Map> getRules() { + public List getRules() { return rules; } } diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/rules/SigmaV1Rule.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/rules/SigmaV1Rule.java new file mode 100644 index 0000000000..9255d94371 --- /dev/null +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/rules/SigmaV1Rule.java @@ -0,0 +1,32 @@ +package org.opensearch.dataprepper.plugins.processor.rules; + +import org.opensearch.dataprepper.plugins.processor.model.datatypes.DataType; + +import java.util.List; +import java.util.function.Predicate; + +public class SigmaV1Rule extends Rule { + private final String title; + private final String id; + private final List tags; + + public SigmaV1Rule(final String title, final String id, final List tags, final Predicate ruleCondition, + final Predicate evaluationCondition) { + super(ruleCondition, evaluationCondition); + this.title = title; + this.id = id; + this.tags = tags; + } + + public String getTitle() { + return title; + } + + public String getId() { + return id; + } + + public List getTags() { + return tags; + } +} diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/util/Predicates.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/util/Predicates.java new file mode 100644 index 0000000000..075f487e68 --- /dev/null +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/util/Predicates.java @@ -0,0 +1,19 @@ +package org.opensearch.dataprepper.plugins.processor.util; + +import org.opensearch.dataprepper.plugins.processor.model.datatypes.DataType; + +import java.util.function.Predicate; + +public enum Predicates { + ALWAYS_TRUE(x -> true); + + private final Predicate value; + + Predicates(final Predicate value) { + this.value = value; + } + + public Predicate getValue() { + return value; + } +} diff --git a/data-prepper-plugins/rule-engine/src/test/java/org/opensearch/dataprepper/plugins/processor/parser/SigmaRulePredicateParserTest.java b/data-prepper-plugins/rule-engine/src/test/java/org/opensearch/dataprepper/plugins/processor/parser/SigmaRulePredicateParserTest.java deleted file mode 100644 index cb812e1f0f..0000000000 --- a/data-prepper-plugins/rule-engine/src/test/java/org/opensearch/dataprepper/plugins/processor/parser/SigmaRulePredicateParserTest.java +++ /dev/null @@ -1,114 +0,0 @@ -package org.opensearch.dataprepper.plugins.processor.parser; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.mockito.MockitoAnnotations; -import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.log.JacksonLog; -import org.opensearch.dataprepper.plugins.processor.RuleEngineConfig; -import org.opensearch.dataprepper.plugins.processor.model.log.LogFormat; -import org.opensearch.dataprepper.plugins.processor.model.log.LogType; -import org.opensearch.dataprepper.plugins.processor.parser.objects.SigmaRule; - -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Map; -import java.util.UUID; -import java.util.function.Predicate; - -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; - -public class SigmaRulePredicateParserTest { - private static final String RULES_PATH_FORMAT = "src/test/resources/rules/%s"; - private static final String DELETE_IDENTITY_RULE_FILE = "aws_delete_identity.yml"; - private static final String GUARDDUTY_DISRUPTION_RULE_FILE = "aws_guardduty_disruption.yml"; - - private SigmaRulePredicateParser sigmaRulePredicateParser; - - @BeforeEach - void setup() { - MockitoAnnotations.openMocks(this); - - final RuleEngineConfig ruleEngineConfig = new RuleEngineConfig.Builder() - .withLogFormat(LogFormat.NONE) - .withLogType(LogType.CLOUDTRAIL) - .build(); - sigmaRulePredicateParser = new SigmaRulePredicateParser(ruleEngineConfig); - } - - @Test - void parse_SingleFieldEqualsCondition() { - final SigmaRule sigmaRule = getSigmaRule(DELETE_IDENTITY_RULE_FILE); - final Predicate result = sigmaRulePredicateParser.parseRule(sigmaRule); - - assertTrue(result.test(getEvent(Map.of("eventSource", "ses.amazonaws.com")))); - assertFalse(result.test(getEvent(Map.of("eventSource", UUID.randomUUID().toString())))); - } - - @Test - void parse_SingleFieldEqualsCondition_OCSF() { - sigmaRulePredicateParser = new SigmaRulePredicateParser(new RuleEngineConfig.Builder() - .withLogFormat(LogFormat.OCSF) - .withLogType(LogType.CLOUDTRAIL) - .build()); - - final SigmaRule sigmaRule = getSigmaRule(GUARDDUTY_DISRUPTION_RULE_FILE); - final Predicate result = sigmaRulePredicateParser.parseRule(sigmaRule); - - assertTrue(result.test(getEvent(Map.of( - "api.service.name", "guardduty.amazonaws.com", - "api.operation", "CreateIPSet" - )))); - assertFalse(result.test(getEvent(Map.of( - "api.service.name", "guardduty.amazonaws.com", - "api.operation", "DeleteIPSet" - )))); - } - - @Test - void parse_SingleFieldEqualsCondition_Nested() { - sigmaRulePredicateParser = new SigmaRulePredicateParser(new RuleEngineConfig.Builder() - .withLogFormat(LogFormat.OCSF) - .withLogType(LogType.CLOUDTRAIL) - .build()); - - final SigmaRule sigmaRule = getSigmaRule(GUARDDUTY_DISRUPTION_RULE_FILE); - final Predicate result = sigmaRulePredicateParser.parseRule(sigmaRule); - - assertTrue(result.test(getEvent(Map.of( - "api", Map.of( - "service", Map.of( - "name", "guardduty.amazonaws.com" - ), - "operation", "CreateIPSet" - ) - )))); - assertFalse(result.test(getEvent(Map.of( - "api", Map.of( - "service", Map.of( - "name", "guardduty.amazonaws.com" - ), - "operation", "DeleteIPSet" - ) - )))); - } - - private SigmaRule getSigmaRule(final String ruleFile) { - try { - final Path rulePath = Path.of(String.format(RULES_PATH_FORMAT, ruleFile)); - final String ruleString = Files.readString(rulePath, StandardCharsets.UTF_8); - - return SigmaRule.fromYaml(ruleString, true); - } catch (final Exception e) { - throw new RuntimeException("Exception parsing rule: " + ruleFile, e); - } - } - - private Event getEvent(final Map data) { - return JacksonLog.builder() - .withData(data) - .build(); - } -} diff --git a/data-prepper-plugins/rule-engine/src/test/java/org/opensearch/dataprepper/plugins/processor/parser/SigmaV1SigmaV1RuleConditionParserTest.java b/data-prepper-plugins/rule-engine/src/test/java/org/opensearch/dataprepper/plugins/processor/parser/SigmaV1SigmaV1RuleConditionParserTest.java new file mode 100644 index 0000000000..43d31093c0 --- /dev/null +++ b/data-prepper-plugins/rule-engine/src/test/java/org/opensearch/dataprepper/plugins/processor/parser/SigmaV1SigmaV1RuleConditionParserTest.java @@ -0,0 +1,76 @@ +package org.opensearch.dataprepper.plugins.processor.parser; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.MockitoAnnotations; +import org.opensearch.dataprepper.plugins.processor.model.datatypes.CloudTrail; +import org.opensearch.dataprepper.plugins.processor.model.datatypes.DataType; +import org.opensearch.dataprepper.plugins.processor.model.datatypes.OCSF; +import org.opensearch.dataprepper.plugins.processor.parser.objects.SigmaRule; + +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.Map; +import java.util.UUID; +import java.util.function.Predicate; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class SigmaV1SigmaV1RuleConditionParserTest { + private static final String RULES_PATH_FORMAT = "src/test/resources/rules/%s"; + private static final String DELETE_IDENTITY_RULE_FILE = "aws_delete_identity.yml"; + private static final String GUARDDUTY_DISRUPTION_RULE_FILE = "aws_guardduty_disruption.yml"; + + private SigmaV1RuleConditionParser sigmaSigmaV1RuleConditionParser; + + @BeforeEach + void setup() { + MockitoAnnotations.openMocks(this); + sigmaSigmaV1RuleConditionParser = new SigmaV1RuleConditionParser(Collections.emptyMap()); + } + + @Test + void parse_SingleFieldEqualsCondition() { + final SigmaRule sigmaRule = getSigmaRule(DELETE_IDENTITY_RULE_FILE); + final Predicate result = sigmaSigmaV1RuleConditionParser.parseRuleCondition(sigmaRule); + + assertTrue(result.test(getCloudTrail("", "ses.amazonaws.com"))); + assertFalse(result.test(getCloudTrail("", UUID.randomUUID().toString()))); + } + + @Test + void parse_SingleFieldEqualsCondition_OCSF() { + sigmaSigmaV1RuleConditionParser = new SigmaV1RuleConditionParser(Map.of( + "eventName", "api.operation", + "eventSource", "api.service.name" + )); + + final SigmaRule sigmaRule = getSigmaRule(GUARDDUTY_DISRUPTION_RULE_FILE); + final Predicate result = sigmaSigmaV1RuleConditionParser.parseRuleCondition(sigmaRule); + + assertTrue(result.test(getOCSF("CreateIPSet", "guardduty.amazonaws.com"))); + assertFalse(result.test(getOCSF("DeleteIPSet", "guardduty.amazonaws.com"))); + } + + private SigmaRule getSigmaRule(final String ruleFile) { + try { + final Path rulePath = Path.of(String.format(RULES_PATH_FORMAT, ruleFile)); + final String ruleString = Files.readString(rulePath, StandardCharsets.UTF_8); + + return SigmaRule.fromYaml(ruleString, true); + } catch (final Exception e) { + throw new RuntimeException("Exception parsing rule: " + ruleFile, e); + } + } + + private DataType getCloudTrail(final String eventName, final String eventSource) { + return new CloudTrail(eventName, eventSource); + } + + private DataType getOCSF(final String eventName, final String eventSource) { + return new OCSF(eventName, eventSource); + } +}