Skip to content

Commit

Permalink
Make await security migrations more robust (#109854)
Browse files Browse the repository at this point in the history
* Make await security migrations more robust
  • Loading branch information
jfreden committed Jun 27, 2024
1 parent fc0313f commit 10ad8a6
Show file tree
Hide file tree
Showing 17 changed files with 136 additions and 54 deletions.
2 changes: 0 additions & 2 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ tests:
- class: "org.elasticsearch.xpack.esql.action.AsyncEsqlQueryActionIT"
issue: "https://github.com/elastic/elasticsearch/issues/109944"
method: "testBasicAsyncExecution"
- class: "org.elasticsearch.xpack.security.authz.store.NativePrivilegeStoreCacheTests"
issue: "https://github.com/elastic/elasticsearch/issues/110015"
- class: "org.elasticsearch.action.admin.indices.rollover.RolloverIT"
issue: "https://github.com/elastic/elasticsearch/issues/110034"
method: "testRolloverWithClosedWriteIndex"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ML_INFERENCE_GOOGLE_VERTEX_AI_EMBEDDINGS_ADDED = def(8_694_00_0);
public static final TransportVersion EVENT_INGESTED_RANGE_IN_CLUSTER_STATE = def(8_695_00_0);
public static final TransportVersion ESQL_ADD_AGGREGATE_TYPE = def(8_696_00_0);
public static final TransportVersion SECURITY_MIGRATIONS_MIGRATION_NEEDED_ADDED = def(8_697_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ private static IndexVersion def(int id, Version luceneVersion) {
public static final IndexVersion SEMANTIC_TEXT_FIELD_TYPE = def(8_507_00_0, Version.LUCENE_9_10_0);
public static final IndexVersion UPGRADE_TO_LUCENE_9_11 = def(8_508_00_0, Version.LUCENE_9_11_0);
public static final IndexVersion UNIQUE_TOKEN_FILTER_POS_FIX = def(8_509_00_0, Version.LUCENE_9_11_0);
public static final IndexVersion ADD_SECURITY_MIGRATION = def(8_510_00_0, Version.LUCENE_9_11_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,33 +21,46 @@
import java.io.IOException;

import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg;

public class SecurityMigrationTaskParams implements PersistentTaskParams {
public static final String TASK_NAME = "security-migration";

private final int migrationVersion;

private final boolean migrationNeeded;

public static final ConstructingObjectParser<SecurityMigrationTaskParams, Void> PARSER = new ConstructingObjectParser<>(
TASK_NAME,
true,
(arr) -> new SecurityMigrationTaskParams((int) arr[0])
(arr) -> new SecurityMigrationTaskParams((int) arr[0], arr[1] == null || (boolean) arr[1])
);

static {
PARSER.declareInt(constructorArg(), new ParseField("migration_version"));
PARSER.declareBoolean(optionalConstructorArg(), new ParseField("migration_needed"));
}

public SecurityMigrationTaskParams(int migrationVersion) {
public SecurityMigrationTaskParams(int migrationVersion, boolean migrationNeeded) {
this.migrationVersion = migrationVersion;
this.migrationNeeded = migrationNeeded;
}

public SecurityMigrationTaskParams(StreamInput in) throws IOException {
this.migrationVersion = in.readInt();
if (in.getTransportVersion().onOrAfter(TransportVersions.SECURITY_MIGRATIONS_MIGRATION_NEEDED_ADDED)) {
this.migrationNeeded = in.readBoolean();
} else {
this.migrationNeeded = true;
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeInt(migrationVersion);
if (out.getTransportVersion().onOrAfter(TransportVersions.SECURITY_MIGRATIONS_MIGRATION_NEEDED_ADDED)) {
out.writeBoolean(migrationNeeded);
}
}

@Override
Expand All @@ -64,6 +77,7 @@ public TransportVersion getMinimalSupportedVersion() {
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
builder.field("migration_version", migrationVersion);
builder.field("migration_needed", migrationNeeded);
builder.endObject();
return builder;
}
Expand All @@ -75,4 +89,8 @@ public static SecurityMigrationTaskParams fromXContent(XContentParser parser) {
public int getMigrationVersion() {
return migrationVersion;
}

public boolean isMigrationNeeded() {
return migrationNeeded;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.settings.MockSecureSettings;
Expand All @@ -26,7 +27,9 @@
import org.elasticsearch.license.LicenseSettings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.xpack.core.security.authc.support.Hasher;
import org.elasticsearch.xpack.core.security.test.TestRestrictedIndices;
import org.elasticsearch.xpack.security.LocalStateSecurity;
import org.elasticsearch.xpack.security.support.SecurityMigrations;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
Expand All @@ -41,10 +44,9 @@
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;

import static org.elasticsearch.persistent.PersistentTasksCustomMetadata.getTaskWithId;
import static org.elasticsearch.test.SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING;
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
import static org.elasticsearch.xpack.core.security.support.SecurityMigrationTaskParams.TASK_NAME;
import static org.elasticsearch.xpack.security.support.SecurityIndexManager.getMigrationVersionFromIndexMetadata;
import static org.hamcrest.Matchers.hasItem;

/**
Expand Down Expand Up @@ -90,7 +92,12 @@ public void tearDown() throws Exception {
}

private boolean isMigrationComplete(ClusterState state) {
return getTaskWithId(state, TASK_NAME) == null;
IndexMetadata indexMetadata = state.metadata().index(TestRestrictedIndices.INTERNAL_SECURITY_MAIN_INDEX_7);
if (indexMetadata == null) {
// If index doesn't exist, no migration needed
return true;
}
return getMigrationVersionFromIndexMetadata(indexMetadata) == SecurityMigrations.MIGRATIONS_BY_VERSION.lastKey();
}

private void awaitSecurityMigration() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ public void configureApplicationPrivileges() {
assertEquals(6, putPrivilegesResponse.created().values().stream().mapToInt(List::size).sum());
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/109894")
public void testGetPrivilegesUsesCache() {
final Client client = client();

Expand Down Expand Up @@ -205,7 +204,6 @@ public void testPopulationOfCacheWhenLoadingPrivilegesForAllApplications() {
assertEquals(1, new GetPrivilegesRequestBuilder(client).application("app-1").privileges("write").get().privileges().length);
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/109895")
public void testSuffixWildcard() {
final Client client = client();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -788,7 +788,8 @@ Collection<Object> createComponents(
this.persistentTasksService.set(persistentTasksService);

systemIndices.getMainIndexManager().addStateListener((oldState, newState) -> {
if (clusterService.state().nodes().isLocalNodeElectedMaster()) {
// Only consider applying migrations if it's the master node and the security index exists
if (clusterService.state().nodes().isLocalNodeElectedMaster() && newState.indexExists()) {
applyPendingSecurityMigrations(newState);
}
});
Expand Down Expand Up @@ -1203,43 +1204,53 @@ Collection<Object> createComponents(
}

private void applyPendingSecurityMigrations(SecurityIndexManager.State newState) {
// If no migrations have been applied and the security index is on the latest version (new index), all migrations can be skipped
if (newState.migrationsVersion == 0 && newState.createdOnLatestVersion) {
submitPersistentMigrationTask(SecurityMigrations.MIGRATIONS_BY_VERSION.lastKey(), false);
return;
}

Map.Entry<Integer, SecurityMigrations.SecurityMigration> nextMigration = SecurityMigrations.MIGRATIONS_BY_VERSION.higherEntry(
newState.migrationsVersion
);

if (nextMigration == null) {
return;
}

// Check if next migration that has not been applied is eligible to run on the current cluster
if (systemIndices.getMainIndexManager().isEligibleSecurityMigration(nextMigration.getValue()) == false) {
if (nextMigration == null || systemIndices.getMainIndexManager().isEligibleSecurityMigration(nextMigration.getValue()) == false) {
// Reset retry counter if all eligible migrations have been applied successfully
nodeLocalMigrationRetryCount.set(0);
} else if (nodeLocalMigrationRetryCount.get() > MAX_SECURITY_MIGRATION_RETRY_COUNT) {
logger.warn("Security migration failed [" + nodeLocalMigrationRetryCount.get() + "] times, restart node to retry again.");
} else if (systemIndices.getMainIndexManager().isReadyForSecurityMigration(nextMigration.getValue())) {
nodeLocalMigrationRetryCount.incrementAndGet();
persistentTasksService.get()
.sendStartRequest(
SecurityMigrationTaskParams.TASK_NAME,
SecurityMigrationTaskParams.TASK_NAME,
new SecurityMigrationTaskParams(newState.migrationsVersion),
null,
ActionListener.wrap((response) -> {
logger.debug("Security migration task submitted");
}, (exception) -> {
// Do nothing if the task is already in progress
if (ExceptionsHelper.unwrapCause(exception) instanceof ResourceAlreadyExistsException) {
// Do not count ResourceAlreadyExistsException as failure
nodeLocalMigrationRetryCount.decrementAndGet();
} else {
logger.warn("Submit security migration task failed: " + exception.getCause());
}
})
);
submitPersistentMigrationTask(newState.migrationsVersion);
}
}

private void submitPersistentMigrationTask(int migrationsVersion) {
submitPersistentMigrationTask(migrationsVersion, true);
}

private void submitPersistentMigrationTask(int migrationsVersion, boolean securityMigrationNeeded) {
nodeLocalMigrationRetryCount.incrementAndGet();
persistentTasksService.get()
.sendStartRequest(
SecurityMigrationTaskParams.TASK_NAME,
SecurityMigrationTaskParams.TASK_NAME,
new SecurityMigrationTaskParams(migrationsVersion, securityMigrationNeeded),
null,
ActionListener.wrap((response) -> {
logger.debug("Security migration task submitted");
}, (exception) -> {
// Do nothing if the task is already in progress
if (ExceptionsHelper.unwrapCause(exception) instanceof ResourceAlreadyExistsException) {
// Do not count ResourceAlreadyExistsException as failure
nodeLocalMigrationRetryCount.decrementAndGet();
} else {
logger.warn("Submit security migration task failed: " + exception.getCause());
}
})
);
}

private AuthorizationEngine getAuthorizationEngine() {
return findValueFromExtensions("authorization engine", extension -> extension.getAuthorizationEngine(settings));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.indices.IndexClosedException;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.rest.RestStatus;
Expand All @@ -55,6 +56,7 @@
import java.util.stream.Collectors;

import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_FORMAT_SETTING;
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_INDEX_VERSION_CREATED;
import static org.elasticsearch.indices.SystemIndexDescriptor.VERSION_META_KEY;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
import static org.elasticsearch.xpack.core.security.action.UpdateIndexMigrationVersionAction.MIGRATION_VERSION_CUSTOM_DATA_KEY;
Expand Down Expand Up @@ -244,6 +246,19 @@ private SystemIndexDescriptor.MappingsVersion getMinSecurityIndexMappingVersion(
return mappingsVersion == null ? new SystemIndexDescriptor.MappingsVersion(1, 0) : mappingsVersion;
}

/**
* Check if the index was created on the latest index version available in the cluster
*/
private static boolean isCreatedOnLatestVersion(IndexMetadata indexMetadata, ClusterState clusterState) {
final IndexVersion indexVersionCreated = indexMetadata != null
? SETTING_INDEX_VERSION_CREATED.get(indexMetadata.getSettings())
: null;
return indexVersionCreated != null
&& indexVersionCreated.onOrAfter(
IndexVersion.min(IndexVersion.current(), clusterState.nodes().getMaxDataNodeCompatibleIndexVersion())
);
}

@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
Expand All @@ -254,15 +269,15 @@ public void clusterChanged(ClusterChangedEvent event) {
}
final State previousState = state;
final IndexMetadata indexMetadata = resolveConcreteIndex(systemIndexDescriptor.getAliasName(), event.state().metadata());
final Map<String, String> customMetadata = indexMetadata == null ? null : indexMetadata.getCustomData(MIGRATION_VERSION_CUSTOM_KEY);
final boolean createdOnLatestVersion = isCreatedOnLatestVersion(indexMetadata, event.state());
final Instant creationTime = indexMetadata != null ? Instant.ofEpochMilli(indexMetadata.getCreationDate()) : null;
final boolean isIndexUpToDate = indexMetadata == null
|| INDEX_FORMAT_SETTING.get(indexMetadata.getSettings()) == systemIndexDescriptor.getIndexFormat();
Tuple<Boolean, Boolean> available = checkIndexAvailable(event.state());
final boolean indexAvailableForWrite = available.v1();
final boolean indexAvailableForSearch = available.v2();
final boolean mappingIsUpToDate = indexMetadata == null || checkIndexMappingUpToDate(event.state());
final int migrationsVersion = customMetadata == null ? 0 : Integer.parseInt(customMetadata.get(MIGRATION_VERSION_CUSTOM_DATA_KEY));
final int migrationsVersion = getMigrationVersionFromIndexMetadata(indexMetadata);
final SystemIndexDescriptor.MappingsVersion minClusterMappingVersion = getMinSecurityIndexMappingVersion(event.state());
final int indexMappingVersion = loadIndexMappingVersion(systemIndexDescriptor.getAliasName(), event.state());
final String concreteIndexName = indexMetadata == null
Expand Down Expand Up @@ -290,6 +305,7 @@ public void clusterChanged(ClusterChangedEvent event) {
indexAvailableForSearch,
indexAvailableForWrite,
mappingIsUpToDate,
createdOnLatestVersion,
migrationsVersion,
minClusterMappingVersion,
indexMappingVersion,
Expand All @@ -310,6 +326,15 @@ public void clusterChanged(ClusterChangedEvent event) {
}
}

public static int getMigrationVersionFromIndexMetadata(IndexMetadata indexMetadata) {
Map<String, String> customMetadata = indexMetadata == null ? null : indexMetadata.getCustomData(MIGRATION_VERSION_CUSTOM_KEY);
if (customMetadata == null) {
return 0;
}
String migrationVersion = customMetadata.get(MIGRATION_VERSION_CUSTOM_DATA_KEY);
return migrationVersion == null ? 0 : Integer.parseInt(migrationVersion);
}

public void onStateRecovered(Consumer<State> recoveredStateConsumer) {
BiConsumer<State, State> stateChangeListener = (previousState, nextState) -> {
boolean stateJustRecovered = previousState == UNRECOVERED_STATE && nextState != UNRECOVERED_STATE;
Expand Down Expand Up @@ -588,6 +613,7 @@ public static class State {
false,
false,
false,
false,
null,
null,
null,
Expand All @@ -602,6 +628,7 @@ public static class State {
public final boolean indexAvailableForSearch;
public final boolean indexAvailableForWrite;
public final boolean mappingUpToDate;
public final boolean createdOnLatestVersion;
public final Integer migrationsVersion;
// Min mapping version supported by the descriptors in the cluster
public final SystemIndexDescriptor.MappingsVersion minClusterMappingVersion;
Expand All @@ -619,6 +646,7 @@ public State(
boolean indexAvailableForSearch,
boolean indexAvailableForWrite,
boolean mappingUpToDate,
boolean createdOnLatestVersion,
Integer migrationsVersion,
SystemIndexDescriptor.MappingsVersion minClusterMappingVersion,
Integer indexMappingVersion,
Expand All @@ -634,6 +662,7 @@ public State(
this.indexAvailableForWrite = indexAvailableForWrite;
this.mappingUpToDate = mappingUpToDate;
this.migrationsVersion = migrationsVersion;
this.createdOnLatestVersion = createdOnLatestVersion;
this.minClusterMappingVersion = minClusterMappingVersion;
this.indexMappingVersion = indexMappingVersion;
this.concreteIndexName = concreteIndexName;
Expand All @@ -653,6 +682,7 @@ public boolean equals(Object o) {
&& indexAvailableForSearch == state.indexAvailableForSearch
&& indexAvailableForWrite == state.indexAvailableForWrite
&& mappingUpToDate == state.mappingUpToDate
&& createdOnLatestVersion == state.createdOnLatestVersion
&& Objects.equals(indexMappingVersion, state.indexMappingVersion)
&& Objects.equals(migrationsVersion, state.migrationsVersion)
&& Objects.equals(minClusterMappingVersion, state.minClusterMappingVersion)
Expand All @@ -674,6 +704,7 @@ public int hashCode() {
indexAvailableForSearch,
indexAvailableForWrite,
mappingUpToDate,
createdOnLatestVersion,
migrationsVersion,
minClusterMappingVersion,
indexMappingVersion,
Expand Down
Loading

0 comments on commit 10ad8a6

Please sign in to comment.