Skip to content

Commit

Permalink
Add AdministrativeCommand types
Browse files Browse the repository at this point in the history
Motivation:

While mirroring migration job is implemeted #880, read-only mode is
necessary to safely migrate the legacy `mirrors.json` and
`credentials.json` the HEAD revision. As the current read-only mode
rejects all commands, the migration job is also unable to push commits.

A new command type is required to rejects only users requests but allows
administrative cases. I propose to introduce two new command types for
the smooth migration job.

Modifications:

- Add `UpdateServerStatusCommand` so as to toggle the read-only mode for
  the cluster.
- Add `ForcePush` so as to wrap a push `Command` to be executed even in the
  read-only mode.

Result:

`UpdateServerStatusCommand` and `ForcePush` have been added as new
commands to update the cluster status and forcibly push commits in the
read-only mode.
  • Loading branch information
ikhoon committed Nov 28, 2023
1 parent f91a632 commit 00caf43
Show file tree
Hide file tree
Showing 11 changed files with 382 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@

import javax.annotation.Nullable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.MoreObjects;

import com.linecorp.armeria.common.util.Exceptions;
Expand All @@ -36,6 +39,8 @@
*/
public abstract class AbstractCommandExecutor implements CommandExecutor {

private static final Logger logger = LoggerFactory.getLogger(AbstractCommandExecutor.class);

@Nullable
private final Consumer<CommandExecutor> onTakeLeadership;
@Nullable
Expand Down Expand Up @@ -97,8 +102,24 @@ public final void setWritable(boolean writable) {
@Override
public final <T> CompletableFuture<T> execute(Command<T> command) {
requireNonNull(command, "command");
if (!isWritable()) {
throw new IllegalStateException("running in read-only mode");
if (!isWritable() && !(command instanceof AdministrativeCommand)) {
// Reject all commands except for AdministrativeCommand when the replica is in read-only mode.
// AdministrativeCommand is allowed because it is used to change the read-only mode or migrate
// metadata under maintenance mode.
throw new IllegalStateException("running in read-only mode. command: " + command);
}

if (command.type() == CommandType.UPDATE_SERVER_STATUS) {
final UpdateServerStatusCommand command0 = (UpdateServerStatusCommand) command;
final boolean writable0 = command0.writable();
if (writable0 != writable) {
setWritable(writable0);
if (writable0) {
logger.warn("Left read-only mode.");
} else {
logger.warn("Entered read-only mode.");
}
}
}

try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2023 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.centraldogma.server.command;

import javax.annotation.Nullable;

import com.linecorp.centraldogma.common.Author;

abstract class AdministrativeCommand<T> extends RootCommand<T> {
AdministrativeCommand(CommandType commandType, @Nullable Long timestamp,
@Nullable Author author) {
super(commandType, timestamp, author);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.linecorp.centraldogma.server.command;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -52,6 +53,8 @@
@Type(value = PushAsIsCommand.class, name = "PUSH"),
@Type(value = CreateSessionCommand.class, name = "CREATE_SESSIONS"),
@Type(value = RemoveSessionCommand.class, name = "REMOVE_SESSIONS"),
@Type(value = UpdateServerStatusCommand.class, name = "UPDATE_SERVER_STATUS"),
@Type(value = ForcePushCommand.class, name = "FORCE_PUSH_COMMAND"),
})
public interface Command<T> {

Expand Down Expand Up @@ -355,6 +358,26 @@ static Command<Void> removeSession(String sessionId) {
return new RemoveSessionCommand(null, null, sessionId);
}

/**
* Returns a new {@link Command} which is used to update the status of the server.
*/
static Command<Void> updateServerStatus(boolean writable) {
return new UpdateServerStatusCommand(null, null, writable);
}

/**
* Returns a new {@link Command} which is used to force-push the push {@link Command} even the server is in
* read-only mode. This command is useful for migrating the repository content during maintenance mode.
*
* <p>Note that {@link CommandType#NORMALIZING_PUSH} and {@link CommandType#PUSH} are allowed as the
* delegate.
*/
static <T> Command<T> forcePush(Command<T> delegate) {
checkArgument(delegate.type() == CommandType.NORMALIZING_PUSH || delegate.type() == CommandType.PUSH,
"delegate: %s (expected: NORMALIZING_PUSH or PUSH)", delegate);
return new ForcePushCommand<>(delegate);
}

/**
* Returns the {@link CommandType} of the command.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ public enum CommandType {
CREATE_SESSION(Void.class),
REMOVE_SESSION(Void.class),
PURGE_PROJECT(Void.class),
PURGE_REPOSITORY(Void.class);
PURGE_REPOSITORY(Void.class),
UPDATE_SERVER_STATUS(Void.class),
// The result type of FORCE_PUSH is Object because it can be any type.
FORCE_PUSH(Object.class);

/**
* The type of an object which is returned as a result after executing the command.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2023 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.centraldogma.server.command;

import static java.util.Objects.requireNonNull;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.MoreObjects.ToStringHelper;

/**
* A {@link Command} which is used to force-push {@code delegate} even the server is in read-only mode.
* This command is useful for migrating the repository content during maintenance mode.
*/
public final class ForcePushCommand<T> extends AdministrativeCommand<T> {

private final Command<T> delegate;

@JsonCreator
ForcePushCommand(@JsonProperty("delegate") Command<T> delegate) {
super(CommandType.FORCE_PUSH, requireNonNull(delegate, "delegate").timestamp(), delegate.author());
this.delegate = delegate;
}

/**
* Returns the {@link Command} to be force-pushed.
*/
@JsonProperty("delegate")
public Command<T> delegate() {
return delegate;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof ForcePushCommand)) {
return false;
}
final ForcePushCommand<?> that = (ForcePushCommand<?>) o;
return super.equals(that) && delegate.equals(that.delegate);
}

@Override
public int hashCode() {
return super.hashCode() * 31 + delegate.hashCode();
}

@Override
ToStringHelper toStringHelper() {
return super.toStringHelper().add("delegate", delegate);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,15 @@ protected <T> CompletableFuture<T> doExecute(Command<T> command) throws Exceptio
return (CompletableFuture<T>) removeSession((RemoveSessionCommand) command);
}

if (command instanceof UpdateServerStatusCommand) {
return (CompletableFuture<T>) updateServerStatus((UpdateServerStatusCommand) command);
}

if (command instanceof ForcePushCommand) {
//noinspection TailRecursion
return doExecute(((ForcePushCommand<T>) command).delegate());
}

throw new UnsupportedOperationException(command.toString());
}

Expand Down Expand Up @@ -350,4 +359,9 @@ private CompletableFuture<Void> removeSession(RemoveSessionCommand c) {
return null;
});
}

private CompletableFuture<Void> updateServerStatus(UpdateServerStatusCommand c) {
setWritable(c.writable());
return CompletableFuture.completedFuture(null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright 2023 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.centraldogma.server.command;

import java.util.Objects;

import javax.annotation.Nullable;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.MoreObjects.ToStringHelper;

import com.linecorp.centraldogma.common.Author;

/**
* A {@link Command} which is used to update the status of all servers in the cluster.
*/
public final class UpdateServerStatusCommand extends AdministrativeCommand<Void> {

private final boolean writable;

@JsonCreator
UpdateServerStatusCommand(@JsonProperty("timestamp") @Nullable Long timestamp,
@JsonProperty("author") @Nullable Author author,
@JsonProperty("writable") boolean writable) {
super(CommandType.UPDATE_SERVER_STATUS, timestamp, author);
this.writable = writable;
}

/**
* Returns whether the cluster is writable.
*/
@JsonProperty("writable")
public boolean writable() {
return writable;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof UpdateServerStatusCommand)) {
return false;
}
final UpdateServerStatusCommand that = (UpdateServerStatusCommand) o;

return super.equals(that) && writable == that.writable;
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), writable);
}

