From 707b37c2a6531421d959eab3327faccddde8ed14 Mon Sep 17 00:00:00 2001 From: Alex Petrov Date: Tue, 21 Jan 2025 18:24:45 +0100 Subject: [PATCH] Avoid double loading in Cassandra side of journal Patch by Alex Petrov; reviewed by Benedict Elliott Smith for CASSANDRA-20244 --- .../src/main/java/accord/api/Journal.java | 5 +- .../main/java/accord/impl/AbstractLoader.java | 5 +- .../main/java/accord/impl/CommandChange.java | 6 +- .../accord/impl/InMemoryCommandStore.java | 74 +++++++++---------- .../src/main/java/accord/local/Command.java | 2 +- .../test/java/accord/impl/basic/Cluster.java | 17 +++-- .../accord/impl/basic/InMemoryJournal.java | 45 +++++------ 7 files changed, 83 insertions(+), 71 deletions(-) diff --git a/accord-core/src/main/java/accord/api/Journal.java b/accord-core/src/main/java/accord/api/Journal.java index a201653de..68faabcc1 100644 --- a/accord-core/src/main/java/accord/api/Journal.java +++ b/accord-core/src/main/java/accord/api/Journal.java @@ -21,6 +21,7 @@ import java.util.Iterator; import java.util.NavigableMap; import java.util.Objects; +import java.util.function.Supplier; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -33,6 +34,7 @@ import accord.primitives.TxnId; import accord.topology.Topology; import accord.utils.PersistentField.Persister; +import accord.utils.async.AsyncChain; import org.agrona.collections.Int2ObjectHashMap; /** @@ -145,8 +147,7 @@ enum Load */ interface Loader { - void load(Command next, OnDone onDone); - void apply(Command next, OnDone onDone); + AsyncChain load(TxnId txnId, Supplier supplier); } diff --git a/accord-core/src/main/java/accord/impl/AbstractLoader.java b/accord-core/src/main/java/accord/impl/AbstractLoader.java index 26a4d68f5..78bb5a747 100644 --- a/accord-core/src/main/java/accord/impl/AbstractLoader.java +++ b/accord-core/src/main/java/accord/impl/AbstractLoader.java @@ -26,14 +26,15 @@ import accord.local.Commands; import accord.local.SafeCommand; import accord.local.SafeCommandStore; +import accord.primitives.SaveStatus; import accord.primitives.TxnId; import static accord.local.Cleanup.Input.FULL; -import static accord.primitives.SaveStatus.Applying; import static accord.primitives.SaveStatus.PreApplied; import static accord.primitives.Status.Invalidated; import static accord.primitives.Status.Stable; import static accord.primitives.Status.Truncated; +import static accord.primitives.Txn.Kind.Write; public abstract class AbstractLoader implements Journal.Loader { @@ -66,7 +67,7 @@ protected void applyWrites(TxnId txnId, SafeCommandStore safeStore, BiConsumer= 0 && !command.hasBeen(Truncated)) + else if (command.txnId().is(Write) && command.saveStatus().compareTo(SaveStatus.Stable) >= 0 && !command.hasBeen(Truncated)) { apply.accept(safeCommand, command); } diff --git a/accord-core/src/main/java/accord/impl/CommandChange.java b/accord-core/src/main/java/accord/impl/CommandChange.java index 0ec5d9c98..a11e0781e 100644 --- a/accord-core/src/main/java/accord/impl/CommandChange.java +++ b/accord-core/src/main/java/accord/impl/CommandChange.java @@ -52,13 +52,13 @@ import static accord.impl.CommandChange.Field.EXECUTES_AT_LEAST; import static accord.impl.CommandChange.Field.EXECUTE_AT; import static accord.impl.CommandChange.Field.FIELDS; +import static accord.impl.CommandChange.Field.MIN_UNIQUE_HLC; import static accord.impl.CommandChange.Field.PARTIAL_DEPS; import static accord.impl.CommandChange.Field.PARTIAL_TXN; import static accord.impl.CommandChange.Field.PARTICIPANTS; import static accord.impl.CommandChange.Field.PROMISED; import static accord.impl.CommandChange.Field.RESULT; import static accord.impl.CommandChange.Field.SAVE_STATUS; -import static accord.impl.CommandChange.Field.MIN_UNIQUE_HLC; import static accord.impl.CommandChange.Field.WAITING_ON; import static accord.impl.CommandChange.Field.WRITES; import static accord.local.Cleanup.NO; @@ -76,6 +76,7 @@ import static accord.local.Command.Truncated.vestigial; import static accord.local.StoreParticipants.Filter.LOAD; import static accord.primitives.Known.KnownExecuteAt.ApplyAtKnown; +import static accord.primitives.SaveStatus.TruncatedApplyWithOutcome; import static accord.primitives.Status.Durability.NotDurable; public class CommandChange @@ -427,6 +428,9 @@ private static Command.Truncated truncated(TxnId txnId, SaveStatus status, Statu case TruncatedApplyWithOutcome: case TruncatedApplyWithDeps: case TruncatedApply: + // TODO (expected): Find another way to unset the result for the truncated command + if (status != TruncatedApplyWithOutcome) + result = null; return truncatedApply(txnId, status, durability, participants, executeAt, writes, result, executesAtLeast); case Vestigial: return vestigial(txnId, participants); diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java index 2a6e8c251..f494430e6 100644 --- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java +++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java @@ -36,6 +36,7 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; +import java.util.function.Supplier; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -85,15 +86,16 @@ import accord.utils.Invariants; import accord.utils.async.AsyncChain; import accord.utils.async.AsyncChains; +import accord.utils.async.AsyncResults; import accord.utils.async.Cancellable; import org.agrona.collections.ObjectHashSet; import static accord.local.Cleanup.Input.FULL; import static accord.local.KeyHistory.ASYNC; +import static accord.local.KeyHistory.SYNC; import static accord.primitives.Known.KnownRoute.MaybeRoute; import static accord.primitives.Routable.Domain.Range; import static accord.primitives.Routables.Slice.Minimal; -import static accord.local.KeyHistory.SYNC; import static accord.primitives.SaveStatus.Applying; import static accord.primitives.SaveStatus.Erased; import static accord.primitives.SaveStatus.Vestigial; @@ -1165,47 +1167,23 @@ else if (!CommandsForKey.managesExecution(txnId) && command.hasBeen(Status.Stabl return txnId; } - @Override - public void load(Command command, Journal.OnDone onDone) + public void load(Command command) { - try - { - commandStore.executeInContext(commandStore, - context(command, ASYNC), - safeStore -> loadInternal(command, safeStore)); - } - catch (Throwable t) - { - onDone.failure(t); - } - - onDone.success(); + commandStore.executeInContext(commandStore, + context(command, ASYNC), + safeStore -> loadInternal(command, safeStore)); } - @Override - public void apply(Command command, Journal.OnDone onDone) + public void apply(Command command) { - try - { - PreLoadContext context = context(command, SYNC); - commandStore.unsafeRunIn(() -> { - commandStore.executeInContext(commandStore, - context, - safeStore -> { - applyWrites(command.txnId(), safeStore, (safeCommand, cmd) -> { - unsafeApplyWrites(safeStore, safeCommand, cmd); - }); - return null; - }); - }); - } - catch (Throwable t) - { - onDone.failure(t); - return; - } - - onDone.success(); + commandStore.executeInContext(commandStore, + context(command, SYNC), + safeStore -> { + applyWrites(command.txnId(), safeStore, (safeCommand, cmd) -> { + unsafeApplyWrites(safeStore, safeCommand, cmd); + }); + return null; + }); } protected void unsafeApplyWrites(SafeCommandStore safeStore, SafeCommand safeCommand, Command command) @@ -1219,6 +1197,26 @@ protected void unsafeApplyWrites(SafeCommandStore safeStore, SafeCommand safeCom safeStore.notifyListeners(safeCommand, command); } } + + public AsyncChain load(TxnId txnId, Supplier supplier) + { + Command command; + // TODO (required): consider this race condition some more: + // - can we avoid double-applying? + // - is this definitely safe? + if (commandStore.hasCommand(txnId)) + { + command = commandStore.command(txnId).value(); + } + else + { + command = supplier.get(); + load(command); + } + + apply(command); + return AsyncResults.success(null); + } } @Override diff --git a/accord-core/src/main/java/accord/local/Command.java b/accord-core/src/main/java/accord/local/Command.java index 78186fdd3..84fa92a7a 100644 --- a/accord-core/src/main/java/accord/local/Command.java +++ b/accord-core/src/main/java/accord/local/Command.java @@ -1738,7 +1738,7 @@ public static T validate(T validate) case Erased: case WasApply: Invariants.require(writes == null, "Writes exist for %s", validate); - Invariants.require(result == null, "Results exist %s", validate); + Invariants.require(result == null, "Results exist %s %s", validate, result); break; } } diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java b/accord-core/src/test/java/accord/impl/basic/Cluster.java index 2faffc202..cfeb623df 100644 --- a/accord-core/src/test/java/accord/impl/basic/Cluster.java +++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java @@ -916,7 +916,7 @@ private static void verifyConsistentRestore(Int2ObjectHashMap= 0); + && store.unsafeGetRedundantBefore().max(beforeCommand.participants().touches(), RedundantBefore.Entry::shardRedundantBefore).compareTo(beforeCommand.txnId()) >= 0); continue; } if (beforeCommand.hasBeen(Status.Truncated)) @@ -925,11 +925,16 @@ private static void verifyConsistentRestore(Int2ObjectHashMap store.unsafeCommands().size()) diff --git a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java index 4bd676027..9eed23b9a 100644 --- a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java +++ b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java @@ -33,8 +33,8 @@ import accord.api.Journal; import accord.api.Result; import accord.impl.CommandChange; -import accord.impl.RetiredSafeCommand; import accord.impl.InMemoryCommandStore; +import accord.impl.RetiredSafeCommand; import accord.local.Cleanup; import accord.local.Command; import accord.local.Command.WaitingOnWithExecuteAt; @@ -57,10 +57,12 @@ import accord.primitives.Writes; import accord.utils.Invariants; import accord.utils.PersistentField; +import accord.utils.async.AsyncChains; +import accord.utils.async.AsyncResult; import org.agrona.collections.Int2ObjectHashMap; import static accord.api.Journal.Load.ALL; -import static accord.impl.CommandChange.*; +import static accord.impl.CommandChange.Field; import static accord.impl.CommandChange.Field.ACCEPTED; import static accord.impl.CommandChange.Field.DURABILITY; import static accord.impl.CommandChange.Field.EXECUTES_AT_LEAST; @@ -74,13 +76,23 @@ import static accord.impl.CommandChange.Field.SAVE_STATUS; import static accord.impl.CommandChange.Field.WAITING_ON; import static accord.impl.CommandChange.Field.WRITES; +import static accord.impl.CommandChange.WaitingOnProvider; +import static accord.impl.CommandChange.anyFieldChanged; +import static accord.impl.CommandChange.getFlags; +import static accord.impl.CommandChange.isChanged; +import static accord.impl.CommandChange.isNull; +import static accord.impl.CommandChange.nextSetField; +import static accord.impl.CommandChange.setChanged; +import static accord.impl.CommandChange.setFieldIsNull; +import static accord.impl.CommandChange.toIterableSetFields; +import static accord.impl.CommandChange.unsetFieldIsNull; +import static accord.impl.CommandChange.unsetIterable; +import static accord.impl.CommandChange.validateFlags; import static accord.local.Cleanup.Input.FULL; import static accord.primitives.SaveStatus.Erased; import static accord.primitives.SaveStatus.Vestigial; -import static accord.primitives.SaveStatus.Stable; import static accord.primitives.Status.Invalidated; import static accord.primitives.Status.Truncated; -import static accord.primitives.Txn.Kind.Write; import static accord.utils.Invariants.illegalState; public class InMemoryJournal implements Journal @@ -362,23 +374,14 @@ public void success() {} { if (e.getValue().isEmpty()) continue; - // TODO (required): consider this race condition some more: - // - can we avoid double-applying? - // - is this definitely safe? - Command command; - if (!commandStore.hasCommand(e.getKey())) - { - command = reconstruct(e.getValue(), ALL).construct(commandStore.unsafeGetRedundantBefore()); - Invariants.require(command.saveStatus() != SaveStatus.Uninitialised, - "Found uninitialized command in the log: %s %s", diffEntry.getKey(), e.getValue()); - loader.load(command, sync); - } - else - { - command = commandStore.command(e.getKey()).value(); - } - if (command.txnId().is(Write) && command.saveStatus().compareTo(Stable) >= 0 && !command.hasBeen(Truncated)) - loader.apply(command, sync); + AsyncResult res = loader.load(e.getKey(), + () -> { + Command command = reconstruct(e.getValue(), ALL).construct(commandStore.unsafeGetRedundantBefore()); + Invariants.require(command.saveStatus() != SaveStatus.Uninitialised, + "Found uninitialized command in the log: %s %s", diffEntry.getKey(), e.getValue()); + return command; + }).beginAsResult(); + AsyncChains.getUnchecked(res); } } }