Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid double loading in Cassandra side of journal #159

Open
wants to merge 3 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions accord-core/src/main/java/accord/api/Journal.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import accord.primitives.TxnId;
import accord.topology.Topology;
import accord.utils.PersistentField.Persister;
import accord.utils.async.AsyncChain;
import org.agrona.collections.Int2ObjectHashMap;

/**
Expand Down Expand Up @@ -145,8 +146,7 @@ enum Load
*/
interface Loader
{
void load(Command next, OnDone onDone);
void apply(Command next, OnDone onDone);
AsyncChain<Command> load(TxnId txnId);
}


Expand Down
7 changes: 4 additions & 3 deletions accord-core/src/main/java/accord/impl/AbstractLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@
import accord.local.Commands;
import accord.local.SafeCommand;
import accord.local.SafeCommandStore;
import accord.primitives.SaveStatus;
import accord.primitives.TxnId;

import static accord.local.Cleanup.Input.FULL;
import static accord.primitives.SaveStatus.Applying;
import static accord.primitives.SaveStatus.PreApplied;
import static accord.primitives.Status.Invalidated;
import static accord.primitives.Status.Stable;
import static accord.primitives.Status.Truncated;
import static accord.primitives.Txn.Kind.Write;

public abstract class AbstractLoader implements Journal.Loader
{
Expand All @@ -58,15 +59,15 @@ protected Command loadInternal(Command command, SafeCommandStore safeStore)
return safeStore.unsafeGetNoCleanup(txnId).update(safeStore, command);
}

