diff --git a/eth/downloader/fetchers_concurrent.go b/eth/downloader/fetchers_concurrent.go index 87f34dcca1..c120c77794 100644 --- a/eth/downloader/fetchers_concurrent.go +++ b/eth/downloader/fetchers_concurrent.go @@ -36,6 +36,8 @@ var timeoutGracePeriod = 2 * time.Minute // peersRetryInterval is the retry interval when all peers cannot get the request data. var peersRetryInterval = 100 * time.Millisecond +var maxRetries = 10 + // typedQueue is an interface defining the adaptor needed to translate the type // specific downloader/queue schedulers into the type-agnostic general concurrent // fetcher algorithm calls. @@ -128,6 +130,8 @@ func (d *Downloader) concurrentFetch(queue typedQueue, beaconMode bool) error { // Prepare the queue and fetch block parts until the block header fetcher's done finished := false + + requestRetried := 0 for { // Short circuit if we lost all our peers if d.peers.Len() == 0 && !beaconMode { @@ -139,6 +143,9 @@ func (d *Downloader) concurrentFetch(queue typedQueue, beaconMode bool) error { return nil } } else { + if requestRetried > 0 { + log.Debug("Krish: step into retry logic else...") + } // Send a download request to all idle peers, until throttled var ( idles []*peerConnection @@ -161,6 +168,10 @@ func (d *Downloader) concurrentFetch(queue typedQueue, beaconMode bool) error { } sort.Sort(&peerCapacitySort{idles, caps}) + if requestRetried > 0 { + log.Info("Krish: idles num", "idles count = ", len(idles)) + } + var ( progressed bool throttled bool @@ -170,9 +181,11 @@ func (d *Downloader) concurrentFetch(queue typedQueue, beaconMode bool) error { // Short circuit if throttling activated or there are no more // queued tasks to be retrieved if throttled { + log.Info("Krish: throttled") break } if queued = queue.pending(); queued == 0 { + log.Info("Krish: queued == 0") break } // Reserve a chunk of fetches for a peer. A nil can mean either that @@ -189,9 +202,16 @@ func (d *Downloader) concurrentFetch(queue typedQueue, beaconMode bool) error { if request == nil { continue } + + if requestRetried > 0 { + log.Info("Krish: queue reserve success", "request header", request.Headers[0].Number) + } + // Fetch the chunk and make sure any errors return the hashes to the queue req, err := queue.request(peer, request, responses) if err != nil { + log.Info("Krish: queue.request error", "request header", request.Headers[0].Number, "err is", err) + // Sending the request failed, which generally means the peer // was disconnected in between assignment and network send. // Although all peer removal operations return allocated tasks @@ -199,9 +219,9 @@ func (d *Downloader) concurrentFetch(queue typedQueue, beaconMode bool) error { // immediately pushing the unfulfilled requests. queue.unreserve(peer.id) // TODO(karalabe): This needs a non-expiration method //reset progressed - if len(pending) == 0 { - progressed = false - } + //if len(pending) == 0 { + // progressed = false + //} continue } pending[peer.id] = req @@ -222,6 +242,11 @@ func (d *Downloader) concurrentFetch(queue typedQueue, beaconMode bool) error { // Retry the unreserved task in next loop if len(pending) == 0 && queued > 0 && beaconMode { log.Warn("All idle peers are not valid for current task, will retry ...") + requestRetried++ + if requestRetried > maxRetries { + log.Info("max retry exceeded, cancel request") + return errCanceled + } time.Sleep(peersRetryInterval) continue }