Skip to content

Commit

Permalink
Fixed some issues with blockchain download
Browse files Browse the repository at this point in the history
  • Loading branch information
MatthewLM committed Jan 27, 2015
1 parent 4d4ec60 commit 5d9d018
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,7 @@ class OrphanBlock {
final boolean filtered = filteredTxHashes != null && filteredTxn != null;
Preconditions.checkArgument((block.transactions == null && filtered)
|| (block.transactions != null && !filtered));
if (!shouldVerifyTransactions())
this.block = block.cloneAsHeader();
else
this.block = block;
this.block = block;
this.filteredTxHashes = filteredTxHashes;
this.filteredTxn = filteredTxn;
}
Expand Down Expand Up @@ -424,13 +421,13 @@ private boolean add(Block block, boolean tryConnecting,
// Determine if centrally trusted hash
// Wait a while for the server if the block is less than three hours old
try {
if (validHashStore != null && !validHashStore.isValidHash(block.getHash(), this, block.getTimeSeconds() > Utils.currentTimeSeconds() - 60*60*3)) {
throw new VerificationException("Invalid hash received");
}
} catch (IOException e) {
log.error("IO Error when determining valid hashes: ", e);
return false;
}
if (validHashStore != null && !validHashStore.isValidHash(block.getHash(), this, block.getTimeSeconds() > Utils.currentTimeSeconds() - 60*60*3)) {
throw new VerificationException("Invalid hash received");
}
} catch (IOException e) {
log.error("IO Error when determining valid hashes: ", e);
return false;
}

checkDifficultyTransitions(storedPrev, block);
connectBlock(block, storedPrev, shouldVerifyTransactions(), filteredTxHashList, filteredTxn);
Expand Down
97 changes: 83 additions & 14 deletions core/src/main/java/com/matthewmitchell/peercoinj/core/Peer.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,32 @@

package com.matthewmitchell.peercoinj.core;

import com.matthewmitchell.peercoinj.store.BlockStore;
import com.matthewmitchell.peercoinj.store.BlockStoreException;
import com.matthewmitchell.peercoinj.utils.ListenerRegistration;
import com.matthewmitchell.peercoinj.utils.Threading;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import static com.google.common.base.Preconditions.checkNotNull;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import net.jcip.annotations.GuardedBy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import com.matthewmitchell.peercoinj.net.AbstractTimeoutHandler;
import com.matthewmitchell.peercoinj.store.BlockStore;
import com.matthewmitchell.peercoinj.store.BlockStoreException;
import com.matthewmitchell.peercoinj.utils.ListenerRegistration;
import com.matthewmitchell.peercoinj.utils.Threading;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import javax.annotation.Nullable;
import net.jcip.annotations.GuardedBy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* <p>A Peer handles the high level communication with a Peercoin node, extending a {@link PeerSocketHandler} which
Expand Down Expand Up @@ -341,6 +342,9 @@ protected void processMessage(Message m) throws Exception {
endFilteredBlock(currentFilteredBlock);
currentFilteredBlock = null;
}

// If we have a block and we expected one then we reset the timeout for the next block.
blockResponseTimeout.processMessage(m);

