Skip to content

Commit

Permalink
Avoid double loading in Cassandra side of journal
Browse files Browse the repository at this point in the history
Patch by Alex Petrov; reviewed by Benedict Elliott Smith for CASSANDRA-20244
  • Loading branch information
ifesdjeen committed Jan 22, 2025
1 parent f9ed591 commit e64f5b8
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 63 deletions.
5 changes: 3 additions & 2 deletions accord-core/src/main/java/accord/api/Journal.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Iterator;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

Expand All @@ -33,6 +34,7 @@
import accord.primitives.TxnId;
import accord.topology.Topology;
import accord.utils.PersistentField.Persister;
import accord.utils.async.AsyncChain;
import org.agrona.collections.Int2ObjectHashMap;

/**
Expand Down Expand Up @@ -145,8 +147,7 @@ enum Load
*/
interface Loader
{
void load(Command next, OnDone onDone);
void apply(Command next, OnDone onDone);
AsyncChain<Void> load(TxnId txnId, Supplier<Command> supplier);
}


Expand Down
3 changes: 3 additions & 0 deletions accord-core/src/main/java/accord/impl/CommandChange.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import static accord.local.Cleanup.NO;
import static accord.local.Cleanup.TRUNCATE_WITH_OUTCOME;
import static accord.local.StoreParticipants.Filter.LOAD;
import static accord.primitives.SaveStatus.TruncatedApplyWithOutcome;
import static accord.primitives.Status.Durability.NotDurable;

