Skip to content

Commit

Permalink
Refactor CommandsForKey for efficiency, and to support transitive dep…
Browse files Browse the repository at this point in the history
…endency elision

patch by Benedict; reviewed by Aleksey Yeshchenko for CASSANDRA-19310
  • Loading branch information
belliottsmith committed Feb 28, 2024
1 parent 62bd2c6 commit 6b8bef4
Show file tree
Hide file tree
Showing 114 changed files with 3,268 additions and 3,612 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
import java.util.Set;
import java.util.function.BiConsumer;

import javax.annotation.Nullable;

import accord.coordinate.tracking.QuorumTracker;
import accord.local.Node;
import accord.local.Node.Id;
import accord.messages.Callback;
import accord.primitives.FullRoute;
import accord.primitives.Txn;
import accord.primitives.Seekables;
import accord.primitives.TxnId;
import accord.topology.Topologies;
import accord.utils.Invariants;
Expand Down Expand Up @@ -95,24 +97,22 @@ else if (tracker.recordSuccess(from) == Success)

final Node node;
final TxnId txnId;
final Txn txn;
final FullRoute<?> route;

private Topologies topologies;
private boolean initialRoundIsDone;
private ExtraEpochs extraEpochs;
private Map<Id, Object> debug = Invariants.debug() ? new LinkedHashMap<>() : null;

AbstractCoordinatePreAccept(Node node, FullRoute<?> route, TxnId txnId, Txn txn)
AbstractCoordinatePreAccept(Node node, FullRoute<?> route, TxnId txnId)
{
this(node, txnId, txn, route, node.topology().withUnsyncedEpochs(route, txnId, txnId));
this(node, route, txnId, node.topology().withUnsyncedEpochs(route, txnId, txnId));
}

AbstractCoordinatePreAccept(Node node, TxnId txnId, Txn txn, FullRoute<?> route, Topologies topologies)
AbstractCoordinatePreAccept(Node node, FullRoute<?> route, @Nullable TxnId txnId, Topologies topologies)
{
this.node = node;
this.txnId = txnId;
this.txn = txn;
this.route = route;
this.topologies = topologies;
}
Expand All @@ -122,6 +122,7 @@ final void start()
contact(topologies.nodes(), topologies, this);
}

