Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Topology rebuild on bounce #3739

Draft
wants to merge 10 commits into
base: cep-15-accord
Choose a base branch
from
4 changes: 2 additions & 2 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[submodule "modules/accord"]
path = modules/accord
url = https://github.com/apache/cassandra-accord.git
branch = trunk
url = https://github.com/ifesdjeen/cassandra-accord.git
branch = topology-rebuild-on-bounce
2 changes: 1 addition & 1 deletion modules/accord
Submodule accord updated 36 files
+1 −2 accord-core/src/main/java/accord/api/ConfigurationService.java
+47 −6 accord-core/src/main/java/accord/api/Journal.java
+3 −3 accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
+122 −4 accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java
+618 −0 accord-core/src/main/java/accord/impl/CommandChange.java
+33 −54 accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+9 −8 accord-core/src/main/java/accord/impl/InMemoryCommandStores.java
+5 −5 accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
+3 −3 accord-core/src/main/java/accord/impl/progresslog/WaitingState.java
+47 −13 accord-core/src/main/java/accord/local/Command.java
+9 −10 accord-core/src/main/java/accord/local/CommandStore.java
+103 −46 accord-core/src/main/java/accord/local/CommandStores.java
+3 −3 accord-core/src/main/java/accord/local/Commands.java
+5 −4 accord-core/src/main/java/accord/local/Node.java
+7 −7 accord-core/src/main/java/accord/local/SafeCommandStore.java
+15 −15 accord-core/src/main/java/accord/local/StoreParticipants.java
+4 −2 accord-core/src/main/java/accord/messages/PreAccept.java
+3 −3 accord-core/src/main/java/accord/messages/Propagate.java
+1 −1 accord-core/src/main/java/accord/messages/ReadEphemeralTxnData.java
+1 −1 accord-core/src/main/java/accord/primitives/Txn.java
+3 −5 accord-core/src/main/java/accord/topology/TopologyManager.java
+3 −1 accord-core/src/test/java/accord/Utils.java
+3 −1 accord-core/src/test/java/accord/burn/BurnTestBase.java
+4 −4 accord-core/src/test/java/accord/coordinate/CoordinateTransactionTest.java
+1 −1 accord-core/src/test/java/accord/impl/RemoteListenersTest.java
+43 −38 accord-core/src/test/java/accord/impl/basic/Cluster.java
+88 −7 accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
+361 −375 accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
+37 −1 accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
+0 −97 accord-core/src/test/java/accord/impl/basic/VerifyingJournal.java
+1 −1 accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java
+8 −2 accord-core/src/test/java/accord/impl/mock/MockCluster.java
+6 −4 accord-core/src/test/java/accord/local/ImmutableCommandTest.java
+1 −1 accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java
+3 −1 accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
+3 −1 accord-maelstrom/src/main/java/accord/maelstrom/Main.java
1 change: 1 addition & 0 deletions src/java/org/apache/cassandra/config/AccordSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ public enum TransactionalRangeMigration
public boolean ephemeralReadEnabled = true;
public boolean state_cache_listener_jfr_enabled = true;
public final JournalSpec journal = new JournalSpec();
// TODO: do we need more retries; rename
public final MinEpochRetrySpec minEpochSyncRetry = new MinEpochRetrySpec();

public static class MinEpochRetrySpec extends RetrySpec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.accord.AccordJournal;
import org.apache.cassandra.service.accord.AccordJournalValueSerializers;
import org.apache.cassandra.service.accord.AccordJournalValueSerializers.FlyweightSerializer;
import org.apache.cassandra.service.accord.AccordKeyspace;
Expand All @@ -103,7 +104,6 @@
import org.apache.cassandra.service.accord.AccordService;
import org.apache.cassandra.service.accord.IAccordService;
import org.apache.cassandra.service.accord.JournalKey;
import org.apache.cassandra.service.accord.SavedCommand;
import org.apache.cassandra.service.accord.api.AccordAgent;
import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey;
import org.apache.cassandra.service.paxos.PaxosRepairHistory;
Expand Down Expand Up @@ -1099,7 +1099,7 @@ protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition
return newVersion.build().unfilteredIterator();
}

