Skip to content

Commit

Permalink
improve durability scheduling, copy temporarily for majority deps fet…
Browse files Browse the repository at this point in the history
…ching
  • Loading branch information
belliottsmith committed Oct 14, 2024
1 parent 70e61b4 commit c3a1d8f
Show file tree
Hide file tree
Showing 6 changed files with 418 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@

import accord.api.Scheduler;
import accord.coordinate.CoordinateGloballyDurable;
import accord.coordinate.CoordinateShardDurable;
import accord.coordinate.CoordinationFailed;
import accord.coordinate.ExecuteSyncPoint.SyncPointErased;
import accord.local.Node;
Expand All @@ -48,9 +47,12 @@
import accord.utils.async.AsyncChain;
import accord.utils.async.AsyncResult;

import static accord.coordinate.CoordinateShardDurable.coordinate;
import static accord.coordinate.CoordinateSyncPoint.exclusiveSyncPoint;
import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;

/**
* Helper methods and classes to invoke coordination to propagate information about durability.
Expand Down Expand Up @@ -85,15 +87,15 @@ public class CoordinateDurabilityScheduling
private Scheduler.Scheduled scheduled;

/*
* In each round at each node wait this amount of time between initiating new CoordinateShardDurable
* In each cycle, attempt to split the range into this many pieces; if we fail, we increase the number of pieces
*/
private long frequencyMicros = TimeUnit.MILLISECONDS.toMicros(500L);
private int targetShardSplits = 64;

/*
* In each round at each node wait this amount of time between allocating a CoordinateShardDurable txnId
* and coordinating the shard durability
*/
private long txnIdLagMicros = TimeUnit.SECONDS.toMicros(1L);
private long txnIdLagMicros = TimeUnit.SECONDS.toMicros(5L);

/*
* In each round at each node wait this amount of time between allocating a CoordinateShardDurable txnId
Expand All @@ -120,6 +122,10 @@ public class CoordinateDurabilityScheduling
*/
private long globalCycleTimeMicros = TimeUnit.SECONDS.toMicros(30);

private long defaultRetryDelayMicros = TimeUnit.SECONDS.toMicros(1);
private long maxRetryDelayMicros = TimeUnit.MINUTES.toMicros(1);
private int maxNumberOfSplits = 1 << 10;

