Skip to content

Commit

Permalink
Merge pull request #68 from hynky1999/optional_decode
Browse files Browse the repository at this point in the history
encoding + typing + better index retry
  • Loading branch information
hynky1999 authored May 22, 2023
2 parents 006af0d + a86aa51 commit 299afa5
Show file tree
Hide file tree
Showing 17 changed files with 266 additions and 121 deletions.
3 changes: 0 additions & 3 deletions .flake8

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,23 @@ env:


jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ env.PYTHON_VERSION }}
uses: actions/setup-python@v2
with:
python-version: ${{ env.PYTHON_VERSION }}
- name: Install dependencies
run: |
pip install -r requirements.txt
- name: Run tests
run: |
pytest
publish:
runs-on: ubuntu-latest
steps:
Expand Down
50 changes: 50 additions & 0 deletions .github/workflows/test_and_types.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
name: Test on main branch
env:
PYTHON_VERSION: "3.10"

on:
push:
branches:
- main
pull_request:
branches:
- main

jobs:
test:
runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v2

- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: ${{ env.PYTHON_VERSION }}

- name: Install dependencies
run: pip install -r requirements.txt # Replace with your dependencies installation command

- name: Run tests
run: python -m unittest discover -s tests -p "*_tests.py" # Replace with your test command
lint_and_types:
runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v2

- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: ${{ env.PYTHON_VERSION }}

- name: Install dependencies
run: pip install -r requirements.txt # Replace with your dependencies installation command

- name: Lint with pyright
run: pip install pyright && pyright

- name: Lint with black
run: pip install black && black --diff .
84 changes: 45 additions & 39 deletions cmoncrawl/aggregator/index_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
from datetime import datetime
import re

from cmoncrawl.aggregator.utils.ndjson_decoder import Decoder
from cmoncrawl.aggregator.utils import ndjson
import json
from types import TracebackType
from typing import (
Any,
AsyncIterable,
AsyncIterator,
Awaitable,
Callable,
Deque,
List,
Dict,
Expand All @@ -24,7 +27,13 @@
MatchType,
)

from aiohttp import ClientError, ClientSession, ContentTypeError, ServerConnectionError
from aiohttp import (
ClientError,
ClientResponse,
ClientSession,
ContentTypeError,
ServerConnectionError,
)
import asyncio
import random

Expand Down Expand Up @@ -156,6 +165,16 @@ def should_retry(retry: int, reason: str, status: int, **args: Any):
status = 0
content = None
reason: str | None = None
decode: Callable[[ClientResponse], Awaitable[Any]]

if content_type == "text/x-ndjson":
decode = lambda response: response.json(
content_type=content_type, loads=ndjson.Decoder().decode
)
elif content_type == "application/json":
decode = lambda response: response.json(content_type=content_type)
else:
raise ValueError(f"Unknown content type: {content_type}")

for retry in range(max_retry):
try:
Expand All @@ -165,13 +184,11 @@ def should_retry(retry: int, reason: str, status: int, **args: Any):
async with client.get(cdx_server, params=params) as response:
status = response.status
if not response.ok:
reason = response.reason if response.reason else "Unknown"
reason = str(response.reason) if response.reason else "Unknown" # type: ignore
if not should_retry(retry, reason, status, **args):
break
else:
content = await response.json(
content_type=content_type, loads=Decoder().decode
)
content = await decode(response)
all_purpose_logger.info(
f"Successfully retrieved page of {domain} from {cdx_server} add_info: {args}"
)
Expand All @@ -184,7 +201,8 @@ def should_retry(retry: int, reason: str, status: int, **args: Any):
ContentTypeError,
) as e:
reason = f"{type(e)} {str(e)}"
if not should_retry(retry, reason, 500, **args):
status = 500
if not should_retry(retry, reason, status, **args):
break