@Override
ToStringHelper toStringHelper() {
return super.toStringHelper()
.add("writable", writable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import com.linecorp.centraldogma.internal.Jackson;
import com.linecorp.centraldogma.internal.Util;
import com.linecorp.centraldogma.server.command.Command;
import com.linecorp.centraldogma.server.command.CommandType;
import com.linecorp.centraldogma.server.command.ForcePushCommand;
import com.linecorp.centraldogma.server.command.NormalizingPushCommand;

public final class ReplicationLog<T> {
Expand Down Expand Up @@ -58,6 +60,9 @@ private static <T> T deserializeResult(
result = null;
}

if (command.type() == CommandType.FORCE_PUSH) {
command = ((ForcePushCommand<T>) command).delegate();
}
final Class<T> resultType = Util.unsafeCast(command.type().resultType());
if (resultType == Void.class) {
if (result != null) {
Expand All @@ -77,7 +82,12 @@ private static <T> T deserializeResult(
: NormalizingPushCommand.class.getSimpleName() + " cannot be replicated.";
this.command = requireNonNull(command, "command");

final Class<?> resultType = command.type().resultType();
final Class<?> resultType;
if (command.type() == CommandType.FORCE_PUSH) {
resultType = ((ForcePushCommand<?>) command).delegate().type().resultType();
} else {
resultType = command.type().resultType();
}
if (resultType == Void.class) {
if (result != null) {
rejectIncompatibleResult(result, Void.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
import com.linecorp.centraldogma.server.command.CommandExecutor;
import com.linecorp.centraldogma.server.command.CommandType;
import com.linecorp.centraldogma.server.command.CommitResult;
import com.linecorp.centraldogma.server.command.ForcePushCommand;
import com.linecorp.centraldogma.server.command.NormalizingPushCommand;
import com.linecorp.centraldogma.server.command.RemoveRepositoryCommand;
import com.linecorp.centraldogma.server.metadata.MetadataService;
Expand Down Expand Up @@ -1089,6 +1090,13 @@ private <T> T blockingExecute(Command<T> command) throws Exception {
final CommitResult commitResult = (CommitResult) result;
final Command<Revision> pushAsIsCommand = normalizingPushCommand.asIs(commitResult);
log = new ReplicationLog<>(replicaId(), pushAsIsCommand, commitResult.revision());
} else if (command.type() == CommandType.FORCE_PUSH &&
((ForcePushCommand<?>) command).delegate().type() == CommandType.NORMALIZING_PUSH) {
final NormalizingPushCommand delegated =
(NormalizingPushCommand) ((ForcePushCommand<?>) command).delegate();
final CommitResult commitResult = (CommitResult) result;
final Command<Revision> command0 = Command.forcePush(delegated.asIs(commitResult));
log = new ReplicationLog<>(replicaId(), command0, commitResult.revision());
} else {
log = new ReplicationLog<>(replicaId(), command, result);
}
Expand All @@ -1101,6 +1109,12 @@ private <T> T blockingExecute(Command<T> command) throws Exception {
}
}

private <T> ReplicationLog<?> newReplicationLogForPush(NormalizingPushCommand command, CommitResult
result) {
final Command<Revision> pushAsIsCommand = command.asIs(result);
return new ReplicationLog<>(replicaId(), pushAsIsCommand, result.revision());
}

private void createParentNodes() throws Exception {
if (createdParentNodes) {
return;
Expand Down
Loading

0 comments on commit 00caf43

Please sign in to comment.