if (m instanceof Ping) {
if (((Ping) m).hasNonce())
Expand Down Expand Up @@ -853,8 +857,14 @@ private void processBlock(Block m) {
try {
// Otherwise it's a block sent to us because the peer thought we needed it, so add it to the block chain.
if (blockChain.add(m)) {

// The block was successfully linked into the chain. Notify the user of our progress.
invokeOnBlocksDownloaded(m);

// Expect more responses if there are more blocks remaining
if (vPeerVersionMessage.bestHeight > checkNotNull(blockChain).getBestChainHeight())
blockResponseTimeout.setTimeoutEnabled(true);

} else {
// This block is an orphan - we don't know how to get from it back to the genesis block yet. That
// must mean that there are blocks we are missing, so do another getblocks with a new block locator
Expand Down Expand Up @@ -1021,6 +1031,7 @@ private void invokeOnBlocksDownloaded(final Block m) {
// since the time we first connected to the peer. However, it's weird and unexpected to receive a callback
// with negative "blocks left" in this case, so we clamp to zero so the API user doesn't have to think about it.
final int blocksLeft = Math.max(0, (int) vPeerVersionMessage.bestHeight - checkNotNull(blockChain).getBestChainHeight());

for (final ListenerRegistration<PeerEventListener> registration : eventListeners) {
registration.executor.execute(new Runnable() {
@Override
Expand Down Expand Up @@ -1246,12 +1257,68 @@ public void addWallet(Wallet wallet) {
public void removeWallet(Wallet wallet) {
wallets.remove(wallet);
}

// Timeout for receiving headers or block response

static int blockTimeout = 10000;

class BlockTimeoutHandler extends AbstractTimeoutHandler {

boolean expectFull;
boolean hadSuccess = false;

public void setSocketTimeout(boolean expectFull) {

setSocketTimeout(blockTimeout);
setTimeoutEnabled(true);
this.expectFull = expectFull;

}

public void processMessage(Message m) {

if ((expectFull && (m instanceof Block || m instanceof FilteredBlock))
|| (!expectFull && (m instanceof HeadersMessage))) {

setTimeoutEnabled(false); // No timeout until we are ready for the next message

if (!hadSuccess) {

// As this is a success we should decrease the timeout so we progressively expect faster block times.
blockTimeout *= 0.9;
if (blockTimeout < 1000)
blockTimeout = 1000;

hadSuccess = true;

}

}

}

@Override
protected void timeoutOccurred() {

// Need to download from a new peer! Close this one
close();

// Increase timeout gradually in case this is our fault
blockTimeout *= 1.2;
if (blockTimeout > 60000)
blockTimeout = 60000;

}

};

BlockTimeoutHandler blockResponseTimeout = new BlockTimeoutHandler();

// Keep track of the last request we made to the peer in blockChainDownloadLocked so we can avoid redundant and harmful
// getblocks requests.
@GuardedBy("lock")
private Sha256Hash lastGetBlocksBegin, lastGetBlocksEnd;

@GuardedBy("lock")
private void blockChainDownloadLocked(Sha256Hash toHash) {
checkState(lock.isHeldByCurrentThread());
Expand Down Expand Up @@ -1329,10 +1396,12 @@ private void blockChainDownloadLocked(Sha256Hash toHash) {

if (downloadBlockBodies) {
GetBlocksMessage message = new GetBlocksMessage(params, blockLocator, toHash);
blockResponseTimeout.setSocketTimeout(true);
sendMessage(message);
} else {
// Downloading headers for a while instead of full blocks.
GetHeadersMessage message = new GetHeadersMessage(params, blockLocator, toHash);
blockResponseTimeout.setSocketTimeout(false);
sendMessage(message);
}
}
Expand All @@ -1346,7 +1415,7 @@ public void startBlockChainDownload() {
// TODO: peer might still have blocks that we don't have, and even have a heavier
// chain even if the chain block count is lower.
final int blocksLeft = getPeerBlockHeightDifference();
if (blocksLeft >= 0) {
if (blocksLeft > 0) {
for (final ListenerRegistration<PeerEventListener> registration : eventListeners) {
registration.executor.execute(new Runnable() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public PeerSocketHandler(NetworkParameters params, PeerAddress peerAddress) {
* TODO: Maybe use something other than the unchecked NotYetConnectedException here
*/
public void sendMessage(Message message) throws NotYetConnectedException {
log.info("SENT -> {}", message.toString());
lock.lock();
try {
if (writeTarget == null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ public boolean isValidHash(Sha256Hash hash, AbstractBlockChain blockChain, boole
HttpURLConnection connection = (HttpURLConnection) VALID_HASHES_URL.openConnection();
connection.setUseCaches(false);
connection.setInstanceFollowRedirects(false);
connection.setConnectTimeout(8000);
connection.setReadTimeout(8000);
connection.setConnectTimeout(30000);
connection.setReadTimeout(30000);
connection.setRequestMethod("POST");
connection.setRequestProperty("Content-Type", "application/octet-stream");
connection.setRequestProperty( "Accept-Encoding", "" );
Expand All @@ -202,7 +202,7 @@ public boolean isValidHash(Sha256Hash hash, AbstractBlockChain blockChain, boole
final int responseCode = connection.getResponseCode();
if (responseCode == HttpURLConnection.HTTP_OK) {

InputStream is = new BufferedInputStream(connection.getInputStream(), 1024);
InputStream is = new BufferedInputStream(connection.getInputStream());

// We are going to replace the valid hashes with the new ones

Expand Down

0 comments on commit 5d9d018

Please sign in to comment.