Skip to content

Commit

Permalink
Fix infinite loop, and notify progress log of sync point durability w…
Browse files Browse the repository at this point in the history
…hile waiting to apply

patch by Benedict; reviewed by David for CASSANDRA-20125
  • Loading branch information
belliottsmith committed Dec 4, 2024
1 parent bd0761c commit 50d6ef9
Show file tree
Hide file tree
Showing 16 changed files with 290 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package accord.coordinate;

import accord.coordinate.ExecuteSyncPoint.ExecuteExclusiveSyncPoint;
import accord.coordinate.ExecuteSyncPoint.ExecuteExclusive;
import accord.coordinate.tracking.AllTracker;
import accord.coordinate.tracking.RequestStatus;
import accord.local.Node;
Expand All @@ -32,7 +32,7 @@
import accord.utils.SortedArrays.SortedArrayList;
import accord.utils.async.AsyncResult;

public class CoordinateShardDurable extends ExecuteExclusiveSyncPoint implements Callback<ReadReply>
public class CoordinateShardDurable extends ExecuteExclusive implements Callback<ReadReply>
{
private CoordinateShardDurable(Node node, SyncPoint<Range> exclusiveSyncPoint)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@

import java.util.List;

import javax.annotation.Nullable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import accord.coordinate.CoordinationAdapter.Adapters;
import accord.coordinate.CoordinationAdapter.Adapters.SyncPointAdapter;
import accord.local.Node;
import accord.messages.Apply;
import accord.messages.Callback;
import accord.messages.PreAccept.PreAcceptOk;
import accord.primitives.Ballot;
import accord.primitives.Deps;
Expand Down Expand Up @@ -176,15 +179,25 @@ public static void sendApply(Node node, Node.Id to, SyncPoint<?> syncPoint)
{
// TODO (required): consider, document and add invariants checking if this topologies is correct in all cases
// (notably ExclusiveSyncPoints should execute in earlier epochs for durability, but not for fetching )
Topologies executes = executes(node, syncPoint.route, syncPoint.syncId);
sendApply(node, to, syncPoint, executes);
sendApply(node, to, syncPoint, executes(node, syncPoint.route, syncPoint.syncId));
}

public static void sendApply(Node node, Node.Id to, SyncPoint<?> syncPoint, Topologies executes)
{
sendApply(node, to, syncPoint, executes, null);
}

public static void sendApply(Node node, Node.Id to, SyncPoint<?> syncPoint, @Nullable Callback<Apply.ApplyReply> callback)
{
sendApply(node, to, syncPoint, executes(node, syncPoint.route, syncPoint.syncId), callback);
}

public static void sendApply(Node node, Node.Id to, SyncPoint<?> syncPoint, Topologies executes, @Nullable Callback<Apply.ApplyReply> callback)
{
TxnId txnId = syncPoint.syncId;
Timestamp executeAt = txnId;
Txn txn = node.agent().emptySystemTxn(txnId.kind(), txnId.domain());
Deps deps = syncPoint.waitFor;
Apply.sendMaximal(node, to, executes, txnId, syncPoint.route(), txn, executeAt, deps, txn.execute(txnId, executeAt, null), txn.result(txnId, executeAt, null), syncPoint.route());
Apply.sendMaximal(node, to, executes, txnId, syncPoint.route(), txn, executeAt, deps, txn.execute(txnId, executeAt, null), txn.result(txnId, executeAt, null), syncPoint.route(), callback);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

import accord.api.ProtocolModifiers;
import accord.api.Result;
import accord.coordinate.ExecuteSyncPoint.ExecuteBlocking;
import accord.coordinate.ExecuteSyncPoint.ExecuteInclusive;
import accord.coordinate.tracking.FastPathTracker;
import accord.coordinate.tracking.PreAcceptTracker;
import accord.coordinate.tracking.QuorumTracker;
Expand Down Expand Up @@ -71,6 +71,11 @@ class DefaultFactory implements Factory
@Override
public <R> CoordinationAdapter<R> get(TxnId txnId, Step step)
{
switch (txnId.kind())
{
case ExclusiveSyncPoint: return (CoordinationAdapter<R>) Adapters.exclusiveSyncPoint();
case SyncPoint: return (CoordinationAdapter<R>) Adapters.inclusiveSyncPoint();
}
switch (step)
{
default: throw new AssertionError("Unhandled step: " + step);
Expand Down Expand Up @@ -219,18 +224,17 @@ public void stabilise(Node node, Topologies any, FullRoute<?> route, Ballot ball
@Override
public void execute(Node node, Topologies any, FullRoute<?> route, ExecutePath path, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, BiConsumer<? super SyncPoint<U>, Throwable> callback)
{
Topologies all = forExecution(node, route, txnId, executeAt, deps);
persist(node, all, route, txnId, txn, executeAt, deps, txn.execute(txnId, executeAt, null), txn.result(txnId, executeAt, null), callback);
persist(node, null, route, txnId, txn, executeAt, deps, txn.execute(txnId, executeAt, null), txn.result(txnId, executeAt, null), callback);
}

@Override
public void persist(Node node, Topologies any, FullRoute<?> route, Route<?> participants, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result, BiConsumer<? super SyncPoint<U>, Throwable> callback)
public void persist(Node node, Topologies ignore, FullRoute<?> route, Route<?> participants, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result, BiConsumer<? super SyncPoint<U>, Throwable> callback)
{
Topologies all = forExecution(node, route, txnId, executeAt, deps);

invokeSuccess(node, route, txnId, txn, deps, callback);
new PersistSyncPoint(node, all, txnId, route, txn, executeAt, deps, writes, result)
.start(Apply.FACTORY, Maximal, any, writes, result);
.start(Apply.FACTORY, Maximal, all, writes, result);
}
}

Expand Down Expand Up @@ -283,18 +287,6 @@ Topologies forExecution(Node node, FullRoute<?> route, TxnId txnId, Timestamp ex
{
return node.topology().preciseEpochs(route, txnId.epoch(), executeAt.epoch());
}

@Override
public void execute(Node node, Topologies any, FullRoute<?> route, ExecutePath path, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, BiConsumer<? super SyncPoint<U>, Throwable> callback)
{
Topologies all = forExecution(node, route, txnId, executeAt, deps);

ExecuteBlocking<U> execute = ExecuteBlocking.atQuorum(node, all, new SyncPoint<>(txnId, deps, (FullRoute<U>) route), executeAt);
execute.start();
addOrExecuteCallback(execute, callback);
}

protected abstract void addOrExecuteCallback(ExecuteBlocking<U> execute, BiConsumer<? super SyncPoint<U>, Throwable> callback);
}

/*
Expand All @@ -310,14 +302,6 @@ private static class AsyncInclusiveSyncPointAdapter<U extends Unseekable> extend
protected AsyncInclusiveSyncPointAdapter() {
super();
}

@Override
protected void addOrExecuteCallback(ExecuteBlocking<U> execute, BiConsumer<? super SyncPoint<U>, Throwable> callback)
{
// If this is the async adapter then we want to invoke the callback immediately
// and the caller can wait on the txn locally if they want
callback.accept(execute.syncPoint, null);
}
}

private static class InclusiveSyncPointBlockingAdapter<U extends Unseekable> extends AbstractInclusiveSyncPointAdapter<U>
Expand All @@ -329,9 +313,13 @@ protected InclusiveSyncPointBlockingAdapter() {
}

@Override
protected void addOrExecuteCallback(ExecuteBlocking<U> execute, BiConsumer<? super SyncPoint<U>, Throwable> callback)
public void execute(Node node, Topologies any, FullRoute<?> route, ExecutePath path, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, BiConsumer<? super SyncPoint<U>, Throwable> callback)
{
Topologies all = forExecution(node, route, txnId, executeAt, deps);

ExecuteInclusive<U> execute = ExecuteInclusive.atQuorum(node, all, new SyncPoint<>(txnId, deps, (FullRoute<U>) route), executeAt);
execute.addCallback(callback);
execute.start();
}

@Override
Expand Down
93 changes: 81 additions & 12 deletions accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@

import accord.api.Result;
import accord.coordinate.CoordinationAdapter.Adapters;
import accord.coordinate.tracking.QuorumIdTracker;
import accord.coordinate.tracking.SimpleTracker;
import accord.coordinate.tracking.QuorumTracker;
import accord.coordinate.tracking.RequestStatus;
import accord.coordinate.tracking.SimpleTracker;
import accord.local.Node;
import accord.messages.Apply;
import accord.messages.ApplyThenWaitUntilApplied;
import accord.messages.Callback;
import accord.messages.InformDurable;
import accord.messages.ReadData;
import accord.messages.ReadData.CommitOrReadNack;
import accord.messages.ReadData.ReadReply;
import accord.messages.WaitUntilApplied;
import accord.primitives.Participants;
Expand All @@ -45,25 +49,84 @@
import accord.utils.SortedArrays.SortedArrayList;
import accord.utils.async.AsyncResults.SettableResult;

import static accord.messages.Apply.ApplyReply.Insufficient;
import static accord.messages.ReadData.CommitOrReadNack.Waiting;
import static accord.primitives.Status.Durability.Majority;
import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;

public abstract class ExecuteSyncPoint<U extends Unseekable> extends SettableResult<SyncPoint<U>> implements Callback<ReadReply>
{
public static class SyncPointErased extends Throwable {}

public static class ExecuteBlocking<U extends Unseekable> extends ExecuteSyncPoint<U>
public static class ExecuteInclusive<U extends Unseekable> extends ExecuteSyncPoint<U>
{
private final Timestamp executeAt;
public ExecuteBlocking(Node node, SyncPoint<U> syncPoint, SimpleTracker<?> tracker, Timestamp executeAt)
private final QuorumIdTracker durableTracker;
private Callback<Apply.ApplyReply> insufficientCallback;

public ExecuteInclusive(Node node, SyncPoint<U> syncPoint, SimpleTracker<?> tracker, Timestamp executeAt)
{
super(node, syncPoint, tracker);
Invariants.checkArgument(!syncPoint.syncId.awaitsOnlyDeps());
this.executeAt = executeAt;
this.durableTracker = new QuorumIdTracker(tracker.topologies());
}

public static <U extends Unseekable> ExecuteBlocking<U> atQuorum(Node node, Topologies topologies, SyncPoint<U> syncPoint, Timestamp executeAt)
public static <U extends Unseekable> ExecuteInclusive<U> atQuorum(Node node, Topologies topologies, SyncPoint<U> syncPoint, Timestamp executeAt)
{
return new ExecuteBlocking<>(node, syncPoint, new QuorumTracker(topologies), executeAt);
return new ExecuteInclusive<>(node, syncPoint, new QuorumTracker(topologies), executeAt);
}

@Override
public void onSuccess(Node.Id from, ReadReply reply)
{
if (isDurableReply(reply))
onDurableSuccess(from);

super.onSuccess(from, reply);
}

private void onDurableSuccess(Node.Id from)
{
if (durableTracker.recordSuccess(from) == RequestStatus.Success)
InformDurable.informHome(node, tracker.topologies(), syncPoint.syncId, syncPoint.route, executeAt, Majority);
}

private static boolean isDurableReply(ReadReply reply)
{
if (reply.isOk())
return true;

switch ((CommitOrReadNack) reply)
{
case Waiting:
case Invalid:
case Redundant:
return true;
case Insufficient:
case Rejected:
return false;
}
return false;
}

protected void sendApply(Node.Id to)
{
if (insufficientCallback == null)
{
insufficientCallback = new Callback<>()
{
@Override
public void onSuccess(Node.Id from, Apply.ApplyReply reply)
{
if (reply != Insufficient)
onDurableSuccess(from);
}
@Override public void onFailure(Node.Id from, Throwable failure) {}
@Override public void onCallbackFailure(Node.Id from, Throwable failure) {}
};
}
CoordinateSyncPoint.sendApply(node, to, syncPoint, insufficientCallback);
}

@Override
Expand All @@ -79,16 +142,16 @@ public void start()
}
}

public static class ExecuteExclusiveSyncPoint extends ExecuteSyncPoint<Range>
public static class ExecuteExclusive extends ExecuteSyncPoint<Range>
{
private long retryInFutureEpoch;
public ExecuteExclusiveSyncPoint(Node node, SyncPoint<Range> syncPoint, Function<Topologies, SimpleTracker<?>> trackerSupplier)
public ExecuteExclusive(Node node, SyncPoint<Range> syncPoint, Function<Topologies, SimpleTracker<?>> trackerSupplier)
{
super(node, syncPoint, Adapters.exclusiveSyncPoint().forExecution(node, syncPoint.route(), syncPoint.syncId, syncPoint.syncId, syncPoint.waitFor), trackerSupplier);
Invariants.checkArgument(syncPoint.syncId.kind() == ExclusiveSyncPoint);
}

public ExecuteExclusiveSyncPoint(Node node, SyncPoint<Range> syncPoint, Function<Topologies, SimpleTracker<?>> trackerSupplier, SimpleTracker<?> tracker)
public ExecuteExclusive(Node node, SyncPoint<Range> syncPoint, Function<Topologies, SimpleTracker<?>> trackerSupplier, SimpleTracker<?> tracker)
{
super(node, syncPoint, trackerSupplier, tracker);
Invariants.checkArgument(syncPoint.syncId.kind() == ExclusiveSyncPoint);
Expand Down Expand Up @@ -116,7 +179,7 @@ protected void onSuccess()
{
if (retryInFutureEpoch > tracker.topologies().currentEpoch())
{
ExecuteExclusiveSyncPoint continuation = new ExecuteExclusiveSyncPoint(node, syncPoint, trackerSupplier, trackerSupplier.apply(node.topology().preciseEpochs(syncPoint.route(), tracker.topologies().currentEpoch(), retryInFutureEpoch)));
ExecuteExclusive continuation = new ExecuteExclusive(node, syncPoint, trackerSupplier, trackerSupplier.apply(node.topology().preciseEpochs(syncPoint.route(), tracker.topologies().currentEpoch(), retryInFutureEpoch)));
continuation.addCallback((success, failure) -> {
if (failure == null) trySuccess(success);
else tryFailure(failure);
Expand Down Expand Up @@ -170,12 +233,12 @@ public synchronized void onSuccess(Node.Id from, ReadReply reply)

if (!reply.isOk())
{
switch ((ReadData.CommitOrReadNack)reply)
switch ((CommitOrReadNack)reply)
{
default: throw new AssertionError("Unhandled: " + reply);

case Insufficient:
CoordinateSyncPoint.sendApply(node, from, syncPoint, tracker.topologies());
sendApply(from);
return;

case Redundant:
Expand All @@ -184,7 +247,8 @@ public synchronized void onSuccess(Node.Id from, ReadReply reply)

case Invalid:
tryFailure(new Invalidated(syncPoint.syncId, syncPoint.route.homeKey()));
return;

case Waiting:
}
}
else
Expand All @@ -200,6 +264,11 @@ protected void onSuccess()
trySuccess(syncPoint);
}

protected void sendApply(Node.Id to)
{
CoordinateSyncPoint.sendApply(node, to, syncPoint);
}

@Override
public synchronized void onFailure(Node.Id from, Throwable failure)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ protected void onDone(Success success, Throwable fail)
if (hasMadeProgress(full))
{
if (full.durability.isDurable())
node.send(topologies.forEpoch(txnId.epoch()).forKey(route.homeKey()).nodes, to -> new InformDurable(to, topologies, route, txnId, full.executeAtIfKnown(), full.durability));
InformDurable.informHome(node, topologies, txnId, route, full.executeAtIfKnown(), full.durability);
callback.accept(full.toProgressToken(), null);
}
else
Expand Down
15 changes: 2 additions & 13 deletions accord-core/src/main/java/accord/coordinate/Persist.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@

package accord.coordinate;

import java.util.HashSet;
import java.util.Set;

import accord.api.Result;
import accord.coordinate.tracking.QuorumTracker;
import accord.local.Node;
Expand All @@ -37,7 +34,6 @@
import accord.primitives.TxnId;
import accord.primitives.Writes;
import accord.topology.Topologies;
import accord.topology.Topology;
import accord.utils.SortedArrays;

import static accord.coordinate.tracking.RequestStatus.Success;
Expand All @@ -55,8 +51,8 @@ public abstract class Persist implements Callback<ApplyReply>
protected final Result result;
protected final FullRoute<?> route;
protected final Topologies topologies;
// TODO (expected): track separate ALL and Quorum, so we can report Universal durability to permit faster GC
protected final QuorumTracker tracker;
protected final Set<Id> persistedOn;
boolean isDone;

protected Persist(Node node, Topologies all, TxnId txnId, Route<?> sendTo, Txn txn, Timestamp executeAt, Deps stableDeps, Writes writes, Result result, FullRoute<?> route)
Expand All @@ -72,7 +68,6 @@ protected Persist(Node node, Topologies all, TxnId txnId, Route<?> sendTo, Txn t
this.route = route;
this.topologies = all;
this.tracker = new QuorumTracker(all);
this.persistedOn = new HashSet<>();
}

@Override
Expand All @@ -83,18 +78,12 @@ public void onSuccess(Id from, ApplyReply reply)
default: throw new IllegalStateException();
case Redundant:
case Applied:
persistedOn.add(from);
if (sendTo == route && tracker.recordSuccess(from) == Success)
{
if (!isDone)
{
isDone = true;
Topologies topologies = tracker.topologies();
Topology topology = topologies.forEpoch(txnId.epoch());
int homeShardIndex = topology.indexForKey(route.homeKey());
// we can persist only partially if some shards are already completed; in this case the home shard may not participate
if (homeShardIndex >= 0)
node.send(topology.get(homeShardIndex).nodes, to -> new InformDurable(to, topologies, route, txnId, executeAt, Majority));
InformDurable.informHome(node, topologies, txnId, route, executeAt, Majority);
}
}
break;
Expand Down
Loading

0 comments on commit 50d6ef9

Please sign in to comment.