await asyncio.sleep(random.randint(0, (retry + 1) * sleep_step))
Expand Down Expand Up @@ -280,15 +298,22 @@ async def get_all_CC_indexes(client: ClientSession, cdx_server: str) -> List[str
"""
Get all CC index servers from a given CDX server
"""
for _ in range(3):
async with client.get(cdx_server) as response:
r_json = await response.json(content_type="application/json")
CC_servers = [js["cdx-api"] for js in r_json]
return CC_servers
all_purpose_logger.error(
f"Failed to get CC servers from {cdx_server} after 3 attempts"
response = await IndexAggregator.__retrieve(
client=client,
cdx_server=cdx_server,
domain=cdx_server,
params={},
content_type="application/json",
max_retry=10,
sleep_step=1,
)
return []
if response.content is None:
all_purpose_logger.error(
f"Failed to get CC servers from {cdx_server} after 3 attempts"
)
return []
CC_servers = [js["cdx-api"] for js in response.content]
return CC_servers

class IndexAggregatorIterator(AsyncIterator[DomainRecord]):
def __init__(
Expand All @@ -309,7 +334,7 @@ def __init__(
self.__opt_prefetch_size = prefetch_size
self.__domain_records: Deque[DomainRecord] = deque()
self.prefetch_queue: Set[
asyncio.Task[Tuple[RetrieveResponse, DomainCrawl, int]]
asyncio.Task[Tuple[RetrieveResponse, DomainCrawl]]
] = set()
self.__since = since
self.__to = to
Expand Down Expand Up @@ -359,7 +384,7 @@ async def __prefetch_next_crawl(self) -> int:
for i in range(pages):
dc = DomainCrawl(next_crawl.domain, next_crawl.cdx_server, i)
self.prefetch_queue.add(
asyncio.create_task(self.__fetch_next_dc(dc, 0))
asyncio.create_task(self.__fetch_next_dc(dc))
)
return pages
return 0
Expand All @@ -384,30 +409,13 @@ async def __await_next_prefetch(self):
self.prefetch_queue, return_when="FIRST_COMPLETED"
)
for task in done:
response, dc, retry = task.result()
response, dc = task.result()

if response.content is not None:
self.__domain_records.extend(response.content)
all_purpose_logger.info(
f"Found {len(response.content)} records for {dc.domain} of {dc.cdx_server}"
)
else:
_max_retry = (
self.__max_retry
if response.status in ALLOWED_ERR_FOR_RETRIES
else retry
)
if (
retry < _max_retry
and response.status in ALLOWED_ERR_FOR_RETRIES
):
self.prefetch_queue.add(
asyncio.create_task(self.__fetch_next_dc(dc, retry + 1))
)
else:
all_purpose_logger.error(
f"Failed to fetch {dc.domain} of {dc.cdx_server} with status {response.status}"
)

# Nothing more to prefetch

Expand Down Expand Up @@ -438,8 +446,7 @@ def clean(self):
for task in self.prefetch_queue:
task.cancel()

async def __fetch_next_dc(self, dc: DomainCrawl, retry: int):
await asyncio.sleep(random.randint(0, self.__sleep_step * (retry)))
async def __fetch_next_dc(self, dc: DomainCrawl):
return (
await IndexAggregator.get_captured_responses(
self.__client,
Expand All @@ -449,11 +456,10 @@ async def __fetch_next_dc(self, dc: DomainCrawl, retry: int):
page=dc.page,
since=self.__since,
to=self.__to,
max_retry=1,
max_retry=self.__max_retry,
sleep_step=self.__sleep_step,
),
dc,
retry,
)


Expand Down
File renamed without changes.
49 changes: 31 additions & 18 deletions cmoncrawl/integrations/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,17 @@ def add_mode_args(subparser: Any):
default=500_000,
help="Max number of domain records per file output",
)
subparser.add_parser(
html_parser = subparser.add_parser(
DownloadOutputFormat.HTML.value, help="Download HTML files from Common Crawl"
)

html_parser.add_argument(
"--encoding",
type=str,
default=None,
help="Force usage of specified encoding is possible",
)

return subparser


Expand Down Expand Up @@ -105,7 +113,7 @@ def add_args(subparser: Any):


def url_download_prepare_router(
output_format: DownloadOutputFormat, filter_non_200: bool
output_format: DownloadOutputFormat, filter_non_200: bool, encoding: str | None
):
router = Router()
match output_format:
Expand All @@ -115,7 +123,8 @@ def url_download_prepare_router(
)
case DownloadOutputFormat.HTML:
router.load_extractor(
"dummy_extractor", HTMLExtractor(filter_non_ok=filter_non_200)
"dummy_extractor",
HTMLExtractor(filter_non_ok=filter_non_200, encoding=encoding),
)
router.register_route("dummy_extractor", [r".*"])
return router
Expand All @@ -140,7 +149,7 @@ def url_download_prepare_streamer(

async def url_download(
url: str,
match_type: str | None,
match_type: MatchType | None,
output: Path,
cc_server: List[str] | None,
since: datetime,
Expand All @@ -152,11 +161,12 @@ async def url_download(
max_crawls_per_file: int,
max_directory_size: int,
filter_non_200: bool,
encoding: str | None,
):
outstreamer = url_download_prepare_streamer(
mode, output, max_directory_size, max_crawls_per_file
)
router = url_download_prepare_router(mode, filter_non_200)
router = url_download_prepare_router(mode, filter_non_200, encoding)
pipeline = ProcessorPipeline(
router, AsyncDownloader(max_retry=max_retry), outstreamer
)
Expand All @@ -178,18 +188,21 @@ def run_download(args: argparse.Namespace):
mode = DownloadOutputFormat(args.mode)
return asyncio.run(
url_download(
args.url,
args.match_type,
args.output,
args.cc_server,
args.since,
args.to,
args.limit,
args.max_retry,
args.sleep_step,
mode,
args.max_crawls_per_file if mode == DownloadOutputFormat.RECORD else 1,
args.max_directory_size,
args.filter_non_200,
url=args.url,
match_type=args.match_type,
output=args.output,
cc_server=args.cc_server,
since=args.since,
to=args.to,
limit=args.limit,
max_retry=args.max_retry,
sleep_step=args.sleep_step,
mode=mode,
max_crawls_per_file=args.max_crawls_per_file
if mode == DownloadOutputFormat.RECORD
else 1,
max_directory_size=args.max_directory_size,
filter_non_200=args.filter_non_200,
encoding=args.encoding if mode == DownloadOutputFormat.HTML else None,
)
)
6 changes: 3 additions & 3 deletions cmoncrawl/integrations/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,13 @@ def get_domain_records_json(
with open(file_path, "r") as f:
for line in tqdm(f):
js = json.loads(line)
domain_record: DomainRecord = DomainRecord.schema().load(
domain_record: DomainRecord = DomainRecord.schema().load( # type: ignore
js["domain_record"]
)
additional_info = js.get("additional_info", {})
if not isinstance(additional_info, dict):
additional_info = {}
records.append((domain_record, additional_info))
records.append((domain_record, additional_info)) # type: ignore
return records


Expand All @@ -139,7 +139,7 @@ def get_domain_records_html(
def load_config(config_path: Path) -> ExtractConfig:
with open(config_path, "r") as f:
config = json.load(f)
return ExtractConfig.schema().load(config)
return ExtractConfig.schema().load(config) # type: ignore


def create_router(config: ExtractConfig) -> Router:
Expand Down
Loading

0 comments on commit 299afa5

Please sign in to comment.