Skip to content

Commit

Permalink
Add vertex store size limit; cleanup vertex store events
Browse files Browse the repository at this point in the history
  • Loading branch information
LukasGasior1 committed Feb 27, 2024
1 parent feaec60 commit 501a3bf
Show file tree
Hide file tree
Showing 51 changed files with 1,004 additions and 733 deletions.
7 changes: 6 additions & 1 deletion common/src/main/java/com/radixdlt/monitoring/Metrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,12 @@ public record Sync(
Counter invalidEpochInitialQcSyncStates) {}

public record VertexStore(
Gauge size, Counter forks, Counter rebuilds, Counter indirectParents) {}
Gauge size,
Gauge byteSize,
Counter forks,
Counter rebuilds,
Counter indirectParents,
Counter errorsDueToSizeLimit) {}

public record DivergentVertexExecution(int numDistinctExecutionResults) {}
}
Expand Down
4 changes: 4 additions & 0 deletions common/src/main/java/com/radixdlt/utils/WrappedByteArray.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ public String toHexString() {
return Hex.toHexString(value);
}

public int size() {
return value.length;
}

@Override
public byte[] hashableBytes() {
return value;
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void test_pacemaker_round_update_race_condition() {
private EventProcessor<BFTInsertUpdate> bftInsertUpdateProcessor() {
final Map<HashCode, ExecutedVertex> insertedVertices = new HashMap<>();
return bftInsertUpdate -> {
final ExecutedVertex inserted = bftInsertUpdate.getInserted();
final ExecutedVertex inserted = bftInsertUpdate.insertedVertex();
insertedVertices.putIfAbsent(inserted.getVertexHash(), inserted);
final Optional<ExecutedVertex> maybeParent =
Optional.ofNullable(insertedVertices.get(inserted.getParentId()));
Expand Down Expand Up @@ -204,7 +204,7 @@ private static MessageMutator messUpMessagesForNodeUnderTest() {
queue.add(message.withAdditionalDelay(additionalMessageDelay));
return true;
} else if (msg instanceof BFTInsertUpdate
&& ((BFTInsertUpdate) msg).getInserted().getRound().equals(Round.of(1))) {
&& ((BFTInsertUpdate) msg).insertedVertex().getRound().equals(Round.of(1))) {
queue.add(message.withAdditionalDelay(additionalMessageDelay));
return true;
} else {
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/java/com/radixdlt/RadixNodeModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import com.radixdlt.consensus.bft.*;
import com.radixdlt.consensus.epoch.EpochsConsensusModule;
import com.radixdlt.consensus.sync.BFTSyncPatienceMillis;
import com.radixdlt.consensus.vertexstore.VertexStoreConfig;
import com.radixdlt.environment.*;
import com.radixdlt.environment.rx.RxEnvironmentModule;
import com.radixdlt.genesis.GenesisProvider;
Expand Down Expand Up @@ -156,6 +157,19 @@ protected void configure() {
bindConstant().annotatedWith(AdditionalRoundTimeIfProposalReceivedMs.class).to(30_000L);
bindConstant().annotatedWith(TimeoutQuorumResolutionDelayMs.class).to(0L);

final var vertexStoreConfig =
new VertexStoreConfig(
properties.get(
"bft.vertex_store.max_serialized_size_bytes",
VertexStoreConfig.DEFAULT_MAX_SERIALIZED_SIZE_BYTES));
bind(VertexStoreConfig.class).toInstance(vertexStoreConfig);

Preconditions.checkArgument(
vertexStoreConfig.maxSerializedSizeBytes()
>= VertexStoreConfig.MIN_MAX_SERIALIZED_SIZE_BYTES,
"Invalid configuration: bft.vertex_store.max_serialized_size_byte must be at least "
+ VertexStoreConfig.MIN_MAX_SERIALIZED_SIZE_BYTES);

// System (e.g. time, random)
install(new SystemModule());

Expand Down
55 changes: 12 additions & 43 deletions core/src/main/java/com/radixdlt/consensus/bft/BFTHighQCUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,48 +64,17 @@

package com.radixdlt.consensus.bft;

import com.google.common.collect.ImmutableList;
import com.radixdlt.consensus.HighQC;
import com.radixdlt.consensus.vertexstore.VertexStoreState;
import java.util.Objects;
import com.radixdlt.consensus.vertexstore.ExecutedVertex;
import com.radixdlt.lang.Option;
import com.radixdlt.utils.WrappedByteArray;

/** An event emitted when the high qc has been updated */
public final class BFTHighQCUpdate {
private final VertexStoreState vertexStoreState;

private BFTHighQCUpdate(VertexStoreState vertexStoreState) {
this.vertexStoreState = vertexStoreState;
}

public static BFTHighQCUpdate create(VertexStoreState vertexStoreState) {
return new BFTHighQCUpdate(vertexStoreState);
}

public HighQC getHighQC() {
return vertexStoreState.getHighQC();
}

public VertexStoreState getVertexStoreState() {
return vertexStoreState;
}

@Override
public String toString() {
return String.format(
"%s{highQC=%s}", this.getClass().getSimpleName(), vertexStoreState.getHighQC());
}

@Override
public int hashCode() {
return Objects.hash(vertexStoreState);
}

@Override
public boolean equals(Object o) {
if (!(o instanceof BFTHighQCUpdate)) {
return false;
}

BFTHighQCUpdate other = (BFTHighQCUpdate) o;
return Objects.equals(other.vertexStoreState, this.vertexStoreState);
}
}
/**
* An event emitted when vertex store updates its highQC, which possibly results in some vertices
* being committed.
*/
public record BFTHighQCUpdate(
HighQC newHighQc,
Option<ImmutableList<ExecutedVertex>> committedVertices,
WrappedByteArray serializedVertexStoreState) {}
70 changes: 4 additions & 66 deletions core/src/main/java/com/radixdlt/consensus/bft/BFTInsertUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,71 +64,9 @@

package com.radixdlt.consensus.bft;

import com.radixdlt.consensus.BFTHeader;
import com.radixdlt.consensus.vertexstore.ExecutedVertex;
import com.radixdlt.consensus.vertexstore.VertexStoreState;
import java.util.Objects;
import com.radixdlt.utils.WrappedByteArray;

/** An update emitted when the BFT has inserted a new vertex */
public final class BFTInsertUpdate {
private final VertexStoreState vertexStoreState;
private final ExecutedVertex insertedVertex;
private final int siblingsCount;

private BFTInsertUpdate(
ExecutedVertex insertedVertex, int siblingsCount, VertexStoreState vertexStoreState) {
this.insertedVertex = Objects.requireNonNull(insertedVertex);
this.siblingsCount = siblingsCount;
this.vertexStoreState = Objects.requireNonNull(vertexStoreState);
}

public static BFTInsertUpdate insertedVertex(
ExecutedVertex insertedVertex, int siblingsCount, VertexStoreState vertexStoreState) {
return new BFTInsertUpdate(insertedVertex, siblingsCount, vertexStoreState);
}

public VertexStoreState getVertexStoreState() {
return vertexStoreState;
}

public int getSiblingsCount() {
return siblingsCount;
}

public int getVertexStoreSize() {
return vertexStoreState.getVertices().size();
}

public BFTHeader getHeader() {
return new BFTHeader(
insertedVertex.getRound(),
insertedVertex.getVertexHash(),
insertedVertex.getLedgerHeader());
}

public ExecutedVertex getInserted() {
return insertedVertex;
}

@Override
public int hashCode() {
return Objects.hash(vertexStoreState, insertedVertex, siblingsCount);
}

@Override
public boolean equals(Object o) {
if (!(o instanceof BFTInsertUpdate)) {
return false;
}

BFTInsertUpdate other = (BFTInsertUpdate) o;
return Objects.equals(this.vertexStoreState, other.vertexStoreState)
&& Objects.equals(this.insertedVertex, other.insertedVertex)
&& this.siblingsCount == other.siblingsCount;
}

@Override
public String toString() {
return String.format("%s{inserted=%s}", getClass().getSimpleName(), insertedVertex);
}
}
/** An event emitted after a vertex has been inserted into the vertex store. */
public record BFTInsertUpdate(
ExecutedVertex insertedVertex, WrappedByteArray serializedVertexStoreState) {}
Original file line number Diff line number Diff line change
Expand Up @@ -65,42 +65,8 @@
package com.radixdlt.consensus.bft;

import com.radixdlt.consensus.vertexstore.VertexStoreState;
import java.util.Objects;
import com.radixdlt.utils.WrappedByteArray;

/** An update emitted when the BFT has been rebuilt */
public final class BFTRebuildUpdate {
private final VertexStoreState vertexStoreState;

private BFTRebuildUpdate(VertexStoreState vertexStoreState) {
this.vertexStoreState = vertexStoreState;
}

public static BFTRebuildUpdate create(VertexStoreState vertexStoreState) {
return new BFTRebuildUpdate(vertexStoreState);
}

public VertexStoreState getVertexStoreState() {
return vertexStoreState;
}

@Override
public String toString() {
return String.format(
"%s{root=%s}", this.getClass().getSimpleName(), vertexStoreState.getRoot());
}

@Override
public int hashCode() {
return Objects.hash(vertexStoreState);
}

@Override
public boolean equals(Object o) {
if (!(o instanceof BFTRebuildUpdate)) {
return false;
}

BFTRebuildUpdate other = (BFTRebuildUpdate) o;
return Objects.equals(other.vertexStoreState, this.vertexStoreState);
}
}
/** An even emitted when the vertex store has been rebuilt. */
public record BFTRebuildUpdate(
VertexStoreState vertexStoreState, WrappedByteArray serializedVertexStoreState) {}
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ private void processRoundCachedEvent(QueuedConsensusEvent queuedEvent) {

@Override
public void processBFTUpdate(BFTInsertUpdate update) {
final var vertexId = update.getInserted().getVertexHash();
final var vertexId = update.insertedVertex().getVertexHash();
log.trace("LOCAL_SYNC: {}", vertexId);

syncingEvents.stream()
Expand All @@ -167,7 +167,7 @@ public void processBFTUpdate(BFTInsertUpdate update) {
@Override
public void processBFTRebuildUpdate(BFTRebuildUpdate rebuildUpdate) {
rebuildUpdate
.getVertexStoreState()
.vertexStoreState()
.getVertices()
.forEach(
v -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ public void processBFTUpdate(BFTInsertUpdate update) {
public EventProcessor<BFTRebuildUpdate> bftRebuildUpdateEventProcessor() {
return update -> {
if (update
.getVertexStoreState()
.vertexStoreState()
.getRoot()
.vertex()
.getParentHeader()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import com.radixdlt.consensus.liveness.*;
import com.radixdlt.consensus.sync.*;
import com.radixdlt.consensus.vertexstore.VertexStoreAdapter;
import com.radixdlt.consensus.vertexstore.VertexStoreConfig;
import com.radixdlt.consensus.vertexstore.VertexStoreJavaImpl;
import com.radixdlt.crypto.Hasher;
import com.radixdlt.environment.*;
Expand All @@ -84,6 +85,7 @@
import com.radixdlt.messaging.core.GetVerticesRequestRateLimit;
import com.radixdlt.monitoring.Metrics;
import com.radixdlt.p2p.NodeId;
import com.radixdlt.serialization.Serialization;
import com.radixdlt.sync.messages.local.LocalSyncRequest;
import com.radixdlt.utils.TimeSupplier;
import java.util.Comparator;
Expand All @@ -107,7 +109,6 @@ protected void configure() {
eventBinder.addBinding().toInstance(BFTRebuildUpdate.class);
eventBinder.addBinding().toInstance(BFTInsertUpdate.class);
eventBinder.addBinding().toInstance(BFTHighQCUpdate.class);
eventBinder.addBinding().toInstance(BFTCommittedUpdate.class);
eventBinder.addBinding().toInstance(Proposal.class);
eventBinder.addBinding().toInstance(Vote.class);
eventBinder.addBinding().toInstance(LedgerUpdate.class);
Expand Down Expand Up @@ -458,15 +459,17 @@ private VertexStoreFactory vertexStoreFactory(
EventDispatcher<BFTInsertUpdate> updateSender,
EventDispatcher<BFTRebuildUpdate> rebuildUpdateDispatcher,
EventDispatcher<BFTHighQCUpdate> highQCUpdateEventDispatcher,
EventDispatcher<BFTCommittedUpdate> committedDispatcher,
Ledger ledger,
Hasher hasher) {
Hasher hasher,
Serialization serialization,
Metrics metrics,
VertexStoreConfig vertexStoreConfig) {
return vertexStoreState ->
new VertexStoreAdapter(
VertexStoreJavaImpl.create(vertexStoreState, ledger, hasher),
new VertexStoreJavaImpl(
ledger, hasher, serialization, metrics, vertexStoreConfig, vertexStoreState),
highQCUpdateEventDispatcher,
updateSender,
rebuildUpdateDispatcher,
committedDispatcher);
rebuildUpdateDispatcher);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,8 @@ public void processProposal(Proposal proposal) {
public void processBFTUpdate(BFTInsertUpdate update) {
log.trace("BFTUpdate: Processing {}", update);

final var round = update.getHeader().getRound();
final var vertex = update.getInserted();
final var round = update.insertedVertex().getRound();
final var vertex = update.insertedVertex();

if (round.equals(currentRound())) {
// A vertex for the current round has been inserted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,5 @@
* RadixEngine ((RPNV1-718)
*/
public interface PersistentVertexStore {
void save(VertexStoreState vertexStoreState);
void save(byte[] serializedVertexStoreState);
}
Loading

0 comments on commit 501a3bf

Please sign in to comment.