public class CommandChange
Expand Down Expand Up @@ -414,6 +415,8 @@ private static Command.Truncated truncated(TxnId txnId, SaveStatus status, Statu
case TruncatedApplyWithOutcome:
case TruncatedApplyWithDeps:
case TruncatedApply:
if (status != TruncatedApplyWithOutcome)
result = null;
return Command.Truncated.truncatedApply(txnId, status, durability, participants, executeAt, writes, result, executesAtLeast);
case ErasedOrVestigial:
return Command.Truncated.erasedOrVestigial(txnId, participants);
Expand Down
71 changes: 37 additions & 34 deletions accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -75,6 +76,7 @@
import accord.primitives.Routable.Domain;
import accord.primitives.RoutableKey;
import accord.primitives.Route;
import accord.primitives.SaveStatus;
import accord.primitives.Status;
import accord.primitives.Timestamp;
import accord.primitives.Txn;
Expand All @@ -84,6 +86,7 @@
import accord.utils.Invariants;
import accord.utils.async.AsyncChain;
import accord.utils.async.AsyncChains;
import accord.utils.async.AsyncResults;
import accord.utils.async.Cancellable;
import org.agrona.collections.ObjectHashSet;

Expand All @@ -104,6 +107,7 @@
import static accord.primitives.Status.Truncated;
import static accord.primitives.Txn.Kind.EphemeralRead;
import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
import static accord.primitives.Txn.Kind.Write;
import static accord.utils.Invariants.illegalState;
import static java.lang.String.format;

Expand Down Expand Up @@ -1160,47 +1164,26 @@ else if (!CommandsForKey.managesExecution(txnId) && command.hasBeen(Status.Stabl
return PreLoadContext.contextFor(txnId);
}

@Override
public void load(Command command, Journal.OnDone onDone)
public void load(Command command)
{
try
{
commandStore.executeInContext(commandStore,
context(command, ASYNC),
safeStore -> loadInternal(command, safeStore));
}
catch (Throwable t)
{
onDone.failure(t);
}

onDone.success();
commandStore.executeInContext(commandStore,
context(command, ASYNC),
safeStore -> loadInternal(command, safeStore));
}

@Override
public void apply(Command command, Journal.OnDone onDone)
public void apply(Command command)
{
try
if (command.txnId().is(Write) && command.saveStatus().compareTo(SaveStatus.Stable) >= 0 && !command.hasBeen(Truncated))
{
PreLoadContext context = context(command, SYNC);
commandStore.unsafeRunIn(() -> {
commandStore.executeInContext(commandStore,
context,
safeStore -> {
applyWrites(command.txnId(), safeStore, (safeCommand, cmd) -> {
unsafeApplyWrites(safeStore, safeCommand, cmd);
});
return null;
commandStore.executeInContext(commandStore,
context(command, SYNC),
safeStore -> {
applyWrites(command.txnId(), safeStore, (safeCommand, cmd) -> {
unsafeApplyWrites(safeStore, safeCommand, cmd);
});
});
return null;
});
}
catch (Throwable t)
{
onDone.failure(t);
return;
}

onDone.success();
}

protected void unsafeApplyWrites(SafeCommandStore safeStore, SafeCommand safeCommand, Command command)
Expand All @@ -1214,6 +1197,26 @@ protected void unsafeApplyWrites(SafeCommandStore safeStore, SafeCommand safeCom
safeStore.notifyListeners(safeCommand, command);
}
}

public AsyncChain<Void> load(TxnId txnId, Supplier<Command> supplier)
{
Command command;
// TODO (required): consider this race condition some more:
// - can we avoid double-applying?
// - is this definitely safe?
if (commandStore.hasCommand(txnId))
{
command = commandStore.command(txnId).value();
}
else
{
command = supplier.get();
load(command);
}

apply(command);
return AsyncResults.success(null);
}
}

@Override
Expand Down
2 changes: 1 addition & 1 deletion accord-core/src/main/java/accord/local/Command.java
Original file line number Diff line number Diff line change
Expand Up @@ -1701,7 +1701,7 @@ public static <T extends Command> T validate(T validate)
case Erased:
case WasApply:
Invariants.checkState(writes == null, "Writes exist for %s", validate);
Invariants.checkState(result == null, "Results exist %s", validate);
Invariants.checkState(result == null, "Results exist %s %s", validate, result);
break;
}
}
Expand Down
17 changes: 11 additions & 6 deletions accord-core/src/test/java/accord/impl/basic/Cluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -916,7 +916,7 @@ private static void verifyConsistentRestore(Int2ObjectHashMap<NavigableMap<TxnId
{
if (afterCommand.is(Status.Invalidated))
Invariants.checkState(beforeCommand.hasBeen(Status.Truncated) || !beforeCommand.hasBeen(Status.PreCommitted)
&& store.unsafeGetRedundantBefore().max(beforeCommand.participants().touches(), RedundantBefore.Entry::shardRedundantBefore).compareTo(beforeCommand.txnId()) >= 0);
&& store.unsafeGetRedundantBefore().max(beforeCommand.participants().touches(), RedundantBefore.Entry::shardRedundantBefore).compareTo(beforeCommand.txnId()) >= 0);
continue;
}
if (beforeCommand.hasBeen(Status.Truncated))
Expand All @@ -925,11 +925,16 @@ private static void verifyConsistentRestore(Int2ObjectHashMap<NavigableMap<TxnId
Invariants.checkState(beforeCommand.is(Status.Invalidated) || afterCommand.is(Status.Truncated) || afterCommand.is(Status.Applied));
continue;
}
Invariants.checkState(isConsistent(beforeCommand.saveStatus(), afterCommand.saveStatus())
&& beforeCommand.executeAtOrTxnId().equals(afterCommand.executeAtOrTxnId())
&& beforeCommand.acceptedOrCommitted().equals(afterCommand.acceptedOrCommitted())
&& beforeCommand.promised().equals(afterCommand.promised())
&& beforeCommand.durability().equals(afterCommand.durability()));
Invariants.checkState(isConsistent(beforeCommand.saveStatus(), afterCommand.saveStatus()),
"%s != %s", beforeCommand.saveStatus(), afterCommand.saveStatus());
Invariants.checkState(beforeCommand.executeAtOrTxnId().equals(afterCommand.executeAtOrTxnId()),
"%s != %s", beforeCommand.executeAtOrTxnId(), afterCommand.executeAtOrTxnId());
Invariants.checkState(beforeCommand.acceptedOrCommitted().equals(afterCommand.acceptedOrCommitted()),
"%s != %s", beforeCommand.acceptedOrCommitted(), afterCommand.acceptedOrCommitted());
Invariants.checkState(beforeCommand.promised().equals(afterCommand.promised()),
"%s != %s", beforeCommand.promised(), afterCommand.promised());
Invariants.checkState(beforeCommand.durability().equals(afterCommand.durability()),
"%s != %s", beforeCommand.durability(), afterCommand.durability());
}

