Skip to content

Commit

Permalink
Avoid double loading in Cassandra side of journal
Browse files Browse the repository at this point in the history
Patch by Alex Petrov; reviewed by Benedict Elliott Smith for CASSANDRA-20244
  • Loading branch information
ifesdjeen committed Feb 1, 2025
1 parent 78ab7ee commit 707b37c
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 71 deletions.
5 changes: 3 additions & 2 deletions accord-core/src/main/java/accord/api/Journal.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Iterator;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

Expand All @@ -33,6 +34,7 @@
import accord.primitives.TxnId;
import accord.topology.Topology;
import accord.utils.PersistentField.Persister;
import accord.utils.async.AsyncChain;
import org.agrona.collections.Int2ObjectHashMap;

/**
Expand Down Expand Up @@ -145,8 +147,7 @@ enum Load
*/
interface Loader
{
void load(Command next, OnDone onDone);
void apply(Command next, OnDone onDone);
AsyncChain<Void> load(TxnId txnId, Supplier<Command> supplier);
}


Expand Down
5 changes: 3 additions & 2 deletions accord-core/src/main/java/accord/impl/AbstractLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@
import accord.local.Commands;
import accord.local.SafeCommand;
import accord.local.SafeCommandStore;
import accord.primitives.SaveStatus;
import accord.primitives.TxnId;

import static accord.local.Cleanup.Input.FULL;
import static accord.primitives.SaveStatus.Applying;
import static accord.primitives.SaveStatus.PreApplied;
import static accord.primitives.Status.Invalidated;
import static accord.primitives.Status.Stable;
import static accord.primitives.Status.Truncated;
import static accord.primitives.Txn.Kind.Write;

