Skip to content

Commit

Permalink
Merge pull request #103 from hynky1999/fix/throttle_query_requests
Browse files Browse the repository at this point in the history
Throttling of query requests
  • Loading branch information
hynky1999 authored Feb 14, 2024
2 parents bc54438 + f85c8e2 commit 28f0a9e
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 32 deletions.
9 changes: 9 additions & 0 deletions cmoncrawl/aggregator/gateway_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ def __aiter__(self):
max_retry=self.max_retry,
prefetch_size=self.prefetch_size,
sleep_base=self.sleep_base,
throttler=self.throttler,
)
self.iterators.append(iterator)
return iterator
Expand Down Expand Up @@ -146,6 +147,7 @@ async def get_number_of_pages(
match_type: MatchType | None,
max_retry: int,
sleep_base: float,
throttler: Throttler,
page_size: int | None = None,
) -> int:
params: Dict[str, str | int] = {
Expand All @@ -166,6 +168,7 @@ async def get_number_of_pages(
"text/x-ndjson",
max_retry=max_retry,
sleep_base=sleep_base,
throttler=throttler,
log_additional_info={
"type": "num_pages",
"domain": domain,
Expand All @@ -185,6 +188,7 @@ async def get_captured_responses(
max_retry: int,
sleep_base: float,
page: int,
throttler: Throttler,
since: datetime = datetime.min,
to: datetime = datetime.max,
) -> List[DomainRecord]:
Expand All @@ -205,6 +209,7 @@ async def get_captured_responses(
"text/x-ndjson",
max_retry=max_retry,
sleep_base=sleep_base,
throttler=throttler,
log_additional_info={
"type": "page",
"domain": domain,
Expand Down Expand Up @@ -248,6 +253,7 @@ def __init__(
max_retry: int,
prefetch_size: int,
sleep_base: float,
throttler: Throttler,
):
self.__client = client
self.__opt_prefetch_size = prefetch_size
Expand All @@ -260,6 +266,7 @@ def __init__(
self.__total = 0
self.__sleep_base = sleep_base
self.__match_type = match_type
self.__throttler = throttler

self.__crawls_remaining = self.init_crawls_queue(urls, CC_files)

Expand Down Expand Up @@ -294,6 +301,7 @@ async def __prefetch_next_crawl(self) -> int:
match_type=self.__match_type,
max_retry=self.__max_retry,
sleep_base=self.__sleep_base,
throttler=self.__throttler,
)
except Exception as e:
all_purpose_logger.error(
Expand All @@ -318,6 +326,7 @@ async def __prefetch_next_crawl(self) -> int:
to=self.__to,
max_retry=self.__max_retry,
sleep_base=self.__sleep_base,
throttler=self.__throttler,
),
)
)
Expand Down
73 changes: 41 additions & 32 deletions cmoncrawl/aggregator/utils/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from cmoncrawl.aggregator.utils import ndjson
from cmoncrawl.common.loggers import all_purpose_logger
from cmoncrawl.common.throttling import Throttler

ALLOWED_ERR_FOR_RETRIES = [500, 502, 503, 504]

Expand Down Expand Up @@ -114,6 +115,7 @@ async def retrieve(
sleep_base: float,
allowed_status_errors: list[int] = ALLOWED_ERR_FOR_RETRIES,
log_additional_info: dict[str, Any] = {}, # type: ignore
throttler: Throttler | None = None,
):
@retry(
stop=stop_after_attempt(max_retry + 1),
Expand All @@ -122,7 +124,7 @@ async def retrieve(
reraise=True,
before_sleep=log_after_retry,
)
async def _retrieve(
async def _retrieve_with_throttling(
client: ClientSession,
cdx_server: str,
params: dict[str, Any],
Expand All @@ -133,39 +135,46 @@ async def _retrieve(
all_purpose_logger.debug(
f"Sending request to {cdx_server} with params: {params}"
)
try:
async with client.get(cdx_server, params=params) as response:
status = response.status
if not response.ok:
reason = str(response.reason) if response.reason else "Unknown" # type: ignore
if status in allowed_status_errors:
raise DownloadError(reason, status)
raise ValueError(
f"Failed to download {cdx_server} with status {status} and reason {reason}"
)
else:
if content_type == "text/x-ndjson":
content = await response.json(
content_type=content_type,
loads=ndjson.Decoder().decode,

async def _request():
try:
async with client.get(cdx_server, params=params) as response:
status = response.status
if not response.ok:
reason = str(response.reason) if response.reason else "Unknown" # type: ignore
if status in allowed_status_errors:
raise DownloadError(reason, status)
raise ValueError(
f"Failed to download {cdx_server} with status {status} and reason {reason}"
)
elif content_type == "application/json":
content = await response.json(content_type=content_type)
else:
raise ValueError(f"Unknown content type: {content_type}")
except (
ClientError,
TimeoutError,
ServerConnectionError,
ContentTypeError,
) as e:
reason = f"{type(e)} {str(e)}"
status = 500
raise DownloadError(reason, status)

return content

return await _retrieve(
if content_type == "text/x-ndjson":
content = await response.json(
content_type=content_type,
loads=ndjson.Decoder().decode,
)
elif content_type == "application/json":
content = await response.json(content_type=content_type)
else:
raise ValueError(f"Unknown content type: {content_type}")
except (
ClientError,
TimeoutError,
ServerConnectionError,
ContentTypeError,
) as e:
reason = f"{type(e)} {str(e)}"
status = 500
raise DownloadError(reason, status)

return content

if throttler is not None:
return await throttler.throttle(_request)
else:
return await _request()

return await _retrieve_with_throttling(
client=client,
cdx_server=cdx_server,
params=params,
Expand Down
4 changes: 4 additions & 0 deletions tests/gateway_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from cmoncrawl.aggregator.gateway_query import GatewayAggregator
from cmoncrawl.aggregator.utils.constants import CC_INDEXES_SERVER
from cmoncrawl.aggregator.utils.helpers import get_all_CC_indexes, unify_url_id
from cmoncrawl.common.throttling import Throttler


class TestIndexerAsync(unittest.IsolatedAsyncioTestCase):
Expand All @@ -23,6 +24,7 @@ async def asyncSetUp(self) -> None:
match_type=MatchType.DOMAIN,
).aopen()
self.client = self.di.client
self.throttler = Throttler(1)

async def asyncTearDown(self) -> None:
await self.di.aclose(None, None, None)
Expand All @@ -35,6 +37,7 @@ async def test_indexer_num_pages(self):
max_retry=20,
sleep_base=1.4,
match_type=MatchType.DOMAIN,
throttler=self.throttler,
)
self.assertEqual(num_pages, 14)

Expand Down Expand Up @@ -82,6 +85,7 @@ async def test_init_queue_since_to(self):
sleep_base=4,
prefetch_size=2,
match_type=MatchType.DOMAIN,
throttler=self.throttler,
)
self.assertIsNotNone(self.di.cc_servers)
# Generates only for 2020
Expand Down

0 comments on commit 28f0a9e

Please sign in to comment.