SavedCommand.Builder commandBuilder = (SavedCommand.Builder) builder;
AccordJournal.Builder commandBuilder = (AccordJournal.Builder) builder;
if (commandBuilder.isEmpty())
{
Invariants.checkState(rows.isEmpty());
Expand Down
3 changes: 3 additions & 0 deletions src/java/org/apache/cassandra/exceptions/RequestFailure.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class RequestFailure
public static final RequestFailure COORDINATOR_BEHIND = new RequestFailure(RequestFailureReason.COORDINATOR_BEHIND);
public static final RequestFailure READ_TOO_MANY_INDEXES = new RequestFailure(RequestFailureReason.READ_TOO_MANY_INDEXES);
public static final RequestFailure RETRY_ON_DIFFERENT_TRANSACTION_SYSTEM = new RequestFailure(RequestFailureReason.RETRY_ON_DIFFERENT_TRANSACTION_SYSTEM);
public static final RequestFailure BOOTING = new RequestFailure(RequestFailureReason.RETRY_ON_DIFFERENT_TRANSACTION_SYSTEM);

static
{
Expand Down Expand Up @@ -131,6 +132,7 @@ public static RequestFailure forReason(RequestFailureReason reason)
{
switch (reason)
{
// TODO: this really screws things up
default: throw new IllegalStateException("Unhandled request failure reason " + reason);
case UNKNOWN: return UNKNOWN;
case READ_TOO_MANY_TOMBSTONES: return READ_TOO_MANY_TOMBSTONES;
Expand All @@ -144,6 +146,7 @@ public static RequestFailure forReason(RequestFailureReason reason)
case COORDINATOR_BEHIND: return COORDINATOR_BEHIND;
case READ_TOO_MANY_INDEXES: return READ_TOO_MANY_INDEXES;
case RETRY_ON_DIFFERENT_TRANSACTION_SYSTEM: return RETRY_ON_DIFFERENT_TRANSACTION_SYSTEM;
case BOOTING: return BOOTING;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ public enum RequestFailureReason
NOT_CMS (7),
INVALID_ROUTING (8),
COORDINATOR_BEHIND (9),
READ_TOO_MANY_INDEXES (10),
READ_TOO_MANY_INDEXES (10),
RETRY_ON_DIFFERENT_TRANSACTION_SYSTEM (11),
BOOTING (12),
;

static
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public Set<NodeId> reconfigure(ClusterMetadata metadata, Map<String, Replication
{
if (!filter.apply(metadata, peerId))
{
tmpDirectory = tmpDirectory.without(peerId);
tmpDirectory = tmpDirectory.without(metadata.nextEpoch(), peerId);
tokenMap = tokenMap.unassignTokens(peerId);
}
}
Expand Down
13 changes: 12 additions & 1 deletion src/java/org/apache/cassandra/net/MessageDelivery.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import javax.annotation.Nullable;

import org.slf4j.Logger;
Expand Down Expand Up @@ -100,6 +99,14 @@ public default <REQ, RSP> Future<Message<RSP>> sendWithRetries(Backoff backoff,
return promise;
}

public default <REQ, RSP> Future<Message<RSP>> sendWithRetries(Verb verb, REQ request,
Iterator<InetAddressAndPort> candidates,
RetryPredicate shouldRetry,
RetryErrorMessage errorMessage)
{
return sendWithRetries(Backoff.None.INSTANCE, ImmediateRetryScheduler.instance, verb, request, candidates, shouldRetry, errorMessage);
}

public default <REQ, RSP> void sendWithRetries(Backoff backoff, RetryScheduler retryThreads,
Verb verb, REQ request,
Iterator<InetAddressAndPort> candidates,
Expand Down Expand Up @@ -127,11 +134,15 @@ interface OnResult<T>

interface RetryPredicate
{
public static RetryPredicate times(int n) { return (attempt, from, failure) -> attempt < n; }
RetryPredicate ALWAYS_RETRY = (i1, i2, i3) -> true;
RetryPredicate ALWAYS_REJECT = (i1, i2, i3) -> false;
boolean test(int attempt, InetAddressAndPort from, RequestFailure failure);
}

interface RetryErrorMessage
{
RetryErrorMessage EMPTY = (i1, i2, i3, i4) -> null;
String apply(int attempt, ResponseFailureReason retryFailure, @Nullable InetAddressAndPort from, @Nullable RequestFailure reason);
}

Expand Down
67 changes: 67 additions & 0 deletions src/java/org/apache/cassandra/net/MessagingUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.net;

import java.util.Iterator;

import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.repair.SharedContext;

public class MessagingUtils
{
/**
* Candidate iterator that would try all endpoints known to be alive first, and then try all endpoints
* in a round-robin manner.
*/
public static Iterator<InetAddressAndPort> tryAliveFirst(SharedContext context, Iterable<InetAddressAndPort> peers)
{
return new Iterator<>()
{
boolean firstRun = true;
Iterator<InetAddressAndPort> iter = peers.iterator();
boolean isEmpty = !iter.hasNext();

public boolean hasNext()
{
return !isEmpty;
}

public InetAddressAndPort next()
{
// At first, try all alive nodes
if (firstRun)
{
while (iter.hasNext())
{
InetAddressAndPort candidate = iter.next();
if (context.failureDetector().isAlive(candidate))
return candidate;
}
firstRun = false;
}

// After that, cycle through all nodes
if (!iter.hasNext())
iter = peers.iterator();

return iter.next();
}
};
}
}
4 changes: 4 additions & 0 deletions src/java/org/apache/cassandra/net/Verb.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.apache.cassandra.service.accord.AccordService;
import org.apache.cassandra.service.accord.AccordSyncPropagator;
import org.apache.cassandra.service.accord.AccordSyncPropagator.Notification;
import org.apache.cassandra.service.accord.FetchTopology;
import org.apache.cassandra.service.accord.FetchMinEpoch;
import org.apache.cassandra.service.accord.interop.AccordInteropApply;
import org.apache.cassandra.service.accord.interop.AccordInteropCommit;
Expand Down Expand Up @@ -356,8 +357,11 @@ public enum Verb
ACCORD_INTEROP_READ_REPAIR_RSP (158, P2, writeTimeout, IMMEDIATE, () -> AccordInteropReadRepair.replySerializer, AccordService::responseHandlerOrNoop),
ACCORD_INTEROP_READ_REPAIR_REQ (159, P2, writeTimeout, IMMEDIATE, () -> AccordInteropReadRepair.requestSerializer, AccordService::requestHandlerOrNoop, ACCORD_INTEROP_READ_REPAIR_RSP),
ACCORD_INTEROP_APPLY_REQ (160, P2, writeTimeout, IMMEDIATE, () -> AccordInteropApply.serializer, AccordService::requestHandlerOrNoop, ACCORD_APPLY_RSP),
// TODO: swap IDS?
ACCORD_FETCH_MIN_EPOCH_RSP (166, P2, writeTimeout, IMMEDIATE, () -> FetchMinEpoch.Response.serializer, RESPONSE_HANDLER),
ACCORD_FETCH_MIN_EPOCH_REQ (165, P2, writeTimeout, IMMEDIATE, () -> FetchMinEpoch.serializer, () -> FetchMinEpoch.handler, ACCORD_FETCH_MIN_EPOCH_RSP),
ACCORD_FETCH_TOPOLOGY_RSP (167, P2, writeTimeout, IMMEDIATE, () -> FetchTopology.Response.serializer, RESPONSE_HANDLER),
ACCORD_FETCH_TOPOLOGY_REQ (168, P2, writeTimeout, IMMEDIATE, () -> FetchTopology.serializer, () -> FetchTopology.handler, ACCORD_FETCH_TOPOLOGY_RSP),

// generic failure response
FAILURE_RSP (99, P0, noTimeout, REQUEST_RESPONSE, () -> RequestFailure.serializer, RESPONSE_HANDLER ),
Expand Down
1 change: 1 addition & 0 deletions src/java/org/apache/cassandra/repair/SharedContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
*
* See {@link Global#instance} for the main production path
*/
// TODO: move to util or something?
public interface SharedContext
{
InetAddressAndPort broadcastAddressAndPort();
Expand Down
4 changes: 2 additions & 2 deletions src/java/org/apache/cassandra/service/accord/AccordCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -1211,7 +1211,7 @@ public Object fullShrink(TxnId txnId, Command value)

try
{
return SavedCommand.asSerializedDiff(null, value, current_version);
return AccordJournal.asSerializedChange(null, value, current_version);
}
catch (IOException e)
{
Expand All @@ -1223,7 +1223,7 @@ public Object fullShrink(TxnId txnId, Command value)
@Override
public @Nullable Command inflate(TxnId key, Object serialized)
{
SavedCommand.Builder builder = new SavedCommand.Builder(key);
AccordJournal.Builder builder = new AccordJournal.Builder(key);
ByteBuffer buffer = (ByteBuffer) serialized;
buffer.mark();
try (DataInputBuffer buf = new DataInputBuffer(buffer, false))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import accord.api.Agent;
import accord.api.DataStore;
import accord.api.Journal;
import accord.api.LocalListeners;
import accord.api.ProgressLog;
import accord.api.RoutingKey;
Expand Down Expand Up @@ -64,20 +65,19 @@
import accord.utils.async.AsyncChain;
import accord.utils.async.AsyncChains;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.service.accord.SavedCommand.MinimalCommand;
import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey;
import org.apache.cassandra.service.accord.txn.TxnRead;
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;

import static accord.api.Journal.CommandUpdate;
import static accord.api.Journal.FieldUpdates;
import static accord.api.Journal.Load.MINIMAL;
import static accord.api.Journal.Loader;
import static accord.api.Journal.OnDone;
import static accord.local.KeyHistory.SYNC;
import static accord.primitives.Status.Committed;
import static accord.utils.Invariants.checkState;
import static org.apache.cassandra.service.accord.SavedCommand.Load.MINIMAL;

public class AccordCommandStore extends CommandStore
{
Expand Down Expand Up @@ -155,7 +155,7 @@ public void close()
}

public final String loggingId;
private final IJournal journal;
private final Journal journal;
private final AccordExecutor executor;
private final Executor taskExecutor;
private final ExclusiveCaches caches;
Expand All @@ -174,7 +174,7 @@ public AccordCommandStore(int id,
ProgressLog.Factory progressLogFactory,
LocalListeners.Factory listenerFactory,
EpochUpdateHolder epochUpdateHolder,
IJournal journal,
Journal journal,
AccordExecutor executor)
{
super(id, node, agent, dataStore, progressLogFactory, listenerFactory, epochUpdateHolder);
Expand Down Expand Up @@ -203,7 +203,7 @@ public AccordCommandStore(int id,
loadRangesForEpoch(journal.loadRangesForEpoch(id()));
}

static Factory factory(AccordJournal journal, IntFunction<AccordExecutor> executorFactory)
static Factory factory(Journal journal, IntFunction<AccordExecutor> executorFactory)
{
return (id, node, agent, dataStore, progressLogFactory, listenerFactory, rangesForEpoch) ->
new AccordCommandStore(id, node, agent, dataStore, progressLogFactory, listenerFactory, rangesForEpoch, journal, executorFactory.apply(id));
Expand Down Expand Up @@ -461,10 +461,10 @@ public void registerTransitive(SafeCommandStore safeStore, RangeDeps rangeDeps)
return;

RedundantBefore redundantBefore = unsafeGetRedundantBefore();
CommandStores.RangesForEpoch ranges = safeStore.ranges();
CommandStores.RangesForEpoch ranges = safeStore.rangesForEpoch();
// used in places such as accord.local.CommandStore.fetchMajorityDeps
// We find a set of dependencies for a range then update CommandsFor to know about them
Ranges allRanges = safeStore.ranges().all();
Ranges allRanges = safeStore.rangesForEpoch().all();
Ranges coordinateRanges = Ranges.EMPTY;
long coordinateEpoch = -1;
try (ExclusiveCaches caches = lockCaches())
Expand Down Expand Up @@ -523,7 +523,7 @@ public static Command prepareToCache(Command command)
return command;
}

public MinimalCommand loadMinimal(TxnId txnId)
public Command.Minimal loadMinimal(TxnId txnId)
{
return journal.loadMinimal(id, txnId, MINIMAL, unsafeGetRedundantBefore(), durableBefore());
}
Expand Down Expand Up @@ -611,9 +611,9 @@ void loadSafeToRead(NavigableMap<Timestamp, Ranges> safeToRead)
unsafeSetSafeToRead(safeToRead);
}

void loadRangesForEpoch(CommandStores.RangesForEpoch.Snapshot rangesForEpoch)
void loadRangesForEpoch(CommandStores.RangesForEpoch rangesForEpoch)
{
if (rangesForEpoch != null)
unsafeSetRangesForEpoch(new CommandStores.RangesForEpoch(rangesForEpoch.epochs, rangesForEpoch.ranges, this));
unsafeSetRangesForEpoch(rangesForEpoch);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import accord.api.Agent;
import accord.api.DataStore;
import accord.api.Journal;
import accord.api.LocalListeners;
import accord.api.ProgressLog;
import accord.local.CommandStores;
Expand All @@ -47,8 +48,8 @@
import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;

import static org.apache.cassandra.config.AccordSpec.QueueShardModel.THREAD_PER_SHARD;
import static org.apache.cassandra.config.DatabaseDescriptor.getAccordQueueSubmissionModel;
import static org.apache.cassandra.config.DatabaseDescriptor.getAccordQueueShardCount;
import static org.apache.cassandra.config.DatabaseDescriptor.getAccordQueueSubmissionModel;
import static org.apache.cassandra.service.accord.AccordExecutor.Mode.RUN_WITHOUT_LOCK;
import static org.apache.cassandra.service.accord.AccordExecutor.Mode.RUN_WITH_LOCK;
import static org.apache.cassandra.service.accord.AccordExecutor.constant;
Expand All @@ -60,15 +61,16 @@ public class AccordCommandStores extends CommandStores implements CacheSize

private final CacheSizeMetrics cacheSizeMetrics;
private final AccordExecutor[] executors;

private long cacheSize, workingSetSize;
private int maxQueuedLoads, maxQueuedRangeLoads;
private boolean shrinkingOn;

AccordCommandStores(NodeCommandStoreService node, Agent agent, DataStore store, RandomSource random,
ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenerFactory,
AccordJournal journal, AccordExecutor[] executors)
Journal journal, AccordExecutor[] executors)
{
super(node, agent, store, random, shardDistributor, progressLogFactory, listenerFactory,
super(node, agent, store, random, journal, shardDistributor, progressLogFactory, listenerFactory,
AccordCommandStore.factory(journal, id -> executors[id % executors.length]));
this.executors = executors;
this.cacheSizeMetrics = new CacheSizeMetrics(ACCORD_STATE_CACHE, this);
Expand All @@ -80,9 +82,9 @@ public class AccordCommandStores extends CommandStores implements CacheSize
refreshCapacities();
}

static Factory factory(AccordJournal journal)
static Factory factory()
{
return (time, agent, store, random, shardDistributor, progressLogFactory, listenerFactory) -> {
return (NodeCommandStoreService time, Agent agent, DataStore store, RandomSource random, Journal journal, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory) -> {
AccordExecutor[] executors = new AccordExecutor[getAccordQueueShardCount()];
AccordExecutorFactory factory;
int maxThreads = Integer.MAX_VALUE;
Expand Down Expand Up @@ -119,7 +121,7 @@ static Factory factory(AccordJournal journal)
}
}

return new AccordCommandStores(time, agent, store, random, shardDistributor, progressLogFactory, listenerFactory, journal, executors);
return new AccordCommandStores(time, agent, store, random, shardDistributor, progressLogFactory, listenersFactory, journal, executors);
};
}

Expand Down
Loading