Skip to content

Commit

Permalink
Merge pull request #43 from sibiryakov/persistent-meta-cb
Browse files Browse the repository at this point in the history
Persistent .meta and callbacks in Frontera backend
  • Loading branch information
sibiryakov committed May 25, 2015
2 parents 32c83ce + e157540 commit 48de71d
Show file tree
Hide file tree
Showing 18 changed files with 282 additions and 77 deletions.
24 changes: 13 additions & 11 deletions frontera/contrib/backends/sqlalchemy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.types import TypeDecorator
from sqlalchemy import Column, String, Integer
from sqlalchemy import Column, String, Integer, PickleType
from sqlalchemy import UniqueConstraint

from frontera import Backend
Expand Down Expand Up @@ -61,6 +61,7 @@ class State:
status_code = Column(String(20))
state = Column(String(12))
error = Column(String(20))
meta = Column(PickleType())

@classmethod
def query(cls, session):
Expand Down Expand Up @@ -122,7 +123,7 @@ def frontier_stop(self):

def add_seeds(self, seeds):
for seed in seeds:
db_page, _ = self._get_or_create_db_page(url=seed.url, fingerprint=seed.meta['fingerprint'])
db_page, _ = self._get_or_create_db_page(seed)
self.session.commit()

def get_next_requests(self, max_next_requests, **kwargs):
Expand All @@ -134,40 +135,41 @@ def get_next_requests(self, max_next_requests, **kwargs):
next_pages = []
for db_page in query:
db_page.state = Page.State.QUEUED
request = self.manager.request_model(url=db_page.url)
request = self.manager.request_model(url=db_page.url, meta=db_page.meta)
next_pages.append(request)
self.session.commit()
return next_pages

def page_crawled(self, response, links):
db_page, _ = self._get_or_create_db_page(url=response.url, fingerprint=response.meta['fingerprint'])
db_page, _ = self._get_or_create_db_page(response)
db_page.state = Page.State.CRAWLED
db_page.status_code = response.status_code
for link in links:
db_page_from_link, created = self._get_or_create_db_page(url=link.url, fingerprint=link.meta['fingerprint'])
db_page_from_link, created = self._get_or_create_db_page(link)
if created:
db_page_from_link.depth = db_page.depth+1
self.session.commit()

def request_error(self, request, error):
db_page, _ = self._get_or_create_db_page(url=request.url, fingerprint=request.meta['fingerprint'])
db_page, _ = self._get_or_create_db_page(request)
db_page.state = Page.State.ERROR
db_page.error = error
self.session.commit()

def _get_or_create_db_page(self, url, fingerprint):
if not self._request_exists(fingerprint):
def _get_or_create_db_page(self, obj):
if not self._request_exists(obj.meta['fingerprint']):
db_request = self.page_model()
db_request.fingerprint = fingerprint
db_request.fingerprint = obj.meta['fingerprint']
db_request.state = Page.State.NOT_CRAWLED
db_request.url = url
db_request.url = obj.url
db_request.depth = 0
db_request.created_at = datetime.datetime.utcnow()
db_request.meta = obj.meta
self.session.add(db_request)
self.manager.logger.backend.debug('Creating request %s' % db_request)
return db_request, True
else:
db_request = self.page_model.query(self.session).filter_by(fingerprint=fingerprint).first()
db_request = self.page_model.query(self.session).filter_by(fingerprint=obj.meta['fingerprint']).first()
self.manager.logger.backend.debug('Request exists %s' % db_request)
return db_request, False

Expand Down
17 changes: 8 additions & 9 deletions frontera/contrib/requests/converters.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,14 @@

class RequestConverter(BaseRequestConverter):
"""Converts between frontera and Requests request objects"""
@classmethod
def to_frontier(cls, request):
def to_frontier(self, request):
"""request: Requests > Frontier"""
return FrontierRequest(url=request.url,
method=request.method,
headers=request.headers,
cookies=request.cookies if hasattr(request, 'cookies') else {})

@classmethod
def from_frontier(cls, request):
def from_frontier(self, request):
"""request: Frontier > Scrapy"""
return RequestsRequest(url=request.url,
method=request.method,
Expand All @@ -27,16 +25,17 @@ def from_frontier(cls, request):

class ResponseConverter(BaseResponseConverter):
"""Converts between frontera and Scrapy response objects"""
@classmethod
def to_frontier(cls, response):
def __init__(self, request_converter):
self._request_converter = request_converter

def to_frontier(self, response):
"""response: Scrapy > Frontier"""
return FrontierResponse(url=response.url,
status_code=response.status_code,
headers=response.headers,
body=response.text,
request=RequestConverter.to_frontier(response.request))
request=self._request_converter.to_frontier(response.request))

