diff --git a/cmoncrawl/middleware/stompware.py b/cmoncrawl/middleware/stompware.py index 3a0f23fa..59140885 100644 --- a/cmoncrawl/middleware/stompware.py +++ b/cmoncrawl/middleware/stompware.py @@ -176,7 +176,7 @@ async def _call_pipeline_with_ack( # So that we can nack it if needed paths = [] try: - paths = await pipeline.process_domain_record(msg.dr) + paths = await pipeline.process_domain_record(msg.dr, {}) # Ack at any result client.ack(msg.headers.get("message-id"), msg.headers.get("subscription")) diff --git a/cmoncrawl/middleware/synchronized.py b/cmoncrawl/middleware/synchronized.py index 2850048d..bff86b44 100644 --- a/cmoncrawl/middleware/synchronized.py +++ b/cmoncrawl/middleware/synchronized.py @@ -1,5 +1,5 @@ from pathlib import Path -from typing import List, Set +from typing import Any, Dict, List, Set, Tuple from cmoncrawl.aggregator.index_query import IndexAggregator from cmoncrawl.processor.pipeline.pipeline import ProcessorPipeline from cmoncrawl.common.types import DomainRecord @@ -25,7 +25,7 @@ async def index_and_extract( if filter_non_unique_url and unify_url_id(url) in processed_urls: continue try: - await pipeline.process_domain_record(domain_record) + await pipeline.process_domain_record(domain_record, {}) except KeyboardInterrupt as e: break @@ -41,10 +41,14 @@ async def index_and_extract( await pipeline.downloader.__aexit__(None, None, None) -async def _extract_task(domain_record: DomainRecord, pipeline: ProcessorPipeline): +async def _extract_task( + domain_record: DomainRecord, + additional_info: Dict[str, Any], + pipeline: ProcessorPipeline, +): result = [] try: - result = await pipeline.process_domain_record(domain_record) + result = await pipeline.process_domain_record(domain_record, additional_info) except KeyboardInterrupt as e: raise e except Exception as e: @@ -56,12 +60,12 @@ async def _extract_task(domain_record: DomainRecord, pipeline: ProcessorPipeline async def extract( - domain_records: List[DomainRecord], + records: List[Tuple[DomainRecord, Dict[str, Any]]], pipeline: ProcessorPipeline, concurrent_length: int = 20, timeout: int = 5, ): - domain_records_iterator = iter(tqdm(domain_records)) + domain_records_iterator = iter(tqdm(records)) domains_exausted = False if hasattr(pipeline.downloader, "__aenter__"): await pipeline.downloader.__aenter__() @@ -70,13 +74,16 @@ async def extract( while not domains_exausted or len(queue) > 0: # Put into queue till possible while len(queue) < concurrent_length and not domains_exausted: - next_domain_record = next(domain_records_iterator, None) - if next_domain_record is None: + next_record = next(domain_records_iterator, None) + if next_record is None: domains_exausted = True break + next_domain_record, additional_info = next_record queue.add( - asyncio.create_task(_extract_task(next_domain_record, pipeline)) + asyncio.create_task( + _extract_task(next_domain_record, additional_info, pipeline) + ) ) done, queue = await asyncio.wait( diff --git a/cmoncrawl/processor/pipeline/pipeline.py b/cmoncrawl/processor/pipeline/pipeline.py index bbb1dfaf..4436c212 100644 --- a/cmoncrawl/processor/pipeline/pipeline.py +++ b/cmoncrawl/processor/pipeline/pipeline.py @@ -17,7 +17,7 @@ def __init__( self.oustreamer = outstreamer async def process_domain_record( - self, domain_record: DomainRecord, additional_info: Dict[str, Any] = {} + self, domain_record: DomainRecord, additional_info: Dict[str, Any] ): paths: List[Path] = [] downloaded_articles = [] @@ -35,7 +35,8 @@ async def process_domain_record( output = extractor.extract(downloaded_article, metadata) if output is None: metadata_logger.warn( - f"Extractor {extractor.__class__.__name__} returned None for {metadata.domain_record.url}" + f"Extractor {extractor.__class__.__name__} returned None for {metadata.domain_record.url}", + extra={"domain_record": metadata.domain_record}, ) continue diff --git a/cmoncrawl/processor/pipeline/router.py b/cmoncrawl/processor/pipeline/router.py index 7229c9f6..69445317 100644 --- a/cmoncrawl/processor/pipeline/router.py +++ b/cmoncrawl/processor/pipeline/router.py @@ -74,8 +74,8 @@ def load_modules(self, folder: Path): if file == "__init__.py": self.load_module(Path(root) / file) - - extractors.append(self.load_module_as_extractor(Path(root) / file)) + else: + extractors.append(self.load_module_as_extractor(Path(root) / file)) all_purpose_logger.info(f"Loaded {len(extractors)} extractors") def load_extractor(self, name: str, extractor: IExtractor):