protected void applyWrites(TxnId txnId, SafeCommandStore safeStore, BiConsumer<SafeCommand, Command> apply)
protected void maybeApplyWrites(TxnId txnId, SafeCommandStore safeStore, BiConsumer<SafeCommand, Command> apply)
{
SafeCommand safeCommand = safeStore.unsafeGet(txnId);
Command command = safeCommand.current();
if (command.is(Stable) || command.saveStatus() == PreApplied)
{
Commands.maybeExecute(safeStore, safeCommand, command, true, true);
}
else if (command.saveStatus().compareTo(Applying) >= 0 && !command.hasBeen(Truncated))
else if (command.txnId().is(Write) && command.saveStatus().compareTo(SaveStatus.Stable) >= 0 && !command.hasBeen(Truncated))
{
apply.accept(safeCommand, command);
}
Expand Down
2 changes: 1 addition & 1 deletion accord-core/src/main/java/accord/impl/CommandChange.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@
import static accord.impl.CommandChange.Field.EXECUTES_AT_LEAST;
import static accord.impl.CommandChange.Field.EXECUTE_AT;
import static accord.impl.CommandChange.Field.FIELDS;
import static accord.impl.CommandChange.Field.MIN_UNIQUE_HLC;
import static accord.impl.CommandChange.Field.PARTIAL_DEPS;
import static accord.impl.CommandChange.Field.PARTIAL_TXN;
import static accord.impl.CommandChange.Field.PARTICIPANTS;
import static accord.impl.CommandChange.Field.PROMISED;
import static accord.impl.CommandChange.Field.RESULT;
import static accord.impl.CommandChange.Field.SAVE_STATUS;
import static accord.impl.CommandChange.Field.MIN_UNIQUE_HLC;
import static accord.impl.CommandChange.Field.WAITING_ON;
import static accord.impl.CommandChange.Field.WRITES;
import static accord.local.Cleanup.NO;
Expand Down
89 changes: 39 additions & 50 deletions accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,16 @@

import static accord.local.Cleanup.Input.FULL;
import static accord.local.KeyHistory.ASYNC;
import static accord.local.KeyHistory.SYNC;
import static accord.local.RedundantStatus.Coverage.ALL;
import static accord.primitives.Known.KnownRoute.MaybeRoute;
import static accord.primitives.Routable.Domain.Range;
import static accord.primitives.Routables.Slice.Minimal;
import static accord.local.KeyHistory.SYNC;
import static accord.primitives.SaveStatus.Applying;
import static accord.primitives.SaveStatus.Erased;
import static accord.primitives.SaveStatus.Vestigial;
import static accord.primitives.SaveStatus.NotDefined;
import static accord.primitives.SaveStatus.ReadyToExecute;
import static accord.primitives.SaveStatus.Vestigial;
import static accord.primitives.Status.Applied;
import static accord.primitives.Status.Committed;
import static accord.primitives.Status.Durability.NotDurable;
Expand All @@ -124,12 +124,12 @@ public abstract class InMemoryCommandStore extends CommandStore
protected Timestamp maxRedundant = Timestamp.NONE;

private InMemorySafeStore current;
private final Journal.Loader loader;
private final Journal journal;

public InMemoryCommandStore(int id, NodeCommandStoreService node, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, EpochUpdateHolder epochUpdateHolder)
public InMemoryCommandStore(int id, NodeCommandStoreService node, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, EpochUpdateHolder epochUpdateHolder, Journal journal)
{
super(id, node, agent, store, progressLogFactory, listenersFactory, epochUpdateHolder);
this.loader = new CommandLoader(this);
this.journal = journal;
}

protected boolean canExposeUnloaded()
Expand Down Expand Up @@ -816,9 +816,9 @@ public static class Synchronized extends InMemoryCommandStore
Runnable active = null;
final Queue<Runnable> queue = new ConcurrentLinkedQueue<>();

public Synchronized(int id, NodeCommandStoreService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, EpochUpdateHolder epochUpdateHolder)
public Synchronized(int id, NodeCommandStoreService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, EpochUpdateHolder epochUpdateHolder, Journal journal)
{
super(id, time, agent, store, progressLogFactory, listenersFactory, epochUpdateHolder);
super(id, time, agent, store, progressLogFactory, listenersFactory, epochUpdateHolder, journal);
}

private synchronized void maybeRun()
Expand Down Expand Up @@ -909,9 +909,9 @@ public static class SingleThread extends InMemoryCommandStore
private Thread thread; // when run in the executor this will be non-null, null implies not running in this store
private final ExecutorService executor;

public SingleThread(int id, NodeCommandStoreService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, EpochUpdateHolder epochUpdateHolder)
public SingleThread(int id, NodeCommandStoreService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, EpochUpdateHolder epochUpdateHolder, Journal journal)
{
super(id, time, agent, store, progressLogFactory, listenersFactory, epochUpdateHolder);
super(id, time, agent, store, progressLogFactory, listenersFactory, epochUpdateHolder, journal);
this.executor = Executors.newSingleThreadExecutor(r -> {
Thread thread = new Thread(r);
thread.setName(CommandStore.class.getSimpleName() + '[' + time.id() + ']');
Expand Down Expand Up @@ -992,9 +992,9 @@ public InMemorySafeCommand getInternal(TxnId txnId)
}
}

public Debug(int id, NodeCommandStoreService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, EpochUpdateHolder epochUpdateHolder)
public Debug(int id, NodeCommandStoreService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, EpochUpdateHolder epochUpdateHolder, Journal journal)
{
super(id, time, agent, store, progressLogFactory, listenersFactory, epochUpdateHolder);
super(id, time, agent, store, progressLogFactory, listenersFactory, epochUpdateHolder, journal);
}

@Override
Expand Down Expand Up @@ -1120,7 +1120,7 @@ public void clearForTesting()

public Journal.Loader loader()
{
return loader;
return new CommandLoader(this);
}

private static class CommandLoader extends AbstractLoader
Expand Down Expand Up @@ -1148,50 +1148,26 @@ else if (!CommandsForKey.managesExecution(txnId) && command.hasBeen(Status.Stabl
return txnId;
}

@Override
public void load(Command command, Journal.OnDone onDone)
private AsyncChain<Command> load(Command command)
{
try
{
commandStore.executeInContext(commandStore,
context(command, ASYNC),
safeStore -> loadInternal(command, safeStore));
}
catch (Throwable t)
{
onDone.failure(t);
}

onDone.success();
return AsyncChains.success(commandStore.executeInContext(commandStore,
context(command, ASYNC),
(SafeCommandStore safeStore) -> loadInternal(command, safeStore)));
}

@Override
public void apply(Command command, Journal.OnDone onDone)
private AsyncChain<Command> apply(Command command)
{
try
{
PreLoadContext context = context(command, SYNC);
commandStore.unsafeRunIn(() -> {
commandStore.executeInContext(commandStore,
context,
safeStore -> {
applyWrites(command.txnId(), safeStore, (safeCommand, cmd) -> {
unsafeApplyWrites(safeStore, safeCommand, cmd);
});
return null;
});
});
}
catch (Throwable t)
{
onDone.failure(t);
return;
}

onDone.success();
return AsyncChains.success(commandStore.executeInContext(commandStore,
context(command, SYNC),
(SafeCommandStore safeStore) -> {
maybeApplyWrites(command.txnId(), safeStore, (safeCommand, cmd) -> {
unsafeApplyWrites(safeStore, safeCommand, cmd);
});
return command;
}));
}

protected void unsafeApplyWrites(SafeCommandStore safeStore, SafeCommand safeCommand, Command command)
private void unsafeApplyWrites(SafeCommandStore safeStore, SafeCommand safeCommand, Command command)
{
Command.Executed executed = command.asExecuted();
Participants<?> executes = executed.participants().stillExecutes();
Expand All @@ -1202,6 +1178,19 @@ protected void unsafeApplyWrites(SafeCommandStore safeStore, SafeCommand safeCom
safeStore.notifyListeners(safeCommand, command);
}
}

@Override
public AsyncChain<Command> load(TxnId txnId)
{
// TODO (required): consider this race condition some more:
// - can we avoid double-applying?
// - is this definitely safe?
if (commandStore.hasCommand(txnId))
return apply(commandStore.command(txnId).value());

Command command = commandStore.journal.loadCommand(commandStore.id, txnId, commandStore.unsafeGetRedundantBefore(), commandStore.durableBefore());
return load(command).flatMap(this::apply);
}
}

@Override
Expand Down
2 changes: 1 addition & 1 deletion accord-core/src/main/java/accord/local/Command.java
Original file line number Diff line number Diff line change
Expand Up @@ -1749,7 +1749,7 @@ public static <T extends Command> T validate(T validate)
case Erased:
case WasApply:
Invariants.require(writes == null, "Writes exist for %s", validate);
Invariants.require(result == null, "Results exist %s", validate);
Invariants.require(result == null, "Results exist %s %s", validate, result);
break;
}
}
Expand Down
3 changes: 2 additions & 1 deletion accord-core/src/main/java/accord/local/CommandStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ CommandStore create(int id,
DataStore store,
ProgressLog.Factory progressLogFactory,
LocalListeners.Factory listenersFactory,
EpochUpdateHolder rangesForEpoch);
EpochUpdateHolder rangesForEpoch,
Journal journal);
}

