Skip to content

Commit

Permalink
feat(snap): add retry mechanism; add extra configurability; fix snap …
Browse files Browse the repository at this point in the history
…state chunk msg encoding; other minor fixes
  • Loading branch information
Vovchyk committed Dec 20, 2024
1 parent a4acb49 commit 4931a20
Show file tree
Hide file tree
Showing 35 changed files with 794 additions and 239 deletions.
1 change: 1 addition & 0 deletions rskj-core/src/main/java/co/rsk/RskContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -2084,6 +2084,7 @@ private SnapshotProcessor getSnapshotProcessor() {
new ValidGasUsedRule()
),
getRskSystemProperties().getSnapshotChunkSize(),
getRskSystemProperties().checkHistoricalHeaders(),
getRskSystemProperties().isSnapshotParallelEnabled()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public class RskSystemProperties extends SystemProperties {
public static final String USE_PEERS_FROM_LAST_SESSION = "peer.discovery.usePeersFromLastSession";

public static final String PROPERTY_SNAP_CLIENT_ENABLED = "sync.snapshot.client.enabled";
public static final String PROPERTY_SNAP_CLIENT_CHECK_HISTORICAL_HEADERS = "sync.snapshot.client.checkHistoricalHeaders";
public static final String PROPERTY_SNAP_NODES = "sync.snapshot.client.snapBootNodes";

//TODO: REMOVE THIS WHEN THE LocalBLockTests starts working with REMASC
Expand Down Expand Up @@ -429,6 +430,8 @@ public int getLongSyncLimit() {
public boolean isServerSnapshotSyncEnabled() { return configFromFiles.getBoolean("sync.snapshot.server.enabled");}
public boolean isClientSnapshotSyncEnabled() { return configFromFiles.getBoolean(PROPERTY_SNAP_CLIENT_ENABLED);}

public boolean checkHistoricalHeaders() { return configFromFiles.getBoolean(PROPERTY_SNAP_CLIENT_CHECK_HISTORICAL_HEADERS);}

public boolean isSnapshotParallelEnabled() { return configFromFiles.getBoolean("sync.snapshot.client.parallel");}

public int getSnapshotChunkSize() { return configFromFiles.getInt("sync.snapshot.client.chunkSize");}
Expand Down Expand Up @@ -512,10 +515,14 @@ public boolean fastBlockPropagation() {
return configFromFiles.getBoolean("peer.fastBlockPropagation");
}

public Integer getMessageQueueMaxSize() {
public int getMessageQueueMaxSize() {
return configFromFiles.getInt("peer.messageQueue.maxSizePerPeer");
}

public int getMessageQueuePerMinuteThreshold() {
return configFromFiles.getInt("peer.messageQueue.thresholdPerMinutePerPeer");
}

public boolean rpcZeroSignatureIfRemasc() {
return configFromFiles.getBoolean("rpc.zeroSignatureIfRemasc");
}
Expand Down
11 changes: 8 additions & 3 deletions rskj-core/src/main/java/co/rsk/net/NodeMessageHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ private void tryAddMessage(Peer sender, Message message, NodeMsgTraceInfo nodeMs
*/
private boolean controlMessageIngress(Peer sender, Message message, double score) {
return
allowByScore(score) &&
allowByScore(sender, message, score) &&
allowByMessageCount(sender) &&
allowByMinerNotBanned(sender, message) &&
allowByMessageUniqueness(sender, message); // prevent repeated is the most expensive and MUST be the last
Expand All @@ -221,8 +221,13 @@ private boolean controlMessageIngress(Peer sender, Message message, double score
/**
* assert score is acceptable
*/
private boolean allowByScore(double score) {
return score >= 0;
private boolean allowByScore(Peer sender, Message message, double score) {
boolean allow = score >= 0;
if (!allow) {
logger.debug("Message: [{}] from: [{}] with score: [{}] was not allowed", message.getMessageType(), sender, score);
}

return allow;
}

/**
Expand Down
80 changes: 53 additions & 27 deletions rskj-core/src/main/java/co/rsk/net/SnapshotProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import static co.rsk.net.sync.SnapSyncRequestManager.PeerSelector;

/**
* Snapshot Synchronization consist in 3 steps:
* 1. Status: exchange message with the server, to know which block we are going to sync and what the size of the Unitrie of that block is.
Expand Down Expand Up @@ -86,8 +87,7 @@ public class SnapshotProcessor implements InternalService {
private final BlockHeaderParentDependantValidationRule blockHeaderParentValidator;
private final BlockHeaderValidationRule blockHeaderValidator;

private final AtomicLong messageId = new AtomicLong(0);

private final boolean checkHistoricalHeaders;
// flag for parallel requests
private final boolean parallel;

Expand All @@ -106,10 +106,11 @@ public SnapshotProcessor(Blockchain blockchain,
BlockHeaderParentDependantValidationRule blockHeaderParentValidator,
BlockHeaderValidationRule blockHeaderValidator,
int chunkSize,
boolean checkHistoricalHeaders,
boolean isParallelEnabled) {
this(blockchain, trieStore, peersInformation, blockStore, transactionPool,
blockParentValidator, blockValidator, blockHeaderParentValidator, blockHeaderValidator,
chunkSize, isParallelEnabled, null);
chunkSize, checkHistoricalHeaders, isParallelEnabled, null);
}

@VisibleForTesting
Expand All @@ -123,6 +124,7 @@ public SnapshotProcessor(Blockchain blockchain,
BlockHeaderParentDependantValidationRule blockHeaderParentValidator,
BlockHeaderValidationRule blockHeaderValidator,
int chunkSize,
boolean checkHistoricalHeaders,
boolean isParallelEnabled,
@Nullable SyncMessageHandler.Listener listener) {
this.blockchain = blockchain;
Expand All @@ -138,6 +140,7 @@ public SnapshotProcessor(Blockchain blockchain,
this.blockHeaderParentValidator = blockHeaderParentValidator;
this.blockHeaderValidator = blockHeaderValidator;

this.checkHistoricalHeaders = checkHistoricalHeaders;
this.parallel = isParallelEnabled;
this.thread = new Thread(new SyncMessageHandler("SNAP/server", requestQueue, listener) {

Expand All @@ -157,7 +160,7 @@ public void startSyncing(SnapSyncState state) {
}

logger.info("Starting Snap sync");
requestSnapStatus(bestPeerOpt.get());
requestSnapStatus(state, bestPeerOpt.get());
}

private void completeSyncing(SnapSyncState state) {
Expand All @@ -177,9 +180,22 @@ private void failSyncing(SnapSyncState state, Peer peer, EventType eventType, St
/**
* STATUS
*/
private void requestSnapStatus(Peer peer) {
SnapStatusRequestMessage message = new SnapStatusRequestMessage();
peer.sendMessage(message);
private void requestSnapStatus(SnapSyncState state, Peer peer) {
state.submitRequest(snapPeerSelector(peer), SnapStatusRequestMessage::new);
}

private PeerSelector peerSelector(@Nullable Peer peer) {
return PeerSelector.builder()
.withDefaultPeer(() -> peer)
.withAltPeer(peersInformation::getBestPeer)
.build();
}

private PeerSelector snapPeerSelector(@Nullable Peer snapPeer) {
return PeerSelector.builder()
.withDefaultPeer(() -> snapPeer)
.withAltPeer(peersInformation::getBestSnapPeer)
.build();
}

public void processSnapStatusRequest(Peer sender, SnapStatusRequestMessage requestMessage) {
Expand All @@ -201,7 +217,7 @@ public void run() {
}
}

void processSnapStatusRequestInternal(Peer sender, SnapStatusRequestMessage ignoredRequestMessage) {
void processSnapStatusRequestInternal(Peer sender, SnapStatusRequestMessage requestMessage) {
long bestBlockNumber = blockchain.getBestBlock().getNumber();
long checkpointBlockNumber = bestBlockNumber - (bestBlockNumber % BLOCK_NUMBER_CHECKPOINT);
logger.debug("Processing snapshot status request, checkpointBlockNumber: {}, bestBlockNumber: {}", checkpointBlockNumber, bestBlockNumber);
Expand All @@ -226,7 +242,7 @@ void processSnapStatusRequestInternal(Peer sender, SnapStatusRequestMessage igno

long trieSize = opt.get().getTotalSize();
logger.debug("Processing snapshot status request - rootHash: {} trieSize: {}", rootHash, trieSize);
SnapStatusResponseMessage responseMessage = new SnapStatusResponseMessage(blocks, difficulties, trieSize);
SnapStatusResponseMessage responseMessage = new SnapStatusResponseMessage(requestMessage.getId(), blocks, difficulties, trieSize);
sender.sendMessage(responseMessage);
}

Expand Down Expand Up @@ -261,7 +277,7 @@ public void processSnapStatusResponse(SnapSyncState state, Peer sender, SnapStat
generateChunkRequestTasks(state);
startRequestingChunks(state);
} else {
requestBlocksChunk(sender, blocksFromResponse.get(0).getNumber());
requestBlocksChunk(state, blocksFromResponse.get(0).getNumber());
}
}

Expand Down Expand Up @@ -318,9 +334,11 @@ private boolean areBlockPairsValid(Pair<Block, BlockDifficulty> blockPair, @Null
/**
* BLOCK CHUNK
*/
private void requestBlocksChunk(Peer sender, long blockNumber) {
logger.debug("Requesting block chunk to node {} - block {}", sender.getPeerNodeID(), blockNumber);
sender.sendMessage(new SnapBlocksRequestMessage(blockNumber));
private void requestBlocksChunk(SnapSyncState state, long blockNumber) {
state.submitRequest(
peerSelector(null),
messageId -> new SnapBlocksRequestMessage(messageId, blockNumber)
);
}

public void processBlockHeaderChunk(SnapSyncState state, Peer sender, List<BlockHeader> chunk) {
Expand Down Expand Up @@ -405,7 +423,10 @@ private void requestNextBlockHeadersChunk(SnapSyncState state, Peer sender) {

logger.debug("Requesting block header chunk to node {} - block [{}/{}]", peer.getPeerNodeID(), lastVerifiedBlockHeader.getNumber() - 1, parentHash);

state.getSyncEventsHandler().sendBlockHeadersRequest(peer, new ChunkDescriptor(parentHash.getBytes(), (int) count));
state.submitRequest(
peerSelector(sender),
messageId -> new BlockHeadersRequestMessage(messageId, parentHash.getBytes(), (int) count)
);
}

public void processSnapBlocksRequest(Peer sender, SnapBlocksRequestMessage requestMessage) {
Expand Down Expand Up @@ -447,7 +468,7 @@ void processSnapBlocksRequestInternal(Peer sender, SnapBlocksRequestMessage requ
difficulties.add(blockStore.getTotalDifficultyForHash(block.getHash().getBytes()));
}
logger.debug("Sending snap blocks response. From block {} to block {} - chunksize {}", blocks.get(0).getNumber(), blocks.get(blocks.size() - 1).getNumber(), BLOCK_CHUNK_SIZE);
SnapBlocksResponseMessage responseMessage = new SnapBlocksResponseMessage(blocks, difficulties);
SnapBlocksResponseMessage responseMessage = new SnapBlocksResponseMessage(requestMessage.getId(), blocks, difficulties);
sender.sendMessage(responseMessage);
}

Expand All @@ -474,7 +495,12 @@ public void processSnapBlocksResponse(SnapSyncState state, Peer sender, SnapBloc
generateChunkRequestTasks(state);
startRequestingChunks(state);
} else if (nextChunk > lastRequiredBlock) {
requestBlocksChunk(sender, nextChunk);
requestBlocksChunk(state, nextChunk);
} else if (!this.checkHistoricalHeaders) {
logger.info("Finished Snap blocks request sending. Start requesting state chunks without historical headers check");

generateChunkRequestTasks(state);
startRequestingChunks(state);
} else {
logger.info("Finished Snap blocks request sending. Start requesting state chunks and block headers");

Expand All @@ -488,10 +514,11 @@ public void processSnapBlocksResponse(SnapSyncState state, Peer sender, SnapBloc
/**
* STATE CHUNK
*/
private void requestStateChunk(Peer peer, long from, long blockNumber, int chunkSize) {
logger.debug("Requesting state chunk to node {} - block {} - chunkNumber {}", peer.getPeerNodeID(), blockNumber, from / chunkSize);
SnapStateChunkRequestMessage message = new SnapStateChunkRequestMessage(messageId.getAndIncrement(), blockNumber, from, chunkSize);
peer.sendMessage(message);
private void requestStateChunk(SnapSyncState state, Peer peer, long from, long blockNumber, int chunkSize) {
state.submitRequest(
snapPeerSelector(peer),
messageId -> new SnapStateChunkRequestMessage(messageId, blockNumber, from, chunkSize)
);
}

public void processStateChunkRequest(Peer sender, SnapStateChunkRequestMessage requestMessage) {
Expand Down Expand Up @@ -573,7 +600,7 @@ public void processStateChunkResponse(SnapSyncState state, Peer peer, SnapStateC
state.setNextExpectedFrom(nextExpectedFrom + chunkSize * CHUNK_ITEM_SIZE);
} catch (Exception e) {
logger.error("Error while processing chunk response. {}", e.getMessage(), e);
onStateChunkResponseError(peer, nextMessage);
onStateChunkResponseError(state, peer, nextMessage);
}
} else {
break;
Expand All @@ -587,19 +614,18 @@ public void processStateChunkResponse(SnapSyncState state, Peer peer, SnapStateC
}

@VisibleForTesting
void onStateChunkResponseError(Peer peer, SnapStateChunkResponseMessage responseMessage) {
void onStateChunkResponseError(SnapSyncState state, Peer peer, SnapStateChunkResponseMessage responseMessage) {
logger.error("Error while processing chunk response from {} of peer {}. Asking for chunk again.", responseMessage.getFrom(), peer.getPeerNodeID());
Peer alternativePeer = peersInformation.getBestSnapPeerCandidates().stream()
.filter(listedPeer -> !listedPeer.getPeerNodeID().equals(peer.getPeerNodeID()))
.findFirst()
.orElse(peer);
logger.debug("Requesting state chunk \"from\" {} to peer {}", responseMessage.getFrom(), peer.getPeerNodeID());
requestStateChunk(alternativePeer, responseMessage.getFrom(), responseMessage.getBlockNumber(), chunkSize);
requestStateChunk(state, alternativePeer, responseMessage.getFrom(), responseMessage.getBlockNumber(), chunkSize);
}

private void processOrderedStateChunkResponse(SnapSyncState state, Peer peer, SnapStateChunkResponseMessage message) throws Exception {
logger.debug("Processing State chunk received from {} to {}", message.getFrom(), message.getTo());
peersInformation.getOrRegisterPeer(peer);

RLPList nodeLists = RLP.decodeList(message.getChunkOfTrieKeyValue());
final RLPList preRootElements = RLP.decodeList(nodeLists.get(0).getRLPData());
Expand Down Expand Up @@ -651,7 +677,7 @@ private void processOrderedStateChunkResponse(SnapSyncState state, Peer peer, Sn
if (!message.isComplete()) {
executeNextChunkRequestTask(state, peer);
} else {
if (blocksVerified(state)) {
if (!this.checkHistoricalHeaders || blocksVerified(state)) {
completeSyncing(state);
} else {
state.setStateFetched();
Expand Down Expand Up @@ -716,7 +742,7 @@ private void executeNextChunkRequestTask(SnapSyncState state, Peer peer) {
if (!taskQueue.isEmpty()) {
ChunkTask task = taskQueue.poll();

requestStateChunk(peer, task.getFrom(), task.getBlockNumber(), chunkSize);
requestStateChunk(state, peer, task.getFrom(), task.getBlockNumber(), chunkSize);
} else {
logger.warn("No more chunk request tasks.");
}
Expand Down
Loading

0 comments on commit 4931a20

Please sign in to comment.