diff --git a/cmoncrawl/aggregator/index_query.py b/cmoncrawl/aggregator/index_query.py index e6600d0e..009d76e7 100644 --- a/cmoncrawl/aggregator/index_query.py +++ b/cmoncrawl/aggregator/index_query.py @@ -17,7 +17,12 @@ Type, ) from cmoncrawl.common.loggers import all_purpose_logger -from cmoncrawl.common.types import DomainRecord, RetrieveResponse, DomainCrawl +from cmoncrawl.common.types import ( + DomainRecord, + RetrieveResponse, + DomainCrawl, + MatchType, +) from aiohttp import ClientError, ClientSession, ContentTypeError import asyncio @@ -31,6 +36,7 @@ def __init__( self, domains: List[str], cc_indexes_server: str = "http://index.commoncrawl.org/collinfo.json", + match_type: MatchType | None = None, cc_servers: List[str] = [], since: datetime = datetime.min, to: datetime = datetime.max, @@ -48,6 +54,7 @@ def __init__( self.max_retry = max_retry self.prefetch_size = prefetch_size self.sleep_step = sleep_step + self.match_type = match_type self.iterators: List[IndexAggregator.IndexAggregatorIterator] = [] async def aopen(self) -> IndexAggregator: @@ -68,6 +75,7 @@ def __aiter__(self): self.client, self.domains, self.cc_servers, + match_type=self.match_type, since=self.since, to=self.to, limit=self.limit, @@ -110,7 +118,7 @@ async def __retrieve( **args: Any, ): def should_retry(retry: int, reason: str, status: int, **args: Any): - all_purpose_logger.error( + all_purpose_logger.warn( f"Failed to retrieve page of {domain} from {cdx_server} with reason {status}: {reason} retry: {retry + 1}/{max_retry} add_info: {args}" ) if status not in allowed_status_errors: @@ -124,6 +132,9 @@ def should_retry(retry: int, reason: str, status: int, **args: Any): for retry in range(max_retry): try: + all_purpose_logger.debug( + f"Sending request to {cdx_server} with params: {params}, retry: {retry + 1}/{max_retry}" + ) async with client.get(cdx_server, params=params) as response: status = response.status if not response.ok: @@ -139,7 +150,6 @@ def should_retry(retry: int, reason: str, status: int, **args: Any): all_purpose_logger.error(str(e), exc_info=True) all_purpose_logger.error(e.message, exc_info=True) all_purpose_logger.error(response.content) - break all_purpose_logger.info( f"Successfully retrieved page of {domain} from {cdx_server} add_info: {args}" @@ -159,6 +169,7 @@ async def get_number_of_pages( client: ClientSession, cdx_server: str, domain: str, + match_type: MatchType | None, max_retry: int, sleep_step: int, page_size: int | None = None, @@ -166,10 +177,12 @@ async def get_number_of_pages( params: Dict[str, str | int] = { "showNumPages": "true", "output": "json", - "matchType": "domain", "url": domain, } + if match_type is not None: + params["matchType"] = match_type.value + if page_size is not None: params["page_size"] = page_size response = await IndexAggregator.__retrieve( @@ -195,6 +208,7 @@ async def get_captured_responses( client: ClientSession, cdx_server: str, domain: str, + match_type: MatchType | None, max_retry: int, sleep_step: int, page: int, @@ -203,12 +217,13 @@ async def get_captured_responses( ): params: Dict[str, str | int] = { "output": "json", - "matchType": "domain", "page": page, "url": domain, "from": to_timestamp_format(since), "to": to_timestamp_format(to), } + if match_type is not None: + params["matchType"] = match_type.value reponse = await IndexAggregator.__retrieve( client, domain, @@ -241,6 +256,10 @@ async def get_all_CC_indexes(client: ClientSession, cdx_server: str) -> List[str r_json = await response.json(content_type="application/json") CC_servers = [js["cdx-api"] for js in r_json] return CC_servers + all_purpose_logger.error( + f"Failed to get CC servers from {cdx_server} after 3 attempts" + ) + return [] class IndexAggregatorIterator(AsyncIterator[DomainRecord]): def __init__( @@ -248,6 +267,7 @@ def __init__( client: ClientSession, domains: List[str], CC_files: List[str], + match_type: MatchType | None, since: datetime, to: datetime, limit: int | None, @@ -268,6 +288,7 @@ def __init__( self.__max_retry = max_retry self.__total = 0 self.__sleep_step = sleep_step + self.__match_type = match_type self.__crawls_remaining = self.init_crawls_queue(domains, CC_files) @@ -294,6 +315,7 @@ async def __prefetch_next_crawl(self) -> int: self.__client, next_crawl.cdx_server, next_crawl.domain, + match_type=self.__match_type, max_retry=self.__max_retry, sleep_step=self.__sleep_step, ) @@ -344,12 +366,13 @@ async def __await_next_prefetch(self): retry < _max_retry and response.status in ALLOWED_ERR_FOR_RETRIES ): - all_purpose_logger.info( - f"Retrying {dc.domain} of {dc.cdx_server} retry {retry + 1}/{_max_retry}" - ) self.prefetch_queue.add( asyncio.create_task(self.__fetch_next_dc(dc, retry + 1)) ) + else: + all_purpose_logger.error( + f"Failed to fetch {dc.domain} of {dc.cdx_server} with status {response.status}" + ) # Nothing more to prefetch @@ -387,6 +410,7 @@ async def __fetch_next_dc(self, dc: DomainCrawl, retry: int): self.__client, dc.cdx_server, dc.domain, + match_type=self.__match_type, page=dc.page, since=self.__since, to=self.__to, diff --git a/cmoncrawl/common/types.py b/cmoncrawl/common/types.py index 8de54e98..65772fa0 100644 --- a/cmoncrawl/common/types.py +++ b/cmoncrawl/common/types.py @@ -1,4 +1,5 @@ from datetime import datetime +from enum import Enum from pathlib import Path from typing import Any, Dict, List from urllib.parse import urlparse @@ -93,3 +94,14 @@ class ExtractConfig: extractors_path: Path routes: List[RoutesConfig] + + +class MatchType(Enum): + """ + Match type for cdx server. + """ + + EXACT = "exact" + PREFIX = "prefix" + HOST = "host" + DOMAIN = "domain" diff --git a/cmoncrawl/integrations/download.py b/cmoncrawl/integrations/download.py index ab6b45f9..f1e2dfe2 100644 --- a/cmoncrawl/integrations/download.py +++ b/cmoncrawl/integrations/download.py @@ -3,6 +3,7 @@ from pathlib import Path from typing import Any, List from cmoncrawl.aggregator.index_query import IndexAggregator +from cmoncrawl.common.types import MatchType from cmoncrawl.processor.pipeline.downloader import AsyncDownloader from cmoncrawl.processor.pipeline.pipeline import ProcessorPipeline from cmoncrawl.processor.pipeline.streamer import StreamerFileHTML @@ -32,9 +33,9 @@ def add_mode_args(subparser: Any): def add_args(subparser: Any): parser = subparser.add_parser("download") parser.add_argument("url") + parser.add_argument("output", type=Path) mode_subparser = parser.add_subparsers(dest="mode", required=True) mode_subparser = add_mode_args(mode_subparser) - parser.add_argument("output", type=Path) parser.add_argument("--limit", type=int, default=5) parser.add_argument( "--since", type=datetime.fromisoformat, default=str(datetime.min) @@ -44,6 +45,12 @@ def add_args(subparser: Any): parser.add_argument("--max_retry", type=int, default=30) parser.add_argument("--sleep_step", type=int, default=4) # Add option to output to either json or html + parser.add_argument( + "--match_type", + type=MatchType, + choices=list(MatchType.__members__.values()), + default=MatchType.PREFIX, + ) parser.add_argument("--max_directory_size", type=int, default=1000) parser.add_argument("--filter_non_200", action="store_true", default=True) parser.set_defaults(func=run_download) @@ -85,6 +92,7 @@ def url_download_prepare_streamer( async def url_download( url: str, + match_type: str | None, output: Path, cc_server: List[str] | None, since: datetime, @@ -108,6 +116,7 @@ async def url_download( index_agg = IndexAggregator( cc_servers=cc_server or [], domains=[url], + match_type=match_type, since=since, to=to, limit=limit, @@ -122,6 +131,7 @@ def run_download(args: argparse.Namespace): return asyncio.run( url_download( args.url, + args.match_type, args.output, args.cc_server, args.since, diff --git a/cmoncrawl/integrations/extract.py b/cmoncrawl/integrations/extract.py index ba1ce83f..efadcce5 100644 --- a/cmoncrawl/integrations/extract.py +++ b/cmoncrawl/integrations/extract.py @@ -3,13 +3,15 @@ import json import multiprocessing from pathlib import Path + +from tqdm import tqdm from cmoncrawl.common.types import ExtractConfig from cmoncrawl.processor.pipeline.downloader import DownloaderDummy, AsyncDownloader from cmoncrawl.processor.pipeline.pipeline import ProcessorPipeline from cmoncrawl.middleware.synchronized import extract import argparse -from typing import Any, List +from typing import Any, Dict, List, Tuple import asyncio from cmoncrawl.processor.pipeline.streamer import ( StreamerFileJSON, @@ -68,15 +70,28 @@ def get_extract_downloader( return AsyncDownloader(max_retry=max_retry, sleep_step=sleep_step) -def get_domain_records_json(file_path: Path) -> List[DomainRecord]: +def get_domain_records_json( + file_path: Path, +) -> List[Tuple[DomainRecord, Dict[str, Any]]]: + records: List[Tuple[DomainRecord, Dict[str, Any]]] = [] with open(file_path, "r") as f: - js = [json.loads(line) for line in f.readlines()] - return [DomainRecord.schema().load(record["domain_record"]) for record in js] + for line in tqdm(f): + js = json.loads(line) + domain_record: DomainRecord = DomainRecord.schema().load( + js["domain_record"] + ) + additional_info = js.get("additional_info", {}) + if not isinstance(additional_info, dict): + additional_info = {} + records.append((domain_record, additional_info)) + return records -def get_domain_records_html(url: str | None, date: datetime | None): +def get_domain_records_html( + url: str | None, date: datetime | None +) -> List[Tuple[DomainRecord, Dict[str, Any]]]: # Just return dummy as correct crawl will be loaded from dummy downloader - return [DomainRecord("", url=url, offset=0, length=0, timestamp=date)] + return [DomainRecord("", url=url, offset=0, length=0, timestamp=date), {}] def load_config(config_path: Path) -> ExtractConfig: @@ -111,10 +126,10 @@ async def extract_from_files( for path in files: match mode: case ExtractMode.RECORD: - domain_records = get_domain_records_json(path) + records = get_domain_records_json(path) case ExtractMode.HTML: - domain_records = get_domain_records_html(url, date) - await extract(domain_records, pipeline) + records = get_domain_records_html(url, date) + await extract(records, pipeline) def _extract_task( diff --git a/cmoncrawl/middleware/synchronized.py b/cmoncrawl/middleware/synchronized.py index bff86b44..bb300ea1 100644 --- a/cmoncrawl/middleware/synchronized.py +++ b/cmoncrawl/middleware/synchronized.py @@ -15,6 +15,7 @@ async def index_and_extract( filter_non_unique_url: bool = False, ): processed_urls: Set[str] = set() + total_extracted: int = 0 if hasattr(pipeline.downloader, "__aenter__"): await pipeline.downloader.__aenter__() @@ -26,6 +27,7 @@ async def index_and_extract( continue try: await pipeline.process_domain_record(domain_record, {}) + total_extracted += 1 except KeyboardInterrupt as e: break @@ -39,6 +41,7 @@ async def index_and_extract( finally: if hasattr(pipeline.downloader, "__aexit__"): await pipeline.downloader.__aexit__(None, None, None) + all_purpose_logger.info(f"Extracted {total_extracted} urls") async def _extract_task( @@ -67,6 +70,7 @@ async def extract( ): domain_records_iterator = iter(tqdm(records)) domains_exausted = False + total_extracted: int = 0 if hasattr(pipeline.downloader, "__aenter__"): await pipeline.downloader.__aenter__() try: @@ -92,6 +96,7 @@ async def extract( for task in done: try: await task + total_extracted += 1 except KeyboardInterrupt as e: break @@ -104,3 +109,4 @@ async def extract( finally: if hasattr(pipeline.downloader, "__aexit__"): await pipeline.downloader.__aexit__(None, None, None) + all_purpose_logger.info(f"Extracted {total_extracted} urls")