Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.11] add rollover & archival mechanism for correlation history indices #707

Merged
merged 1 commit into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,10 @@ public List<Setting<?>> getSettings() {
SecurityAnalyticsSettings.FINDING_HISTORY_INDEX_MAX_AGE,
SecurityAnalyticsSettings.FINDING_HISTORY_ROLLOVER_PERIOD,
SecurityAnalyticsSettings.FINDING_HISTORY_RETENTION_PERIOD,
SecurityAnalyticsSettings.CORRELATION_HISTORY_MAX_DOCS,
SecurityAnalyticsSettings.CORRELATION_HISTORY_INDEX_MAX_AGE,
SecurityAnalyticsSettings.CORRELATION_HISTORY_ROLLOVER_PERIOD,
SecurityAnalyticsSettings.CORRELATION_HISTORY_RETENTION_PERIOD,
SecurityAnalyticsSettings.IS_CORRELATION_INDEX_SETTING,
SecurityAnalyticsSettings.CORRELATION_TIME_WINDOW,
SecurityAnalyticsSettings.DEFAULT_MAPPING_SCHEMA,
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ public class SecurityAnalyticsSettings {
Setting.Property.NodeScope, Setting.Property.Dynamic
);

public static final Setting<TimeValue> CORRELATION_HISTORY_ROLLOVER_PERIOD = Setting.positiveTimeSetting(
"plugins.security_analytics.correlation_history_rollover_period",
TimeValue.timeValueHours(12),
Setting.Property.NodeScope, Setting.Property.Dynamic
);

public static final Setting<TimeValue> ALERT_HISTORY_INDEX_MAX_AGE = Setting.positiveTimeSetting(
"plugins.security_analytics.alert_history_max_age",
new TimeValue(30, TimeUnit.DAYS),
Expand All @@ -55,6 +61,12 @@ public class SecurityAnalyticsSettings {
Setting.Property.NodeScope, Setting.Property.Dynamic
);

public static final Setting<TimeValue> CORRELATION_HISTORY_INDEX_MAX_AGE = Setting.positiveTimeSetting(
"plugins.security_analytics.correlation_history_max_age",
new TimeValue(30, TimeUnit.DAYS),
Setting.Property.NodeScope, Setting.Property.Dynamic
);

public static final Setting<Long> ALERT_HISTORY_MAX_DOCS = Setting.longSetting(
"plugins.security_analytics.alert_history_max_docs",
1000L,
Expand All @@ -69,6 +81,13 @@ public class SecurityAnalyticsSettings {
Setting.Property.NodeScope, Setting.Property.Dynamic, Setting.Property.Deprecated
);

public static final Setting<Long> CORRELATION_HISTORY_MAX_DOCS = Setting.longSetting(
"plugins.security_analytics.correlation_history_max_docs",
1000L,
0L,
Setting.Property.NodeScope, Setting.Property.Dynamic
);

public static final Setting<TimeValue> ALERT_HISTORY_RETENTION_PERIOD = Setting.positiveTimeSetting(
"plugins.security_analytics.alert_history_retention_period",
new TimeValue(60, TimeUnit.DAYS),
Expand All @@ -81,6 +100,12 @@ public class SecurityAnalyticsSettings {
Setting.Property.NodeScope, Setting.Property.Dynamic
);

public static final Setting<TimeValue> CORRELATION_HISTORY_RETENTION_PERIOD = Setting.positiveTimeSetting(
"plugins.security_analytics.correlation_history_retention_period",
new TimeValue(60, TimeUnit.DAYS),
Setting.Property.NodeScope, Setting.Property.Dynamic
);

public static final Setting<TimeValue> REQUEST_TIMEOUT = Setting.positiveTimeSetting(
"plugins.security_analytics.request_timeout",
TimeValue.timeValueSeconds(10),
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ public void onResponse(AcknowledgedResponse response) {
public void onFailure(Exception e) {
onFailures(e);
}
}
},
false
);
} else {
indexCorrelationRule();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ public void onResponse(AcknowledgedResponse response) {
public void onFailure(Exception e) {
onFailures(e);
}
});
}, false
);
} else {
prepareCustomLogTypeIndexing();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1041,7 +1041,8 @@ public void onResponse(AcknowledgedResponse response) {
public void onFailure(Exception e) {
onFailures(e);
}
}
},
false
);
} else {
prepareDetectorIndexing();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ public void onResponse(AcknowledgedResponse response) {
public void onFailure(Exception e) {
onFailures(e);
}
}
},
false
);
} else {
prepareRuleIndexing();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ void start() {
searchSourceBuilder.fetchSource(true);
searchSourceBuilder.size(10000);
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices(CorrelationIndices.CORRELATION_INDEX);
searchRequest.indices(CorrelationIndices.CORRELATION_HISTORY_INDEX_PATTERN_REGEXP);
searchRequest.source(searchSourceBuilder);
searchRequest.preference(Preference.PRIMARY_FIRST.type());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public void onResponse(SearchResponse response) {
scoreSearchSourceBuilder.fetchSource(true);
scoreSearchSourceBuilder.size(1);
SearchRequest scoreSearchRequest = new SearchRequest();
scoreSearchRequest.indices(CorrelationIndices.CORRELATION_INDEX);
scoreSearchRequest.indices(CorrelationIndices.CORRELATION_METADATA_INDEX);
scoreSearchRequest.source(scoreSearchSourceBuilder);
scoreSearchRequest.preference(Preference.PRIMARY_FIRST.type());

Expand All @@ -156,7 +156,7 @@ public void onResponse(SearchResponse response) {
searchSourceBuilder.fetchField("counter");
searchSourceBuilder.size(1);
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices(CorrelationIndices.CORRELATION_INDEX);
searchRequest.indices(CorrelationIndices.CORRELATION_HISTORY_INDEX_PATTERN_REGEXP);
searchRequest.source(searchSourceBuilder);
searchRequest.preference(Preference.PRIMARY_FIRST.type());

Expand All @@ -168,11 +168,11 @@ public void onResponse(SearchResponse response) {

for (SearchHit hit: hits) {
long counter = hit.getFields().get("counter").<Long>getValue();
float[] query = new float[101];
for (int i = 0; i < 100; ++i) {
float[] query = new float[3];
for (int i = 0; i < 2; ++i) {
query[i] = (2.0f * ((float) counter) - 50.0f) / 2.0f;
}
query[100] = Long.valueOf((findingTimestamp - scoreTimestamp) / 1000L).floatValue();
query[2] = Long.valueOf((findingTimestamp - scoreTimestamp) / 1000L).floatValue();

CorrelationQueryBuilder correlationQueryBuilder = new CorrelationQueryBuilder("corr_vector", query, noOfNearbyFindings, QueryBuilders.boolQuery()
.mustNot(QueryBuilders.matchQuery(
Expand All @@ -188,7 +188,7 @@ public void onResponse(SearchResponse response) {
searchSourceBuilder.fetchSource(true);
searchSourceBuilder.size(noOfNearbyFindings);
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices(CorrelationIndices.CORRELATION_INDEX);
searchRequest.indices(CorrelationIndices.CORRELATION_HISTORY_INDEX_PATTERN_REGEXP);
searchRequest.source(searchSourceBuilder);
searchRequest.preference(Preference.PRIMARY_FIRST.type());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.admin.indices.alias.Alias;
import org.opensearch.core.action.ActionListener;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
Expand All @@ -33,7 +34,13 @@
public class CorrelationIndices {

private static final Logger log = LogManager.getLogger(CorrelationIndices.class);
public static final String CORRELATION_INDEX = ".opensearch-sap-correlation-history";

public static final String CORRELATION_METADATA_INDEX = ".opensearch-sap-correlation-metadata";
public static final String CORRELATION_HISTORY_INDEX_PATTERN = "<.opensearch-sap-correlation-history-{now/d}-1>";

public static final String CORRELATION_HISTORY_INDEX_PATTERN_REGEXP = ".opensearch-sap-correlation-history*";

public static final String CORRELATION_HISTORY_WRITE_INDEX = ".opensearch-sap-correlation-history-write";
public static final long FIXED_HISTORICAL_INTERVAL = 24L * 60L * 60L * 20L * 1000L;

private final Client client;
Expand All @@ -51,16 +58,35 @@ public static String correlationMappings() throws IOException {

public void initCorrelationIndex(ActionListener<CreateIndexResponse> actionListener) throws IOException {
if (!correlationIndexExists()) {
CreateIndexRequest indexRequest = new CreateIndexRequest(CORRELATION_INDEX)
CreateIndexRequest indexRequest = new CreateIndexRequest(CORRELATION_HISTORY_INDEX_PATTERN)
.mapping(correlationMappings())
.settings(Settings.builder().put("index.hidden", true).put("index.correlation", true).build());
indexRequest.alias(new Alias(CORRELATION_HISTORY_WRITE_INDEX));
client.admin().indices().create(indexRequest, actionListener);
} else {
actionListener.onResponse(new CreateIndexResponse(true, true, CORRELATION_HISTORY_INDEX_PATTERN));
}
}

public void initCorrelationMetadataIndex(ActionListener<CreateIndexResponse> actionListener) throws IOException {
if (!correlationMetadataIndexExists()) {
CreateIndexRequest indexRequest = new CreateIndexRequest(CORRELATION_METADATA_INDEX)
.mapping(correlationMappings())
.settings(Settings.builder().put("index.hidden", true).put("index.correlation", true).build());
client.admin().indices().create(indexRequest, actionListener);
} else {
actionListener.onResponse(new CreateIndexResponse(true, true, CORRELATION_METADATA_INDEX));
}
}

public boolean correlationIndexExists() {
ClusterState clusterState = clusterService.state();
return clusterState.getRoutingTable().hasIndex(CORRELATION_INDEX);
return clusterState.metadata().hasAlias(CORRELATION_HISTORY_WRITE_INDEX);
}

public boolean correlationMetadataIndexExists() {
ClusterState clusterState = clusterService.state();
return clusterState.metadata().hasIndex(CORRELATION_METADATA_INDEX);
}

public void setupCorrelationIndex(TimeValue indexTimeout, Long setupTimestamp, ActionListener<BulkResponse> listener) {
Expand All @@ -76,7 +102,7 @@ public void setupCorrelationIndex(TimeValue indexTimeout, Long setupTimestamp, A
builder.field("scoreTimestamp", 0L);
builder.endObject();

IndexRequest indexRequest = new IndexRequest(CorrelationIndices.CORRELATION_INDEX)
IndexRequest indexRequest = new IndexRequest(CORRELATION_METADATA_INDEX)
.source(builder)
.timeout(indexTimeout);

Expand All @@ -85,7 +111,7 @@ public void setupCorrelationIndex(TimeValue indexTimeout, Long setupTimestamp, A
scoreBuilder.field("root", false);
scoreBuilder.endObject();

IndexRequest scoreIndexRequest = new IndexRequest(CorrelationIndices.CORRELATION_INDEX)
IndexRequest scoreIndexRequest = new IndexRequest(CORRELATION_METADATA_INDEX)
.source(scoreBuilder)
.timeout(indexTimeout);

Expand All @@ -100,16 +126,4 @@ public void setupCorrelationIndex(TimeValue indexTimeout, Long setupTimestamp, A
log.error(ex);
}
}

public ClusterIndexHealth correlationIndexHealth() {
ClusterIndexHealth indexHealth = null;

if (correlationIndexExists()) {
IndexRoutingTable indexRoutingTable = clusterService.state().routingTable().index(CORRELATION_INDEX);
IndexMetadata indexMetadata = clusterService.state().metadata().index(CORRELATION_INDEX);

indexHealth = new ClusterIndexHealth(indexMetadata, indexRoutingTable);
}
return indexHealth;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
*/
package org.opensearch.securityanalytics.util;

import java.util.Optional;
import java.util.SortedMap;

import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.core.action.ActionListener;
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.opensearch.action.support.IndicesOptions;
Expand All @@ -24,6 +27,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.function.Predicate;

public class IndexUtils {

Expand All @@ -35,6 +39,10 @@ public class IndexUtils {
public static Boolean customRuleIndexUpdated = false;
public static Boolean prePackagedRuleIndexUpdated = false;
public static Boolean correlationIndexUpdated = false;

public static Boolean correlationMetadataIndexUpdated = false;

public static String lastUpdatedCorrelationHistoryIndex = null;
public static Boolean correlationRuleIndexUpdated = false;

public static Boolean customLogTypeIndexUpdated = false;
Expand All @@ -53,6 +61,10 @@ public static void prePackagedRuleIndexUpdated() {

public static void correlationIndexUpdated() { correlationIndexUpdated = true; }

public static void correlationMetadataIndexUpdated() {
correlationMetadataIndexUpdated = true;
}

public static void correlationRuleIndexUpdated() {
correlationRuleIndexUpdated = true;
}
Expand Down Expand Up @@ -112,11 +124,20 @@ public static void updateIndexMapping(
String mapping,
ClusterState clusterState,
IndicesAdminClient client,
ActionListener<AcknowledgedResponse> actionListener
ActionListener<AcknowledgedResponse> actionListener,
boolean alias
) throws IOException {
if (clusterState.metadata().indices().containsKey(index)) {
if (shouldUpdateIndex(clusterState.metadata().index(index), mapping)) {
PutMappingRequest putMappingRequest = new PutMappingRequest(index).source(mapping, XContentType.JSON);
String targetIndex = index;
if (alias) {
targetIndex = IndexUtils.getIndexNameWithAlias(clusterState, index);
}
if (targetIndex.equals(IndexUtils.lastUpdatedCorrelationHistoryIndex)) {
return;
}

if (clusterState.metadata().indices().containsKey(targetIndex)) {
if (shouldUpdateIndex(clusterState.metadata().index(targetIndex), mapping)) {
PutMappingRequest putMappingRequest = new PutMappingRequest(targetIndex).source(mapping, XContentType.JSON);
client.putMapping(putMappingRequest, actionListener);
} else {
actionListener.onResponse(new AcknowledgedResponse(true));
Expand Down Expand Up @@ -176,4 +197,11 @@ public static String getNewIndexByCreationDate(ClusterState state, IndexNameExpr
return getNewestIndexByCreationDate(strings, state);
}

public static String getIndexNameWithAlias(ClusterState clusterState, String alias) {
Optional<Map.Entry<String, IndexMetadata>> entry = clusterState.metadata().indices().entrySet().stream().filter(
stringIndexMetadataEntry -> stringIndexMetadataEntry.getValue().getAliases().containsKey(alias)
).findFirst();
return entry.map(Map.Entry::getKey).orElse(null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ public void initPrepackagedRulesIndex(ActionListener<CreateIndexResponse> create
IndexUtils.updateIndexMapping(
Rule.PRE_PACKAGED_RULES_INDEX,
RuleIndices.ruleMappings(), clusterService.state(), client.admin().indices(),
updateListener
updateListener,
false
);
} else {
countRules(searchListener);
Expand Down
4 changes: 2 additions & 2 deletions src/main/resources/mappings/correlation.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"_meta" : {
"schema_version": 1
"schema_version": 2
},
"properties": {
"root": {
Expand All @@ -17,7 +17,7 @@
},
"corr_vector": {
"type": "sa_vector",
"dimension": 101,
"dimension": 3,
"correlation_ctx": {
"similarityFunction": "EUCLIDEAN",
"parameters": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.opensearch.securityanalytics.model.Detector;
import org.opensearch.securityanalytics.model.Rule;
import org.opensearch.securityanalytics.model.ThreatIntelFeedData;
import org.opensearch.securityanalytics.util.CorrelationIndices;
import org.opensearch.test.rest.OpenSearchRestTestCase;

import java.io.IOException;
Expand Down Expand Up @@ -1485,6 +1486,24 @@ public List<String> getFindingIndices(String detectorType) throws IOException {
return indices;
}

public List<String> getCorrelationHistoryIndices() throws IOException {
Response response = client().performRequest(new Request("GET", "/_cat/indices/" + CorrelationIndices.CORRELATION_HISTORY_INDEX_PATTERN_REGEXP + "?format=json"));
XContentParser xcp = createParser(XContentType.JSON.xContent(), response.getEntity().getContent());
List<Object> responseList = xcp.list();
List<String> indices = new ArrayList<>();
for (Object o : responseList) {
if (o instanceof Map) {
((Map<?, ?>) o).forEach((BiConsumer<Object, Object>)
(o1, o2) -> {
if (o1.equals("index")) {
indices.add((String) o2);
}
});
}
}
return indices;
}

public void updateClusterSetting(String setting, String value) throws IOException {
String settingJson = "{\n" +
" \"persistent\" : {" +
Expand Down
Loading