Skip to content

Commit

Permalink
Merge pull request #61 from hynky1999/pipeline
Browse files Browse the repository at this point in the history
bugs + additional info
  • Loading branch information
hynky1999 authored May 11, 2023
2 parents 34544f3 + cd8a328 commit b16f52f
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 14 deletions.
2 changes: 1 addition & 1 deletion cmoncrawl/middleware/stompware.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ async def _call_pipeline_with_ack(
# So that we can nack it if needed
paths = []
try:
paths = await pipeline.process_domain_record(msg.dr)
paths = await pipeline.process_domain_record(msg.dr, {})
# Ack at any result
client.ack(msg.headers.get("message-id"), msg.headers.get("subscription"))

Expand Down
25 changes: 16 additions & 9 deletions cmoncrawl/middleware/synchronized.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from pathlib import Path
from typing import List, Set
from typing import Any, Dict, List, Set, Tuple
from cmoncrawl.aggregator.index_query import IndexAggregator
from cmoncrawl.processor.pipeline.pipeline import ProcessorPipeline
from cmoncrawl.common.types import DomainRecord
Expand All @@ -25,7 +25,7 @@ async def index_and_extract(
if filter_non_unique_url and unify_url_id(url) in processed_urls:
continue
try:
await pipeline.process_domain_record(domain_record)
await pipeline.process_domain_record(domain_record, {})
except KeyboardInterrupt as e:
break

Expand All @@ -41,10 +41,14 @@ async def index_and_extract(
await pipeline.downloader.__aexit__(None, None, None)


async def _extract_task(domain_record: DomainRecord, pipeline: ProcessorPipeline):
async def _extract_task(
domain_record: DomainRecord,
additional_info: Dict[str, Any],
pipeline: ProcessorPipeline,
):
result = []
try:
result = await pipeline.process_domain_record(domain_record)
result = await pipeline.process_domain_record(domain_record, additional_info)
except KeyboardInterrupt as e:
raise e
except Exception as e:
Expand All @@ -56,12 +60,12 @@ async def _extract_task(domain_record: DomainRecord, pipeline: ProcessorPipeline


async def extract(
domain_records: List[DomainRecord],
records: List[Tuple[DomainRecord, Dict[str, Any]]],
pipeline: ProcessorPipeline,
concurrent_length: int = 20,
timeout: int = 5,
):
domain_records_iterator = iter(tqdm(domain_records))
domain_records_iterator = iter(tqdm(records))
domains_exausted = False
if hasattr(pipeline.downloader, "__aenter__"):
await pipeline.downloader.__aenter__()
Expand All @@ -70,13 +74,16 @@ async def extract(
while not domains_exausted or len(queue) > 0:
# Put into queue till possible
while len(queue) < concurrent_length and not domains_exausted:
next_domain_record = next(domain_records_iterator, None)
if next_domain_record is None:
next_record = next(domain_records_iterator, None)
if next_record is None:
domains_exausted = True
break
next_domain_record, additional_info = next_record

queue.add(
asyncio.create_task(_extract_task(next_domain_record, pipeline))
asyncio.create_task(
_extract_task(next_domain_record, additional_info, pipeline)
)
)

done, queue = await asyncio.wait(
Expand Down
5 changes: 3 additions & 2 deletions cmoncrawl/processor/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def __init__(
self.oustreamer = outstreamer

async def process_domain_record(
self, domain_record: DomainRecord, additional_info: Dict[str, Any] = {}
self, domain_record: DomainRecord, additional_info: Dict[str, Any]
):
paths: List[Path] = []
downloaded_articles = []
Expand All @@ -35,7 +35,8 @@ async def process_domain_record(
output = extractor.extract(downloaded_article, metadata)
if output is None:
metadata_logger.warn(
f"Extractor {extractor.__class__.__name__} returned None for {metadata.domain_record.url}"
f"Extractor {extractor.__class__.__name__} returned None for {metadata.domain_record.url}",
extra={"domain_record": metadata.domain_record},
)
continue

Expand Down
4 changes: 2 additions & 2 deletions cmoncrawl/processor/pipeline/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ def load_modules(self, folder: Path):

if file == "__init__.py":
self.load_module(Path(root) / file)

extractors.append(self.load_module_as_extractor(Path(root) / file))
else:
extractors.append(self.load_module_as_extractor(Path(root) / file))
all_purpose_logger.info(f"Loaded {len(extractors)} extractors")

def load_extractor(self, name: str, extractor: IExtractor):
Expand Down

0 comments on commit b16f52f

Please sign in to comment.