public abstract class AbstractLoader implements Journal.Loader
{
Expand Down Expand Up @@ -66,7 +67,7 @@ protected void applyWrites(TxnId txnId, SafeCommandStore safeStore, BiConsumer<S
{
Commands.maybeExecute(safeStore, safeCommand, command, true, true);
}
else if (command.saveStatus().compareTo(Applying) >= 0 && !command.hasBeen(Truncated))
else if (command.txnId().is(Write) && command.saveStatus().compareTo(SaveStatus.Stable) >= 0 && !command.hasBeen(Truncated))
{
apply.accept(safeCommand, command);
}
Expand Down
6 changes: 5 additions & 1 deletion accord-core/src/main/java/accord/impl/CommandChange.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@
import static accord.impl.CommandChange.Field.EXECUTES_AT_LEAST;
import static accord.impl.CommandChange.Field.EXECUTE_AT;
import static accord.impl.CommandChange.Field.FIELDS;
import static accord.impl.CommandChange.Field.MIN_UNIQUE_HLC;
import static accord.impl.CommandChange.Field.PARTIAL_DEPS;
import static accord.impl.CommandChange.Field.PARTIAL_TXN;
import static accord.impl.CommandChange.Field.PARTICIPANTS;
import static accord.impl.CommandChange.Field.PROMISED;
import static accord.impl.CommandChange.Field.RESULT;
import static accord.impl.CommandChange.Field.SAVE_STATUS;
import static accord.impl.CommandChange.Field.MIN_UNIQUE_HLC;
import static accord.impl.CommandChange.Field.WAITING_ON;
import static accord.impl.CommandChange.Field.WRITES;
import static accord.local.Cleanup.NO;
Expand All @@ -76,6 +76,7 @@
import static accord.local.Command.Truncated.vestigial;
import static accord.local.StoreParticipants.Filter.LOAD;
import static accord.primitives.Known.KnownExecuteAt.ApplyAtKnown;
import static accord.primitives.SaveStatus.TruncatedApplyWithOutcome;
import static accord.primitives.Status.Durability.NotDurable;

public class CommandChange
Expand Down Expand Up @@ -427,6 +428,9 @@ private static Command.Truncated truncated(TxnId txnId, SaveStatus status, Statu
case TruncatedApplyWithOutcome:
case TruncatedApplyWithDeps:
case TruncatedApply:
// TODO (expected): Find another way to unset the result for the truncated command
if (status != TruncatedApplyWithOutcome)
result = null;
return truncatedApply(txnId, status, durability, participants, executeAt, writes, result, executesAtLeast);
case Vestigial:
return vestigial(txnId, participants);
Expand Down
74 changes: 36 additions & 38 deletions accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -85,15 +86,16 @@
import accord.utils.Invariants;
import accord.utils.async.AsyncChain;
import accord.utils.async.AsyncChains;
import accord.utils.async.AsyncResults;
import accord.utils.async.Cancellable;
import org.agrona.collections.ObjectHashSet;

import static accord.local.Cleanup.Input.FULL;
import static accord.local.KeyHistory.ASYNC;
import static accord.local.KeyHistory.SYNC;
import static accord.primitives.Known.KnownRoute.MaybeRoute;
import static accord.primitives.Routable.Domain.Range;
import static accord.primitives.Routables.Slice.Minimal;
import static accord.local.KeyHistory.SYNC;
import static accord.primitives.SaveStatus.Applying;
import static accord.primitives.SaveStatus.Erased;
import static accord.primitives.SaveStatus.Vestigial;
Expand Down Expand Up @@ -1165,47 +1167,23 @@ else if (!CommandsForKey.managesExecution(txnId) && command.hasBeen(Status.Stabl
return txnId;
}

@Override
public void load(Command command, Journal.OnDone onDone)
public void load(Command command)
{
try
{
commandStore.executeInContext(commandStore,
context(command, ASYNC),
safeStore -> loadInternal(command, safeStore));
}
catch (Throwable t)
{
onDone.failure(t);
}

onDone.success();
commandStore.executeInContext(commandStore,
context(command, ASYNC),
safeStore -> loadInternal(command, safeStore));
}

@Override
public void apply(Command command, Journal.OnDone onDone)
public void apply(Command command)
{
try
{
PreLoadContext context = context(command, SYNC);
commandStore.unsafeRunIn(() -> {
commandStore.executeInContext(commandStore,
context,
safeStore -> {
applyWrites(command.txnId(), safeStore, (safeCommand, cmd) -> {
unsafeApplyWrites(safeStore, safeCommand, cmd);
});
return null;
});
});
}
catch (Throwable t)
{
onDone.failure(t);
return;
}

onDone.success();
commandStore.executeInContext(commandStore,
context(command, SYNC),
safeStore -> {
applyWrites(command.txnId(), safeStore, (safeCommand, cmd) -> {
unsafeApplyWrites(safeStore, safeCommand, cmd);
});
return null;
});
}

protected void unsafeApplyWrites(SafeCommandStore safeStore, SafeCommand safeCommand, Command command)
Expand All @@ -1219,6 +1197,26 @@ protected void unsafeApplyWrites(SafeCommandStore safeStore, SafeCommand safeCom
safeStore.notifyListeners(safeCommand, command);
}
}

public AsyncChain<Void> load(TxnId txnId, Supplier<Command> supplier)
{
Command command;
// TODO (required): consider this race condition some more:
// - can we avoid double-applying?
// - is this definitely safe?
if (commandStore.hasCommand(txnId))
{
command = commandStore.command(txnId).value();
}
else
{
command = supplier.get();
load(command);
}

apply(command);
return AsyncResults.success(null);
}
}

@Override
Expand Down
2 changes: 1 addition & 1 deletion accord-core/src/main/java/accord/local/Command.java
Original file line number Diff line number Diff line change
Expand Up @@ -1738,7 +1738,7 @@ public static <T extends Command> T validate(T validate)
case Erased:
case WasApply:
Invariants.require(writes == null, "Writes exist for %s", validate);
Invariants.require(result == null, "Results exist %s", validate);
Invariants.require(result == null, "Results exist %s %s", validate, result);
break;
}
}
Expand Down
17 changes: 11 additions & 6 deletions accord-core/src/test/java/accord/impl/basic/Cluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -916,7 +916,7 @@ private static void verifyConsistentRestore(Int2ObjectHashMap<NavigableMap<TxnId
{
if (afterCommand.is(Status.Invalidated))
Invariants.require(beforeCommand.hasBeen(Status.Truncated) || !beforeCommand.hasBeen(Status.PreCommitted)
&& store.unsafeGetRedundantBefore().max(beforeCommand.participants().touches(), RedundantBefore.Entry::shardRedundantBefore).compareTo(beforeCommand.txnId()) >= 0);
&& store.unsafeGetRedundantBefore().max(beforeCommand.participants().touches(), RedundantBefore.Entry::shardRedundantBefore).compareTo(beforeCommand.txnId()) >= 0);
continue;
}
if (beforeCommand.hasBeen(Status.Truncated))
Expand All @@ -925,11 +925,16 @@ private static void verifyConsistentRestore(Int2ObjectHashMap<NavigableMap<TxnId
Invariants.require(beforeCommand.is(Status.Invalidated) || afterCommand.is(Status.Truncated) || afterCommand.is(Status.Applied));
continue;
}
Invariants.require(isConsistent(beforeCommand.saveStatus(), afterCommand.saveStatus())
&& beforeCommand.executeAtOrTxnId().equals(afterCommand.executeAtOrTxnId())
&& beforeCommand.acceptedOrCommitted().equals(afterCommand.acceptedOrCommitted())
&& beforeCommand.promised().equals(afterCommand.promised())
&& beforeCommand.durability().equals(afterCommand.durability()));
Invariants.require(isConsistent(beforeCommand.saveStatus(), afterCommand.saveStatus()),
"%s != %s", beforeCommand.saveStatus(), afterCommand.saveStatus());
Invariants.require(beforeCommand.executeAtOrTxnId().equals(afterCommand.executeAtOrTxnId()),
"%s != %s", beforeCommand.executeAtOrTxnId(), afterCommand.executeAtOrTxnId());
Invariants.require(beforeCommand.acceptedOrCommitted().equals(afterCommand.acceptedOrCommitted()),
"%s != %s", beforeCommand.acceptedOrCommitted(), afterCommand.acceptedOrCommitted());
Invariants.require(beforeCommand.promised().equals(afterCommand.promised()),
"%s != %s", beforeCommand.promised(), afterCommand.promised());
Invariants.require(beforeCommand.durability().equals(afterCommand.durability()),
"%s != %s", beforeCommand.durability(), afterCommand.durability());
}

