diff --git a/rskj-core/src/integrationTest/java/co/rsk/snap/SnapshotSyncIntegrationTest.java b/rskj-core/src/integrationTest/java/co/rsk/snap/SnapshotSyncIntegrationTest.java index a28d3655da2..81079f4b3ce 100644 --- a/rskj-core/src/integrationTest/java/co/rsk/snap/SnapshotSyncIntegrationTest.java +++ b/rskj-core/src/integrationTest/java/co/rsk/snap/SnapshotSyncIntegrationTest.java @@ -105,7 +105,7 @@ public void whenStartTheServerAndClientNodes_thenTheClientWillSynchWithServer() boolean isClientSynced = false; while (System.currentTimeMillis() < endTime) { - if (clientNode.getOutput().contains("CLIENT - Starting Snapshot sync.") && clientNode.getOutput().contains("CLIENT - Snapshot sync finished successfully!")) { + if (clientNode.getOutput().contains("Starting Snap sync") && clientNode.getOutput().contains("Snap sync finished successfully")) { try { JsonNode jsonResponse = OkHttpClientTestFixture.getJsonResponseForGetBestBlockMessage(portClientRpc, serverBestBlockNumber); JsonNode jsonResult = jsonResponse.get(0).get("result"); diff --git a/rskj-core/src/main/java/co/rsk/RskContext.java b/rskj-core/src/main/java/co/rsk/RskContext.java index db130d7069a..d176812a7af 100644 --- a/rskj-core/src/main/java/co/rsk/RskContext.java +++ b/rskj-core/src/main/java/co/rsk/RskContext.java @@ -180,7 +180,9 @@ public class RskContext implements NodeContext, NodeBootstrapper { private ProofOfWorkRule proofOfWorkRule; private ForkDetectionDataRule forkDetectionDataRule; private BlockParentDependantValidationRule blockParentDependantValidationRule; + private BlockParentDependantValidationRule snapBlockParentDependantValidationRule; private BlockValidationRule blockValidationRule; + private BlockValidationRule snapBlockValidationRule; private BlockValidationRule minerServerBlockValidationRule; private BlockValidator blockValidator; private BlockValidator blockHeaderValidator; @@ -1133,6 +1135,33 @@ public synchronized BlockValidationRule getBlockValidationRule() { return blockValidationRule; } + public synchronized BlockValidationRule getSnapBlockValidationRule() { + checkIfNotClosed(); + + if (snapBlockValidationRule == null) { + final RskSystemProperties rskSystemProperties = getRskSystemProperties(); + final Constants commonConstants = rskSystemProperties.getNetworkConstants(); + final BlockTimeStampValidationRule blockTimeStampValidationRule = new BlockTimeStampValidationRule( + commonConstants.getNewBlockMaxSecondsInTheFuture(), + rskSystemProperties.getActivationConfig(), + rskSystemProperties.getNetworkConstants() + ); + snapBlockValidationRule = new BlockCompositeRule( + new TxsMinGasPriceRule(), + new BlockTxsMaxGasPriceRule(rskSystemProperties.getActivationConfig()), + new BlockRootValidationRule(rskSystemProperties.getActivationConfig()), + getProofOfWorkRule(), + new RemascValidationRule(), + blockTimeStampValidationRule, + new GasLimitRule(commonConstants.getMinGasLimit()), + new ExtraDataRule(commonConstants.getMaximumExtraDataSize()), + new ValidTxExecutionSublistsEdgesRule(getRskSystemProperties().getActivationConfig()) + ); + } + + return snapBlockValidationRule; + } + public synchronized BlockParentDependantValidationRule getBlockParentDependantValidationRule() { checkIfNotClosed(); @@ -1151,6 +1180,23 @@ public synchronized BlockParentDependantValidationRule getBlockParentDependantVa return blockParentDependantValidationRule; } + public synchronized BlockParentDependantValidationRule getSnapBlockParentDependantValidationRule() { + checkIfNotClosed(); + + if (snapBlockParentDependantValidationRule == null) { + Constants commonConstants = getRskSystemProperties().getNetworkConstants(); + snapBlockParentDependantValidationRule = new BlockParentCompositeRule( + new BlockTxsFieldsValidationRule(getBlockTxSignatureCache()), + new PrevMinGasPriceRule(), + new BlockParentNumberRule(), + new BlockDifficultyRule(getDifficultyCalculator()), + new BlockParentGasLimitRule(commonConstants.getGasLimitBoundDivisor()) + ); + } + + return snapBlockParentDependantValidationRule; + } + public synchronized org.ethereum.db.BlockStore buildBlockStore(String databaseDir) { checkIfNotClosed(); @@ -1475,7 +1521,6 @@ protected synchronized SyncConfiguration buildSyncConfiguration() { rskSystemProperties.getTopBest(), rskSystemProperties.isServerSnapshotSyncEnabled(), rskSystemProperties.isClientSnapshotSyncEnabled(), - rskSystemProperties.getSnapshotChunkTimeout(), rskSystemProperties.getSnapshotSyncLimit(), rskSystemProperties.getSnapBootNodes()); } @@ -2010,12 +2055,34 @@ private SyncPool getSyncPool() { private SnapshotProcessor getSnapshotProcessor() { if (snapshotProcessor == null) { + final RskSystemProperties rskSystemProperties = getRskSystemProperties(); + final Constants commonConstants = rskSystemProperties.getNetworkConstants(); + final BlockTimeStampValidationRule blockTimeStampValidationRule = new BlockTimeStampValidationRule( + commonConstants.getNewBlockMaxSecondsInTheFuture(), + rskSystemProperties.getActivationConfig(), + rskSystemProperties.getNetworkConstants() + ); + snapshotProcessor = new SnapshotProcessor( getBlockchain(), getTrieStore(), getPeersInformation(), getBlockStore(), getTransactionPool(), + getSnapBlockParentDependantValidationRule(), + getSnapBlockValidationRule(), + new BlockHeaderParentCompositeRule( + new PrevMinGasPriceRule(), + new BlockParentNumberRule(), + blockTimeStampValidationRule, + new BlockDifficultyRule(getDifficultyCalculator()), + new BlockParentGasLimitRule(commonConstants.getGasLimitBoundDivisor()) + ), + new BlockHeaderCompositeRule( + getProofOfWorkRule(), + blockTimeStampValidationRule, + new ValidGasUsedRule() + ), getRskSystemProperties().getSnapshotChunkSize(), getRskSystemProperties().isSnapshotParallelEnabled() ); diff --git a/rskj-core/src/main/java/co/rsk/config/RskSystemProperties.java b/rskj-core/src/main/java/co/rsk/config/RskSystemProperties.java index 1638f7f155c..9e2da036ee8 100644 --- a/rskj-core/src/main/java/co/rsk/config/RskSystemProperties.java +++ b/rskj-core/src/main/java/co/rsk/config/RskSystemProperties.java @@ -32,7 +32,6 @@ import org.ethereum.crypto.ECKey; import org.ethereum.crypto.HashUtil; import org.ethereum.listener.GasPriceCalculator; -import org.ethereum.net.client.Capability; import org.ethereum.vm.PrecompiledContracts; import javax.annotation.Nonnull; @@ -430,21 +429,6 @@ public int getLongSyncLimit() { public boolean isServerSnapshotSyncEnabled() { return configFromFiles.getBoolean("sync.snapshot.server.enabled");} public boolean isClientSnapshotSyncEnabled() { return configFromFiles.getBoolean(PROPERTY_SNAP_CLIENT_ENABLED);} - @Override - public List peerCapabilities() { - List capabilities = super.peerCapabilities(); - - if (isSnapshotSyncEnabled()) { - capabilities.add(Capability.SNAP); - } - - return capabilities; - } - - public int getSnapshotChunkTimeout() { - return configFromFiles.getInt("sync.snapshot.client.chunkRequestTimeout"); - } - public boolean isSnapshotParallelEnabled() { return configFromFiles.getBoolean("sync.snapshot.client.parallel");} public int getSnapshotChunkSize() { return configFromFiles.getInt("sync.snapshot.client.chunkSize");} @@ -570,10 +554,6 @@ public GasPriceCalculator.GasCalculatorType getGasCalculatorType() { return gasCalculatorType; } - public boolean isSnapshotSyncEnabled(){ - return isServerSnapshotSyncEnabled() || isClientSnapshotSyncEnabled(); - } - private void fetchMethodTimeout(Config configElement, Map methodTimeoutMap) { configElement.getObject("methods.timeout") .unwrapped() diff --git a/rskj-core/src/main/java/co/rsk/core/bc/BlockValidatorImpl.java b/rskj-core/src/main/java/co/rsk/core/bc/BlockValidatorImpl.java index 3b80a2f6d54..be5c529307b 100644 --- a/rskj-core/src/main/java/co/rsk/core/bc/BlockValidatorImpl.java +++ b/rskj-core/src/main/java/co/rsk/core/bc/BlockValidatorImpl.java @@ -35,11 +35,11 @@ public class BlockValidatorImpl implements BlockValidator { private static final Logger logger = LoggerFactory.getLogger("blocksyncservice"); - private BlockStore blockStore; + private final BlockStore blockStore; - private BlockParentDependantValidationRule blockParentValidator; + private final BlockParentDependantValidationRule blockParentValidator; - private BlockValidationRule blockValidator; + private final BlockValidationRule blockValidator; public BlockValidatorImpl(BlockStore blockStore, BlockParentDependantValidationRule blockParentValidator, BlockValidationRule blockValidator) { this.blockStore = blockStore; diff --git a/rskj-core/src/main/java/co/rsk/net/SnapshotProcessor.java b/rskj-core/src/main/java/co/rsk/net/SnapshotProcessor.java index 862ddc6fed0..dddb79f1fe1 100644 --- a/rskj-core/src/main/java/co/rsk/net/SnapshotProcessor.java +++ b/rskj-core/src/main/java/co/rsk/net/SnapshotProcessor.java @@ -20,16 +20,25 @@ import co.rsk.config.InternalService; import co.rsk.core.BlockDifficulty; +import co.rsk.core.types.bytes.Bytes; +import co.rsk.crypto.Keccak256; import co.rsk.net.messages.*; import co.rsk.net.sync.*; +import co.rsk.scoring.EventType; import co.rsk.trie.TrieDTO; import co.rsk.trie.TrieDTOInOrderIterator; import co.rsk.trie.TrieDTOInOrderRecoverer; import co.rsk.trie.TrieStore; +import co.rsk.validators.BlockHeaderParentDependantValidationRule; +import co.rsk.validators.BlockHeaderValidationRule; +import co.rsk.validators.BlockParentDependantValidationRule; +import co.rsk.validators.BlockValidationRule; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; import org.ethereum.core.Block; +import org.ethereum.core.BlockHeader; import org.ethereum.core.Blockchain; import org.ethereum.core.TransactionPool; import org.ethereum.db.BlockStore; @@ -44,6 +53,7 @@ import java.util.*; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; /** @@ -69,7 +79,14 @@ public class SnapshotProcessor implements InternalService { private final int chunkSize; private final SnapshotPeersInformation peersInformation; private final TransactionPool transactionPool; - private long messageId = 0; + + private final BlockParentDependantValidationRule blockParentValidator; + private final BlockValidationRule blockValidator; + + private final BlockHeaderParentDependantValidationRule blockHeaderParentValidator; + private final BlockHeaderValidationRule blockHeaderValidator; + + private final AtomicLong messageId = new AtomicLong(0); // flag for parallel requests private final boolean parallel; @@ -78,14 +95,21 @@ public class SnapshotProcessor implements InternalService { private volatile Boolean isRunning; private final Thread thread; + public SnapshotProcessor(Blockchain blockchain, TrieStore trieStore, SnapshotPeersInformation peersInformation, BlockStore blockStore, TransactionPool transactionPool, + BlockParentDependantValidationRule blockParentValidator, + BlockValidationRule blockValidator, + BlockHeaderParentDependantValidationRule blockHeaderParentValidator, + BlockHeaderValidationRule blockHeaderValidator, int chunkSize, boolean isParallelEnabled) { - this(blockchain, trieStore, peersInformation, blockStore, transactionPool, chunkSize, isParallelEnabled, null); + this(blockchain, trieStore, peersInformation, blockStore, transactionPool, + blockParentValidator, blockValidator, blockHeaderParentValidator, blockHeaderValidator, + chunkSize, isParallelEnabled, null); } @VisibleForTesting @@ -94,6 +118,10 @@ public SnapshotProcessor(Blockchain blockchain, SnapshotPeersInformation peersInformation, BlockStore blockStore, TransactionPool transactionPool, + BlockParentDependantValidationRule blockParentValidator, + BlockValidationRule blockValidator, + BlockHeaderParentDependantValidationRule blockHeaderParentValidator, + BlockHeaderValidationRule blockHeaderValidator, int chunkSize, boolean isParallelEnabled, @Nullable SyncMessageHandler.Listener listener) { @@ -103,29 +131,49 @@ public SnapshotProcessor(Blockchain blockchain, this.chunkSize = chunkSize; this.blockStore = blockStore; this.transactionPool = transactionPool; + + this.blockParentValidator = blockParentValidator; + this.blockValidator = blockValidator; + + this.blockHeaderParentValidator = blockHeaderParentValidator; + this.blockHeaderValidator = blockHeaderValidator; + this.parallel = isParallelEnabled; - this.thread = new Thread(new SyncMessageHandler("SNAP requests", requestQueue, listener) { + this.thread = new Thread(new SyncMessageHandler("SNAP/server", requestQueue, listener) { @Override public boolean isRunning() { return isRunning; } - }, "snap sync request handler"); + }, "snap sync server handler"); } - public void startSyncing() { - // get more than one peer, use the peer queue - // TODO(snap-poc) deal with multiple peers algorithm here - Peer peer = peersInformation.getBestSnapPeerCandidates().get(0); - logger.info("CLIENT - Starting Snapshot sync."); - requestSnapStatus(peer); + public void startSyncing(SnapSyncState state) { + Optional bestPeerOpt = peersInformation.getBestSnapPeer(); + if (bestPeerOpt.isEmpty()) { + logger.warn("No snap-capable peer to start sync against"); + stopSyncing(state); + return; + } + + logger.info("Starting Snap sync"); + requestSnapStatus(bestPeerOpt.get()); + } + + private void completeSyncing(SnapSyncState state) { + boolean result = rebuildStateAndSave(state); + logger.info("Snap sync finished {}", result ? "successfully" : "with errors"); + stopSyncing(state); } - // TODO(snap-poc) should be called on errors too private void stopSyncing(SnapSyncState state) { state.finish(); } + private void failSyncing(SnapSyncState state, Peer peer, EventType eventType, String message, Object... arguments) { + state.fail(peer, eventType, message, arguments); + } + /** * STATUS */ @@ -154,10 +202,9 @@ public void run() { } void processSnapStatusRequestInternal(Peer sender, SnapStatusRequestMessage ignoredRequestMessage) { - logger.debug("SERVER - Processing snapshot status request."); long bestBlockNumber = blockchain.getBestBlock().getNumber(); long checkpointBlockNumber = bestBlockNumber - (bestBlockNumber % BLOCK_NUMBER_CHECKPOINT); - logger.debug("SERVER - checkpointBlockNumber: {}, bestBlockNumber: {}", checkpointBlockNumber, bestBlockNumber); + logger.debug("Processing snapshot status request, checkpointBlockNumber: {}, bestBlockNumber: {}", checkpointBlockNumber, bestBlockNumber); List blocks = Lists.newArrayList(); List difficulties = Lists.newArrayList(); for (long i = checkpointBlockNumber - BLOCK_CHUNK_SIZE; i < checkpointBlockNumber; i++) { @@ -166,53 +213,201 @@ void processSnapStatusRequestInternal(Peer sender, SnapStatusRequestMessage igno difficulties.add(blockStore.getTotalDifficultyForHash(block.getHash().getBytes())); } - logger.trace("SERVER - Sending snapshot status response. From block {} to block {} - chunksize {}", blocks.get(0).getNumber(), blocks.get(blocks.size() - 1).getNumber(), BLOCK_CHUNK_SIZE); Block checkpointBlock = blockchain.getBlockByNumber(checkpointBlockNumber); blocks.add(checkpointBlock); - logger.trace("SERVER - adding checkpoint block: {}", checkpointBlock.getNumber()); difficulties.add(blockStore.getTotalDifficultyForHash(checkpointBlock.getHash().getBytes())); byte[] rootHash = checkpointBlock.getStateRoot(); Optional opt = trieStore.retrieveDTO(rootHash); - long trieSize = 0; - if (opt.isPresent()) { - trieSize = opt.get().getTotalSize(); - } else { - logger.debug("SERVER - trie is notPresent"); + if (opt.isEmpty()) { + logger.warn("trie is not present for rootHash: {}", Bytes.of(rootHash)); + return; } - logger.debug("SERVER - processing snapshot status request - rootHash: {} trieSize: {}", rootHash, trieSize); + + long trieSize = opt.get().getTotalSize(); + logger.debug("Processing snapshot status request - rootHash: {} trieSize: {}", rootHash, trieSize); SnapStatusResponseMessage responseMessage = new SnapStatusResponseMessage(blocks, difficulties, trieSize); sender.sendMessage(responseMessage); } public void processSnapStatusResponse(SnapSyncState state, Peer sender, SnapStatusResponseMessage responseMessage) { + if (!state.isRunning()) { + return; + } + List blocksFromResponse = responseMessage.getBlocks(); List difficultiesFromResponse = responseMessage.getDifficulties(); + if (blocksFromResponse.size() != difficultiesFromResponse.size()) { + failSyncing(state, sender, EventType.INVALID_BLOCK, "Blocks and difficulties size mismatch. Blocks: [{}], Difficulties: [{}]", blocksFromResponse.size(), difficultiesFromResponse.size()); + return; + } + Block lastBlock = blocksFromResponse.get(blocksFromResponse.size() - 1); + BlockDifficulty lastBlockDifficulty = difficultiesFromResponse.get(difficultiesFromResponse.size() - 1); - state.setLastBlock(lastBlock); - state.setLastBlockDifficulty(lastBlock.getCumulativeDifficulty()); + state.setLastBlock(lastBlock, lastBlockDifficulty, sender); state.setRemoteRootHash(lastBlock.getStateRoot()); state.setRemoteTrieSize(responseMessage.getTrieSize()); - for (int i = 0; i < blocksFromResponse.size(); i++) { - state.addBlock(new ImmutablePair<>(blocksFromResponse.get(i), difficultiesFromResponse.get(i))); + if (!validateAndSaveBlocks(state, sender, blocksFromResponse, difficultiesFromResponse)) { + return; + } + + logger.debug("Processing snapshot status response: {} from {} to {} - last blockNumber: {} trieSize: {}", blocksFromResponse.size(), blocksFromResponse.get(0).getNumber(), blocksFromResponse.get(blocksFromResponse.size() - 1).getNumber(), lastBlock.getNumber(), state.getRemoteTrieSize()); + + if (blocksVerified(state)) { + logger.info("Finished Snap blocks request sending"); + + generateChunkRequestTasks(state); + startRequestingChunks(state); + } else { + requestBlocksChunk(sender, blocksFromResponse.get(0).getNumber()); + } + } + + private boolean validateAndSaveBlocks(SnapSyncState state, Peer sender, List blocks, List difficulties) { + Pair childBlockPair = state.getLastBlockPair(); + for (int i = blocks.size() - 1; i >= 0; i--) { + Block block = blocks.get(i); + BlockDifficulty totalDifficulty = difficulties.get(i); + + Pair blockPair = new ImmutablePair<>(block, totalDifficulty); + if (!areBlockPairsValid(blockPair, childBlockPair, true)) { + failSyncing(state, sender, EventType.INVALID_BLOCK, "Block [{}]/[{}] at height: [{}] is not valid", block.getHash(), totalDifficulty, block.getNumber()); + return false; + } + + Block parentBlock = blockStore.getBlockByHash(block.getParentHash().getBytes()); + if (parentBlock != null) { + if (areBlockPairsValid(new ImmutablePair<>(parentBlock, blockStore.getTotalDifficultyForHash(parentBlock.getHash().getBytes())), blockPair, false)) { + state.addBlock(blockPair); + state.setLastVerifiedBlockHeader(blockPair.getLeft().getHeader()); + return true; + } else { + failSyncing(state, sender, EventType.INVALID_BLOCK, "Block [{}]/[{}] at height: [{}] is not valid", block.getHash(), totalDifficulty, block.getNumber()); + return false; + } + } + + state.addBlock(blockPair); + state.setLastVerifiedBlockHeader(blockPair.getLeft().getHeader()); + + childBlockPair = blockPair; + } + + return true; + } + + private boolean areBlockPairsValid(Pair blockPair, @Nullable Pair childBlockPair, boolean validateParent) { + if (validateParent && !blockValidator.isValid(blockPair.getLeft())) { + return false; + } + + if (childBlockPair == null) { + return true; } - logger.debug("CLIENT - Processing snapshot status response - last blockNumber: {} triesize: {}", lastBlock.getNumber(), state.getRemoteTrieSize()); - logger.debug("Blocks included in the response: {} from {} to {}", blocksFromResponse.size(), blocksFromResponse.get(0).getNumber(), blocksFromResponse.get(blocksFromResponse.size() - 1).getNumber()); - requestBlocksChunk(sender, blocksFromResponse.get(0).getNumber()); - generateChunkRequestTasks(state); - startRequestingChunks(state); + + if (!blockPair.getLeft().isParentOf(childBlockPair.getLeft()) + || !blockPair.getRight().equals(childBlockPair.getRight().subtract(childBlockPair.getLeft().getCumulativeDifficulty()))) { + return false; + } + + return blockParentValidator.isValid(childBlockPair.getLeft(), blockPair.getLeft()); } /** * BLOCK CHUNK */ private void requestBlocksChunk(Peer sender, long blockNumber) { - logger.debug("CLIENT - Requesting block chunk to node {} - block {}", sender.getPeerNodeID(), blockNumber); + logger.debug("Requesting block chunk to node {} - block {}", sender.getPeerNodeID(), blockNumber); sender.sendMessage(new SnapBlocksRequestMessage(blockNumber)); } + public void processBlockHeaderChunk(SnapSyncState state, Peer sender, List chunk) { + if (!state.isRunning()) { + return; + } + + logger.debug("Processing block headers response - chunk: [{}; {}]", chunk.get(0).getNumber(), chunk.get(chunk.size() - 1).getNumber()); + + if (!validateBlockHeaders(state, sender, chunk)) { + state.fail(sender, EventType.INVALID_HEADER, "Invalid block headers received"); + return; + } + + if (blocksVerified(state)) { + if (state.isStateFetched()) { + completeSyncing(state); + } + } else { + requestNextBlockHeadersChunk(state, sender); + } + } + + private boolean validateBlockHeaders(SnapSyncState state, Peer sender, List blockHeaders) { + for (int i = 0; i < blockHeaders.size(); i++) { + BlockHeader blockHeader = blockHeaders.get(i); + BlockHeader lastVerifiedBlockHeader = state.getLastVerifiedBlockHeader(); + + if (!areBlockHeadersValid(blockHeader, lastVerifiedBlockHeader, true)) { + failSyncing(state, state.getLastBlockSender(), EventType.INVALID_HEADER, "Block header [{}] at height: [{}] is not valid", blockHeader.getHash(), blockHeader.getNumber()); + return false; + } + + Block parentBlock = blockStore.getBlockByHash(blockHeader.getParentHash().getBytes()); + if (parentBlock != null && !areBlockHeadersValid(parentBlock.getHeader(), blockHeader, false)) { + failSyncing(state, state.getLastBlockSender(), EventType.INVALID_HEADER, "Block header [{}] at height: [{}] is not valid", blockHeader.getHash(), blockHeader.getNumber()); + return false; + } + + state.setLastVerifiedBlockHeader(blockHeader); + + if (blocksVerified(state)) { + return true; + } + } + + return true; + } + + private boolean areBlockHeadersValid(BlockHeader blockHeader, BlockHeader childBlockHeader, boolean validateParent) { + if (validateParent && !blockHeaderValidator.isValid(blockHeader)) { + return false; + } + + if (!blockHeader.isParentOf(childBlockHeader)) { + return false; + } + + return blockHeaderParentValidator.isValid(childBlockHeader, blockHeader); + } + + /** + * BLOCK HEADER CHUNK + */ + private void requestNextBlockHeadersChunk(SnapSyncState state, Peer sender) { + Peer peer = peersInformation.getBestPeer(Collections.singleton(state.getLastBlockSender().getPeerNodeID())) + .orElse(peersInformation.getBestSnapPeer().orElse(sender)); + + BlockHeader lastVerifiedBlockHeader = state.getLastVerifiedBlockHeader(); + Keccak256 parentHash = lastVerifiedBlockHeader.getParentHash(); + if (blockStore.isBlockExist(parentHash.getBytes())) { + logger.error("No more block headers to request"); + stopSyncing(state); + return; + } + long count = Math.min(state.getBlockHeaderChunkSize(), lastVerifiedBlockHeader.getNumber() - 1); + if (count < 1) { + logger.info("No more block headers to request but no genesis found"); + state.fail(state.getLastBlockSender(), EventType.INVALID_HEADER, "Invalid block headers genesis block"); + return; + } + + logger.debug("Requesting block header chunk to node {} - block [{}/{}]", peer.getPeerNodeID(), lastVerifiedBlockHeader.getNumber() - 1, parentHash); + + state.getSyncEventsHandler().sendBlockHeadersRequest(peer, new ChunkDescriptor(parentHash.getBytes(), (int) count)); + } + public void processSnapBlocksRequest(Peer sender, SnapBlocksRequestMessage requestMessage) { if (isRunning != Boolean.TRUE) { logger.warn("processSnapBlocksRequest: invalid state, isRunning: [{}]", isRunning); @@ -233,35 +428,60 @@ public void run() { } void processSnapBlocksRequestInternal(Peer sender, SnapBlocksRequestMessage requestMessage) { - logger.debug("SERVER - Processing snap blocks request"); + logger.debug("Processing snap blocks request"); List blocks = Lists.newArrayList(); List difficulties = Lists.newArrayList(); - long startingBlockNumber = requestMessage.getBlockNumber() - BLOCK_CHUNK_SIZE; + + if (requestMessage.getBlockNumber() < 2) { + logger.debug("Snap blocks request from {} failed because of invalid block number {}", sender.getPeerNodeID(), requestMessage.getBlockNumber()); + return; + } + + long startingBlockNumber = Math.max(1, requestMessage.getBlockNumber() - BLOCK_CHUNK_SIZE); for (long i = startingBlockNumber; i < requestMessage.getBlockNumber(); i++) { Block block = blockchain.getBlockByNumber(i); + if (block == null) { + break; + } blocks.add(block); difficulties.add(blockStore.getTotalDifficultyForHash(block.getHash().getBytes())); } - logger.debug("SERVER - Sending snap blocks response. From block {} to block {} - chunksize {}", blocks.get(0).getNumber(), blocks.get(blocks.size() - 1).getNumber(), BLOCK_CHUNK_SIZE); + logger.debug("Sending snap blocks response. From block {} to block {} - chunksize {}", blocks.get(0).getNumber(), blocks.get(blocks.size() - 1).getNumber(), BLOCK_CHUNK_SIZE); SnapBlocksResponseMessage responseMessage = new SnapBlocksResponseMessage(blocks, difficulties); sender.sendMessage(responseMessage); } public void processSnapBlocksResponse(SnapSyncState state, Peer sender, SnapBlocksResponseMessage responseMessage) { + if (!state.isRunning()) { + return; + } + long lastRequiredBlock = state.getLastBlock().getNumber() - BLOCKS_REQUIRED; List blocksFromResponse = responseMessage.getBlocks(); - logger.debug("CLIENT - Processing snap blocks response. Receiving from block {} to block {} Objective: {}.", blocksFromResponse.get(0).getNumber(), blocksFromResponse.get(blocksFromResponse.size() - 1).getNumber(), lastRequiredBlock); + logger.debug("Processing snap blocks response. Receiving from block {} to block {} Objective: {}.", blocksFromResponse.get(0).getNumber(), blocksFromResponse.get(blocksFromResponse.size() - 1).getNumber(), lastRequiredBlock); List difficultiesFromResponse = responseMessage.getDifficulties(); - for (int i = 0; i < blocksFromResponse.size(); i++) { - state.addBlock(new ImmutablePair<>(blocksFromResponse.get(i), difficultiesFromResponse.get(i))); + if (!validateAndSaveBlocks(state, sender, blocksFromResponse, difficultiesFromResponse)) { + return; } + long nextChunk = blocksFromResponse.get(0).getNumber(); - logger.debug("CLIENT - SnapBlock - nexChunk : {} - lastRequired {}, missing {}", nextChunk, lastRequiredBlock, nextChunk - lastRequiredBlock); - if (nextChunk > lastRequiredBlock) { + logger.debug("SnapBlock - nexChunk : {} - lastRequired {}, missing {}", nextChunk, lastRequiredBlock, nextChunk - lastRequiredBlock); + + if (blocksVerified(state)) { + logger.info("Finished Snap blocks request sending. Start requesting state chunks"); + + generateChunkRequestTasks(state); + startRequestingChunks(state); + } else if (nextChunk > lastRequiredBlock) { requestBlocksChunk(sender, nextChunk); } else { - logger.info("CLIENT - Finished Snap blocks request sending."); + logger.info("Finished Snap blocks request sending. Start requesting state chunks and block headers"); + + generateChunkRequestTasks(state); + startRequestingChunks(state); + + requestNextBlockHeadersChunk(state, sender); } } @@ -269,8 +489,8 @@ public void processSnapBlocksResponse(SnapSyncState state, Peer sender, SnapBloc * STATE CHUNK */ private void requestStateChunk(Peer peer, long from, long blockNumber, int chunkSize) { - logger.debug("CLIENT - Requesting state chunk to node {} - block {} - chunkNumber {}", peer.getPeerNodeID(), blockNumber, from / chunkSize); - SnapStateChunkRequestMessage message = new SnapStateChunkRequestMessage(messageId++, blockNumber, from, chunkSize); + logger.debug("Requesting state chunk to node {} - block {} - chunkNumber {}", peer.getPeerNodeID(), blockNumber, from / chunkSize); + SnapStateChunkRequestMessage message = new SnapStateChunkRequestMessage(messageId.getAndIncrement(), blockNumber, from, chunkSize); peer.sendMessage(message); } @@ -299,8 +519,8 @@ void processStateChunkRequestInternal(Peer sender, SnapStateChunkRequestMessage List trieEncoded = new ArrayList<>(); Block block = blockchain.getBlockByNumber(request.getBlockNumber()); final long to = request.getFrom() + (request.getChunkSize() * CHUNK_ITEM_SIZE); - logger.debug("SERVER - Processing state chunk request from node {}. From {} to calculated {} being chunksize {}", sender.getPeerNodeID(), request.getFrom(), to, request.getChunkSize()); - logger.debug("SERVER - Sending state chunk from {} to {}", request.getFrom(), to); + logger.debug("Processing state chunk request from node {}. From {} to calculated {} being chunksize {}", sender.getPeerNodeID(), request.getFrom(), to, request.getChunkSize()); + logger.debug("Sending state chunk from {} to {}", request.getFrom(), to); TrieDTOInOrderIterator it = new TrieDTOInOrderIterator(trieStore, block.getStateRoot(), request.getFrom(), to); // First we add the root nodes on the left of the current node. They are used to validate the chunk. @@ -329,12 +549,16 @@ void processStateChunkRequestInternal(Peer sender, SnapStateChunkRequestMessage long totalChunkTime = System.currentTimeMillis() - startChunk; - logger.debug("SERVER - Sending state chunk from {} of {} bytes to node {}, totalTime {}ms", request.getFrom(), chunkBytes.length, sender.getPeerNodeID(), totalChunkTime); + logger.debug("Sending state chunk from {} of {} bytes to node {}, totalTime {}ms", request.getFrom(), chunkBytes.length, sender.getPeerNodeID(), totalChunkTime); sender.sendMessage(responseMessage); } public void processStateChunkResponse(SnapSyncState state, Peer peer, SnapStateChunkResponseMessage responseMessage) { - logger.debug("CLIENT - State chunk received chunkNumber {}. From {} to {} of total size {}", responseMessage.getFrom() / CHUNK_ITEM_SIZE, responseMessage.getFrom(), responseMessage.getTo(), state.getRemoteTrieSize()); + if (!state.isRunning()) { + return; + } + + logger.debug("State chunk received chunkNumber {}. From {} to {} of total size {}", responseMessage.getFrom() / CHUNK_ITEM_SIZE, responseMessage.getFrom(), responseMessage.getTo(), state.getRemoteTrieSize()); PriorityQueue queue = state.getSnapStateChunkQueue(); queue.add(responseMessage); @@ -342,7 +566,7 @@ public void processStateChunkResponse(SnapSyncState state, Peer peer, SnapStateC while (!queue.isEmpty()) { SnapStateChunkResponseMessage nextMessage = queue.peek(); long nextExpectedFrom = state.getNextExpectedFrom(); - logger.debug("CLIENT - State chunk dequeued from: {} - expected: {}", nextMessage.getFrom(), nextExpectedFrom); + logger.debug("State chunk dequeued from: {} - expected: {}", nextMessage.getFrom(), nextExpectedFrom); if (nextMessage.getFrom() == nextExpectedFrom) { try { processOrderedStateChunkResponse(state, peer, queue.poll()); @@ -357,7 +581,7 @@ public void processStateChunkResponse(SnapSyncState state, Peer peer, SnapStateC } if (!responseMessage.isComplete()) { - logger.debug("CLIENT - State chunk response not complete. Requesting next chunk."); + logger.debug("State chunk response not complete. Requesting next chunk."); executeNextChunkRequestTask(state, peer); } } @@ -373,11 +597,9 @@ void onStateChunkResponseError(Peer peer, SnapStateChunkResponseMessage response requestStateChunk(alternativePeer, responseMessage.getFrom(), responseMessage.getBlockNumber(), chunkSize); } - private void processOrderedStateChunkResponse(SnapSyncState state, Peer peer, SnapStateChunkResponseMessage message) throws Exception { - logger.debug("CLIENT - Processing State chunk received from {} to {}", message.getFrom(), message.getTo()); + logger.debug("Processing State chunk received from {} to {}", message.getFrom(), message.getTo()); peersInformation.getOrRegisterPeer(peer); - state.onNewChunk(); RLPList nodeLists = RLP.decodeList(message.getChunkOfTrieKeyValue()); final RLPList preRootElements = RLP.decodeList(nodeLists.get(0).getRLPData()); @@ -389,7 +611,6 @@ private void processOrderedStateChunkResponse(SnapSyncState state, Peer peer, Sn List nodes = new ArrayList<>(); List postRootNodes = new ArrayList<>(); - for (int i = 0; i < preRootElements.size(); i++) { final RLPList trieElement = (RLPList) preRootElements.get(i); final byte[] value = trieElement.get(0).getRLPData(); @@ -430,9 +651,11 @@ private void processOrderedStateChunkResponse(SnapSyncState state, Peer peer, Sn if (!message.isComplete()) { executeNextChunkRequestTask(state, peer); } else { - boolean result = rebuildStateAndSave(state); - logger.info("CLIENT - Snapshot sync finished {}! ", result ? "successfully" : "with errors"); - stopSyncing(state); + if (blocksVerified(state)) { + completeSyncing(state); + } else { + state.setStateFetched(); + } } } else { logger.error("Error while verifying chunk response: {}", message); @@ -440,28 +663,33 @@ private void processOrderedStateChunkResponse(SnapSyncState state, Peer peer, Sn } } + private boolean blocksVerified(SnapSyncState state) { + BlockHeader lastVerifiedBlockHeader = state.getLastVerifiedBlockHeader(); + return lastVerifiedBlockHeader != null && blockStore.isBlockExist(lastVerifiedBlockHeader.getParentHash().getBytes()); + } + /** * Once state share is received, rebuild the trie, save it in db and save all the blocks. */ private boolean rebuildStateAndSave(SnapSyncState state) { - logger.info("CLIENT - Recovering trie..."); + logger.info("Recovering trie..."); final TrieDTO[] nodeArray = state.getAllNodes().toArray(new TrieDTO[0]); Optional result = TrieDTOInOrderRecoverer.recoverTrie(nodeArray, this.trieStore::saveDTO); if (result.isPresent() && Arrays.equals(state.getRemoteRootHash(), result.get().calculateHash())) { - logger.info("CLIENT - State final validation OK!"); + logger.info("State final validation OK!"); this.blockchain.removeBlocksByNumber(0); //genesis is removed so backwards sync will always start. BlockConnectorHelper blockConnector = new BlockConnectorHelper(this.blockStore); state.connectBlocks(blockConnector); - logger.info("CLIENT - Setting last block as best block..."); + logger.info("Setting last block as best block..."); this.blockchain.setStatus(state.getLastBlock(), state.getLastBlockDifficulty()); this.transactionPool.setBestBlock(state.getLastBlock()); return true; } - logger.error("CLIENT - State final validation FAILED"); + logger.error("State final validation FAILED"); return false; } diff --git a/rskj-core/src/main/java/co/rsk/net/SyncProcessor.java b/rskj-core/src/main/java/co/rsk/net/SyncProcessor.java index 7c282118e40..0c5c2477eae 100644 --- a/rskj-core/src/main/java/co/rsk/net/SyncProcessor.java +++ b/rskj-core/src/main/java/co/rsk/net/SyncProcessor.java @@ -178,7 +178,7 @@ public void processBlockHeadersResponse(Peer peer, BlockHeadersResponseMessage m MessageType messageType = message.getMessageType(); if (isPending(messageId, messageType)) { removePendingMessage(messageId, messageType); - syncState.newBlockHeaders(message.getBlockHeaders()); + syncState.newBlockHeaders(peer, message.getBlockHeaders()); } else { notifyUnexpectedMessageToPeerScoring(peer, "block headers"); } @@ -289,8 +289,8 @@ public void startBlockForwardSyncing(Peer peer) { } @Override - public void startSnapSync() { - logger.info("Start Snap syncing"); + public void startSnapSync(Peer peer) { + logger.info("Start Snap syncing with {}", peer.getPeerNodeID()); setSyncState(new SnapSyncState(this, snapshotProcessor, syncConfiguration)); } diff --git a/rskj-core/src/main/java/co/rsk/net/sync/BaseSyncState.java b/rskj-core/src/main/java/co/rsk/net/sync/BaseSyncState.java index a2b0fa2cf9b..ea4fdb85a4b 100644 --- a/rskj-core/src/main/java/co/rsk/net/sync/BaseSyncState.java +++ b/rskj-core/src/main/java/co/rsk/net/sync/BaseSyncState.java @@ -30,8 +30,8 @@ import java.util.List; public abstract class BaseSyncState implements SyncState { - protected SyncConfiguration syncConfiguration; - protected SyncEventsHandler syncEventsHandler; + protected final SyncConfiguration syncConfiguration; + protected final SyncEventsHandler syncEventsHandler; protected Duration timeElapsed; @@ -57,7 +57,7 @@ public void tick(Duration duration) { protected void onMessageTimeOut() { /* empty */ } @Override - public void newBlockHeaders(List chunk) { /* empty */ } + public void newBlockHeaders(Peer peer, List chunk) { /* empty */ } @Override public void newBody(BodyResponseMessage message, Peer peer) { /* empty */ } diff --git a/rskj-core/src/main/java/co/rsk/net/sync/BlockConnectorException.java b/rskj-core/src/main/java/co/rsk/net/sync/BlockConnectorException.java deleted file mode 100644 index 67c297a81ca..00000000000 --- a/rskj-core/src/main/java/co/rsk/net/sync/BlockConnectorException.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * This file is part of RskJ - * Copyright (C) 2023 RSK Labs Ltd. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Lesser General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public License - * along with this program. If not, see . - */ - -package co.rsk.net.sync; - -public class BlockConnectorException extends RuntimeException { - private final long blockNumber; - private final long childBlockNumber; - - public BlockConnectorException(final long blockNumber, final long childBlockNumber) { - super(String.format("Block with number %s is not child's (%s) parent.", blockNumber, childBlockNumber)); - this.blockNumber = blockNumber; - this.childBlockNumber = childBlockNumber; - } - - public long getBlockNumber() { - return blockNumber; - } - - public long getChildBlockNumber() { - return childBlockNumber; - } -} diff --git a/rskj-core/src/main/java/co/rsk/net/sync/BlockConnectorHelper.java b/rskj-core/src/main/java/co/rsk/net/sync/BlockConnectorHelper.java index 58191074de5..c406eedba85 100644 --- a/rskj-core/src/main/java/co/rsk/net/sync/BlockConnectorHelper.java +++ b/rskj-core/src/main/java/co/rsk/net/sync/BlockConnectorHelper.java @@ -29,6 +29,7 @@ public class BlockConnectorHelper { private static final Logger logger = LoggerFactory.getLogger("SnapBlockConnector"); + private final BlockStore blockStore; public BlockConnectorHelper(BlockStore blockStore) { @@ -37,49 +38,28 @@ public BlockConnectorHelper(BlockStore blockStore) { public void startConnecting(List> blockAndDifficultiesList) { if (blockAndDifficultiesList.isEmpty()) { - logger.debug("Block list is empty, nothing to connect"); + logger.warn("Block list is empty, nothing to connect"); return; } - blockAndDifficultiesList.sort(new BlockAndDiffComparator()); - Block child = null; - logger.info("Start connecting blocks ranging from {} to {} - Total: {}", + logger.info("Start connecting blocks ranging from [{}] to [{}] - Total: [{}]", blockAndDifficultiesList.get(0).getKey().getNumber(), blockAndDifficultiesList.get(blockAndDifficultiesList.size() - 1).getKey().getNumber(), blockAndDifficultiesList.size()); - int blockIndex = blockAndDifficultiesList.size() - 1; - if (blockStore.isEmpty()) { - Pair blockAndDifficulty = blockAndDifficultiesList.get(blockIndex); - child = blockAndDifficulty.getLeft(); - logger.debug("BlockStore is empty, setting child block number the last block from the list: {}", child.getNumber()); - blockStore.saveBlock(child, blockAndDifficulty.getRight(), true); - logger.debug("Block number: {} saved", child.getNumber()); - blockIndex--; - } else { - logger.debug("BlockStore is not empty, getting best block"); - child = blockStore.getBestBlock(); - logger.debug("Best block number: {}", child.getNumber()); - } - while (blockIndex >= 0) { - Pair currentBlockAndDifficulty = blockAndDifficultiesList.get(blockIndex); - Block currentBlock = currentBlockAndDifficulty.getLeft(); + int totalSaved = 0; + for (Pair pair : blockAndDifficultiesList) { + Block currentBlock = pair.getLeft(); logger.trace("Connecting block number: {}", currentBlock.getNumber()); - if (!currentBlock.isParentOf(child)) { - throw new BlockConnectorException(currentBlock.getNumber(), child.getNumber()); + if (!blockStore.isBlockExist(currentBlock.getHash().getBytes())) { + blockStore.saveBlock(currentBlock, pair.getRight(), true); + totalSaved++; + } else { + logger.warn("Block: [{}/{}] already exists. Skipping", currentBlock.getNumber(), currentBlock.getHash()); } - blockStore.saveBlock(currentBlock, currentBlockAndDifficulty.getRight(), true); - child = currentBlock; - blockIndex--; } - logger.info("Finished connecting blocks. Last saved block: {}",child.getNumber()); - } - static class BlockAndDiffComparator implements java.util.Comparator> { - @Override - public int compare(Pair o1, Pair o2) { - return Long.compare(o1.getLeft().getNumber(), o2.getLeft().getNumber()); - } + logger.info("Finished connecting blocks. Total saved: [{}]. ", totalSaved); } } diff --git a/rskj-core/src/main/java/co/rsk/net/sync/CheckingBestHeaderSyncState.java b/rskj-core/src/main/java/co/rsk/net/sync/CheckingBestHeaderSyncState.java index 37fd4870bea..44c9fabf22f 100644 --- a/rskj-core/src/main/java/co/rsk/net/sync/CheckingBestHeaderSyncState.java +++ b/rskj-core/src/main/java/co/rsk/net/sync/CheckingBestHeaderSyncState.java @@ -46,7 +46,7 @@ public void onEnter(){ } @Override - public void newBlockHeaders(List chunk){ + public void newBlockHeaders(Peer peer, List chunk){ BlockHeader header = chunk.get(0); boolean unexpectedHeader = !ByteUtil.fastEquals(header.getHash().getBytes(), miniChunk.getHash()); if (unexpectedHeader) { diff --git a/rskj-core/src/main/java/co/rsk/net/sync/DownloadingBackwardsHeadersSyncState.java b/rskj-core/src/main/java/co/rsk/net/sync/DownloadingBackwardsHeadersSyncState.java index 915825a4711..ca52f440933 100644 --- a/rskj-core/src/main/java/co/rsk/net/sync/DownloadingBackwardsHeadersSyncState.java +++ b/rskj-core/src/main/java/co/rsk/net/sync/DownloadingBackwardsHeadersSyncState.java @@ -43,7 +43,7 @@ public DownloadingBackwardsHeadersSyncState( } @Override - public void newBlockHeaders(List toRequest) { + public void newBlockHeaders(Peer peer, List toRequest) { syncEventsHandler.backwardDownloadBodies( child, toRequest.stream().skip(1).collect(Collectors.toList()), selectedPeer ); diff --git a/rskj-core/src/main/java/co/rsk/net/sync/DownloadingHeadersSyncState.java b/rskj-core/src/main/java/co/rsk/net/sync/DownloadingHeadersSyncState.java index 26d64046097..e0658eb338c 100644 --- a/rskj-core/src/main/java/co/rsk/net/sync/DownloadingHeadersSyncState.java +++ b/rskj-core/src/main/java/co/rsk/net/sync/DownloadingHeadersSyncState.java @@ -73,7 +73,7 @@ public DownloadingHeadersSyncState( } @Override - public void newBlockHeaders(List chunk) { + public void newBlockHeaders(Peer peer, List chunk) { Optional currentChunkOpt = chunksDownloadHelper.getCurrentChunk(); if (!currentChunkOpt.isPresent()) { syncEventsHandler.onSyncIssue(selectedPeer, "Current chunk not present on {}", this.getClass()); diff --git a/rskj-core/src/main/java/co/rsk/net/sync/PeerAndModeDecidingSyncState.java b/rskj-core/src/main/java/co/rsk/net/sync/PeerAndModeDecidingSyncState.java index 1389d0e9ad1..777d6ced94b 100644 --- a/rskj-core/src/main/java/co/rsk/net/sync/PeerAndModeDecidingSyncState.java +++ b/rskj-core/src/main/java/co/rsk/net/sync/PeerAndModeDecidingSyncState.java @@ -88,35 +88,30 @@ private void tryStartSyncing() { private boolean tryStartSnapshotSync() { if (!syncConfiguration.isClientSnapSyncEnabled()) { - logger.trace("Snap syncing disabled"); + logger.debug("Snap syncing disabled"); return false; } - // TODO(snap-poc) deal with multiple peers logic here - // TODO: To be handled when we implement the multiple peers - //List bestPeers = peersInformation.getBestPeerCandidates(); - - // TODO: for now, use pre-configured snap boot nodes instead (until snap nodes discovery is implemented) - SnapshotPeersInformation snapPeersInformation = peersInformation; - Optional bestPeerOpt = snapPeersInformation.getBestSnapPeer(); + Optional bestPeerOpt = peersInformation.getBestSnapPeer(); Optional peerBestBlockNumOpt = bestPeerOpt.flatMap(this::getPeerBestBlockNumber); - if (!bestPeerOpt.isPresent() || !peerBestBlockNumOpt.isPresent()) { - logger.trace("Snap syncing not possible, no valid peer"); + if (bestPeerOpt.isEmpty() || peerBestBlockNumOpt.isEmpty()) { + logger.info("Snap syncing not possible, no snap-capable peer available"); return false; } // we consider Snap as part of the Long Sync if (!isValidSnapDistance(peerBestBlockNumOpt.get())) { - logger.debug("Snap syncing not required (long sync not required)"); + logger.info("Snap syncing not required"); return false; } // we consider Snap as part of the Long Sync syncEventsHandler.onLongSyncUpdate(true, peerBestBlockNumOpt.get()); - // send the LIST - syncEventsHandler.startSnapSync(); + // start snap syncing +// syncEventsHandler.startFindingSnapConnectionPoint(bestPeerOpt.get()); + syncEventsHandler.startSnapSync(bestPeerOpt.get()); return true; } diff --git a/rskj-core/src/main/java/co/rsk/net/sync/PeersInformation.java b/rskj-core/src/main/java/co/rsk/net/sync/PeersInformation.java index 593e814beb2..99ebc821496 100644 --- a/rskj-core/src/main/java/co/rsk/net/sync/PeersInformation.java +++ b/rskj-core/src/main/java/co/rsk/net/sync/PeersInformation.java @@ -143,7 +143,7 @@ private Optional getBestPeer(Stream> bestC return Optional.of(entriesToConsider.get(randomIndex).getKey()); } - return getBestCandidatesStream() + return bestCandidatesStream .max(this.peerComparator) .map(Map.Entry::getKey); } @@ -204,6 +204,11 @@ public List getBestSnapPeerCandidates() { .collect(Collectors.toList()); } + @Override + public Optional getBestPeer(Set exclude) { + return getBestPeer(getBestCandidatesStream().filter(p -> !exclude.contains(p.getKey().getPeerNodeID()))); + } + public Set knownNodeIds() { return peerStatuses.keySet().stream() .map(Peer::getPeerNodeID) diff --git a/rskj-core/src/main/java/co/rsk/net/sync/SnapSyncState.java b/rskj-core/src/main/java/co/rsk/net/sync/SnapSyncState.java index 65494b86129..694ec166526 100644 --- a/rskj-core/src/main/java/co/rsk/net/sync/SnapSyncState.java +++ b/rskj-core/src/main/java/co/rsk/net/sync/SnapSyncState.java @@ -21,20 +21,22 @@ import co.rsk.core.BlockDifficulty; import co.rsk.net.Peer; import co.rsk.net.SnapshotProcessor; +import co.rsk.net.messages.MessageType; import co.rsk.net.messages.SnapBlocksResponseMessage; import co.rsk.net.messages.SnapStateChunkResponseMessage; import co.rsk.net.messages.SnapStatusResponseMessage; +import co.rsk.scoring.EventType; import co.rsk.trie.TrieDTO; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import org.apache.commons.lang3.tuple.Pair; import org.ethereum.core.Block; +import org.ethereum.core.BlockHeader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.math.BigInteger; -import java.time.Duration; import java.util.*; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -57,13 +59,18 @@ public class SnapSyncState extends BaseSyncState { private BigInteger stateSize = BigInteger.ZERO; private BigInteger stateChunkSize = BigInteger.ZERO; + private boolean stateFetched; private final List allNodes; private long remoteTrieSize; private byte[] remoteRootHash; private final List> blocks; + private Block lastBlock; private BlockDifficulty lastBlockDifficulty; + private Peer lastBlockSender; + + private BlockHeader lastVerifiedBlockHeader; private long nextExpectedFrom = 0L; @@ -81,13 +88,13 @@ public SnapSyncState(SyncEventsHandler syncEventsHandler, SnapshotProcessor snap this.snapshotProcessor = snapshotProcessor; // TODO(snap-poc) code in SnapshotProcessor should be moved here probably this.allNodes = Lists.newArrayList(); this.blocks = Lists.newArrayList(); - this.thread = new Thread(new SyncMessageHandler("SNAP responses", responseQueue, listener) { + this.thread = new Thread(new SyncMessageHandler("SNAP/client", responseQueue, listener) { @Override public boolean isRunning() { return isRunning; } - }, "snap sync response handler"); + }, "snap sync client handler"); } @Override @@ -98,12 +105,13 @@ public void onEnter() { } isRunning = Boolean.TRUE; thread.start(); - snapshotProcessor.startSyncing(); + snapshotProcessor.startSyncing(this); } @Override public void onSnapStatus(Peer sender, SnapStatusResponseMessage responseMessage) { try { + resetTimeElapsed(); responseQueue.put(new SyncMessageHandler.Job(sender, responseMessage) { @Override public void run() { @@ -111,7 +119,7 @@ public void run() { } }); } catch (InterruptedException e) { - logger.warn("SnapStatusResponseMessage processing was interrupted", e); + logger.warn("{} processing was interrupted", MessageType.SNAP_STATUS_RESPONSE_MESSAGE, e); Thread.currentThread().interrupt(); } } @@ -119,6 +127,7 @@ public void run() { @Override public void onSnapBlocks(Peer sender, SnapBlocksResponseMessage responseMessage) { try { + resetTimeElapsed(); responseQueue.put(new SyncMessageHandler.Job(sender, responseMessage) { @Override public void run() { @@ -126,7 +135,7 @@ public void run() { } }); } catch (InterruptedException e) { - logger.warn("SnapBlocksResponseMessage processing was interrupted", e); + logger.warn("{} processing was interrupted", MessageType.SNAP_BLOCKS_RESPONSE_MESSAGE, e); Thread.currentThread().interrupt(); } } @@ -134,6 +143,7 @@ public void run() { @Override public void onSnapStateChunk(Peer sender, SnapStateChunkResponseMessage responseMessage) { try { + resetTimeElapsed(); responseQueue.put(new SyncMessageHandler.Job(sender, responseMessage) { @Override public void run() { @@ -141,38 +151,52 @@ public void run() { } }); } catch (InterruptedException e) { - logger.warn("SnapStateChunkResponseMessage processing was interrupted", e); + logger.warn("{} processing was interrupted", MessageType.SNAP_STATE_CHUNK_RESPONSE_MESSAGE, e); Thread.currentThread().interrupt(); } } - public void onNewChunk() { - resetTimeElapsed(); - } - @Override - public void tick(Duration duration) { - // TODO(snap-poc) handle multiple peers casuistry, similarly to co.rsk.net.sync.DownloadingBodiesSyncState.tick - - timeElapsed = timeElapsed.plus(duration); - if (timeElapsed.compareTo(syncConfiguration.getTimeoutWaitingSnapChunk()) >= 0) { - onMessageTimeOut(); + public void newBlockHeaders(Peer peer, List chunk) { + try { + resetTimeElapsed(); + responseQueue.put(new SyncMessageHandler.Job(peer, MessageType.BLOCK_HEADERS_RESPONSE_MESSAGE) { + @Override + public void run() { + snapshotProcessor.processBlockHeaderChunk(SnapSyncState.this, peer, chunk); + } + }); + } catch (InterruptedException e) { + logger.warn("{} processing was interrupted", MessageType.BLOCK_HEADERS_RESPONSE_MESSAGE, e); + Thread.currentThread().interrupt(); } } + public SyncEventsHandler getSyncEventsHandler() { + return this.syncEventsHandler; + } + @Override protected void onMessageTimeOut() { - // TODO: call syncEventsHandler.onErrorSyncing() and punish peers after SNAP feature discovery is implemented - - finish(); + fail(getLastBlockSender(), EventType.TIMEOUT_MESSAGE, "Snap sync timed out"); } public Block getLastBlock() { return lastBlock; } - public void setLastBlock(Block lastBlock) { + public void setLastBlock(Block lastBlock, BlockDifficulty lastBlockDifficulty, Peer lastBlockSender) { this.lastBlock = lastBlock; + this.lastBlockDifficulty = lastBlockDifficulty; + this.lastBlockSender = lastBlockSender; + } + + public BlockHeader getLastVerifiedBlockHeader() { + return lastVerifiedBlockHeader; + } + + public void setLastVerifiedBlockHeader(BlockHeader lastVerifiedBlockHeader) { + this.lastVerifiedBlockHeader = lastVerifiedBlockHeader; } public long getNextExpectedFrom() { @@ -187,8 +211,8 @@ public BlockDifficulty getLastBlockDifficulty() { return lastBlockDifficulty; } - public void setLastBlockDifficulty(BlockDifficulty lastBlockDifficulty) { - this.lastBlockDifficulty = lastBlockDifficulty; + public Peer getLastBlockSender() { + return lastBlockSender; } public byte[] getRemoteRootHash() { @@ -211,8 +235,8 @@ public void addBlock(Pair blockPair) { blocks.add(blockPair); } - public void addAllBlocks(List> blocks) { - this.blocks.addAll(blocks); + public Pair getLastBlockPair() { + return blocks.isEmpty() ? null : blocks.get(blocks.size() - 1); } public void connectBlocks(BlockConnectorHelper blockConnectorHelper) { @@ -231,6 +255,14 @@ public void setStateSize(BigInteger stateSize) { this.stateSize = stateSize; } + public boolean isStateFetched() { + return stateFetched; + } + + public void setStateFetched() { + this.stateFetched = true; + } + public BigInteger getStateChunkSize() { return stateChunkSize; } @@ -247,6 +279,14 @@ public Queue getChunkTaskQueue() { return chunkTaskQueue; } + public int getBlockHeaderChunkSize() { + return syncConfiguration.getChunkSize(); + } + + public boolean isRunning() { + return isRunning == Boolean.TRUE; + } + public void finish() { if (isRunning != Boolean.TRUE) { logger.warn("Invalid state, isRunning: [{}]", isRunning); @@ -256,13 +296,27 @@ public void finish() { isRunning = Boolean.FALSE; thread.interrupt(); + logger.debug("Stopping Snap Sync"); + syncEventsHandler.stopSyncing(); } + public void fail(Peer peer, EventType eventType, String message, Object... arguments) { + if (isRunning != Boolean.TRUE) { + logger.warn("Invalid state, isRunning: [{}]", isRunning); + return; + } + + logger.debug("Snap Sync failed due to: {}", eventType); + + isRunning = Boolean.FALSE; + thread.interrupt(); + + syncEventsHandler.onErrorSyncing(peer, eventType, message, arguments); + } + @VisibleForTesting public void setRunning() { isRunning = true; } - - } diff --git a/rskj-core/src/main/java/co/rsk/net/sync/SnapshotPeersInformation.java b/rskj-core/src/main/java/co/rsk/net/sync/SnapshotPeersInformation.java index 52bed4900e4..1a9fad26a8a 100644 --- a/rskj-core/src/main/java/co/rsk/net/sync/SnapshotPeersInformation.java +++ b/rskj-core/src/main/java/co/rsk/net/sync/SnapshotPeersInformation.java @@ -17,10 +17,12 @@ */ package co.rsk.net.sync; +import co.rsk.net.NodeID; import co.rsk.net.Peer; import java.util.List; import java.util.Optional; +import java.util.Set; /** * This is mostly a workaround because SyncProcessor needs to access Peer instances. @@ -30,5 +32,6 @@ public interface SnapshotPeersInformation { Optional getBestSnapPeer(); List getBestSnapPeerCandidates(); + Optional getBestPeer(Set exclude); SyncPeerStatus getOrRegisterPeer(Peer peer); } diff --git a/rskj-core/src/main/java/co/rsk/net/sync/SyncConfiguration.java b/rskj-core/src/main/java/co/rsk/net/sync/SyncConfiguration.java index beeb1ac70e7..722f1329a30 100644 --- a/rskj-core/src/main/java/co/rsk/net/sync/SyncConfiguration.java +++ b/rskj-core/src/main/java/co/rsk/net/sync/SyncConfiguration.java @@ -47,8 +47,6 @@ public final class SyncConfiguration { private final boolean isServerSnapSyncEnabled; private final boolean isClientSnapSyncEnabled; - private final Duration timeoutWaitingSnapChunk; - private final int snapshotSyncLimit; private final Map nodeIdToSnapshotTrustedPeerMap; @@ -92,7 +90,6 @@ public SyncConfiguration( topBest, isServerSnapSyncEnabled, isClientSnapSyncEnabled, - timeoutWaitingSnapChunk, snapshotSyncLimit, Collections.emptyList()); } @@ -109,7 +106,6 @@ public SyncConfiguration( double topBest, boolean isServerSnapSyncEnabled, boolean isClientSnapSyncEnabled, - int timeoutWaitingSnapChunk, int snapshotSyncLimit, List snapBootNodes) { this.expectedPeers = expectedPeers; @@ -123,12 +119,8 @@ public SyncConfiguration( this.topBest = topBest; this.isServerSnapSyncEnabled = isServerSnapSyncEnabled; this.isClientSnapSyncEnabled = isClientSnapSyncEnabled; - // TODO(snap-poc) re-visit the need of this specific timeout as the algorithm evolves - this.timeoutWaitingSnapChunk = Duration.ofSeconds(timeoutWaitingSnapChunk); this.snapshotSyncLimit = snapshotSyncLimit; - - List snapBootNodesList = snapBootNodes != null ? snapBootNodes : Collections.emptyList(); nodeIdToSnapshotTrustedPeerMap = Collections.unmodifiableMap(snapBootNodesList.stream() @@ -179,10 +171,6 @@ public boolean isClientSnapSyncEnabled() { return isClientSnapSyncEnabled; } - public Duration getTimeoutWaitingSnapChunk() { - return timeoutWaitingSnapChunk; - } - public int getSnapshotSyncLimit() { return snapshotSyncLimit; } diff --git a/rskj-core/src/main/java/co/rsk/net/sync/SyncEventsHandler.java b/rskj-core/src/main/java/co/rsk/net/sync/SyncEventsHandler.java index e3cb4d93c83..e0bd7be4e9a 100644 --- a/rskj-core/src/main/java/co/rsk/net/sync/SyncEventsHandler.java +++ b/rskj-core/src/main/java/co/rsk/net/sync/SyncEventsHandler.java @@ -59,5 +59,5 @@ public interface SyncEventsHandler { void backwardSyncing(Peer peer); - void startSnapSync(); + void startSnapSync(Peer peer); } diff --git a/rskj-core/src/main/java/co/rsk/net/sync/SyncMessageHandler.java b/rskj-core/src/main/java/co/rsk/net/sync/SyncMessageHandler.java index b2f282cc468..37796dfb700 100644 --- a/rskj-core/src/main/java/co/rsk/net/sync/SyncMessageHandler.java +++ b/rskj-core/src/main/java/co/rsk/net/sync/SyncMessageHandler.java @@ -20,9 +20,11 @@ import co.rsk.net.Peer; import co.rsk.net.messages.Message; +import co.rsk.net.messages.MessageType; import co.rsk.util.FormatUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.MDC; import java.time.Duration; import java.time.Instant; @@ -30,7 +32,9 @@ public abstract class SyncMessageHandler implements Runnable { - private static final Logger logger = LoggerFactory.getLogger("syncprocessor"); + private static final Logger logger = LoggerFactory.getLogger("syncmessagehandler"); + + public static final String QUEUE_NAME = "queue"; private final String name; @@ -64,8 +68,10 @@ public void run() { try { job = jobQueue.take(); + MDC.put(QUEUE_NAME, name); + if (logger.isDebugEnabled()) { - logger.debug("Processing msg: [{}] from: [{}] for: [{}]", job.getMsg().getMessageType(), job.getSender(), name); + logger.debug("Processing msg: [{}] from: [{}] for: [{}]. Pending count: [{}]", job.getMsgType(), job.getSender(), name, jobQueue.size()); jobStart = Instant.now(); } @@ -73,7 +79,7 @@ public void run() { if (logger.isDebugEnabled()) { logger.debug("Finished processing of msg: [{}] from: [{}] for: [{}] after [{}] seconds.", - job.getMsg().getMessageType(), job.getSender(), name, + job.getMsgType(), job.getSender(), name, FormatUtils.formatNanosecondsToSeconds(Duration.between(jobStart, Instant.now()).toNanos())); } @@ -95,6 +101,8 @@ public void run() { if (listener != null) { listener.onException(e); } + } finally { + MDC.remove(QUEUE_NAME); } } @@ -116,27 +124,31 @@ public interface Listener { public static abstract class Job implements Runnable { private final Peer sender; - - private final Message msg; + private final MessageType msgType; public Job(Peer sender, Message msg) { this.sender = sender; - this.msg = msg; + this.msgType = msg.getMessageType(); + } + + public Job(Peer sender, MessageType msgType) { + this.sender = sender; + this.msgType = msgType; } public Peer getSender() { return sender; } - public Message getMsg() { - return msg; + public MessageType getMsgType() { + return msgType; } @Override public String toString() { return "SyncMessageHandler{" + "sender=" + sender + - ", msgType=" + msg.getMessageType() + + ", msgType=" + msgType + '}'; } } diff --git a/rskj-core/src/main/java/co/rsk/net/sync/SyncState.java b/rskj-core/src/main/java/co/rsk/net/sync/SyncState.java index e3b9b48fa47..4d0436d45b3 100644 --- a/rskj-core/src/main/java/co/rsk/net/sync/SyncState.java +++ b/rskj-core/src/main/java/co/rsk/net/sync/SyncState.java @@ -29,7 +29,7 @@ import java.util.List; public interface SyncState { - void newBlockHeaders(List chunk); + void newBlockHeaders(Peer peer, List chunk); // TODO(mc) don't receive a full message void newBody(BodyResponseMessage message, Peer peer); diff --git a/rskj-core/src/main/java/co/rsk/validators/BlockDifficultyRule.java b/rskj-core/src/main/java/co/rsk/validators/BlockDifficultyRule.java index 2afb970bb60..2f95ab041ff 100644 --- a/rskj-core/src/main/java/co/rsk/validators/BlockDifficultyRule.java +++ b/rskj-core/src/main/java/co/rsk/validators/BlockDifficultyRule.java @@ -43,12 +43,12 @@ public BlockDifficultyRule(DifficultyCalculator difficultyCalculator) { } @Override - public boolean isValid(BlockHeader header, Block parent) { + public boolean isValid(BlockHeader header, BlockHeader parent) { if (header == null || parent == null) { logger.warn("BlockDifficultyRule - block or parent are null"); return false; } - BlockDifficulty calcDifficulty = difficultyCalculator.calcDifficulty(header, parent.getHeader()); + BlockDifficulty calcDifficulty = difficultyCalculator.calcDifficulty(header, parent); BlockDifficulty difficulty = header.getDifficulty(); if (!difficulty.equals(calcDifficulty)) { diff --git a/rskj-core/src/main/java/co/rsk/validators/BlockHeaderParentCompositeRule.java b/rskj-core/src/main/java/co/rsk/validators/BlockHeaderParentCompositeRule.java index 5726b28138b..d7d24093afd 100644 --- a/rskj-core/src/main/java/co/rsk/validators/BlockHeaderParentCompositeRule.java +++ b/rskj-core/src/main/java/co/rsk/validators/BlockHeaderParentCompositeRule.java @@ -34,7 +34,7 @@ public BlockHeaderParentCompositeRule(BlockHeaderParentDependantValidationRule.. } @Override - public boolean isValid(BlockHeader header, Block parent) { + public boolean isValid(BlockHeader header, BlockHeader parent) { String shortHash = header.getPrintableHash(); long number = header.getNumber(); logger.debug("Validating parent header {} {}", shortHash, number); diff --git a/rskj-core/src/main/java/co/rsk/validators/BlockHeaderParentDependantValidationRule.java b/rskj-core/src/main/java/co/rsk/validators/BlockHeaderParentDependantValidationRule.java index c89c070bedb..8878b977d94 100644 --- a/rskj-core/src/main/java/co/rsk/validators/BlockHeaderParentDependantValidationRule.java +++ b/rskj-core/src/main/java/co/rsk/validators/BlockHeaderParentDependantValidationRule.java @@ -22,5 +22,9 @@ import org.ethereum.core.BlockHeader; public interface BlockHeaderParentDependantValidationRule { - boolean isValid(BlockHeader header, Block parent); + boolean isValid(BlockHeader header, BlockHeader parent); + + default boolean isValid(BlockHeader header, Block parentBlock) { + return isValid(header, parentBlock == null ? null : parentBlock.getHeader()); + } } diff --git a/rskj-core/src/main/java/co/rsk/validators/BlockParentGasLimitRule.java b/rskj-core/src/main/java/co/rsk/validators/BlockParentGasLimitRule.java index a41d101e1ce..5510436ed4b 100644 --- a/rskj-core/src/main/java/co/rsk/validators/BlockParentGasLimitRule.java +++ b/rskj-core/src/main/java/co/rsk/validators/BlockParentGasLimitRule.java @@ -50,15 +50,14 @@ public BlockParentGasLimitRule(int gasLimitBoundDivisor) { @Override - public boolean isValid(BlockHeader header, Block parent) { + public boolean isValid(BlockHeader header, BlockHeader parent) { if (header == null || parent == null) { logger.warn("BlockParentGasLimitRule - block or parent are null"); return false; } - BlockHeader parentHeader = parent.getHeader(); BigInteger headerGasLimit = new BigInteger(1, header.getGasLimit()); - BigInteger parentGasLimit = new BigInteger(1, parentHeader.getGasLimit()); + BigInteger parentGasLimit = new BigInteger(1, parent.getGasLimit()); if (headerGasLimit.compareTo(parentGasLimit.multiply(BigInteger.valueOf(gasLimitBoundDivisor - 1L)).divide(BigInteger.valueOf(gasLimitBoundDivisor))) < 0 || headerGasLimit.compareTo(parentGasLimit.multiply(BigInteger.valueOf(gasLimitBoundDivisor + 1L)).divide(BigInteger.valueOf(gasLimitBoundDivisor))) > 0) { diff --git a/rskj-core/src/main/java/co/rsk/validators/BlockParentNumberRule.java b/rskj-core/src/main/java/co/rsk/validators/BlockParentNumberRule.java index 318ce146b5d..a1484e27de0 100644 --- a/rskj-core/src/main/java/co/rsk/validators/BlockParentNumberRule.java +++ b/rskj-core/src/main/java/co/rsk/validators/BlockParentNumberRule.java @@ -35,13 +35,13 @@ public class BlockParentNumberRule implements BlockParentDependantValidationRule private static final Logger logger = LoggerFactory.getLogger("blockvalidator"); @Override - public boolean isValid(BlockHeader header, Block parent) { + public boolean isValid(BlockHeader header, BlockHeader parent) { if (header == null || parent == null) { logger.warn("BlockParentNumberRule - block or parent are null"); return false; } - BlockHeader parentHeader = parent.getHeader(); - if (header.getNumber() != (parentHeader.getNumber() + 1)) { + + if (header.getNumber() != (parent.getNumber() + 1)) { logger.warn("#{}: block number is not parentBlock number + 1", header.getNumber()); return false; } diff --git a/rskj-core/src/main/java/co/rsk/validators/BlockTimeStampValidationRule.java b/rskj-core/src/main/java/co/rsk/validators/BlockTimeStampValidationRule.java index c9432812af1..2e5cf4aae1f 100644 --- a/rskj-core/src/main/java/co/rsk/validators/BlockTimeStampValidationRule.java +++ b/rskj-core/src/main/java/co/rsk/validators/BlockTimeStampValidationRule.java @@ -88,7 +88,7 @@ public boolean isValid(BlockHeader header) { } @Override - public boolean isValid(BlockHeader header, Block parent) { + public boolean isValid(BlockHeader header, BlockHeader parent) { if (this.validPeriodLength == 0) { return true; } diff --git a/rskj-core/src/main/java/co/rsk/validators/PrevMinGasPriceRule.java b/rskj-core/src/main/java/co/rsk/validators/PrevMinGasPriceRule.java index 95bfa4641bc..1530abdcd36 100644 --- a/rskj-core/src/main/java/co/rsk/validators/PrevMinGasPriceRule.java +++ b/rskj-core/src/main/java/co/rsk/validators/PrevMinGasPriceRule.java @@ -39,7 +39,7 @@ public boolean isValid(Block block, Block parent) { } @Override - public boolean isValid(BlockHeader header, Block parent) { + public boolean isValid(BlockHeader header, BlockHeader parent) { if (header.isGenesis()) { return true; } diff --git a/rskj-core/src/main/java/org/ethereum/net/client/Capability.java b/rskj-core/src/main/java/org/ethereum/net/client/Capability.java index c8eb98b1669..50884f01497 100644 --- a/rskj-core/src/main/java/org/ethereum/net/client/Capability.java +++ b/rskj-core/src/main/java/org/ethereum/net/client/Capability.java @@ -51,6 +51,10 @@ public boolean isRSK() { return RSK.equals(name); } + public boolean isSNAP() { + return SNAP.equals(name); + } + @Override public boolean equals(Object obj) { if (this == obj) { diff --git a/rskj-core/src/main/java/org/ethereum/net/client/ConfigCapabilitiesImpl.java b/rskj-core/src/main/java/org/ethereum/net/client/ConfigCapabilitiesImpl.java index eb20c0e2b64..8cfac9b9e29 100644 --- a/rskj-core/src/main/java/org/ethereum/net/client/ConfigCapabilitiesImpl.java +++ b/rskj-core/src/main/java/org/ethereum/net/client/ConfigCapabilitiesImpl.java @@ -40,7 +40,7 @@ public class ConfigCapabilitiesImpl implements ConfigCapabilities{ private final RskSystemProperties config; - private SortedSet allCapabilities = new TreeSet<>(); + private final SortedSet allCapabilities = new TreeSet<>(); public ConfigCapabilitiesImpl(RskSystemProperties config) { if (config.syncVersion() != null) { @@ -54,14 +54,13 @@ public ConfigCapabilitiesImpl(RskSystemProperties config) { } } - if (config.isSnapshotSyncEnabled() && allCapabilities.stream().anyMatch(Capability::isRSK)) { + if (allCapabilities.stream().anyMatch(Capability::isRSK)) { allCapabilities.add(new Capability(SNAP, SNAP_VERSION)); } this.config = config; } - /** * Gets the capabilities listed in 'peer.capabilities' config property * sorted by their names. @@ -83,6 +82,10 @@ public List getConfigCapabilities() { @Override public List getSupportedCapabilities(HelloMessage hello) { List configCaps = getConfigCapabilities(); + if (config.isClientSnapshotSyncEnabled()) { + configCaps.add(new Capability(Capability.SNAP, Capability.SNAP_VERSION)); + } + List supported = new ArrayList<>(); List eths = new ArrayList<>(); diff --git a/rskj-core/src/main/java/org/ethereum/net/message/StaticMessages.java b/rskj-core/src/main/java/org/ethereum/net/message/StaticMessages.java index 10030069022..4c08cd4866c 100644 --- a/rskj-core/src/main/java/org/ethereum/net/message/StaticMessages.java +++ b/rskj-core/src/main/java/org/ethereum/net/message/StaticMessages.java @@ -19,7 +19,7 @@ package org.ethereum.net.message; -import org.ethereum.config.SystemProperties; +import co.rsk.config.RskSystemProperties; import org.ethereum.net.client.Capability; import org.ethereum.net.client.ConfigCapabilities; import org.ethereum.net.p2p.*; @@ -37,7 +37,7 @@ */ public class StaticMessages { - private final SystemProperties config; + private final RskSystemProperties config; private final ConfigCapabilities configCapabilities; public static final PingMessage PING_MESSAGE = new PingMessage(); @@ -45,7 +45,7 @@ public class StaticMessages { public static final GetPeersMessage GET_PEERS_MESSAGE = new GetPeersMessage(); public static final DisconnectMessage DISCONNECT_MESSAGE = new DisconnectMessage(ReasonCode.REQUESTED); - public StaticMessages(SystemProperties config, ConfigCapabilities configCapabilities) { + public StaticMessages(RskSystemProperties config, ConfigCapabilities configCapabilities) { this.config = config; this.configCapabilities = configCapabilities; } @@ -53,11 +53,14 @@ public StaticMessages(SystemProperties config, ConfigCapabilities configCapabili public HelloMessage createHelloMessage(String peerId) { return createHelloMessage(peerId, config.getPeerPort()); } - public HelloMessage createHelloMessage(String peerId, int listenPort) { + public HelloMessage createHelloMessage(String peerId, int listenPort) { String helloAnnouncement = buildHelloAnnouncement(); byte p2pVersion = (byte) config.defaultP2PVersion(); List capabilities = configCapabilities.getConfigCapabilities(); + if (config.isServerSnapshotSyncEnabled()) { + capabilities.add(new Capability(Capability.SNAP, Capability.SNAP_VERSION)); + } return new HelloMessage(p2pVersion, helloAnnouncement, capabilities, listenPort, peerId); diff --git a/rskj-core/src/main/java/org/ethereum/net/server/Channel.java b/rskj-core/src/main/java/org/ethereum/net/server/Channel.java index a79390d7dcd..c390be3318e 100644 --- a/rskj-core/src/main/java/org/ethereum/net/server/Channel.java +++ b/rskj-core/src/main/java/org/ethereum/net/server/Channel.java @@ -19,13 +19,12 @@ package org.ethereum.net.server; -import co.rsk.net.Peer; import co.rsk.net.NodeID; +import co.rsk.net.Peer; import co.rsk.net.eth.RskMessage; import co.rsk.net.eth.RskWireProtocol; import co.rsk.net.messages.Message; import co.rsk.net.messages.MessageType; -import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import org.ethereum.net.MessageQueue; @@ -51,7 +50,6 @@ import java.math.BigInteger; import java.net.InetAddress; import java.net.InetSocketAddress; -import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -77,7 +75,7 @@ public class Channel implements Peer { private final PeerStatistics peerStats = new PeerStatistics(); private Stats stats; - private final boolean isSnapCapable; + private boolean isSnapCapable; public Channel(MessageQueue msgQueue, MessageCodec messageCodec, @@ -85,8 +83,7 @@ public Channel(MessageQueue msgQueue, RskWireProtocol.Factory rskWireProtocolFactory, Eth62MessageFactory eth62MessageFactory, StaticMessages staticMessages, - String remoteId, - List capabilities) { + String remoteId) { this.msgQueue = msgQueue; this.messageCodec = messageCodec; this.nodeManager = nodeManager; @@ -95,19 +92,6 @@ public Channel(MessageQueue msgQueue, this.staticMessages = staticMessages; this.isActive = remoteId != null && !remoteId.isEmpty(); this.stats = new Stats(); - this.isSnapCapable = capabilities.stream() - .anyMatch(capability -> Capability.SNAP.equals(capability.getName())); - } - - @VisibleForTesting - public Channel(MessageQueue msgQueue, - MessageCodec messageCodec, - NodeManager nodeManager, - RskWireProtocol.Factory rskWireProtocolFactory, - Eth62MessageFactory eth62MessageFactory, - StaticMessages staticMessages, - String remoteId) { - this(msgQueue, messageCodec, nodeManager, rskWireProtocolFactory, eth62MessageFactory, staticMessages, remoteId, new ArrayList<>()); } public void sendHelloMessage(ChannelHandlerContext ctx, FrameCodec frameCodec, String nodeId, @@ -170,6 +154,7 @@ public Node getNode() { } public void initMessageCodes(List caps) { + isSnapCapable = caps.stream().anyMatch(Capability::isSNAP); messageCodec.initMessageCodes(caps); } diff --git a/rskj-core/src/main/java/org/ethereum/net/server/EthereumChannelInitializer.java b/rskj-core/src/main/java/org/ethereum/net/server/EthereumChannelInitializer.java index f38bd7eacf8..7d0f478f7ff 100644 --- a/rskj-core/src/main/java/org/ethereum/net/server/EthereumChannelInitializer.java +++ b/rskj-core/src/main/java/org/ethereum/net/server/EthereumChannelInitializer.java @@ -111,7 +111,7 @@ public void initChannel(NioSocketChannel ch) { P2pHandler p2pHandler = new P2pHandler(ethereumListener, messageQueue, config.getPeerP2PPingInterval()); MessageCodec messageCodec = new MessageCodec(ethereumListener, config); HandshakeHandler handshakeHandler = new HandshakeHandler(config, peerScoringManager, p2pHandler, messageCodec, configCapabilities); - Channel channel = new Channel(messageQueue, messageCodec, nodeManager, rskWireProtocolFactory, eth62MessageFactory, staticMessages, remoteId, configCapabilities.getConfigCapabilities()); + Channel channel = new Channel(messageQueue, messageCodec, nodeManager, rskWireProtocolFactory, eth62MessageFactory, staticMessages, remoteId); ch.pipeline().addLast("readTimeoutHandler", new ReadTimeoutHandler(config.peerChannelReadTimeout(), TimeUnit.SECONDS)); ch.pipeline().addLast("handshakeHandler", handshakeHandler); diff --git a/rskj-core/src/main/resources/expected.conf b/rskj-core/src/main/resources/expected.conf index 236d60c62e1..c4680601de2 100644 --- a/rskj-core/src/main/resources/expected.conf +++ b/rskj-core/src/main/resources/expected.conf @@ -281,7 +281,6 @@ sync = { enabled = parallel = chunkSize = - chunkRequestTimeout = limit = snapBootNodes = [ { diff --git a/rskj-core/src/main/resources/reference.conf b/rskj-core/src/main/resources/reference.conf index 86fa6b74016..20a17d2ba5c 100644 --- a/rskj-core/src/main/resources/reference.conf +++ b/rskj-core/src/main/resources/reference.conf @@ -380,12 +380,10 @@ sync { enabled = false # Server / chunk size chunkSize = 50 - # Request timeout (in seconds) - chunkRequestTimeout = 120 # Distance to the tip of the blockchain to start snapshot sync - limit = 1000000 + limit = 10000 # Parallel requests (true, false) - parallel = true + parallel = false # list of SNAP-capable peers to connect to snapBootNodes = [] } diff --git a/rskj-core/src/test/java/co/rsk/core/bc/BlockRelayValidatorTest.java b/rskj-core/src/test/java/co/rsk/core/bc/BlockRelayValidatorTest.java index 03665cfd62c..95c386be84f 100644 --- a/rskj-core/src/test/java/co/rsk/core/bc/BlockRelayValidatorTest.java +++ b/rskj-core/src/test/java/co/rsk/core/bc/BlockRelayValidatorTest.java @@ -47,7 +47,7 @@ void genesisCheck() { verify(block).isGenesis(); verify(blockValidator, never()).isValid(any()); - verify(blockParentValidator, never()).isValid(any(), any()); + verify(blockParentValidator, never()).isValid(any(), any(Block.class)); } @Test @@ -62,7 +62,7 @@ void blockValidatorCheck() { verify(block).isGenesis(); verify(blockValidator).isValid(any()); - verify(blockParentValidator, never()).isValid(any(), any()); + verify(blockParentValidator, never()).isValid(any(), any(Block.class)); } @Test @@ -74,7 +74,7 @@ void blockParentValidatorCheck() { when(block.getParentHash()).thenReturn(parentHash); when(blockStore.getBlockByHash(any())).thenReturn(parentBlock); when(blockValidator.isValid(any())).thenReturn(true); - when(blockParentValidator.isValid(any(), any())).thenReturn(false); + when(blockParentValidator.isValid(any(), any(Block.class))).thenReturn(false); boolean actualResult = blockRelayValidator.isValid(block); @@ -82,7 +82,7 @@ void blockParentValidatorCheck() { verify(block).isGenesis(); verify(blockValidator).isValid(any()); - verify(blockParentValidator).isValid(any(), any()); + verify(blockParentValidator).isValid(any(), any(Block.class)); } @Test @@ -94,7 +94,7 @@ void allValidatorsCheck() { when(block.getParentHash()).thenReturn(parentHash); when(blockStore.getBlockByHash(any())).thenReturn(parentBlock); when(blockValidator.isValid(any())).thenReturn(true); - when(blockParentValidator.isValid(any(), any())).thenReturn(true); + when(blockParentValidator.isValid(any(), any(Block.class))).thenReturn(true); boolean actualResult = blockRelayValidator.isValid(block); @@ -102,6 +102,6 @@ void allValidatorsCheck() { verify(block).isGenesis(); verify(blockValidator).isValid(any()); - verify(blockParentValidator).isValid(any(), any()); + verify(blockParentValidator).isValid(any(), any(Block.class)); } } diff --git a/rskj-core/src/test/java/co/rsk/core/bc/BlockValidatorBuilder.java b/rskj-core/src/test/java/co/rsk/core/bc/BlockValidatorBuilder.java index fd0b881a61c..5fb5e4d3e14 100644 --- a/rskj-core/src/test/java/co/rsk/core/bc/BlockValidatorBuilder.java +++ b/rskj-core/src/test/java/co/rsk/core/bc/BlockValidatorBuilder.java @@ -26,12 +26,15 @@ import co.rsk.trie.TrieStore; import co.rsk.util.TimeProvider; import co.rsk.validators.*; +import org.ethereum.core.Block; import org.ethereum.core.BlockTxSignatureCache; import org.ethereum.core.ReceivedTxSignatureCache; -import org.ethereum.core.SignatureCache; import org.ethereum.datasource.HashMapDB; import org.ethereum.db.BlockStore; -import org.mockito.Mockito; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** * Created by mario on 19/01/17. @@ -95,11 +98,11 @@ public BlockValidatorBuilder addTxsMinGasPriceRule() { } public BlockValidatorBuilder addBlockUnclesValidationRule(BlockStore blockStore) { - BlockHeaderValidationRule validationRule = Mockito.mock(BlockHeaderValidationRule.class); - Mockito.when(validationRule.isValid(Mockito.any())).thenReturn(true); + BlockHeaderValidationRule validationRule = mock(BlockHeaderValidationRule.class); + when(validationRule.isValid(any())).thenReturn(true); - BlockHeaderParentDependantValidationRule parentValidationRule = Mockito.mock(BlockHeaderParentDependantValidationRule.class); - Mockito.when(parentValidationRule.isValid(Mockito.any(), Mockito.any())).thenReturn(true); + BlockHeaderParentDependantValidationRule parentValidationRule = mock(BlockHeaderParentDependantValidationRule.class); + when(parentValidationRule.isValid(any(), any(Block.class))).thenReturn(true); this.addBlockUnclesValidationRule(blockStore, validationRule, parentValidationRule); return this; diff --git a/rskj-core/src/test/java/co/rsk/core/bc/BlockValidatorTest.java b/rskj-core/src/test/java/co/rsk/core/bc/BlockValidatorTest.java index 77095d84965..215b1542848 100644 --- a/rskj-core/src/test/java/co/rsk/core/bc/BlockValidatorTest.java +++ b/rskj-core/src/test/java/co/rsk/core/bc/BlockValidatorTest.java @@ -43,12 +43,16 @@ import org.ethereum.db.IndexedBlockStore; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import org.mockito.Mockito; import java.math.BigInteger; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; -import static org.mockito.Mockito.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** * Created by ajlopez on 04/08/2016. @@ -288,7 +292,7 @@ void invalidPOWUncles() { store.saveBlock(uncle1a, TEST_DIFFICULTY, false); BlockHeaderParentDependantValidationRule parentValidationRule = mock(BlockHeaderParentDependantValidationRule.class); - when(parentValidationRule.isValid(Mockito.any(), Mockito.any())).thenReturn(true); + when(parentValidationRule.isValid(any(), any(Block.class))).thenReturn(true); BlockValidatorImpl validator = new BlockValidatorBuilder() .addBlockUnclesValidationRule(store, new ProofOfWorkRule(config).setFallbackMiningEnabled(false), parentValidationRule) @@ -497,7 +501,7 @@ void processBlockWithInvalidMGPTxs() { BlockStore blockStore = mock(org.ethereum.db.BlockStore.class); Repository repository = mock(Repository.class); - when(repository.getNonce(Mockito.any())).thenReturn(BigInteger.ZERO); + when(repository.getNonce(any())).thenReturn(BigInteger.ZERO); Block parent = new BlockBuilder(null, null, null).minGasPrice(BigInteger.ZERO) .parent(new BlockGenerator().getGenesisBlock()).build(); @@ -533,7 +537,7 @@ void processBlockWithInvalidPrevMGP() { BlockStore blockStore = mock(org.ethereum.db.BlockStore.class); Repository repository = mock(Repository.class); - when(repository.getNonce(Mockito.any())).thenReturn(BigInteger.ZERO); + when(repository.getNonce(any())).thenReturn(BigInteger.ZERO); Block parent = new BlockBuilder(null, null, null).minGasPrice(BigInteger.ZERO) .parent(new BlockGenerator().getGenesisBlock()).build(); @@ -556,7 +560,7 @@ void processValidMGPBlock() { BlockStore blockStore = mock(org.ethereum.db.BlockStore.class); Repository repository = mock(Repository.class); - when(repository.getNonce(Mockito.any())).thenReturn(BigInteger.ZERO); + when(repository.getNonce(any())).thenReturn(BigInteger.ZERO); Block parent = new BlockBuilder(null, null, null).minGasPrice(BigInteger.TEN) .parent(new BlockGenerator().getGenesisBlock()).build(); diff --git a/rskj-core/src/test/java/co/rsk/net/SnapshotProcessorTest.java b/rskj-core/src/test/java/co/rsk/net/SnapshotProcessorTest.java index 0a6b126bb2d..58068feafd0 100644 --- a/rskj-core/src/test/java/co/rsk/net/SnapshotProcessorTest.java +++ b/rskj-core/src/test/java/co/rsk/net/SnapshotProcessorTest.java @@ -25,6 +25,10 @@ import co.rsk.net.sync.SyncMessageHandler; import co.rsk.test.builders.BlockChainBuilder; import co.rsk.trie.TrieStore; +import co.rsk.validators.BlockHeaderParentDependantValidationRule; +import co.rsk.validators.BlockHeaderValidationRule; +import co.rsk.validators.BlockParentDependantValidationRule; +import co.rsk.validators.BlockValidationRule; import org.ethereum.core.Block; import org.ethereum.core.Blockchain; import org.ethereum.core.TransactionPool; @@ -57,6 +61,10 @@ public class SnapshotProcessorTest { private final SnapshotPeersInformation peersInformation = mock(SnapshotPeersInformation.class); private final SnapSyncState snapSyncState = mock(SnapSyncState.class); private final SyncMessageHandler.Listener listener = mock(SyncMessageHandler.Listener.class); + private final BlockParentDependantValidationRule blockParentValidator = mock(BlockParentDependantValidationRule.class); + private final BlockValidationRule blockValidator = mock(BlockValidationRule.class); + private final BlockHeaderParentDependantValidationRule blockHeaderParentValidator = mock(BlockHeaderParentDependantValidationRule.class); + private final BlockHeaderValidationRule blockHeaderValidator = mock(BlockHeaderValidationRule.class); private SnapshotProcessor underTest; @BeforeEach @@ -82,10 +90,15 @@ void givenStartSyncingIsCalled_thenSnapStatusStartToBeRequestedFromPeer() { peersInformation, blockStore, transactionPool, + blockParentValidator, + blockValidator, + blockHeaderParentValidator, + blockHeaderValidator, TEST_CHUNK_SIZE, false); + doReturn(Optional.of(peer)).when(peersInformation).getBestSnapPeer(); //when - underTest.startSyncing(); + underTest.startSyncing(snapSyncState); //then verify(peer).sendMessage(any(SnapStatusRequestMessage.class)); } @@ -102,6 +115,10 @@ void givenSnapStatusResponseCalled_thenSnapChunkRequestsAreMade() { peersInformation, blockStore, transactionPool, + blockParentValidator, + blockValidator, + blockHeaderParentValidator, + blockHeaderValidator, TEST_CHUNK_SIZE, false); @@ -116,15 +133,19 @@ void givenSnapStatusResponseCalled_thenSnapChunkRequestsAreMade() { doReturn(blocks.get(blocks.size() - 1)).when(snapSyncState).getLastBlock(); doReturn(snapStatusResponseMessage.getTrieSize()).when(snapSyncState).getRemoteTrieSize(); doReturn(new LinkedList<>()).when(snapSyncState).getChunkTaskQueue(); + doReturn(Optional.of(peer)).when(peersInformation).getBestSnapPeer(); + doReturn(true).when(snapSyncState).isRunning(); + doReturn(true).when(blockValidator).isValid(any()); + doReturn(true).when(blockParentValidator).isValid(any(), any()); - underTest.startSyncing(); + underTest.startSyncing(snapSyncState); //when underTest.processSnapStatusResponse(snapSyncState, peer, snapStatusResponseMessage); //then - verify(peer, atLeast(3)).sendMessage(any()); // 1 for SnapStatusRequestMessage, 1 for SnapBlocksRequestMessage and 1 for SnapStateChunkRequestMessage - verify(peersInformation, times(2)).getBestSnapPeerCandidates(); + verify(peer, times(2)).sendMessage(any()); // 1 for SnapStatusRequestMessage, 1 for SnapBlocksRequestMessage and 1 for SnapStateChunkRequestMessage + verify(peersInformation, times(1)).getBestSnapPeer(); } @Test @@ -137,6 +158,10 @@ void givenSnapStatusRequestReceived_thenSnapStatusResponseIsSent() { peersInformation, blockStore, transactionPool, + blockParentValidator, + blockValidator, + blockHeaderParentValidator, + blockHeaderValidator, TEST_CHUNK_SIZE, false); //when @@ -156,6 +181,10 @@ void givenSnapBlockRequestReceived_thenSnapBlocksResponseMessageIsSent() { peersInformation, blockStore, transactionPool, + blockParentValidator, + blockValidator, + blockHeaderParentValidator, + blockHeaderValidator, TEST_CHUNK_SIZE, false); @@ -179,6 +208,10 @@ void givenSnapBlocksResponseReceived_thenSnapBlocksRequestMessageIsSent() { peersInformation, blockStore, transactionPool, + blockParentValidator, + blockValidator, + blockHeaderParentValidator, + blockHeaderValidator, 200, false); @@ -189,9 +222,12 @@ void givenSnapBlocksResponseReceived_thenSnapBlocksRequestMessageIsSent() { } SnapStatusResponseMessage snapStatusResponseMessage = new SnapStatusResponseMessage(blocks, difficulties, 100000L); + doReturn(true).when(snapSyncState).isRunning(); + doReturn(true).when(blockValidator).isValid(any()); + doReturn(true).when(blockParentValidator).isValid(any(), any()); doReturn(new LinkedList<>()).when(snapSyncState).getChunkTaskQueue(); - underTest.startSyncing(); + underTest.startSyncing(snapSyncState); underTest.processSnapStatusResponse(snapSyncState, peer, snapStatusResponseMessage); SnapBlocksResponseMessage snapBlocksResponseMessage = new SnapBlocksResponseMessage(blocks, difficulties); @@ -214,6 +250,10 @@ void givenSnapStateChunkRequest_thenSnapStateChunkResponseMessageIsSent() { peersInformation, blockStore, transactionPool, + blockParentValidator, + blockValidator, + blockHeaderParentValidator, + blockHeaderValidator, TEST_CHUNK_SIZE, false); @@ -239,6 +279,10 @@ void givenProcessSnapStatusRequestIsCalled_thenInternalOneIsCalledLater() throws peersInformation, blockStore, transactionPool, + blockParentValidator, + blockValidator, + blockHeaderParentValidator, + blockHeaderValidator, TEST_CHUNK_SIZE, false, listener) { @@ -259,7 +303,7 @@ void processSnapStatusRequestInternal(Peer sender, SnapStatusRequestMessage requ verify(listener, times(1)).onJobRun(jobArg.capture()); assertEquals(peer, jobArg.getValue().getSender()); - assertEquals(msg, jobArg.getValue().getMsg()); + assertEquals(msg.getMessageType(), jobArg.getValue().getMsgType()); } @Test @@ -275,6 +319,10 @@ void givenProcessSnapBlocksRequestIsCalled_thenInternalOneIsCalledLater() throws peersInformation, blockStore, transactionPool, + blockParentValidator, + blockValidator, + blockHeaderParentValidator, + blockHeaderValidator, TEST_CHUNK_SIZE, false, listener) { @@ -295,7 +343,7 @@ void processSnapBlocksRequestInternal(Peer sender, SnapBlocksRequestMessage requ verify(listener, times(1)).onJobRun(jobArg.capture()); assertEquals(peer, jobArg.getValue().getSender()); - assertEquals(msg, jobArg.getValue().getMsg()); + assertEquals(msg.getMessageType(), jobArg.getValue().getMsgType()); } @Test @@ -311,6 +359,10 @@ void givenProcessStateChunkRequestIsCalled_thenInternalOneIsCalledLater() throws peersInformation, blockStore, transactionPool, + blockParentValidator, + blockValidator, + blockHeaderParentValidator, + blockHeaderValidator, TEST_CHUNK_SIZE, false, listener) { @@ -331,7 +383,7 @@ void processStateChunkRequestInternal(Peer sender, SnapStateChunkRequestMessage verify(listener, times(1)).onJobRun(jobArg.capture()); assertEquals(peer, jobArg.getValue().getSender()); - assertEquals(msg, jobArg.getValue().getMsg()); + assertEquals(msg.getMessageType(), jobArg.getValue().getMsgType()); } @Test @@ -342,6 +394,10 @@ void givenErrorRLPData_thenOnStateChunkErrorIsCalled() { peersInformation, blockStore, transactionPool, + blockParentValidator, + blockValidator, + blockHeaderParentValidator, + blockHeaderValidator, TEST_CHUNK_SIZE, false); @@ -353,14 +409,15 @@ void givenErrorRLPData_thenOnStateChunkErrorIsCalled() { when(snapSyncState.getNextExpectedFrom()).thenReturn(1L); when(responseMessage.getFrom()).thenReturn(1L); when(responseMessage.getChunkOfTrieKeyValue()).thenReturn(RLP.encodedEmptyList()); + doReturn(true).when(snapSyncState).isRunning(); + doReturn(true).when(blockValidator).isValid(any()); + doReturn(true).when(blockParentValidator).isValid(any(), any()); underTest = spy(underTest); underTest.processStateChunkResponse(snapSyncState, peer, responseMessage); - verify(snapSyncState, times(1)).onNewChunk(); verify(underTest, times(1)).onStateChunkResponseError(peer, responseMessage); verify(peer, times(1)).sendMessage(any(SnapStateChunkRequestMessage.class)); - } private void initializeBlockchainWithAmountOfBlocks(int numberOfBlocks) { diff --git a/rskj-core/src/test/java/co/rsk/net/sync/BlockConnectorHelperTest.java b/rskj-core/src/test/java/co/rsk/net/sync/BlockConnectorHelperTest.java index 4f80a509a12..ebf4ce91835 100644 --- a/rskj-core/src/test/java/co/rsk/net/sync/BlockConnectorHelperTest.java +++ b/rskj-core/src/test/java/co/rsk/net/sync/BlockConnectorHelperTest.java @@ -1,6 +1,7 @@ package co.rsk.net.sync; import co.rsk.core.BlockDifficulty; +import co.rsk.crypto.Keccak256; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.ethereum.core.Block; @@ -13,14 +14,11 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import java.math.BigInteger; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.*; @@ -50,128 +48,26 @@ void testStartConnectingWhenBlockListIsEmpty() { } @Test - void testStartConnectingWhenBlockStoreIsEmpty() { - when(blockStore.isEmpty()).thenReturn(true); - - Block block1 = mock(Block.class); - Block block2 = mock(Block.class); - Block block3 = mock(Block.class); - when(block1.getNumber()).thenReturn(1L); - when(block2.getNumber()).thenReturn(2L); - when(block3.getNumber()).thenReturn(3L); - when(block1.isParentOf(block2)).thenReturn(true); - when(block2.isParentOf(block3)).thenReturn(true); - - - BlockDifficulty diff1 = new BlockDifficulty(BigInteger.valueOf(1)); - BlockDifficulty diff2 = new BlockDifficulty(BigInteger.valueOf(2)); - BlockDifficulty diff3 = new BlockDifficulty(BigInteger.valueOf(3)); - blockAndDifficultiesList = buildBlockDifficulties(Arrays.asList(block1, block2,block3), - Arrays.asList(diff1, diff2,diff3)); - - blockConnectorHelper = new BlockConnectorHelper(blockStore); - - blockConnectorHelper.startConnecting(blockAndDifficultiesList); - - verify(blockStore, times(3)).saveBlock(blockCaptor.capture(), difficultyCaptor.capture(), anyBoolean()); - verify(blockStore, times(0)).getBestBlock(); - List savedBlocks = blockCaptor.getAllValues(); - List savedDifficulties = difficultyCaptor.getAllValues(); - assertEquals(block3, savedBlocks.get(0)); - assertEquals(diff3, savedDifficulties.get(0)); - assertEquals(block2, savedBlocks.get(1)); - assertEquals(diff2, savedDifficulties.get(1)); - assertEquals(block1, savedBlocks.get(2)); - assertEquals(diff1, savedDifficulties.get(2)); - - } - - @Test - void testStartConnectingWhenBlockStoreIsEmptyAndNotOrderedList() { - when(blockStore.isEmpty()).thenReturn(true); - - Block block1 = mock(Block.class); - Block block2 = mock(Block.class); - when(block1.getNumber()).thenReturn(1L); - when(block2.getNumber()).thenReturn(2L); - when(block1.isParentOf(block2)).thenReturn(true); - - BlockDifficulty diff1 = new BlockDifficulty(BigInteger.valueOf(1)); - BlockDifficulty diff2 = new BlockDifficulty(BigInteger.valueOf(2)); - blockAndDifficultiesList = buildBlockDifficulties(Arrays.asList(block2, block1), - Arrays.asList(diff2, diff1)); - - blockConnectorHelper = new BlockConnectorHelper(blockStore); - - blockConnectorHelper.startConnecting(blockAndDifficultiesList); - - verify(blockStore, times(2)).saveBlock(blockCaptor.capture(), difficultyCaptor.capture(), anyBoolean()); - verify(blockStore, times(0)).getBestBlock(); - List savedBlocks = blockCaptor.getAllValues(); - List savedDifficulties = difficultyCaptor.getAllValues(); - assertEquals(block2, savedBlocks.get(0)); - assertEquals(diff2, savedDifficulties.get(0)); - assertEquals(block1, savedBlocks.get(1)); - assertEquals(diff1, savedDifficulties.get(1)); - } - - @Test - void testStartConnectingWhenBlockStoreIsNotEmpty() { - Block block1 = mock(Block.class); - Block block2 = mock(Block.class); - Block block3 = mock(Block.class); - - when(block1.getNumber()).thenReturn(1L); - when(block2.getNumber()).thenReturn(2L); - when(block3.getNumber()).thenReturn(3L); - when(block1.isParentOf(block2)).thenReturn(true); - when(block2.isParentOf(block3)).thenReturn(true); - - when(blockStore.isEmpty()).thenReturn(false); - when(blockStore.getBestBlock()).thenReturn(block3); + void testStartConnectingWhenBlockListIsNotEmpty() { + Block block1 = mockBlock( new byte[] { 1 }); + Block block2 = mockBlock(new byte[] { 2 }); blockAndDifficultiesList = buildBlockDifficulties(Arrays.asList(block1, block2), Arrays.asList(mock(BlockDifficulty.class), mock(BlockDifficulty.class))); blockConnectorHelper = new BlockConnectorHelper(blockStore); blockConnectorHelper.startConnecting(blockAndDifficultiesList); - verify(blockStore, times(1)).getBestBlock(); verify(blockStore, times(2)).saveBlock(any(), any(), anyBoolean()); } - @Test - void whenBlockIsNotParentOfExistingBestBlock() { - Block block2 = mock(Block.class); - Block block3 = mock(Block.class); - when(block2.getNumber()).thenReturn(2L); - when(block3.getNumber()).thenReturn(3L); - when(block2.isParentOf(block3)).thenReturn(false); - blockAndDifficultiesList = buildBlockDifficulties(Collections.singletonList(block2), - Collections.singletonList(mock(BlockDifficulty.class))); - - blockConnectorHelper = new BlockConnectorHelper(blockStore); - - when(blockStore.isEmpty()).thenReturn(false); - when(blockStore.getBestBlock()).thenReturn(block3); - - assertThrows(BlockConnectorException.class, () -> blockConnectorHelper.startConnecting(blockAndDifficultiesList)); - } - - @Test - void testStartConnectingWhenBlockIsNotParentOfChild() { - Block block1 = mock(Block.class); - Block block2 = mock(Block.class); - when(block1.getNumber()).thenReturn(1L); - when(block2.getNumber()).thenReturn(2L); - when(block1.isParentOf(block2)).thenReturn(false); - when(blockStore.isEmpty()).thenReturn(true); - blockAndDifficultiesList = buildBlockDifficulties(Arrays.asList(block1, block2), - Arrays.asList(mock(BlockDifficulty.class), mock(BlockDifficulty.class))); - blockConnectorHelper = new BlockConnectorHelper(blockStore); - - assertThrows(BlockConnectorException.class, () -> blockConnectorHelper.startConnecting(blockAndDifficultiesList)); + private Block mockBlock(byte[] hashBytes) { + Block block = mock(Block.class); + Keccak256 hash = mock(Keccak256.class); + when(hash.getBytes()).thenReturn(hashBytes); + when(block.getHash()).thenReturn(hash); + return block; } - List> buildBlockDifficulties(List blocks, List difficulties) { + private List> buildBlockDifficulties(List blocks, List difficulties) { int i = 0; List> list = new ArrayList<>(); for (Block block : blocks) { @@ -180,5 +76,4 @@ List> buildBlockDifficulties(List blocks, Lis } return list; } - -} \ No newline at end of file +} diff --git a/rskj-core/src/test/java/co/rsk/net/sync/CheckingBestHeaderSyncStateTest.java b/rskj-core/src/test/java/co/rsk/net/sync/CheckingBestHeaderSyncStateTest.java index bc440807e11..f4b47e0fb78 100644 --- a/rskj-core/src/test/java/co/rsk/net/sync/CheckingBestHeaderSyncStateTest.java +++ b/rskj-core/src/test/java/co/rsk/net/sync/CheckingBestHeaderSyncStateTest.java @@ -68,7 +68,7 @@ void newBlockHeadersWhenValidHeaderContinue() { when(header.getHash().getBytes()).thenReturn(HASH_1); when(blockHeaderValidationRule.isValid(header)).thenReturn(true); - state.newBlockHeaders(Collections.singletonList(header)); + state.newBlockHeaders(null, Collections.singletonList(header)); verify(syncEventsHandler, times(1)).startFindingConnectionPoint(peer); } @@ -79,7 +79,7 @@ void newBlockHeadersWhenInValidHeaderOnErrorSyncing() { when(header.getHash().getBytes()).thenReturn(HASH_1); when(blockHeaderValidationRule.isValid(header)).thenReturn(false); - state.newBlockHeaders(Collections.singletonList(header)); + state.newBlockHeaders(null, Collections.singletonList(header)); verify(syncEventsHandler, times(1)) .onErrorSyncing(peer, EventType.INVALID_HEADER, @@ -92,7 +92,7 @@ void newBlockHeadersWhenDifferentHeaderOnErrorSyncing() { when(header.getHash().getBytes()).thenReturn(HashUtil.sha256(new byte[]{5})); when(blockHeaderValidationRule.isValid(header)).thenReturn(true); - state.newBlockHeaders(Collections.singletonList(header)); + state.newBlockHeaders(null, Collections.singletonList(header)); verify(syncEventsHandler, times(1)) .onErrorSyncing(peer, EventType.INVALID_HEADER, diff --git a/rskj-core/src/test/java/co/rsk/net/sync/DownloadingBackwardsHeadersSyncStateTest.java b/rskj-core/src/test/java/co/rsk/net/sync/DownloadingBackwardsHeadersSyncStateTest.java index 4d1885521a6..d78a5d989f6 100644 --- a/rskj-core/src/test/java/co/rsk/net/sync/DownloadingBackwardsHeadersSyncStateTest.java +++ b/rskj-core/src/test/java/co/rsk/net/sync/DownloadingBackwardsHeadersSyncStateTest.java @@ -96,7 +96,7 @@ void newHeaders() { List receivedHeaders = new LinkedList<>(); - target.newBlockHeaders(receivedHeaders); + target.newBlockHeaders(selectedPeer, receivedHeaders); verify(syncEventsHandler).backwardDownloadBodies(child, receivedHeaders, selectedPeer); diff --git a/rskj-core/src/test/java/co/rsk/net/sync/DownloadingHeadersSyncStateTest.java b/rskj-core/src/test/java/co/rsk/net/sync/DownloadingHeadersSyncStateTest.java index 6f27aa9234c..9c76e9d9afb 100644 --- a/rskj-core/src/test/java/co/rsk/net/sync/DownloadingHeadersSyncStateTest.java +++ b/rskj-core/src/test/java/co/rsk/net/sync/DownloadingHeadersSyncStateTest.java @@ -126,7 +126,7 @@ void newBlockHeadersWhenNoCurrentChunkThenSyncIssue() { when(chunksDownloadHelper.getCurrentChunk()).thenReturn(Optional.empty()); - syncState.newBlockHeaders(new ArrayList<>()); + syncState.newBlockHeaders(selectedPeer, new ArrayList<>()); verify(syncEventsHandler, times(1)).onSyncIssue(selectedPeer, "Current chunk not present on {}", DownloadingHeadersSyncState.class); @@ -157,7 +157,7 @@ void newBlockHeadersWhenUnexpectedChunkSizeThenInvalidMessage() { List chunk = new ArrayList<>(); chunk.add(mock(BlockHeader.class)); - syncState.newBlockHeaders(chunk); + syncState.newBlockHeaders(selectedPeer, chunk); verify(syncEventsHandler, times(1)).onErrorSyncing(selectedPeer, EventType.INVALID_MESSAGE, "Unexpected chunk size received on {}: hash: {}", DownloadingHeadersSyncState.class, HashUtil.toPrintableHash(currentChunk.getHash())); @@ -191,7 +191,7 @@ void newBlockHeadersWhenUnexpectedHeaderThenInvalidMessage() { byte[] headerHash = TestUtils.generateBytes(DownloadingHeadersSyncStateTest.class,"headerHash",32); when(header.getHash().getBytes()).thenReturn(headerHash);; // different from chunkHash chunk.add(header); - syncState.newBlockHeaders(chunk); + syncState.newBlockHeaders(selectedPeer, chunk); verify(syncEventsHandler, times(1)).onErrorSyncing(selectedPeer, EventType.INVALID_MESSAGE, "Unexpected chunk header hash received on {}: hash: {}", DownloadingHeadersSyncState.class, HashUtil.toPrintableHash(currentChunk.getHash())); diff --git a/rskj-core/src/test/java/co/rsk/net/sync/SimpleSyncEventsHandler.java b/rskj-core/src/test/java/co/rsk/net/sync/SimpleSyncEventsHandler.java index 246ee4f56fc..d6694ecebe4 100644 --- a/rskj-core/src/test/java/co/rsk/net/sync/SimpleSyncEventsHandler.java +++ b/rskj-core/src/test/java/co/rsk/net/sync/SimpleSyncEventsHandler.java @@ -105,5 +105,5 @@ public boolean stopSyncingWasCalled() { } @Override - public void startSnapSync() { } + public void startSnapSync(Peer peer) { } } diff --git a/rskj-core/src/test/java/co/rsk/net/sync/SnapSyncStateTest.java b/rskj-core/src/test/java/co/rsk/net/sync/SnapSyncStateTest.java index 8ce31717683..f12e5080483 100644 --- a/rskj-core/src/test/java/co/rsk/net/sync/SnapSyncStateTest.java +++ b/rskj-core/src/test/java/co/rsk/net/sync/SnapSyncStateTest.java @@ -19,25 +19,24 @@ package co.rsk.net.sync; import co.rsk.core.BlockDifficulty; -import co.rsk.net.NodeID; import co.rsk.net.Peer; import co.rsk.net.SnapshotProcessor; +import co.rsk.net.messages.MessageType; import co.rsk.net.messages.SnapBlocksResponseMessage; import co.rsk.net.messages.SnapStateChunkResponseMessage; import co.rsk.net.messages.SnapStatusResponseMessage; +import co.rsk.scoring.EventType; import org.apache.commons.lang3.tuple.Pair; import org.ethereum.core.Block; +import org.ethereum.core.BlockHeader; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import java.math.BigInteger; -import java.net.InetAddress; -import java.net.UnknownHostException; import java.time.Duration; import java.util.List; -import java.util.Optional; import java.util.PriorityQueue; import java.util.Queue; import java.util.concurrent.CountDownLatch; @@ -45,7 +44,6 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.*; @@ -76,7 +74,7 @@ void givenOnEnterWasCalledAndNotRunningYet_thenSyncingStartsWithTestObjectAsPara //given-when underTest.onEnter(); //then - verify(snapshotProcessor, times(1)).startSyncing(); + verify(snapshotProcessor, times(1)).startSyncing(underTest); } @Test @@ -95,28 +93,7 @@ void givenOnEnterWasCalledTwice_thenSyncingStartsOnlyOnce() { underTest.onEnter(); underTest.onEnter(); //then - verify(snapshotProcessor, times(1)).startSyncing(); - } - - @Test - void givenOnMessageTimeOutCalled_thenSyncingStops() { - //given-when - underTest.setRunning(); - underTest.onMessageTimeOut(); - //then - verify(syncEventsHandler, times(1)).stopSyncing(); - } - - @Test - void givenNewChunk_thenTimerIsReset() { - //given - underTest.timeElapsed = Duration.ofMinutes(1); - assertThat(underTest.timeElapsed, greaterThan(Duration.ZERO)); - - // when - underTest.onNewChunk(); - //then - assertThat(underTest.timeElapsed, equalTo(Duration.ZERO)); + verify(snapshotProcessor, times(1)).startSyncing(underTest); } @Test @@ -132,24 +109,6 @@ void givenTickIsCalledBeforeTimeout_thenTimerIsUpdated_andNoTimeoutHappens() { verify(syncEventsHandler, never()).onErrorSyncing(any(), any(), any(), any()); } - @Test - void givenTickIsCalledAfterTimeout_thenTimerIsUpdated_andTimeoutHappens() throws UnknownHostException { - //given - Duration elapsedTime = Duration.ofMinutes(1); - underTest.timeElapsed = Duration.ZERO; - Peer mockedPeer = mock(Peer.class); - NodeID nodeID = mock(NodeID.class); - when(mockedPeer.getPeerNodeID()).thenReturn(nodeID); - when(mockedPeer.getAddress()).thenReturn(InetAddress.getByName("127.0.0.1")); - when(peersInformation.getBestSnapPeer()).thenReturn(Optional.of(mockedPeer)); - underTest.setRunning(); - // when - underTest.tick(elapsedTime); - //then - assertThat(underTest.timeElapsed, equalTo(elapsedTime)); - verify(syncEventsHandler, times(1)).stopSyncing(); - } - @Test void givenFinishIsCalled_thenSyncEventHandlerStopsSync() { //given-when @@ -178,7 +137,7 @@ void givenOnSnapStatusIsCalled_thenJobIsAddedAndRun() throws InterruptedExceptio verify(listener, times(1)).onJobRun(jobArg.capture()); assertEquals(peer, jobArg.getValue().getSender()); - assertEquals(msg, jobArg.getValue().getMsg()); + assertEquals(msg.getMessageType(), jobArg.getValue().getMsgType()); } @Test @@ -200,7 +159,30 @@ void givenOnSnapBlocksIsCalled_thenJobIsAddedAndRun() throws InterruptedExceptio verify(listener, times(1)).onJobRun(jobArg.capture()); assertEquals(peer, jobArg.getValue().getSender()); - assertEquals(msg, jobArg.getValue().getMsg()); + assertEquals(msg.getMessageType(), jobArg.getValue().getMsgType()); + } + + @Test + void givenNewBlockHeadersIsCalled_thenJobIsAddedAndRun() throws InterruptedException { + //given + Peer peer = mock(Peer.class); + //noinspection unchecked + List msg = mock(List.class); + CountDownLatch latch = new CountDownLatch(1); + doCountDownOnQueueEmpty(listener, latch); + underTest.onEnter(); + + //when + underTest.newBlockHeaders(peer, msg); + + //then + assertTrue(latch.await(THREAD_JOIN_TIMEOUT, TimeUnit.MILLISECONDS)); + + ArgumentCaptor jobArg = ArgumentCaptor.forClass(SyncMessageHandler.Job.class); + verify(listener, times(1)).onJobRun(jobArg.capture()); + + assertEquals(peer, jobArg.getValue().getSender()); + assertEquals(MessageType.BLOCK_HEADERS_RESPONSE_MESSAGE, jobArg.getValue().getMsgType()); } @Test @@ -222,14 +204,34 @@ void givenOnSnapStateChunkIsCalled_thenJobIsAddedAndRun() throws InterruptedExce verify(listener, times(1)).onJobRun(jobArg.capture()); assertEquals(peer, jobArg.getValue().getSender()); - assertEquals(msg, jobArg.getValue().getMsg()); + assertEquals(msg.getMessageType(), jobArg.getValue().getMsgType()); + } + + @Test + void givenOnMessageTimeOut_thenShouldFail() throws InterruptedException { + //given + Peer peer = mock(Peer.class); + underTest.setLastBlock(mock(Block.class), mock(BlockDifficulty.class), peer); + underTest.setRunning(); + + //when + underTest.onMessageTimeOut(); + + //then + verify(syncEventsHandler, times(1)).onErrorSyncing(eq(peer), eq(EventType.TIMEOUT_MESSAGE), any()); } @Test void testSetAndGetLastBlock() { Block mockBlock = mock(Block.class); - underTest.setLastBlock(mockBlock); + BlockDifficulty mockBlockDifficulty = mock(BlockDifficulty.class); + Peer mockPeer = mock(Peer.class); + + underTest.setLastBlock(mockBlock, mockBlockDifficulty, mockPeer); + assertEquals(mockBlock, underTest.getLastBlock()); + assertEquals(mockBlockDifficulty, underTest.getLastBlockDifficulty()); + assertEquals(mockPeer, underTest.getLastBlockSender()); } @Test @@ -272,13 +274,6 @@ void testGetSnapStateChunkQueue() { assertNotNull(queue); } - @Test - void testSetAndGetLastBlockDifficulty() { - BlockDifficulty mockBlockDifficulty = mock(BlockDifficulty.class); - underTest.setLastBlockDifficulty(mockBlockDifficulty); - assertEquals(mockBlockDifficulty, underTest.getLastBlockDifficulty()); - } - @Test void testSetAndGetRemoteRootHash() { byte[] mockRootHash = new byte[]{1, 2, 3}; diff --git a/rskj-core/src/test/java/co/rsk/validators/BlockTimeStampValidationRuleTest.java b/rskj-core/src/test/java/co/rsk/validators/BlockTimeStampValidationRuleTest.java index 700f6d1b2dc..6c4e792d36b 100644 --- a/rskj-core/src/test/java/co/rsk/validators/BlockTimeStampValidationRuleTest.java +++ b/rskj-core/src/test/java/co/rsk/validators/BlockTimeStampValidationRuleTest.java @@ -129,48 +129,96 @@ void blockInTheFuture() { } @Test - void blockTimeLowerThanParentTime() { + void blockTimeLowerThanParentBlockTime() { int validPeriod = 540; BlockTimeStampValidationRule validationRule = new BlockTimeStampValidationRule(validPeriod, preRskip179Config, Constants.regtest(), timeProvider); when(timeProvider.currentTimeMillis()).thenReturn(10_000_000L); BlockHeader header = mock(BlockHeader.class); Block parent = mock(Block.class); + BlockHeader parentHeader = mock(BlockHeader.class); when(header.getTimestamp()).thenReturn(10_000L); + when(parent.getHeader()).thenReturn(parentHeader); + when(parentHeader.getTimestamp()).thenReturn(10_000L + 1000); + assertFalse(validationRule.isValid(header, parent)); + } + + @Test + void blockTimeLowerThanParentBlockHeaderTime() { + int validPeriod = 540; + BlockTimeStampValidationRule validationRule = new BlockTimeStampValidationRule(validPeriod, preRskip179Config, Constants.regtest(), timeProvider); + + when(timeProvider.currentTimeMillis()).thenReturn(10_000_000L); + BlockHeader header = mock(BlockHeader.class); + BlockHeader parent = mock(BlockHeader.class); + + when(header.getTimestamp()).thenReturn(10_000L); when(parent.getTimestamp()).thenReturn(10_000L + 1000); assertFalse(validationRule.isValid(header, parent)); } @Test - void blockTimeGreaterThanParentTime() { + void blockTimeGreaterThanParentBlockTime() { int validPeriod = 540; BlockTimeStampValidationRule validationRule = new BlockTimeStampValidationRule(validPeriod, preRskip179Config, Constants.regtest(), timeProvider); when(timeProvider.currentTimeMillis()).thenReturn(10_000_000L); BlockHeader header = mock(BlockHeader.class); Block parent = mock(Block.class); + BlockHeader parentHeader = mock(BlockHeader.class); when(header.getTimestamp()).thenReturn(10_000L); + when(parent.getHeader()).thenReturn(parentHeader); + when(parentHeader.getTimestamp()).thenReturn(10_000L - 1000); + + assertTrue(validationRule.isValid(header, parent)); + } + @Test + void blockTimeGreaterThanParentBlockHeaderTime() { + int validPeriod = 540; + BlockTimeStampValidationRule validationRule = new BlockTimeStampValidationRule(validPeriod, preRskip179Config, Constants.regtest(), timeProvider); + + when(timeProvider.currentTimeMillis()).thenReturn(10_000_000L); + BlockHeader header = mock(BlockHeader.class); + BlockHeader parent = mock(BlockHeader.class); + + when(header.getTimestamp()).thenReturn(10_000L); when(parent.getTimestamp()).thenReturn(10_000L - 1000); assertTrue(validationRule.isValid(header, parent)); } @Test - void blockTimeEqualsParentTime() { + void blockTimeEqualsParentBlockTime() { int validPeriod = 540; BlockTimeStampValidationRule validationRule = new BlockTimeStampValidationRule(validPeriod, preRskip179Config, Constants.regtest(), timeProvider); when(timeProvider.currentTimeMillis()).thenReturn(10_000_000L); BlockHeader header = mock(BlockHeader.class); Block parent = mock(Block.class); + BlockHeader parentHeader = mock(BlockHeader.class); when(header.getTimestamp()).thenReturn(10_000L); + when(parent.getHeader()).thenReturn(parentHeader); + when(parentHeader.getTimestamp()).thenReturn(10_000L); + + assertFalse(validationRule.isValid(header, parent)); + } + @Test + void blockTimeEqualsParentBlockHeaderTime() { + int validPeriod = 540; + BlockTimeStampValidationRule validationRule = new BlockTimeStampValidationRule(validPeriod, preRskip179Config, Constants.regtest(), timeProvider); + + when(timeProvider.currentTimeMillis()).thenReturn(10_000_000L); + BlockHeader header = mock(BlockHeader.class); + BlockHeader parent = mock(BlockHeader.class); + + when(header.getTimestamp()).thenReturn(10_000L); when(parent.getTimestamp()).thenReturn(10_000L); assertFalse(validationRule.isValid(header, parent)); diff --git a/rskj-core/src/test/java/co/rsk/validators/PrevMinGasPriceValidatorTest.java b/rskj-core/src/test/java/co/rsk/validators/PrevMinGasPriceValidatorTest.java index bb38e5d54a1..ef846888151 100644 --- a/rskj-core/src/test/java/co/rsk/validators/PrevMinGasPriceValidatorTest.java +++ b/rskj-core/src/test/java/co/rsk/validators/PrevMinGasPriceValidatorTest.java @@ -47,7 +47,7 @@ void noParentBlock() { PrevMinGasPriceRule pmgpv = new PrevMinGasPriceRule(); - Assertions.assertFalse(pmgpv.isValid(header, null)); + Assertions.assertFalse(pmgpv.isValid(header, (Block) null)); } @Test @@ -61,7 +61,7 @@ void genesisBlock() { PrevMinGasPriceRule pmgpv = new PrevMinGasPriceRule(); - Assertions.assertTrue(pmgpv.isValid(header, null)); + Assertions.assertTrue(pmgpv.isValid(header, (Block) null)); } @Test diff --git a/rskj-core/src/test/java/org/ethereum/core/ImportLightTest.java b/rskj-core/src/test/java/org/ethereum/core/ImportLightTest.java index 3d1489a4680..0ede5ed08a7 100644 --- a/rskj-core/src/test/java/org/ethereum/core/ImportLightTest.java +++ b/rskj-core/src/test/java/org/ethereum/core/ImportLightTest.java @@ -43,7 +43,6 @@ import org.mockito.Mockito; import java.util.Map; -import java.util.function.Supplier; /** * Created by Anton Nashatyrev on 29.12.2015. diff --git a/rskj-core/src/test/java/org/ethereum/jsontestsuite/runners/StateTestRunner.java b/rskj-core/src/test/java/org/ethereum/jsontestsuite/runners/StateTestRunner.java index ea1d8339986..3bf9dd6f2b9 100644 --- a/rskj-core/src/test/java/org/ethereum/jsontestsuite/runners/StateTestRunner.java +++ b/rskj-core/src/test/java/org/ethereum/jsontestsuite/runners/StateTestRunner.java @@ -18,7 +18,6 @@ */ package org.ethereum.jsontestsuite.runners; -import co.rsk.peg.constants.BridgeRegTestConstants; import co.rsk.config.TestSystemProperties; import co.rsk.core.Coin; import co.rsk.core.RskAddress; @@ -31,6 +30,7 @@ import co.rsk.db.StateRootsStoreImpl; import co.rsk.peg.BridgeSupportFactory; import co.rsk.peg.RepositoryBtcBlockStoreWithCache; +import co.rsk.peg.constants.BridgeRegTestConstants; import co.rsk.trie.TrieStoreImpl; import org.ethereum.core.*; import org.ethereum.datasource.HashMapDB;