if (before.size() > store.unsafeCommands().size())
Expand Down
44 changes: 24 additions & 20 deletions accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,12 @@
import accord.primitives.Writes;
import accord.utils.Invariants;
import accord.utils.PersistentField;
import accord.utils.async.AsyncChains;
import accord.utils.async.AsyncResult;
import org.agrona.collections.Int2ObjectHashMap;

import static accord.api.Journal.Load.ALL;
import static accord.impl.CommandChange.*;
import static accord.impl.CommandChange.Field;
import static accord.impl.CommandChange.Field.ACCEPTED;
import static accord.impl.CommandChange.Field.DURABILITY;
import static accord.impl.CommandChange.Field.EXECUTES_AT_LEAST;
Expand All @@ -71,12 +73,23 @@
import static accord.impl.CommandChange.Field.SAVE_STATUS;
import static accord.impl.CommandChange.Field.WAITING_ON;
import static accord.impl.CommandChange.Field.WRITES;
import static accord.impl.CommandChange.WaitingOnProvider;
import static accord.impl.CommandChange.anyFieldChanged;
import static accord.impl.CommandChange.getFieldChanged;
import static accord.impl.CommandChange.getFieldIsNull;
import static accord.impl.CommandChange.getFlags;
import static accord.impl.CommandChange.getWaitingOn;
import static accord.impl.CommandChange.nextSetField;
import static accord.impl.CommandChange.setFieldChanged;
import static accord.impl.CommandChange.setFieldIsNull;
import static accord.impl.CommandChange.toIterableSetFields;
import static accord.impl.CommandChange.unsetFieldIsNull;
import static accord.impl.CommandChange.unsetIterableFields;
import static accord.impl.CommandChange.validateFlags;
import static accord.primitives.SaveStatus.Erased;
import static accord.primitives.SaveStatus.ErasedOrVestigial;
import static accord.primitives.SaveStatus.Stable;
import static accord.primitives.Status.Invalidated;
import static accord.primitives.Status.Truncated;
import static accord.primitives.Txn.Kind.Write;
import static accord.utils.Invariants.illegalState;

public class InMemoryJournal implements Journal
Expand Down Expand Up @@ -359,23 +372,14 @@ public void success() {}
{
if (e.getValue().isEmpty()) continue;

// TODO (required): consider this race condition some more:
// - can we avoid double-applying?
// - is this definitely safe?
Command command;
if (!commandStore.hasCommand(e.getKey()))
{
command = reconstruct(e.getValue(), ALL).construct(commandStore.unsafeGetRedundantBefore());
Invariants.checkState(command.saveStatus() != SaveStatus.Uninitialised,
"Found uninitialized command in the log: %s %s", diffEntry.getKey(), e.getValue());
loader.load(command, sync);
}
else
{
command = commandStore.command(e.getKey()).value();
}
if (command.txnId().is(Write) && command.saveStatus().compareTo(Stable) >= 0 && !command.hasBeen(Truncated))
loader.apply(command, sync);
AsyncResult<Void> res = loader.load(e.getKey(),
() -> {
Command command = reconstruct(e.getValue(), ALL).construct(commandStore.unsafeGetRedundantBefore());
Invariants.checkState(command.saveStatus() != SaveStatus.Uninitialised,
"Found uninitialized command in the log: %s %s", diffEntry.getKey(), e.getValue());
return command;
}).beginAsResult();
AsyncChains.getUnchecked(res);
}
}
}
Expand Down

0 comments on commit e64f5b8

Please sign in to comment.