diff --git a/cmoncrawl/aggregator/gateway_query.py b/cmoncrawl/aggregator/gateway_query.py index a846c098..16d58b2b 100644 --- a/cmoncrawl/aggregator/gateway_query.py +++ b/cmoncrawl/aggregator/gateway_query.py @@ -116,6 +116,7 @@ def __aiter__(self): max_retry=self.max_retry, prefetch_size=self.prefetch_size, sleep_base=self.sleep_base, + throttler=self.throttler, ) self.iterators.append(iterator) return iterator @@ -146,6 +147,7 @@ async def get_number_of_pages( match_type: MatchType | None, max_retry: int, sleep_base: float, + throttler: Throttler, page_size: int | None = None, ) -> int: params: Dict[str, str | int] = { @@ -166,6 +168,7 @@ async def get_number_of_pages( "text/x-ndjson", max_retry=max_retry, sleep_base=sleep_base, + throttler=throttler, log_additional_info={ "type": "num_pages", "domain": domain, @@ -185,6 +188,7 @@ async def get_captured_responses( max_retry: int, sleep_base: float, page: int, + throttler: Throttler, since: datetime = datetime.min, to: datetime = datetime.max, ) -> List[DomainRecord]: @@ -205,6 +209,7 @@ async def get_captured_responses( "text/x-ndjson", max_retry=max_retry, sleep_base=sleep_base, + throttler=throttler, log_additional_info={ "type": "page", "domain": domain, @@ -248,6 +253,7 @@ def __init__( max_retry: int, prefetch_size: int, sleep_base: float, + throttler: Throttler, ): self.__client = client self.__opt_prefetch_size = prefetch_size @@ -260,6 +266,7 @@ def __init__( self.__total = 0 self.__sleep_base = sleep_base self.__match_type = match_type + self.__throttler = throttler self.__crawls_remaining = self.init_crawls_queue(urls, CC_files) @@ -294,6 +301,7 @@ async def __prefetch_next_crawl(self) -> int: match_type=self.__match_type, max_retry=self.__max_retry, sleep_base=self.__sleep_base, + throttler=self.__throttler, ) except Exception as e: all_purpose_logger.error( @@ -318,6 +326,7 @@ async def __prefetch_next_crawl(self) -> int: to=self.__to, max_retry=self.__max_retry, sleep_base=self.__sleep_base, + throttler=self.__throttler, ), ) ) diff --git a/cmoncrawl/aggregator/utils/helpers.py b/cmoncrawl/aggregator/utils/helpers.py index d8529454..2f2891b8 100644 --- a/cmoncrawl/aggregator/utils/helpers.py +++ b/cmoncrawl/aggregator/utils/helpers.py @@ -22,6 +22,7 @@ from cmoncrawl.aggregator.utils import ndjson from cmoncrawl.common.loggers import all_purpose_logger +from cmoncrawl.common.throttling import Throttler ALLOWED_ERR_FOR_RETRIES = [500, 502, 503, 504] @@ -114,6 +115,7 @@ async def retrieve( sleep_base: float, allowed_status_errors: list[int] = ALLOWED_ERR_FOR_RETRIES, log_additional_info: dict[str, Any] = {}, # type: ignore + throttler: Throttler | None = None, ): @retry( stop=stop_after_attempt(max_retry + 1), @@ -122,7 +124,7 @@ async def retrieve( reraise=True, before_sleep=log_after_retry, ) - async def _retrieve( + async def _retrieve_with_throttling( client: ClientSession, cdx_server: str, params: dict[str, Any], @@ -133,39 +135,46 @@ async def _retrieve( all_purpose_logger.debug( f"Sending request to {cdx_server} with params: {params}" ) - try: - async with client.get(cdx_server, params=params) as response: - status = response.status - if not response.ok: - reason = str(response.reason) if response.reason else "Unknown" # type: ignore - if status in allowed_status_errors: - raise DownloadError(reason, status) - raise ValueError( - f"Failed to download {cdx_server} with status {status} and reason {reason}" - ) - else: - if content_type == "text/x-ndjson": - content = await response.json( - content_type=content_type, - loads=ndjson.Decoder().decode, + + async def _request(): + try: + async with client.get(cdx_server, params=params) as response: + status = response.status + if not response.ok: + reason = str(response.reason) if response.reason else "Unknown" # type: ignore + if status in allowed_status_errors: + raise DownloadError(reason, status) + raise ValueError( + f"Failed to download {cdx_server} with status {status} and reason {reason}" ) - elif content_type == "application/json": - content = await response.json(content_type=content_type) else: - raise ValueError(f"Unknown content type: {content_type}") - except ( - ClientError, - TimeoutError, - ServerConnectionError, - ContentTypeError, - ) as e: - reason = f"{type(e)} {str(e)}" - status = 500 - raise DownloadError(reason, status) - - return content - - return await _retrieve( + if content_type == "text/x-ndjson": + content = await response.json( + content_type=content_type, + loads=ndjson.Decoder().decode, + ) + elif content_type == "application/json": + content = await response.json(content_type=content_type) + else: + raise ValueError(f"Unknown content type: {content_type}") + except ( + ClientError, + TimeoutError, + ServerConnectionError, + ContentTypeError, + ) as e: + reason = f"{type(e)} {str(e)}" + status = 500 + raise DownloadError(reason, status) + + return content + + if throttler is not None: + return await throttler.throttle(_request) + else: + return await _request() + + return await _retrieve_with_throttling( client=client, cdx_server=cdx_server, params=params,