abstract Seekables<?, ?> keysOrRanges();
abstract void contact(Set<Id> nodes, Topologies topologies, Callback<R> callback);
abstract void onSuccessInternal(Id from, R reply);
/**
Expand Down Expand Up @@ -163,7 +164,22 @@ public final synchronized void onSuccess(Id from, R reply)
@Override
public final void setFailure(Throwable failure)
{
Invariants.checkState(!initialRoundIsDone || (extraEpochs != null && !extraEpochs.extraRoundIsDone));
super.setFailure(failure);
onFailure(failure);
}

@Override
public final boolean tryFailure(Throwable failure)
{
if (!super.tryFailure(failure))
return false;
onFailure(failure);
return true;
}

private void onFailure(Throwable failure)
{
// we may already be complete, as we may receive a failure from a later phase; but it's fine to redundantly mark done
initialRoundIsDone = true;
if (extraEpochs != null)
extraEpochs.extraRoundIsDone = true;
Expand All @@ -178,7 +194,7 @@ else if (failure instanceof Preempted)
else if (failure instanceof Invalidated)
node.agent().metricsEventsListener().onInvalidated(txnId);
}
super.setFailure(failure);

}

final void onPreAcceptedOrNewEpoch()
Expand All @@ -203,7 +219,7 @@ final void onNewEpoch(Topologies prevTopologies, long latestEpoch)
// TODO (desired, efficiency): check if we have already have a valid quorum for the future epoch
// (noting that nodes may have adopted new ranges, in which case they should be discounted, and quorums may have changed shape)
node.withEpoch(latestEpoch, () -> {
TopologyMismatch mismatch = TopologyMismatch.checkForMismatch(node.topology().globalForEpoch(latestEpoch), txnId, route.homeKey(), txn.keys());
TopologyMismatch mismatch = TopologyMismatch.checkForMismatch(node.topology().globalForEpoch(latestEpoch), txnId, route.homeKey(), keysOrRanges());
if (mismatch != null)
{
initialRoundIsDone = true;
Expand All @@ -230,23 +246,7 @@ final void onNewEpoch(Topologies prevTopologies, long latestEpoch)
@Override
public final void accept(T success, Throwable failure)
{
if (success != null)
{
trySuccess(success);
}
else
{
if (failure instanceof CoordinationFailed)
{
((CoordinationFailed) failure).set(txnId, route.homeKey());
if (failure instanceof Preempted)
node.agent().metricsEventsListener().onPreempted(txnId);
else if (failure instanceof Timeout)
node.agent().metricsEventsListener().onTimeout(txnId);
else if (failure instanceof Invalidated)
node.agent().metricsEventsListener().onInvalidated(txnId);
}
tryFailure(failure);
}
if (success != null) trySuccess(success);
else tryFailure(failure);
}
}
79 changes: 30 additions & 49 deletions accord-core/src/main/java/accord/coordinate/Barrier.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package accord.coordinate;

import java.util.Objects;
import javax.annotation.Nonnull;

import accord.local.*;
Expand All @@ -30,7 +29,7 @@
import accord.api.Key;
import accord.api.RoutingKey;
import accord.local.SafeCommandStore.TestDep;
import accord.local.SafeCommandStore.TestTimestamp;
import accord.local.SafeCommandStore.TestStartedAt;
import accord.primitives.Routable.Domain;
import accord.primitives.Seekables;
import accord.primitives.SyncPoint;
Expand All @@ -42,6 +41,7 @@

import static accord.local.PreLoadContext.contextFor;
import static accord.primitives.Txn.Kind.Kinds.AnyGloballyVisible;
import static accord.local.SafeCommandStore.TestStatus.IS_STABLE;
import static accord.utils.Invariants.checkArgument;
import static accord.utils.Invariants.checkState;
import static accord.utils.Invariants.illegalState;
Expand Down Expand Up @@ -109,15 +109,7 @@ private void start()
return;
}

if (barrierTxn != null)
{
if (barrierTxn.status.equals(Status.Applied))
{
doBarrierSuccess(barrierTxn.executeAt);
}
// A listener was added to the transaction already
}
else
if (barrierTxn == null)
{
createSyncPoint();
}
Expand All @@ -143,7 +135,8 @@ private void doBarrierSuccess(Timestamp executeAt)

private void createSyncPoint()
{
coordinateSyncPoint = CoordinateSyncPoint.inclusive(node, seekables, barrierType.async);
coordinateSyncPoint = barrierType.async ? CoordinateSyncPoint.inclusive(node, seekables)
: CoordinateSyncPoint.inclusiveAndAwaitQuorum(node, seekables);
coordinateSyncPoint.addCallback((syncPoint, syncPointFailure) -> {
if (syncPointFailure != null)
{
Expand All @@ -153,7 +146,7 @@ private void createSyncPoint()

// Need to wait for the local transaction to finish since coordinate sync point won't wait on anything
// if async was requested or there were no deps found
if (syncPoint.finishedAsync)
if (barrierType.async)
{
TxnId txnId = syncPoint.syncId;
long epoch = txnId.epoch();
Expand Down Expand Up @@ -201,7 +194,7 @@ private ExistingTransactionCheck checkForExistingTransaction()
ExistingTransactionCheck check = new ExistingTransactionCheck();
Key k = seekables.get(0).asKey();
node.commandStores().mapReduceConsume(
contextFor(k, KeyHistory.ALL),
contextFor(k, KeyHistory.COMMANDS),
k.toUnseekable(),
minEpoch,
Long.MAX_VALUE,
Expand All @@ -217,23 +210,13 @@ static class BarrierTxn
@Nonnull
public final Timestamp executeAt;
@Nonnull
public final Status status;
@Nonnull
public final Key key;
public BarrierTxn(@Nonnull TxnId txnId, @Nonnull Timestamp executeAt, @Nonnull Status status, Key key)
public BarrierTxn(@Nonnull TxnId txnId, @Nonnull Timestamp executeAt, Key key)
{
this.txnId = txnId;
this.executeAt = executeAt;
this.status = status;
this.key = key;
}

public BarrierTxn max(BarrierTxn other)
{
if (other == null)
return this;
return status.compareTo(other.status) >= 0 ? this : other;
}
}

/*
Expand All @@ -248,31 +231,29 @@ class ExistingTransactionCheck extends AsyncResults.AbstractResult<BarrierTxn> i
@Override
public BarrierTxn apply(SafeCommandStore safeStore)
{
BarrierTxn found = safeStore.mapReduce(
seekables,
safeStore.ranges().allAfter(minEpoch),
KeyHistory.ALL,
// Barriers are trying to establish that committed transactions are applied before the barrier (or in this case just minEpoch)
// so all existing transaction types should ensure that at this point. An earlier txnid may have an executeAt that is after
// this barrier or the transaction we listen on and that is fine
AnyGloballyVisible,
TestTimestamp.EXECUTES_AFTER,
TxnId.minForEpoch(minEpoch),
TestDep.ANY_DEPS,
null,
Status.Committed,
Status.Applied,
(p1, keyOrRange, txnId, executeAt, status, deps, barrierTxn) -> {
if (keyOrRange.domain() == Domain.Key)
return new BarrierTxn(txnId, executeAt, status, keyOrRange.asKey());
return null;
},
null,
null,
// Take the first one we find, and call it good enough to wait on
Objects::nonNull);
// TODO (required): consider these semantics
BarrierTxn found = safeStore.mapReduceFull(
seekables,
safeStore.ranges().allAfter(minEpoch),
// Barriers are trying to establish that committed transactions are applied before the barrier (or in this case just minEpoch)
// so all existing transaction types should ensure that at this point. An earlier txnid may have an executeAt that is after
// this barrier or the transaction we listen on and that is fine
TxnId.minForEpoch(minEpoch),
AnyGloballyVisible,
TestStartedAt.STARTED_AFTER,
TestDep.ANY_DEPS,
IS_STABLE,
(p1, keyOrRange, txnId, executeAt, barrierTxn) -> {
if (barrierTxn != null)
return barrierTxn;
if (keyOrRange.domain() == Domain.Key)
return new BarrierTxn(txnId, executeAt, keyOrRange.asKey());
return null;
},
null,
null);
// It's not applied so add a listener to find out when it is applied
if (found != null && !found.status.equals(Status.Applied))
if (found != null)
{
safeStore.commandStore().execute(
contextFor(found.txnId),
Expand Down
Loading

0 comments on commit 6b8bef4

Please sign in to comment.