diff --git a/CHANGELOG.md b/CHANGELOG.md index e16c697bd..840fbf5e9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ - Upgrade `commons-validator` library to version 1.6 - Upgrade `okhttp3` library to version 3.14.0 - Fix issue #177: Links from recent TLDs are considered invalid +- Added wait time after frontier ran out of links to avoid race conditions (issue #147) ## Version 0.11.0 diff --git a/src/main/java/focusedCrawler/crawler/async/AsyncCrawler.java b/src/main/java/focusedCrawler/crawler/async/AsyncCrawler.java index ed4db422b..8e8b11fa4 100644 --- a/src/main/java/focusedCrawler/crawler/async/AsyncCrawler.java +++ b/src/main/java/focusedCrawler/crawler/async/AsyncCrawler.java @@ -21,6 +21,10 @@ public class AsyncCrawler extends AbstractExecutionThreadService { + private static final int RUN_OUT_OF_LINKS_DEFAULT = -1; + private static final int MAX_RUN_OUT_OF_LINKS_TIME_MS = 5000; + private static final int RUN_OUT_OF_LINKS_WAIT_TIME = 1000; + private static final Logger logger = LoggerFactory.getLogger(AsyncCrawler.class); private final TargetStorage targetStorage; @@ -29,6 +33,7 @@ public class AsyncCrawler extends AbstractExecutionThreadService { private final Map handlers = new HashMap<>(); private MetricsManager metricsManager; private Configuration config; + private long runOutOfLinksTime = RUN_OUT_OF_LINKS_DEFAULT; public AsyncCrawler(String crawlerId, TargetStorage targetStorage, LinkStorage linkStorage, Configuration config, String dataPath, MetricsManager metricsManager) { @@ -60,6 +65,7 @@ protected void run() { try { LinkRelevance link = (LinkRelevance) linkStorage.select(null); if (link != null) { + this.runOutOfLinksTime = RUN_OUT_OF_LINKS_DEFAULT; Callback handler = handlers.get(link.getType()); if (handler == null) { logger.error("No registered handler for link type: " + link.getType()); @@ -68,29 +74,57 @@ protected void run() { downloader.dipatchDownload(link, handler); } } catch (DataNotFoundException e) { - // There are no more links available in the frontier right now - if (downloader.hasPendingDownloads() || !e.ranOutOfLinks()) { - // If there are still pending downloads, new links - // may be found in these pages, so we should wait some - // time until more links are available and try again - try { - logger.info("Waiting for links from pages being downloaded..."); - Thread.sleep(1000); - } catch (InterruptedException ie) { - } + // There are no more links available in the frontier right now. We need to check + // whether it is a temporary state to decide if the crawler should stop running. + + boolean hasPendingLinks = downloader.hasPendingDownloads() || !e.ranOutOfLinks(); + if (hasPendingLinks) { + // If there are still pending downloads, new links may be found in these pages, + // so we should wait some time until more links are available and try again. + waitMilliseconds(RUN_OUT_OF_LINKS_WAIT_TIME); continue; } - // There are no more pending downloads and there are no - // more links available in the frontier, so stop crawler - logger.info("LinkStorage ran out of links, stopping crawler."); - stopAsync(); - break; + + // Even when the frontier runs out of links and there are no pending downloads, + // there may be still some pages being processed, in which case the crawler may + // find some new links. Therefore, we still keep trying to select from the frontier + // for a fixed amount of time (MAX_RUN_OUT_OF_LINKS_TIME_MS) to avoid race conditions. + if (!hasPendingLinks && this.runOutOfLinksTime == RUN_OUT_OF_LINKS_DEFAULT) { + this.runOutOfLinksTime = System.currentTimeMillis(); + } + + // The crawler should stop only after having ran out of links for a few seconds + // This time is necessary to + long timeSinceRunOutOfLinks = System.currentTimeMillis() - this.runOutOfLinksTime; + if (this.runOutOfLinksTime != RUN_OUT_OF_LINKS_DEFAULT && + timeSinceRunOutOfLinks > MAX_RUN_OUT_OF_LINKS_TIME_MS) { + // There are no more pending downloads, no more links available in the frontier, + // and we already waited some time new links. Now we can stop the crawler. + logger.info("LinkStorage ran out of links for {} ms, stopping crawler.", + timeSinceRunOutOfLinks); + stopAsync(); + break; + } else { + logger.info("LinkStorage ran out of links for {} ms...", + timeSinceRunOutOfLinks); + } + + logger.info("Waiting for links from pages being processed..."); + waitMilliseconds(RUN_OUT_OF_LINKS_WAIT_TIME); + } catch (Exception e) { logger.error("An unexpected error happened.", e); } } } + private void waitMilliseconds(long ms) { + try { + Thread.sleep(ms); + } catch (InterruptedException e) { + } + } + @Override public void shutDown() { logger.info("Starting crawler shutdown...");