Skip to content

Commit

Permalink
Fix indexing
Browse files Browse the repository at this point in the history
  • Loading branch information
ifesdjeen committed Feb 5, 2025
1 parent 6187239 commit 18dac1b
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 30 deletions.
2 changes: 1 addition & 1 deletion accord-core/src/main/java/accord/api/Journal.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ enum Load
*/
interface Loader
{
AsyncChain<Void> load(TxnId txnId);
AsyncChain<Command> load(TxnId txnId);
}


Expand Down
2 changes: 1 addition & 1 deletion accord-core/src/main/java/accord/impl/AbstractLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ protected Command loadInternal(Command command, SafeCommandStore safeStore)
return safeStore.unsafeGetNoCleanup(txnId).update(safeStore, command);
}

protected void applyWrites(TxnId txnId, SafeCommandStore safeStore, BiConsumer<SafeCommand, Command> apply)
protected void maybeApplyWrites(TxnId txnId, SafeCommandStore safeStore, BiConsumer<SafeCommand, Command> apply)
{
SafeCommand safeCommand = safeStore.unsafeGet(txnId);
Command command = safeCommand.current();
Expand Down
45 changes: 18 additions & 27 deletions accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@
import accord.utils.Invariants;
import accord.utils.async.AsyncChain;
import accord.utils.async.AsyncChains;
import accord.utils.async.AsyncResults;
import accord.utils.async.Cancellable;
import org.agrona.collections.ObjectHashSet;

Expand Down Expand Up @@ -1149,26 +1148,26 @@ else if (!CommandsForKey.managesExecution(txnId) && command.hasBeen(Status.Stabl
return txnId;
}

public void load(Command command)
private AsyncChain<Command> load(Command command)
{
commandStore.executeInContext(commandStore,
context(command, ASYNC),
safeStore -> loadInternal(command, safeStore));
return AsyncChains.success(commandStore.executeInContext(commandStore,
context(command, ASYNC),
(SafeCommandStore safeStore) -> loadInternal(command, safeStore)));
}

public void apply(Command command)
private AsyncChain<Command> apply(Command command)
{
commandStore.executeInContext(commandStore,
context(command, SYNC),
safeStore -> {
applyWrites(command.txnId(), safeStore, (safeCommand, cmd) -> {
unsafeApplyWrites(safeStore, safeCommand, cmd);
});
return null;
});
return AsyncChains.success(commandStore.executeInContext(commandStore,
context(command, SYNC),
(SafeCommandStore safeStore) -> {
maybeApplyWrites(command.txnId(), safeStore, (safeCommand, cmd) -> {
unsafeApplyWrites(safeStore, safeCommand, cmd);
});
return command;
}));
}

protected void unsafeApplyWrites(SafeCommandStore safeStore, SafeCommand safeCommand, Command command)
private void unsafeApplyWrites(SafeCommandStore safeStore, SafeCommand safeCommand, Command command)
{
Command.Executed executed = command.asExecuted();
Participants<?> executes = executed.participants().stillExecutes();
Expand All @@ -1181,24 +1180,16 @@ protected void unsafeApplyWrites(SafeCommandStore safeStore, SafeCommand safeCom
}

@Override
public AsyncChain<Void> load(TxnId txnId)
public AsyncChain<Command> load(TxnId txnId)
{
Command command;
// TODO (required): consider this race condition some more:
// - can we avoid double-applying?
// - is this definitely safe?
if (commandStore.hasCommand(txnId))
{
command = commandStore.command(txnId).value();
}
else
{
command = commandStore.journal.loadCommand(commandStore.id, txnId, commandStore.unsafeGetRedundantBefore(), commandStore.durableBefore());
load(command);
}
return apply(commandStore.command(txnId).value());

apply(command);
return AsyncResults.success(null);
Command command = commandStore.journal.loadCommand(commandStore.id, txnId, commandStore.unsafeGetRedundantBefore(), commandStore.durableBefore());
return load(command).flatMap(this::apply);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ public void replay(CommandStores commandStores)
{
if (e.getValue().isEmpty()) continue;

AsyncResult<Void> res = loader.load(e.getKey()).beginAsResult();
AsyncResult<Command> res = loader.load(e.getKey()).beginAsResult();
AsyncChains.getUnchecked(res);
}
}
Expand Down

0 comments on commit 18dac1b

Please sign in to comment.