@classmethod
def from_frontier(cls, response):
def from_frontier(self, response):
"""response: Frontier > Scrapy"""
raise NotImplementedError
7 changes: 5 additions & 2 deletions frontera/contrib/requests/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,8 @@


class RequestsFrontierManager(FrontierManagerWrapper):
request_converter_class = RequestConverter
response_converter_class = ResponseConverter

def __init__(self, settings):
super(RequestsFrontierManager, self).__init__(settings)
self.request_converter = RequestConverter()
self.response_converter = ResponseConverter(self.request_converter)
98 changes: 64 additions & 34 deletions frontera/contrib/scrapy/converters.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,58 +8,88 @@

class RequestConverter(BaseRequestConverter):
"""Converts between frontera and Scrapy request objects"""
@classmethod
def to_frontier(cls, request):
def __init__(self, spider):
self.spider = spider

def to_frontier(self, scrapy_request):
"""request: Scrapy > Frontier"""
if isinstance(request.cookies, dict):
cookies = request.cookies
if isinstance(scrapy_request.cookies, dict):
cookies = scrapy_request.cookies
else:
cookies = dict(sum([d.items() for d in request.cookies], []))
cookies = dict(sum([d.items() for d in scrapy_request.cookies], []))
cb = scrapy_request.callback
if callable(cb):
cb = _find_method(self.spider, cb)
eb = scrapy_request.errback
if callable(eb):
eb = _find_method(self.spider, eb)
meta = {
'scrapy_callback': request.callback,
'scrapy_callback': cb,
'scrapy_errback': eb,
'scrapy_meta': scrapy_request.meta,
'origin_is_frontier': True,
}
meta.update(request.meta or {})
return FrontierRequest(url=request.url,
method=request.method,
headers=request.headers,
return FrontierRequest(url=scrapy_request.url,
method=scrapy_request.method,
headers=scrapy_request.headers,
cookies=cookies,
meta=meta)

@classmethod
def from_frontier(cls, request):
def from_frontier(self, frontier_request):
"""request: Frontier > Scrapy"""
meta = {
'frontier_request': request
}
meta.update(request.meta or {})
return ScrapyRequest(url=request.url,
callback=meta.get('scrapy_callback', None),
method=request.method,
headers=request.headers,
cookies=request.cookies,
cb = frontier_request.meta.get('scrapy_callback', None)
if cb and self.spider:
cb = _get_method(self.spider, cb)
eb = frontier_request.meta.get('scrapy_errback', None)
if eb and self.spider:
eb = _get_method(self.spider, eb)
meta = frontier_request.meta['scrapy_meta']
meta['frontier_request'] = frontier_request
return ScrapyRequest(url=frontier_request.url,
callback=cb,
errback=eb,
method=frontier_request.method,
headers=frontier_request.headers,
cookies=frontier_request.cookies,
meta=meta,
dont_filter=True)


class ResponseConverter(BaseResponseConverter):
"""Converts between frontera and Scrapy response objects"""
@classmethod
def to_frontier(cls, response):
def __init__(self, spider, request_converter):
self.spider = spider
self._request_converter = request_converter

def to_frontier(self, scrapy_response):
"""response: Scrapy > Frontier"""
frontier_response = FrontierResponse(url=response.url,
status_code=response.status,
headers=response.headers,
body=response.body,
request=response.meta['frontier_request'])
frontier_response.meta.update(response.meta)
return frontier_response
frontier_request = scrapy_response.meta['frontier_request']
frontier_request.meta['scrapy_meta'] = scrapy_response.meta
return FrontierResponse(url=scrapy_response.url,
status_code=scrapy_response.status,
headers=scrapy_response.headers,
body=scrapy_response.body,
request=frontier_request)

@classmethod
def from_frontier(cls, response):
def from_frontier(self, response):
"""response: Frontier > Scrapy"""
return ScrapyResponse(url=response.url,
status=response.status,
status=response.status_code,
headers=response.headers,
body=response.body,
request=RequestConverter.from_frontier(response.request))
request=self._request_converter.from_frontier(response.request))


def _find_method(obj, func):
if obj and hasattr(func, 'im_self') and func.im_self is obj:
return func.im_func.__name__
else:
raise ValueError("Function %s is not a method of: %s" % (func, obj))


def _get_method(obj, name):
name = str(name)
try:
return getattr(obj, name)
except AttributeError:
raise ValueError("Method %r not found in: %s" % (name, obj))
10 changes: 8 additions & 2 deletions frontera/contrib/scrapy/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,11 @@


class ScrapyFrontierManager(FrontierManagerWrapper):
request_converter_class = RequestConverter
response_converter_class = ResponseConverter

spider = None

def set_spider(self, spider):
assert self.spider is None, 'Spider is already set. Only one spider is supported per process.'
self.spider = spider
self.request_converter = RequestConverter(self.spider)
self.response_converter = ResponseConverter(self.spider, self.request_converter)
1 change: 1 addition & 0 deletions frontera/contrib/scrapy/schedulers/frontier.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ def process_exception(self, request, exception, spider):
self.stats_manager.add_request_error(error_code)

def open(self, spider):
self.frontier.set_spider(spider)
log.msg('Starting frontier', log.INFO)
if not self.frontier.manager.auto_start:
self.frontier.start()
Expand Down
Empty file.
Empty file.
14 changes: 14 additions & 0 deletions frontera/tests/scrapy_spider/frontera/settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#--------------------------------------------------------
# Frontier
#--------------------------------------------------------
BACKEND = 'frontera.contrib.backends.memory.FIFO'
MAX_REQUESTS = 5
MAX_NEXT_REQUESTS = 1

#--------------------------------------------------------
# Logging
#--------------------------------------------------------
LOGGING_EVENTS_ENABLED = False
LOGGING_MANAGER_ENABLED = False
LOGGING_BACKEND_ENABLED = False
LOGGING_DEBUGGING_ENABLED = False
39 changes: 39 additions & 0 deletions frontera/tests/scrapy_spider/settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#--------------------------------------------------------------------------
# Scrapy Settings
#--------------------------------------------------------------------------
BOT_NAME = 'scrapy_spider'

SPIDER_MODULES = ['frontera.tests.scrapy_spider.spiders']
NEWSPIDER_MODULE = 'frontera.tests.scrapy_spider.spiders'

HTTPCACHE_ENABLED = False
REDIRECT_ENABLED = True
COOKIES_ENABLED = False
DOWNLOAD_TIMEOUT = 20
RETRY_ENABLED = False

CONCURRENT_REQUESTS = 10
CONCURRENT_REQUESTS_PER_DOMAIN = 2

LOGSTATS_INTERVAL = 10

SPIDER_MIDDLEWARES = {}
DOWNLOADER_MIDDLEWARES = {}

#--------------------------------------------------------------------------
# Frontier Settings
#--------------------------------------------------------------------------
SPIDER_MIDDLEWARES.update(
{'frontera.contrib.scrapy.middlewares.schedulers.SchedulerSpiderMiddleware': 999},
)
DOWNLOADER_MIDDLEWARES.update(
{'frontera.contrib.scrapy.middlewares.schedulers.SchedulerDownloaderMiddleware': 999}
)
SCHEDULER = 'frontera.contrib.scrapy.schedulers.frontier.FronteraScheduler'
FRONTERA_SETTINGS = 'frontera.tests.scrapy_spider.frontera.settings'


#--------------------------------------------------------------------------
# Testing
#--------------------------------------------------------------------------
#CLOSESPIDER_PAGECOUNT = 1
Empty file.
35 changes: 35 additions & 0 deletions frontera/tests/scrapy_spider/spiders/example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from scrapy.contrib.linkextractors.lxmlhtml import LxmlLinkExtractor
from scrapy.contrib.linkextractors.sgml import SgmlLinkExtractor
from scrapy.contrib.linkextractors.regex import RegexLinkExtractor
from scrapy.contrib.spiders import CrawlSpider, Rule


class FallbackLinkExtractor(object):
def __init__(self, extractors):
self.extractors = extractors

def extract_links(self, response):
for lx in self.extractors:
links = lx.extract_links(response)
return links


class MySpider(CrawlSpider):
name = 'example'
start_urls = ['http://scrapinghub.com']
callback_calls = 0

rules = [Rule(FallbackLinkExtractor([
LxmlLinkExtractor(),
SgmlLinkExtractor(),
RegexLinkExtractor(),
]), callback='parse_page', follow=True)]

def parse_page(self, response):
self.callback_calls += 1
pass

def parse_nothing(self, response):
pass

parse_start_url = parse_nothing
Loading

0 comments on commit 48de71d

Please sign in to comment.