private static final ThreadLocal<CommandStore> CURRENT_STORE = new ThreadLocal<>();
Expand Down
8 changes: 5 additions & 3 deletions accord-core/src/main/java/accord/local/CommandStores.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,9 @@ private static class StoreSupplier
private final LocalListeners.Factory listenersFactory;
private final CommandStore.Factory shardFactory;
private final RandomSource random;
private final Journal journal;

StoreSupplier(NodeCommandStoreService time, Agent agent, DataStore store, RandomSource random, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, CommandStore.Factory shardFactory)
StoreSupplier(NodeCommandStoreService time, Agent agent, DataStore store, RandomSource random, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, CommandStore.Factory shardFactory, Journal journal)
{
this.time = time;
this.agent = agent;
Expand All @@ -113,11 +114,12 @@ private static class StoreSupplier
this.progressLogFactory = progressLogFactory;
this.listenersFactory = listenersFactory;
this.shardFactory = shardFactory;
this.journal = journal;
}

CommandStore create(int id, EpochUpdateHolder rangesForEpoch)
{
return shardFactory.create(id, time, agent, this.store, progressLogFactory, listenersFactory, rangesForEpoch);
return shardFactory.create(id, time, agent, this.store, progressLogFactory, listenersFactory, rangesForEpoch, journal);
}
}

