From c3a1d8fe4401c6f13ab88b0c4a3c5a3715f5ea41 Mon Sep 17 00:00:00 2001 From: Benedict Elliott Smith Date: Mon, 14 Oct 2024 11:33:01 +0100 Subject: [PATCH] improve durability scheduling, copy temporarily for majority deps fetching --- .../impl/CoordinateDurabilityScheduling.java | 227 +++++++++++----- .../java/accord/impl/MajorityDepsFetcher.java | 251 ++++++++++++++++++ .../accord/impl/progresslog/HomeState.java | 1 - .../main/java/accord/local/CommandStore.java | 64 +---- .../java/accord/local/SafeCommandStore.java | 2 +- .../test/java/accord/impl/basic/Cluster.java | 10 +- 6 files changed, 418 insertions(+), 137 deletions(-) create mode 100644 accord-core/src/main/java/accord/impl/MajorityDepsFetcher.java diff --git a/accord-core/src/main/java/accord/impl/CoordinateDurabilityScheduling.java b/accord-core/src/main/java/accord/impl/CoordinateDurabilityScheduling.java index fd6270898..2d316ed78 100644 --- a/accord-core/src/main/java/accord/impl/CoordinateDurabilityScheduling.java +++ b/accord-core/src/main/java/accord/impl/CoordinateDurabilityScheduling.java @@ -31,7 +31,6 @@ import accord.api.Scheduler; import accord.coordinate.CoordinateGloballyDurable; -import accord.coordinate.CoordinateShardDurable; import accord.coordinate.CoordinationFailed; import accord.coordinate.ExecuteSyncPoint.SyncPointErased; import accord.local.Node; @@ -48,9 +47,12 @@ import accord.utils.async.AsyncChain; import accord.utils.async.AsyncResult; +import static accord.coordinate.CoordinateShardDurable.coordinate; import static accord.coordinate.CoordinateSyncPoint.exclusiveSyncPoint; import static accord.primitives.Txn.Kind.ExclusiveSyncPoint; import static java.util.concurrent.TimeUnit.MICROSECONDS; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.MINUTES; /** * Helper methods and classes to invoke coordination to propagate information about durability. @@ -85,15 +87,15 @@ public class CoordinateDurabilityScheduling private Scheduler.Scheduled scheduled; /* - * In each round at each node wait this amount of time between initiating new CoordinateShardDurable + * In each cycle, attempt to split the range into this many pieces; if we fail, we increase the number of pieces */ - private long frequencyMicros = TimeUnit.MILLISECONDS.toMicros(500L); + private int targetShardSplits = 64; /* * In each round at each node wait this amount of time between allocating a CoordinateShardDurable txnId * and coordinating the shard durability */ - private long txnIdLagMicros = TimeUnit.SECONDS.toMicros(1L); + private long txnIdLagMicros = TimeUnit.SECONDS.toMicros(5L); /* * In each round at each node wait this amount of time between allocating a CoordinateShardDurable txnId @@ -120,6 +122,10 @@ public class CoordinateDurabilityScheduling */ private long globalCycleTimeMicros = TimeUnit.SECONDS.toMicros(30); + private long defaultRetryDelayMicros = TimeUnit.SECONDS.toMicros(1); + private long maxRetryDelayMicros = TimeUnit.MINUTES.toMicros(1); + private int maxNumberOfSplits = 1 << 10; + private Topology currentGlobalTopology; private final Map shardSchedulers = new HashMap<>(); private int globalIndex; @@ -131,16 +137,15 @@ private class ShardScheduler { Shard shard; - // based on ideal number of splits + // time to start a new cycle int nodeOffset; int index; - int numberOfSplits, desiredNumberOfSplits; - boolean defunct; - long shardCycleTimeMicros; + int numberOfSplits; Scheduler.Scheduled scheduled; - long rangeStartedAtMicros; - long cycleStartedAtMicros = -1; + long rangeStartedAtMicros, cycleStartedAtMicros; + long retryDelayMicros = defaultRetryDelayMicros; + boolean defunct; private ShardScheduler() { @@ -150,32 +155,80 @@ synchronized void update(Shard shard, int offset) { this.shard = shard; this.nodeOffset = offset; - this.shardCycleTimeMicros = Math.max(CoordinateDurabilityScheduling.this.shardCycleTimeMicros, shard.rf() * 3L * frequencyMicros); - this.desiredNumberOfSplits = (int) ((shardCycleTimeMicros + frequencyMicros - 1) / frequencyMicros); - if (numberOfSplits == 0 || numberOfSplits < desiredNumberOfSplits) - { - index = offset; - numberOfSplits = desiredNumberOfSplits; - } + if (numberOfSplits == 0 || numberOfSplits < targetShardSplits) + numberOfSplits = targetShardSplits; } synchronized void markDefunct() { defunct = true; + logger.info("Discarding defunct shard durability scheduler for {}", shard); } - synchronized void schedule() + synchronized void retryCoordinateDurability(Node node, SyncPoint exclusiveSyncPoint, int nextIndex) + { + if (defunct) + return; + + // TODO (expected): back-off + coordinateShardDurableAfterExclusiveSyncPoint(node, exclusiveSyncPoint, nextIndex); + } + + synchronized void restart() { if (defunct) return; long nowMicros = node.elapsed(MICROSECONDS); - int cyclePosition = (nodeOffset + (((index * shard.rf()) + numberOfSplits - 1) / numberOfSplits)) % shard.rf(); - long microsOffset = (cyclePosition * shardCycleTimeMicros) / shard.rf(); + long microsOffset = (nodeOffset * shardCycleTimeMicros) / shard.rf(); long scheduleAt = nowMicros - (nowMicros % shardCycleTimeMicros) + microsOffset; if (nowMicros > scheduleAt) scheduleAt += shardCycleTimeMicros; + if (numberOfSplits < targetShardSplits) + numberOfSplits = targetShardSplits; + + cycleStartedAtMicros = scheduleAt; + scheduleAt(nowMicros, scheduleAt); + } + + synchronized void schedule() + { + if (defunct) + return; + + long nowMicros = node.elapsed(MICROSECONDS); + long microsOffset = (index * shardCycleTimeMicros) / numberOfSplits; + long scheduleAt = cycleStartedAtMicros + microsOffset; + if (retryDelayMicros > defaultRetryDelayMicros) + { + retryDelayMicros = Math.max(defaultRetryDelayMicros, (long)(0.9 * retryDelayMicros)); + } + if (numberOfSplits > targetShardSplits && index % 4 == 0) + { + index /= 4; + numberOfSplits /=4; + } + scheduleAt(nowMicros, scheduleAt); + } + + synchronized void retry() + { + if (defunct) + return; + + long nowMicros = node.elapsed(MICROSECONDS); + long scheduleAt = nowMicros + retryDelayMicros; + retryDelayMicros += retryDelayMicros / 2; + if (retryDelayMicros > maxRetryDelayMicros) + { + retryDelayMicros = maxRetryDelayMicros; + } + scheduleAt(nowMicros, scheduleAt); + } + + synchronized void scheduleAt(long nowMicros, long scheduleAt) + { ShardDistributor distributor = node.commandStores().shardDistributor(); Range range; int nextIndex; @@ -188,11 +241,13 @@ synchronized void schedule() nextIndex = i; } - scheduled = node.scheduler().once(() -> { + Runnable schedule = () -> { // TODO (required): allocate stale HLC from a reservation of HLCs for this purpose TxnId syncId = node.nextTxnId(ExclusiveSyncPoint, Domain.Range); startShardSync(syncId, Ranges.of(range), nextIndex); - }, scheduleAt - nowMicros, MICROSECONDS); + }; + if (scheduleAt <= nowMicros) schedule.run(); + else scheduled = node.scheduler().once(schedule, scheduleAt - nowMicros, MICROSECONDS); } /** @@ -204,6 +259,7 @@ private void startShardSync(TxnId syncId, Ranges ranges, int nextIndex) scheduled = node.scheduler().once(() -> node.withEpoch(syncId.epoch(), (ignored, withEpochFailure) -> { if (withEpochFailure != null) { + // don't wait on epoch failure - we aren't the cause of any problems startShardSync(syncId, ranges, nextIndex); Throwable wrapped = CoordinationFailed.wrap(withEpochFailure); logger.trace("Exception waiting for epoch before coordinating exclusive sync point for local shard durability, epoch " + syncId.epoch(), wrapped); @@ -219,10 +275,13 @@ private void startShardSync(TxnId syncId, Ranges ranges, int nextIndex) { synchronized (ShardScheduler.this) { - index *= 2; - numberOfSplits *= 2; // TODO (required): try to recover or invalidate prior sync point - schedule(); + retry(); + if (numberOfSplits * 2 <= maxNumberOfSplits) + { + index *= 2; + numberOfSplits *= 2; + } logger.warn("{}: Exception coordinating ExclusiveSyncPoint for {} durability. Increased numberOfSplits to " + numberOfSplits, syncId, ranges, fail); } } @@ -240,68 +299,92 @@ private void coordinateShardDurableAfterExclusiveSyncPoint(Node node, SyncPoint< scheduled = node.scheduler().once(() -> { scheduled = null; node.commandStores().any().execute(() -> { - CoordinateShardDurable.coordinate(node, exclusiveSyncPoint) - .addCallback((success, fail) -> { - if (fail != null && fail.getClass() != SyncPointErased.class) - { - logger.trace("Exception coordinating local shard durability, will retry immediately", fail); - coordinateShardDurableAfterExclusiveSyncPoint(node, exclusiveSyncPoint, nextIndex); - } - else - { - synchronized (ShardScheduler.this) - { - index = nextIndex; - if (index >= numberOfSplits) - { - index = 0; - long nowMicros = node.elapsed(MICROSECONDS); - String reportTime = ""; - if (cycleStartedAtMicros > 0) - reportTime = "in " + MICROSECONDS.toSeconds(nowMicros - cycleStartedAtMicros) + 's'; - logger.info("Successfully completed one cycle of durability scheduling for shard {}{}", shard.range, reportTime); - if (numberOfSplits > desiredNumberOfSplits) - numberOfSplits = Math.max(desiredNumberOfSplits, (int)(numberOfSplits * 0.9)); - cycleStartedAtMicros = nowMicros; - } - else - { - long nowMicros = node.elapsed(MICROSECONDS); - logger.debug("Successfully coordinated shard durability for range {} in {}s", shard.range, MICROSECONDS.toSeconds(nowMicros - rangeStartedAtMicros)); - } - - schedule(); - } - } - }); + coordinate(node, exclusiveSyncPoint) + .addCallback((success, fail) -> { + if (fail != null && fail.getClass() != SyncPointErased.class) + { + logger.debug("Exception coordinating shard durability for {}, will retry", exclusiveSyncPoint.route.toRanges(), fail); + retryCoordinateDurability(node, exclusiveSyncPoint, nextIndex); + } + else + { + try + { + synchronized (ShardScheduler.this) + { + int prevIndex = index; + index = nextIndex; + if (index >= numberOfSplits) + { + index = 0; + long nowMicros = node.elapsed(MICROSECONDS); + long timeTakenSeconds = MICROSECONDS.toSeconds(nowMicros - cycleStartedAtMicros); + long targetTimeSeconds = MILLISECONDS.toSeconds(shardCycleTimeMicros); + logger.info("Successfully completed one cycle of durability scheduling for shard {} in {}s (vs {}s target)", shard.range, timeTakenSeconds, targetTimeSeconds); + restart(); + } + else + { + long nowMicros = node.elapsed(MICROSECONDS); + int prevRfCycle = (prevIndex * shard.rf()) / numberOfSplits; + int curRfCycle = (index * shard.rf()) / numberOfSplits; + if (prevRfCycle != curRfCycle) + { + long targetTimeSeconds = MICROSECONDS.toSeconds((index * shardCycleTimeMicros) / numberOfSplits); + long timeTakenSeconds = MICROSECONDS.toSeconds(nowMicros - cycleStartedAtMicros); + logger.info("Successfully completed {}/{} cycle of durability scheduling covering range {}. Completed in {}s (vs {}s target).", curRfCycle, shard.rf(), exclusiveSyncPoint.route.toRanges(), timeTakenSeconds, targetTimeSeconds); + } + else if (logger.isTraceEnabled()) + { + logger.trace("Successfully coordinated shard durability for range {} in {}s", shard.range, MICROSECONDS.toSeconds(nowMicros - rangeStartedAtMicros)); + } + schedule(); + } + } + } + catch (Throwable t) + { + retry(); + logger.error("Unexpected exception handling durability scheduling callback; starting from scratch", t); + } + } + }); }); }, durabilityLagMicros, MICROSECONDS); } - } - public CoordinateDurabilityScheduling(Node node) { this.node = node; } - public void setFrequency(int frequency, TimeUnit units) + public void setTargetShardSplits(int targetShardSplits) + { + this.targetShardSplits = targetShardSplits; + } + + public void setDefaultRetryDelay(long retryDelay, TimeUnit units) { - this.frequencyMicros = Ints.saturatedCast(units.toMicros(frequency)); + this.defaultRetryDelayMicros = units.toMicros(retryDelay); } - public void setTxnIdLag(int txnIdLag, TimeUnit units) + public void setMaxRetryDelay(long retryDelay, TimeUnit units) + { + this.maxRetryDelayMicros = units.toMicros(retryDelay); + } + + public void setTxnIdLag(long txnIdLag, TimeUnit units) { this.txnIdLagMicros = Ints.saturatedCast(units.toMicros(txnIdLag)); } - public void setDurabilityLag(int durabilityLag, TimeUnit units) + public void setDurabilityLag(long durabilityLag, TimeUnit units) { this.durabilityLagMicros = Ints.saturatedCast(units.toMicros(durabilityLag)); } - public void setShardCycleTime(int shardCycleTime, TimeUnit units) + public void setShardCycleTime(long shardCycleTime, TimeUnit units) { this.shardCycleTimeMicros = Ints.saturatedCast(units.toMicros(shardCycleTime)); } @@ -319,7 +402,7 @@ public synchronized void start() Invariants.checkState(!stop); // cannot currently restart safely long nowMicros = node.elapsed(MICROSECONDS); setNextGlobalSyncTime(nowMicros); - scheduled = node.scheduler().recurring(this::run, frequencyMicros, MICROSECONDS); + scheduled = node.scheduler().recurring(this::run, 1L, MINUTES); } public void stop() @@ -351,7 +434,6 @@ private void run() } } - private void startGlobalSync() { try @@ -369,7 +451,7 @@ private void startGlobalSync() } } - private void updateTopology() + public synchronized void updateTopology() { Topology latestGlobal = node.topology().current(); if (latestGlobal == currentGlobalTopology) @@ -395,7 +477,10 @@ private void updateTopology() shardSchedulers.put(shard.range, scheduler); scheduler.update(shard, shard.nodes.find(node.id())); if (prevScheduler == null) - scheduler.schedule(); + { + logger.info("Starting shard durability scheduler for {}", shard); + scheduler.restart(); + } } prev.forEach((r, s) -> s.markDefunct()); } @@ -432,6 +517,6 @@ private void setNextGlobalSyncTime(long nowMicros) if (targetTimeInCurrentRound < nowMicros) targetTime += totalRoundDuration; - nextGlobalSyncTimeMicros = targetTime - nowMicros; + nextGlobalSyncTimeMicros = targetTime; } } \ No newline at end of file diff --git a/accord-core/src/main/java/accord/impl/MajorityDepsFetcher.java b/accord-core/src/main/java/accord/impl/MajorityDepsFetcher.java new file mode 100644 index 000000000..0c3bd23ea --- /dev/null +++ b/accord-core/src/main/java/accord/impl/MajorityDepsFetcher.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.impl; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import accord.api.Scheduler; +import accord.coordinate.CollectCalculatedDeps; +import accord.coordinate.CoordinationFailed; +import accord.local.CommandStore; +import accord.local.Node; +import accord.local.ShardDistributor; +import accord.primitives.FullRoute; +import accord.primitives.Range; +import accord.primitives.Ranges; +import accord.primitives.Timestamp; +import accord.primitives.TxnId; + +import static accord.local.KeyHistory.COMMANDS; +import static accord.local.PreLoadContext.contextFor; +import static java.util.concurrent.TimeUnit.MICROSECONDS; + +/** + * Copied from CoordinateDurabilityScheduling + * TODO (required): deprecate in favour of piggy-backing on exclusive sync points + */ +public class MajorityDepsFetcher +{ + private static final Logger logger = LoggerFactory.getLogger(MajorityDepsFetcher.class); + + private final Node node; + + private int targetShardSplits = 64; + private long defaultRetryDelayMicros = TimeUnit.SECONDS.toMicros(1); + private long maxRetryDelayMicros = TimeUnit.MINUTES.toMicros(1); + private int maxNumberOfSplits = 1 << 10; + + private final Map shardSchedulers = new HashMap<>(); + + private class ShardScheduler + { + final CommandStore commandStore; + final Range range; + + long epoch; + boolean defunct; + + int index; + int numberOfSplits; + Scheduler.Scheduled scheduled; + long retryDelayMicros = defaultRetryDelayMicros; + + private ShardScheduler(CommandStore commandStore, Range range, long epoch) + { + this.commandStore = commandStore; + this.range = range; + this.numberOfSplits = targetShardSplits; + this.epoch = epoch; + } + + void markDefunct() + { + this.defunct = true; + } + + void schedule() + { + synchronized (MajorityDepsFetcher.this) + { + if (defunct) + return; + + long nowMicros = node.elapsed(MICROSECONDS); + if (retryDelayMicros > defaultRetryDelayMicros) + retryDelayMicros = Math.max(defaultRetryDelayMicros, (long) (0.9 * retryDelayMicros)); + scheduleAt(nowMicros, nowMicros); + } + } + + void retry() + { + synchronized (MajorityDepsFetcher.this) + { + if (defunct) + return; + + long nowMicros = node.elapsed(MICROSECONDS); + long scheduleAt = nowMicros + retryDelayMicros; + retryDelayMicros += retryDelayMicros / 2; + if (retryDelayMicros > maxRetryDelayMicros) + { + retryDelayMicros = maxRetryDelayMicros; + } + if (numberOfSplits * 2 <= maxNumberOfSplits) + { + index *= 2; + numberOfSplits *= 2; + } + scheduleAt(nowMicros, scheduleAt); + } + } + + void scheduleAt(long nowMicros, long scheduleAt) + { + synchronized (MajorityDepsFetcher.this) + { + ShardDistributor distributor = node.commandStores().shardDistributor(); + Range range; + int nextIndex; + { + int i = index; + Range selectRange = null; + while (selectRange == null) + selectRange = distributor.splitRange(this.range, index, ++i, numberOfSplits); + range = selectRange; + nextIndex = i; + } + + Runnable schedule = () -> start(Ranges.of(range), nextIndex); + if (scheduleAt <= nowMicros) schedule.run(); + else scheduled = node.scheduler().once(schedule, scheduleAt - nowMicros, MICROSECONDS); + } + } + + /** + * The first step for coordinating shard durable is to run an exclusive sync point + * the result of which can then be used to run + */ + private void start(Ranges ranges, int nextIndex) + { + TxnId id = TxnId.fromValues(epoch - 1, 0, node.id()); + Timestamp before = Timestamp.minForEpoch(epoch); + + node.withEpoch(id.epoch(), (ignored, withEpochFailure) -> { + if (withEpochFailure != null) + { + // don't wait on epoch failure - we aren't the cause of any problems + start(ranges, nextIndex); + Throwable wrapped = CoordinationFailed.wrap(withEpochFailure); + logger.trace("Exception waiting for epoch before coordinating exclusive sync point for local shard durability, epoch " + id.epoch(), wrapped); + node.agent().onUncaughtException(wrapped); + return; + } + scheduled = null; + FullRoute route = (FullRoute) node.computeRoute(id, ranges); + logger.debug("Fetching deps to sync epoch {} for range {}", epoch, ranges); + CollectCalculatedDeps.withCalculatedDeps(node, id, route, route, before, (deps, fail) -> { + if (fail != null) + { + logger.warn("Failed to fetch deps for syncing epoch {} for {}", epoch, ranges, fail); + retry(); + } + else + { + // TODO (correctness) : PreLoadContext only works with Seekables, which doesn't allow mixing Keys and Ranges... But Deps has both Keys AND Ranges! + // ATM all known implementations store ranges in-memory, but this will not be true soon, so this will need to be addressed + commandStore.execute(contextFor(null, deps.keyDeps.keys(), COMMANDS), safeStore -> { + safeStore.registerHistoricalTransactions(deps); + }).begin((success, fail2) -> { + if (fail2 != null) + { + retry(); + logger.warn("Failed to apply deps for syncing epoch {} for range {}", epoch, ranges, fail2); + } + else + { + try + { + synchronized (MajorityDepsFetcher.this) + { + index = nextIndex; + if (index >= numberOfSplits) + { + logger.info("Successfully fetched majority deps for {} at epoch {}", range, epoch); + defunct = true; + shardSchedulers.remove(range, ShardScheduler.this); + } + else + { + schedule(); + } + } + } + catch (Throwable t) + { + retry(); + logger.error("Unexpected exception handling durability scheduling callback; starting from scratch", t); + } + } + }); + } + }); + }); + } + } + + public MajorityDepsFetcher(Node node) + { + this.node = node; + } + + public void setTargetShardSplits(int targetShardSplits) + { + this.targetShardSplits = targetShardSplits; + } + + public void setDefaultRetryDelay(long retryDelay, TimeUnit units) + { + this.defaultRetryDelayMicros = units.toMicros(retryDelay); + } + + public void setMaxRetryDelay(long retryDelay, TimeUnit units) + { + this.maxRetryDelayMicros = units.toMicros(retryDelay); + } + + public synchronized void fetchMajorityDeps(CommandStore commandStore, Range range, long epoch) + { + ShardScheduler scheduler = shardSchedulers.get(range); + if (scheduler != null) + { + if (scheduler.epoch >= epoch) + return; + scheduler.markDefunct(); + } + scheduler = new ShardScheduler(commandStore, range, epoch); + scheduler.schedule(); + } + +} \ No newline at end of file diff --git a/accord-core/src/main/java/accord/impl/progresslog/HomeState.java b/accord-core/src/main/java/accord/impl/progresslog/HomeState.java index 261aca052..c71864502 100644 --- a/accord-core/src/main/java/accord/impl/progresslog/HomeState.java +++ b/accord-core/src/main/java/accord/impl/progresslog/HomeState.java @@ -114,7 +114,6 @@ final void clearHomeRetryCounter() encodedState &= ~shiftedMask; } - void atLeast(SafeCommandStore safeStore, DefaultProgressLog instance, CoordinatePhase newPhase, Progress newProgress) { if (phase() == Done) diff --git a/accord-core/src/main/java/accord/local/CommandStore.java b/accord-core/src/main/java/accord/local/CommandStore.java index eda91d8cc..a50777bfb 100644 --- a/accord-core/src/main/java/accord/local/CommandStore.java +++ b/accord-core/src/main/java/accord/local/CommandStore.java @@ -21,11 +21,11 @@ import accord.api.LocalListeners; import accord.api.ProgressLog; import accord.api.DataStore; -import accord.coordinate.CollectCalculatedDeps; import javax.annotation.Nullable; import accord.api.Agent; +import accord.impl.MajorityDepsFetcher; import accord.local.CommandStores.RangesForEpoch; import accord.primitives.Range; import accord.primitives.Routables; @@ -44,8 +44,6 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; @@ -57,15 +55,12 @@ import org.slf4j.LoggerFactory; import accord.primitives.Deps; -import accord.primitives.FullRoute; import accord.primitives.Ranges; import accord.primitives.Timestamp; import accord.primitives.TxnId; import accord.utils.async.AsyncResults; import static accord.api.ConfigurationService.EpochReady.DONE; -import static accord.local.KeyHistory.COMMANDS; -import static accord.local.PreLoadContext.contextFor; import static accord.local.PreLoadContext.empty; import static accord.primitives.AbstractRanges.UnionMode.MERGE_ADJACENT; import static accord.primitives.Routables.Slice.Minimal; @@ -479,62 +474,13 @@ protected Supplier sync(Node node, Ranges ranges, long epoch) }; } + private MajorityDepsFetcher fetcher; // TODO (required, correctness): replace with a simple wait on suitable exclusive sync point(s) private void fetchMajorityDeps(AsyncResults.SettableResult coordination, Node node, long epoch, Ranges ranges) { - AtomicInteger index = new AtomicInteger(); - fetchMajorityDeps(coordination, node, epoch, ranges, index); - } - - // this is temporary until we rely on RX - static final int FETCH_SLICES = Invariants.debug() ? 1 : 64; - private void fetchMajorityDeps(AsyncResults.SettableResult coordination, Node node, long epoch, Ranges ranges, AtomicInteger index) - { - TxnId id = TxnId.fromValues(epoch - 1, 0, node.id()); - Timestamp before = Timestamp.minForEpoch(epoch); - int rangeIndex = index.get() / FETCH_SLICES; - int subRangeIndex = index.get() % FETCH_SLICES; - int nextIndex; - Range nextRange; - { - int nextSubRangeIndex = subRangeIndex; - Range selectRange = null; - while (selectRange == null) - selectRange = node.commandStores().shardDistributor().splitRange(ranges.get(rangeIndex), subRangeIndex, ++nextSubRangeIndex, FETCH_SLICES); - nextRange = selectRange; - nextIndex = rangeIndex + nextSubRangeIndex; - } - FullRoute route = node.computeRoute(id, Ranges.of(nextRange)); - logger.debug("Fetching deps to sync epoch {} for range {}", epoch, nextRange); - CollectCalculatedDeps.withCalculatedDeps(node, id, route, route, before, (deps, fail) -> { - if (fail != null) - { - logger.warn("Failed to fetch deps for syncing epoch {} for range {}", epoch, nextRange, fail); - node.scheduler().once(() -> fetchMajorityDeps(coordination, node, epoch, ranges, index), 1L, TimeUnit.MINUTES); - } - else - { - // TODO (correctness) : PreLoadContext only works with Seekables, which doesn't allow mixing Keys and Ranges... But Deps has both Keys AND Ranges! - // ATM all known implementations store ranges in-memory, but this will not be true soon, so this will need to be addressed - execute(contextFor(null, deps.txnIds(), deps.keyDeps.keys(), COMMANDS), safeStore -> { - safeStore.registerHistoricalTransactions(deps); - }).begin((success, fail2) -> { - if (fail2 != null) - { - logger.warn("Failed to apply deps for syncing epoch {} for range {}", epoch, nextRange, fail2); - node.scheduler().once(() -> fetchMajorityDeps(coordination, node, epoch, ranges, index), 1L, TimeUnit.MINUTES); - node.agent().onUncaughtException(fail2); - } - else - { - int prev = index.getAndSet(nextIndex); - Invariants.checkState(rangeIndex * FETCH_SLICES + subRangeIndex == prev); - if (nextIndex >= ranges.size() * FETCH_SLICES) coordination.setSuccess(null); - else fetchMajorityDeps(coordination, node, epoch, ranges, index); - } - }); - } - }); + if (fetcher == null) fetcher = new MajorityDepsFetcher(node); + for (Range range : ranges) + fetcher.fetchMajorityDeps(this, range, epoch); } Supplier unbootstrap(long epoch, Ranges removedRanges) diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java b/accord-core/src/main/java/accord/local/SafeCommandStore.java index 1deebf830..8ea32a74f 100644 --- a/accord-core/src/main/java/accord/local/SafeCommandStore.java +++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java @@ -264,7 +264,7 @@ public void setRangesForEpoch(CommandStores.RangesForEpoch rangesForEpoch) commandStore().unsafeSetRangesForEpoch(rangesForEpoch); } - protected void registerHistoricalTransactions(Deps deps) + public void registerHistoricalTransactions(Deps deps) { commandStore().registerHistoricalTransactions(deps, this); } diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java b/accord-core/src/test/java/accord/impl/basic/Cluster.java index d98b44431..5c23fd187 100644 --- a/accord-core/src/test/java/accord/impl/basic/Cluster.java +++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java @@ -477,7 +477,7 @@ public static Map run(Id[] nodes, MessageListener messageLis DurableBefore.NOOP_PERSISTER, localConfig); CoordinateDurabilityScheduling durability = new CoordinateDurabilityScheduling(node); // TODO (desired): randomise - durability.setFrequency(60, SECONDS); + durability.setShardCycleTime(30, SECONDS); durability.setGlobalCycleTime(180, SECONDS); durabilityScheduling.add(durability); nodeMap.put(id, node); @@ -487,15 +487,15 @@ public static Map run(Id[] nodes, MessageListener messageLis Runnable updateDurabilityRate; { - IntSupplier frequencySeconds = random.biasedUniformIntsSupplier( 1, 120, 10, 30, 10, 60).get(); + IntSupplier targetSplits = random.biasedUniformIntsSupplier(1, 16, 2, 4, 4, 16).get(); IntSupplier shardCycleTimeSeconds = random.biasedUniformIntsSupplier(5, 60, 10, 30, 1, 30).get(); - IntSupplier globalCycleTimeSeconds = random.biasedUniformIntsSupplier( 1, 90, 10, 30, 10, 60).get(); + IntSupplier globalCycleTimeSeconds = random.biasedUniformIntsSupplier(1, 90, 10, 30,10, 60).get(); updateDurabilityRate = () -> { - int f = frequencySeconds.getAsInt(); + int c = targetSplits.getAsInt(); int s = shardCycleTimeSeconds.getAsInt(); int g = globalCycleTimeSeconds.getAsInt(); durabilityScheduling.forEach(d -> { - d.setFrequency(f, SECONDS); + d.setTargetShardSplits(c); d.setShardCycleTime(s, SECONDS); d.setGlobalCycleTime(g, SECONDS); });