if (before.size() > store.unsafeCommands().size())
Expand Down
45 changes: 24 additions & 21 deletions accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
import accord.api.Journal;
import accord.api.Result;
import accord.impl.CommandChange;
import accord.impl.RetiredSafeCommand;
import accord.impl.InMemoryCommandStore;
import accord.impl.RetiredSafeCommand;
import accord.local.Cleanup;
import accord.local.Command;
import accord.local.Command.WaitingOnWithExecuteAt;
Expand All @@ -57,10 +57,12 @@
import accord.primitives.Writes;
import accord.utils.Invariants;
import accord.utils.PersistentField;
import accord.utils.async.AsyncChains;
import accord.utils.async.AsyncResult;
import org.agrona.collections.Int2ObjectHashMap;

import static accord.api.Journal.Load.ALL;
import static accord.impl.CommandChange.*;
import static accord.impl.CommandChange.Field;
import static accord.impl.CommandChange.Field.ACCEPTED;
import static accord.impl.CommandChange.Field.DURABILITY;
import static accord.impl.CommandChange.Field.EXECUTES_AT_LEAST;
Expand All @@ -74,13 +76,23 @@
import static accord.impl.CommandChange.Field.SAVE_STATUS;
import static accord.impl.CommandChange.Field.WAITING_ON;
import static accord.impl.CommandChange.Field.WRITES;
import static accord.impl.CommandChange.WaitingOnProvider;
import static accord.impl.CommandChange.anyFieldChanged;
import static accord.impl.CommandChange.getFlags;
import static accord.impl.CommandChange.isChanged;
import static accord.impl.CommandChange.isNull;
import static accord.impl.CommandChange.nextSetField;
import static accord.impl.CommandChange.setChanged;
import static accord.impl.CommandChange.setFieldIsNull;
import static accord.impl.CommandChange.toIterableSetFields;
import static accord.impl.CommandChange.unsetFieldIsNull;
import static accord.impl.CommandChange.unsetIterable;
import static accord.impl.CommandChange.validateFlags;
import static accord.local.Cleanup.Input.FULL;
import static accord.primitives.SaveStatus.Erased;
import static accord.primitives.SaveStatus.Vestigial;
import static accord.primitives.SaveStatus.Stable;
import static accord.primitives.Status.Invalidated;
import static accord.primitives.Status.Truncated;
import static accord.primitives.Txn.Kind.Write;
import static accord.utils.Invariants.illegalState;

public class InMemoryJournal implements Journal
Expand Down Expand Up @@ -362,23 +374,14 @@ public void success() {}
{
if (e.getValue().isEmpty()) continue;

// TODO (required): consider this race condition some more:
// - can we avoid double-applying?
// - is this definitely safe?
Command command;
if (!commandStore.hasCommand(e.getKey()))
{
command = reconstruct(e.getValue(), ALL).construct(commandStore.unsafeGetRedundantBefore());
Invariants.require(command.saveStatus() != SaveStatus.Uninitialised,
"Found uninitialized command in the log: %s %s", diffEntry.getKey(), e.getValue());
loader.load(command, sync);
}
else
{
command = commandStore.command(e.getKey()).value();
}
if (command.txnId().is(Write) && command.saveStatus().compareTo(Stable) >= 0 && !command.hasBeen(Truncated))
loader.apply(command, sync);
AsyncResult<Void> res = loader.load(e.getKey(),
() -> {
Command command = reconstruct(e.getValue(), ALL).construct(commandStore.unsafeGetRedundantBefore());
Invariants.require(command.saveStatus() != SaveStatus.Uninitialised,
"Found uninitialized command in the log: %s %s", diffEntry.getKey(), e.getValue());
return command;
}).beginAsResult();
AsyncChains.getUnchecked(res);
}
}
}
Expand Down

0 comments on commit 707b37c

Please sign in to comment.