Expand Down Expand Up @@ -468,7 +470,7 @@ private CommandStores(StoreSupplier supplier, ShardDistributor shardDistributor,
public CommandStores(NodeCommandStoreService time, Agent agent, DataStore store, RandomSource random, Journal journal, ShardDistributor shardDistributor,
ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, CommandStore.Factory shardFactory)
{
this(new StoreSupplier(time, agent, store, random, progressLogFactory, listenersFactory, shardFactory), shardDistributor, journal);
this(new StoreSupplier(time, agent, store, random, progressLogFactory, listenersFactory, shardFactory, journal), shardDistributor, journal);
}

public Topology local()
Expand Down
4 changes: 2 additions & 2 deletions accord-core/src/test/java/accord/impl/basic/Cluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ public void validate(CommandStore commandStore, Command command, boolean isWrite
List<ReflectionUtils.Difference<?>> diff = ReflectionUtils.recursiveEquals(command, reconstructed);
if (!diff.isEmpty() && command.saveStatus().compareTo(SaveStatus.Erased) >= 0)
diff.removeIf(v -> v.path.equals(".participants."));
Invariants.require(diff.isEmpty(), "Commands did not match: expected %s, given %s on s, diff %s", command, reconstructed, commandStore, new LazyToString(() -> String.join("\n", Iterables.transform(diff, Object::toString))));
Invariants.require(diff.isEmpty(), "Commands did not match: expected %s, given %s on %s, diff %s", command, reconstructed, commandStore, new LazyToString(() -> String.join("\n", Iterables.transform(diff, Object::toString))));
}

@Override
Expand Down Expand Up @@ -922,7 +922,7 @@ private static void verifyConsistentRestore(Int2ObjectHashMap<NavigableMap<TxnId
{
if (afterCommand.is(Status.Invalidated))
Invariants.require(beforeCommand.hasBeen(Status.Truncated) || !beforeCommand.hasBeen(Status.PreCommitted)
&& store.unsafeGetRedundantBefore().max(beforeCommand.participants().touches(), RedundantBefore.Entry::shardRedundantBefore).compareTo(beforeCommand.txnId()) >= 0);
&& store.unsafeGetRedundantBefore().max(beforeCommand.participants().touches(), RedundantBefore.Entry::shardRedundantBefore).compareTo(beforeCommand.txnId()) >= 0);
continue;
}
if (beforeCommand.hasBeen(Status.Truncated))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public class DelayedCommandStores extends InMemoryCommandStores.SingleThread
{
private DelayedCommandStores(NodeCommandStoreService time, Agent agent, DataStore store, RandomSource random, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, SimulatedDelayedExecutorService executorService, CacheLoading isLoadedCheck, Journal journal)
{
super(time, agent, store, random, journal, shardDistributor, progressLogFactory, listenersFactory, DelayedCommandStore.factory(executorService, isLoadedCheck, journal));
super(time, agent, store, random, journal, shardDistributor, progressLogFactory, listenersFactory, DelayedCommandStore.factory(executorService, isLoadedCheck));
}

public static CommandStores.Factory factory(PendingQueue pending, CacheLoading isLoadedCheck)
Expand Down Expand Up @@ -178,7 +178,7 @@ public Callable<T> callable()

public DelayedCommandStore(int id, NodeCommandStoreService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, EpochUpdateHolder epochUpdateHolder, SimulatedDelayedExecutorService executor, CacheLoading cacheLoading, Journal journal)
{
super(id, time, agent, store, progressLogFactory, listenersFactory, epochUpdateHolder);
super(id, time, agent, store, progressLogFactory, listenersFactory, epochUpdateHolder, journal);
this.executor = executor;
this.cacheLoading = cacheLoading;
this.journal = journal;
Expand Down Expand Up @@ -255,9 +255,9 @@ protected boolean canExposeUnloaded()
return !cacheLoading.cacheEmpty();
}

private static CommandStore.Factory factory(SimulatedDelayedExecutorService executor, CacheLoading isLoadedCheck, Journal journal)
private static CommandStore.Factory factory(SimulatedDelayedExecutorService executor, CacheLoading isLoadedCheck)
{
return (id, node, agent, store, progressLogFactory, listenersFactory, rangesForEpoch) -> new DelayedCommandStore(id, node, agent, store, progressLogFactory, listenersFactory, rangesForEpoch, executor, isLoadedCheck, journal);
return (id, node, agent, store, progressLogFactory, listenersFactory, rangesForEpoch, journal) -> new DelayedCommandStore(id, node, agent, store, progressLogFactory, listenersFactory, rangesForEpoch, executor, isLoadedCheck, journal);
}

@Override
Expand Down
Loading