Skip to content

Commit

Permalink
RATIS-2094. Avoid corruptions from TransactionContext's stateMachineL…
Browse files Browse the repository at this point in the history
…ogEntry and stateMachineContext. (#1106)
  • Loading branch information
duongkame authored Jun 3, 2024
1 parent 924a0cd commit a2bdd10
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;

/**
* A reference-counted object can be retained for later use
Expand Down Expand Up @@ -152,6 +153,13 @@ public boolean release() {
};
}

/**
* @return a {@link ReferenceCountedObject} by apply the given function to this object.
*/
default <V> ReferenceCountedObject<V> apply(Function<T, V> function) {
return delegate(function.apply(get()));
}

/**
* Wrap the given value as a {@link ReferenceCountedObject}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,12 @@ public TransactionContext startTransaction(RaftClientRequest request) throws IOE

@Override
public TransactionContext startTransaction(LogEntryProto entry, RaftProtos.RaftPeerRole role) {
ByteString copied = ByteString.copyFrom(entry.getStateMachineLogEntry().getLogData().asReadOnlyByteBuffer());
return TransactionContext.newBuilder()
.setStateMachine(this)
.setLogEntry(entry)
.setServerRole(role)
.setStateMachineContext(getProto(entry))
.setStateMachineContext(getProto(copied))
.build();
}

Expand Down Expand Up @@ -147,14 +148,14 @@ static FileStoreRequestProto getProto(TransactionContext context, LogEntryProto
return proto;
}
}
return getProto(entry);
return getProto(entry.getStateMachineLogEntry().getLogData());
}

static FileStoreRequestProto getProto(LogEntryProto entry) {
static FileStoreRequestProto getProto(ByteString bytes) {
try {
return FileStoreRequestProto.parseFrom(entry.getStateMachineLogEntry().getLogData());
return FileStoreRequestProto.parseFrom(bytes);
} catch (InvalidProtocolBufferException e) {
throw new IllegalArgumentException("Failed to parse data, entry=" + entry, e);
throw new IllegalArgumentException("Failed to parse data", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ public interface TransactionContext {
/**
* Returns the data from the {@link StateMachine}
* @return the data from the {@link StateMachine}
* @deprecated access StateMachineLogEntry via {@link TransactionContext#getLogEntryRef()} or
* {@link TransactionContext#getLogEntryUnsafe()}
*/
@Deprecated
StateMachineLogEntryProto getStateMachineLogEntry();

/** Set exception in case of failure. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public TransactionContext startTransaction(RaftClientRequest request) {
public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
try {
assertNotNull(trx.getLogEntryUnsafe());
assertNotNull(trx.getStateMachineLogEntry());
assertNotNull(trx.getLogEntryUnsafe().getStateMachineLogEntry());
Object context = trx.getStateMachineContext();
if (isLeader.get()) {
assertNotNull(trx.getClientRequest());
Expand Down

0 comments on commit a2bdd10

Please sign in to comment.