private Topology currentGlobalTopology;
private final Map<Range, ShardScheduler> shardSchedulers = new HashMap<>();
private int globalIndex;
Expand All @@ -131,16 +137,15 @@ private class ShardScheduler
{
Shard shard;

// based on ideal number of splits
// time to start a new cycle
int nodeOffset;

int index;
int numberOfSplits, desiredNumberOfSplits;
boolean defunct;
long shardCycleTimeMicros;
int numberOfSplits;
Scheduler.Scheduled scheduled;
long rangeStartedAtMicros;
long cycleStartedAtMicros = -1;
long rangeStartedAtMicros, cycleStartedAtMicros;
long retryDelayMicros = defaultRetryDelayMicros;
boolean defunct;

private ShardScheduler()
{
Expand All @@ -150,32 +155,80 @@ synchronized void update(Shard shard, int offset)
{
this.shard = shard;
this.nodeOffset = offset;
this.shardCycleTimeMicros = Math.max(CoordinateDurabilityScheduling.this.shardCycleTimeMicros, shard.rf() * 3L * frequencyMicros);
this.desiredNumberOfSplits = (int) ((shardCycleTimeMicros + frequencyMicros - 1) / frequencyMicros);
if (numberOfSplits == 0 || numberOfSplits < desiredNumberOfSplits)
{
index = offset;
numberOfSplits = desiredNumberOfSplits;
}
if (numberOfSplits == 0 || numberOfSplits < targetShardSplits)
numberOfSplits = targetShardSplits;
}

synchronized void markDefunct()
{
defunct = true;
logger.info("Discarding defunct shard durability scheduler for {}", shard);
}

synchronized void schedule()
synchronized void retryCoordinateDurability(Node node, SyncPoint<Range> exclusiveSyncPoint, int nextIndex)
{
if (defunct)
return;

// TODO (expected): back-off
coordinateShardDurableAfterExclusiveSyncPoint(node, exclusiveSyncPoint, nextIndex);
}

synchronized void restart()
{
if (defunct)
return;

long nowMicros = node.elapsed(MICROSECONDS);
int cyclePosition = (nodeOffset + (((index * shard.rf()) + numberOfSplits - 1) / numberOfSplits)) % shard.rf();
long microsOffset = (cyclePosition * shardCycleTimeMicros) / shard.rf();
long microsOffset = (nodeOffset * shardCycleTimeMicros) / shard.rf();
long scheduleAt = nowMicros - (nowMicros % shardCycleTimeMicros) + microsOffset;
if (nowMicros > scheduleAt)
scheduleAt += shardCycleTimeMicros;

if (numberOfSplits < targetShardSplits)
numberOfSplits = targetShardSplits;

cycleStartedAtMicros = scheduleAt;
scheduleAt(nowMicros, scheduleAt);
}

synchronized void schedule()
{
if (defunct)
return;

long nowMicros = node.elapsed(MICROSECONDS);
long microsOffset = (index * shardCycleTimeMicros) / numberOfSplits;
long scheduleAt = cycleStartedAtMicros + microsOffset;
if (retryDelayMicros > defaultRetryDelayMicros)
{
retryDelayMicros = Math.max(defaultRetryDelayMicros, (long)(0.9 * retryDelayMicros));
}
if (numberOfSplits > targetShardSplits && index % 4 == 0)
{
index /= 4;
numberOfSplits /=4;
}
scheduleAt(nowMicros, scheduleAt);
}

synchronized void retry()
{
if (defunct)
return;

long nowMicros = node.elapsed(MICROSECONDS);
long scheduleAt = nowMicros + retryDelayMicros;
retryDelayMicros += retryDelayMicros / 2;
if (retryDelayMicros > maxRetryDelayMicros)
{
retryDelayMicros = maxRetryDelayMicros;
}
scheduleAt(nowMicros, scheduleAt);
}

synchronized void scheduleAt(long nowMicros, long scheduleAt)
{
ShardDistributor distributor = node.commandStores().shardDistributor();
Range range;
int nextIndex;
Expand All @@ -188,11 +241,13 @@ synchronized void schedule()
nextIndex = i;
}

scheduled = node.scheduler().once(() -> {
Runnable schedule = () -> {
// TODO (required): allocate stale HLC from a reservation of HLCs for this purpose
TxnId syncId = node.nextTxnId(ExclusiveSyncPoint, Domain.Range);
startShardSync(syncId, Ranges.of(range), nextIndex);
}, scheduleAt - nowMicros, MICROSECONDS);
};
if (scheduleAt <= nowMicros) schedule.run();
else scheduled = node.scheduler().once(schedule, scheduleAt - nowMicros, MICROSECONDS);
}

/**
Expand All @@ -204,6 +259,7 @@ private void startShardSync(TxnId syncId, Ranges ranges, int nextIndex)
scheduled = node.scheduler().once(() -> node.withEpoch(syncId.epoch(), (ignored, withEpochFailure) -> {
if (withEpochFailure != null)
{
// don't wait on epoch failure - we aren't the cause of any problems
startShardSync(syncId, ranges, nextIndex);
Throwable wrapped = CoordinationFailed.wrap(withEpochFailure);
logger.trace("Exception waiting for epoch before coordinating exclusive sync point for local shard durability, epoch " + syncId.epoch(), wrapped);
Expand All @@ -219,10 +275,13 @@ private void startShardSync(TxnId syncId, Ranges ranges, int nextIndex)
{
synchronized (ShardScheduler.this)
{
index *= 2;
numberOfSplits *= 2;
// TODO (required): try to recover or invalidate prior sync point
schedule();
retry();
if (numberOfSplits * 2 <= maxNumberOfSplits)
{
index *= 2;
numberOfSplits *= 2;
}
logger.warn("{}: Exception coordinating ExclusiveSyncPoint for {} durability. Increased numberOfSplits to " + numberOfSplits, syncId, ranges, fail);
}
}
Expand All @@ -240,68 +299,92 @@ private void coordinateShardDurableAfterExclusiveSyncPoint(Node node, SyncPoint<
scheduled = node.scheduler().once(() -> {
scheduled = null;
node.commandStores().any().execute(() -> {
CoordinateShardDurable.coordinate(node, exclusiveSyncPoint)
.addCallback((success, fail) -> {
if (fail != null && fail.getClass() != SyncPointErased.class)
{
logger.trace("Exception coordinating local shard durability, will retry immediately", fail);
coordinateShardDurableAfterExclusiveSyncPoint(node, exclusiveSyncPoint, nextIndex);
}
else
{
synchronized (ShardScheduler.this)
{
index = nextIndex;
if (index >= numberOfSplits)
{
index = 0;
long nowMicros = node.elapsed(MICROSECONDS);
String reportTime = "";
if (cycleStartedAtMicros > 0)
reportTime = "in " + MICROSECONDS.toSeconds(nowMicros - cycleStartedAtMicros) + 's';
logger.info("Successfully completed one cycle of durability scheduling for shard {}{}", shard.range, reportTime);
if (numberOfSplits > desiredNumberOfSplits)
numberOfSplits = Math.max(desiredNumberOfSplits, (int)(numberOfSplits * 0.9));
cycleStartedAtMicros = nowMicros;
}
else
{
long nowMicros = node.elapsed(MICROSECONDS);
logger.debug("Successfully coordinated shard durability for range {} in {}s", shard.range, MICROSECONDS.toSeconds(nowMicros - rangeStartedAtMicros));
}

schedule();
}
}
});
coordinate(node, exclusiveSyncPoint)
.addCallback((success, fail) -> {
if (fail != null && fail.getClass() != SyncPointErased.class)
{
logger.debug("Exception coordinating shard durability for {}, will retry", exclusiveSyncPoint.route.toRanges(), fail);
retryCoordinateDurability(node, exclusiveSyncPoint, nextIndex);
}
else
{
try
{
synchronized (ShardScheduler.this)
{
int prevIndex = index;
index = nextIndex;
if (index >= numberOfSplits)
{
index = 0;
long nowMicros = node.elapsed(MICROSECONDS);
long timeTakenSeconds = MICROSECONDS.toSeconds(nowMicros - cycleStartedAtMicros);
long targetTimeSeconds = MILLISECONDS.toSeconds(shardCycleTimeMicros);
logger.info("Successfully completed one cycle of durability scheduling for shard {} in {}s (vs {}s target)", shard.range, timeTakenSeconds, targetTimeSeconds);
restart();
}
else
{
long nowMicros = node.elapsed(MICROSECONDS);
int prevRfCycle = (prevIndex * shard.rf()) / numberOfSplits;
int curRfCycle = (index * shard.rf()) / numberOfSplits;
if (prevRfCycle != curRfCycle)
{
long targetTimeSeconds = MICROSECONDS.toSeconds((index * shardCycleTimeMicros) / numberOfSplits);
long timeTakenSeconds = MICROSECONDS.toSeconds(nowMicros - cycleStartedAtMicros);
logger.info("Successfully completed {}/{} cycle of durability scheduling covering range {}. Completed in {}s (vs {}s target).", curRfCycle, shard.rf(), exclusiveSyncPoint.route.toRanges(), timeTakenSeconds, targetTimeSeconds);
}
else if (logger.isTraceEnabled())
{
logger.trace("Successfully coordinated shard durability for range {} in {}s", shard.range, MICROSECONDS.toSeconds(nowMicros - rangeStartedAtMicros));
}
schedule();
}
}
}
catch (Throwable t)
{
retry();
logger.error("Unexpected exception handling durability scheduling callback; starting from scratch", t);
}
}
});
});
}, durabilityLagMicros, MICROSECONDS);
}

}


public CoordinateDurabilityScheduling(Node node)
{
this.node = node;
}

public void setFrequency(int frequency, TimeUnit units)
public void setTargetShardSplits(int targetShardSplits)
{
this.targetShardSplits = targetShardSplits;
}

public void setDefaultRetryDelay(long retryDelay, TimeUnit units)
{
this.frequencyMicros = Ints.saturatedCast(units.toMicros(frequency));
this.defaultRetryDelayMicros = units.toMicros(retryDelay);
}

public void setTxnIdLag(int txnIdLag, TimeUnit units)
public void setMaxRetryDelay(long retryDelay, TimeUnit units)
{
this.maxRetryDelayMicros = units.toMicros(retryDelay);
}

public void setTxnIdLag(long txnIdLag, TimeUnit units)
{
this.txnIdLagMicros = Ints.saturatedCast(units.toMicros(txnIdLag));
}

public void setDurabilityLag(int durabilityLag, TimeUnit units)
public void setDurabilityLag(long durabilityLag, TimeUnit units)
{
this.durabilityLagMicros = Ints.saturatedCast(units.toMicros(durabilityLag));
}

public void setShardCycleTime(int shardCycleTime, TimeUnit units)
public void setShardCycleTime(long shardCycleTime, TimeUnit units)
{
this.shardCycleTimeMicros = Ints.saturatedCast(units.toMicros(shardCycleTime));
}
Expand All @@ -319,7 +402,7 @@ public synchronized void start()
Invariants.checkState(!stop); // cannot currently restart safely
long nowMicros = node.elapsed(MICROSECONDS);
setNextGlobalSyncTime(nowMicros);
scheduled = node.scheduler().recurring(this::run, frequencyMicros, MICROSECONDS);
scheduled = node.scheduler().recurring(this::run, 1L, MINUTES);
}

public void stop()
Expand Down Expand Up @@ -351,7 +434,6 @@ private void run()
}
}


private void startGlobalSync()
{
try
Expand All @@ -369,7 +451,7 @@ private void startGlobalSync()
}
}

private void updateTopology()
public synchronized void updateTopology()
{
Topology latestGlobal = node.topology().current();
if (latestGlobal == currentGlobalTopology)
Expand All @@ -395,7 +477,10 @@ private void updateTopology()
shardSchedulers.put(shard.range, scheduler);
scheduler.update(shard, shard.nodes.find(node.id()));
if (prevScheduler == null)
scheduler.schedule();
{
logger.info("Starting shard durability scheduler for {}", shard);
scheduler.restart();
}
}
prev.forEach((r, s) -> s.markDefunct());
}
Expand Down Expand Up @@ -432,6 +517,6 @@ private void setNextGlobalSyncTime(long nowMicros)
if (targetTimeInCurrentRound < nowMicros)
targetTime += totalRoundDuration;

nextGlobalSyncTimeMicros = targetTime - nowMicros;
nextGlobalSyncTimeMicros = targetTime;
}
}
Loading

0 comments on commit c3a1d8f

Please sign in to comment.