diff --git a/docs/source/topics/frontera-settings.rst b/docs/source/topics/frontera-settings.rst index afb5519d5..34bd06e1f 100644 --- a/docs/source/topics/frontera-settings.rst +++ b/docs/source/topics/frontera-settings.rst @@ -87,6 +87,37 @@ Default: ``'frontera.contrib.backends.memory.FIFO'`` The :class:`Backend ` to be used by the frontier. For more info see :ref:`Activating a backend `. + +.. setting:: BC_MIN_REQUESTS + +BC_MIN_REQUESTS +--------------- + +Default: ``64`` + +Broad crawling queue get operation will keep retrying until specified number of requests is collected. Maximum number +of retries is hard-coded to 3. + +.. setting:: BC_MIN_HOSTS + +BC_MIN_HOSTS +------------ + +Default: ``24`` + +Keep retyring when getting requests from queue, until there are requests for specified minimum number of hosts +collected. Maximum number of retries is hard-coded and equals 3. + +.. setting:: BC_MAX_REQUESTS_PER_HOST + +BC_MAX_REQUESTS_PER_HOST +------------------------ + +Default:: ``128`` + +Don't include (if possible) batches of requests containing requests for specific host if there are already more then +specified count of maximum requests per host. This is a suggestion for broad crawling queue get algorithm. + .. setting:: CANONICAL_SOLVER CANONICAL_SOLVER diff --git a/docs/source/topics/frontier-backends.rst b/docs/source/topics/frontier-backends.rst index 90fa37874..706da8377 100644 --- a/docs/source/topics/frontier-backends.rst +++ b/docs/source/topics/frontier-backends.rst @@ -283,6 +283,12 @@ tunning a block cache to fit states within one block for average size website. T to achieve documents closeness within the same host. This function can be selected with :setting:`URL_FINGERPRINT_FUNCTION` setting. +.. TODO: document details of block cache tuning, + BC* settings and queue get operation concept, + hbase tables schema and data flow + Queue exploration + shuffling with MR jobs + .. _FIFO: http://en.wikipedia.org/wiki/FIFO .. _LIFO: http://en.wikipedia.org/wiki/LIFO_(computing) .. _DFS: http://en.wikipedia.org/wiki/Depth-first_search diff --git a/frontera/contrib/backends/hbase.py b/frontera/contrib/backends/hbase.py index e804fcc96..b9b88b44f 100644 --- a/frontera/contrib/backends/hbase.py +++ b/frontera/contrib/backends/hbase.py @@ -376,6 +376,10 @@ def __init__(self, manager): port = settings.get('HBASE_THRIFT_PORT') hosts = settings.get('HBASE_THRIFT_HOST') namespace = settings.get('HBASE_NAMESPACE') + self._min_requests = settings.get('BC_MIN_REQUESTS') + self._min_hosts = settings.get('BC_MIN_HOSTS') + self._max_requests_per_host = settings.get('BC_MAX_REQUESTS_PER_HOST') + self.queue_partitions = settings.get('SPIDER_FEED_PARTITIONS') host = choice(hosts) if type(hosts) in [list, tuple] else hosts kwargs = { @@ -456,8 +460,10 @@ def get_next_requests(self, max_next_requests, **kwargs): for partition_id in range(0, self.queue_partitions): if partition_id not in partitions: continue - results = self.queue.get_next_requests(max_next_requests, partition_id, min_requests=64, - min_hosts=24, max_requests_per_host=128) + results = self.queue.get_next_requests(max_next_requests, partition_id, + min_requests=self._min_requests, + min_hosts=self._min_hosts, + max_requests_per_host=self._max_requests_per_host) next_pages.extend(results) self.logger.debug("Got %d requests for partition id %d", len(results), partition_id) return next_pages diff --git a/frontera/contrib/messagebus/kafkabus.py b/frontera/contrib/messagebus/kafkabus.py index 6d91b40cf..930bf6ccc 100644 --- a/frontera/contrib/messagebus/kafkabus.py +++ b/frontera/contrib/messagebus/kafkabus.py @@ -223,7 +223,7 @@ def __init__(self, settings): if codec == 'gzip': from kafka.protocol import CODEC_GZIP self.codec = CODEC_GZIP - if not self.codec: + if self.codec is None: raise NameError("Non-existent Kafka compression codec.") self.conn = KafkaClient(server) diff --git a/frontera/core/manager.py b/frontera/core/manager.py index 4ae34ae84..49efc76c0 100644 --- a/frontera/core/manager.py +++ b/frontera/core/manager.py @@ -10,17 +10,19 @@ class ComponentsPipelineMixin(object): def __init__(self, backend, middlewares=None, canonicalsolver=None, db_worker=False, strategy_worker=False): + self._logger_components = logging.getLogger("manager.components") + # Load middlewares self._middlewares = self._load_middlewares(middlewares) # Load canonical solver - self._logger.debug("Loading canonical url solver '%s'", canonicalsolver) + self._logger_components.debug("Loading canonical url solver '%s'", canonicalsolver) self._canonicalsolver = self._load_object(canonicalsolver) assert isinstance(self.canonicalsolver, CanonicalSolver), \ "canonical solver '%s' must subclass CanonicalSolver" % self.canonicalsolver.__class__.__name__ # Load backend - self._logger.debug("Loading backend '%s'", backend) + self._logger_components.debug("Loading backend '%s'", backend) self._backend = self._load_backend(backend, db_worker, strategy_worker) @property @@ -67,14 +69,14 @@ def _load_middlewares(self, middleware_names): # TO-DO: Use dict for middleware ordering mws = [] for mw_name in middleware_names or []: - self._logger.debug("Loading middleware '%s'", mw_name) + self._logger_components.debug("Loading middleware '%s'", mw_name) try: mw = self._load_object(mw_name, silent=False) assert isinstance(mw, Middleware), "middleware '%s' must subclass Middleware" % mw.__class__.__name__ if mw: mws.append(mw) except NotConfigured: - self._logger.warning("middleware '%s' disabled!", mw_name) + self._logger_components.warning("middleware '%s' disabled!", mw_name) return mws @@ -89,15 +91,14 @@ def _process_components(self, method_name, obj=None, return_classes=None, **kwar if check_response: return_obj = result if check_response and obj and not return_obj: - self._logger.warning("Object '%s' filtered in '%s' by '%s'", - obj.__class__.__name__, method_name, component.__class__.__name__ - ) + self._logger_components.warning("Object '%s' filtered in '%s' by '%s'", + obj.__class__.__name__, method_name, component.__class__.__name__) return return return_obj def _process_component(self, component, method_name, component_category, obj, return_classes, **kwargs): - self._logger.debug("processing '%s' '%s.%s' %s", - method_name, component_category, component.__class__.__name__, obj) + self._logger_components.debug("processing '%s' '%s.%s' %s", + method_name, component_category, component.__class__.__name__, obj) return_obj = getattr(component, method_name)(*([obj] if obj else []), **kwargs) assert return_obj is None or isinstance(return_obj, return_classes), \ "%s '%s.%s' must return None or %s, Got '%s'" % \ diff --git a/frontera/settings/default_settings.py b/frontera/settings/default_settings.py index 7511e3eae..d5f2d8c1f 100644 --- a/frontera/settings/default_settings.py +++ b/frontera/settings/default_settings.py @@ -3,6 +3,9 @@ AUTO_START = True BACKEND = 'frontera.contrib.backends.memory.FIFO' +BC_MIN_REQUESTS = 64 +BC_MIN_HOSTS = 24 +BC_MAX_REQUESTS_PER_HOST = 128 CANONICAL_SOLVER = 'frontera.contrib.canonicalsolvers.Basic' DELAY_ON_EMPTY = 5.0 DOMAIN_FINGERPRINT_FUNCTION = 'frontera.utils.fingerprint.sha1' diff --git a/frontera/worker/db.py b/frontera/worker/db.py index c48aa4063..72a5f0922 100644 --- a/frontera/worker/db.py +++ b/frontera/worker/db.py @@ -144,12 +144,12 @@ def consume_incoming(self, *args, **kwargs): if type == 'page_crawled': _, response, links = msg logger.debug("Page crawled %s", response.url) - if response.meta['jid'] != self.job_id: + if 'jid' not in response.meta or response.meta['jid'] != self.job_id: continue self._backend.page_crawled(response, links) if type == 'request_error': _, request, error = msg - if request.meta['jid'] != self.job_id: + if 'jid' not in request.meta or request.meta['jid'] != self.job_id: continue logger.debug("Request error %s", request.url) self._backend.request_error(request, error) diff --git a/frontera/worker/strategy.py b/frontera/worker/strategy.py index 9eccaa60f..bb2f18527 100644 --- a/frontera/worker/strategy.py +++ b/frontera/worker/strategy.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- from time import asctime import logging -from traceback import format_stack +from traceback import format_stack, format_tb from signal import signal, SIGUSR1 from logging.config import fileConfig from argparse import ArgumentParser @@ -170,14 +170,14 @@ def work(self): if type == 'page_crawled': _, response, links = msg - if response.meta['jid'] != self.job_id: + if 'jid' not in response.meta or response.meta['jid'] != self.job_id: continue self.on_page_crawled(response, links) continue if type == 'request_error': _, request, error = msg - if request.meta['jid'] != self.job_id: + if 'jid' not in request.meta or request.meta['jid'] != self.job_id: continue self.on_request_error(request, error) continue @@ -203,6 +203,8 @@ def work(self): def run(self): def errback(failure): logger.exception(failure.value) + if failure.frames: + logger.critical(str("").join(format_tb(failure.getTracebackObject()))) self.task.start(interval=0).addErrback(errback) def debug(sig, frame):