Skip to content

Commit

Permalink
Merge pull request #62 from hynky1999/download_update
Browse files Browse the repository at this point in the history
Download update
  • Loading branch information
hynky1999 authored May 11, 2023
2 parents b16f52f + eeaac08 commit 36531a8
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 18 deletions.
40 changes: 32 additions & 8 deletions cmoncrawl/aggregator/index_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@
Type,
)
from cmoncrawl.common.loggers import all_purpose_logger
from cmoncrawl.common.types import DomainRecord, RetrieveResponse, DomainCrawl
from cmoncrawl.common.types import (
DomainRecord,
RetrieveResponse,
DomainCrawl,
MatchType,
)

from aiohttp import ClientError, ClientSession, ContentTypeError
import asyncio
Expand All @@ -31,6 +36,7 @@ def __init__(
self,
domains: List[str],
cc_indexes_server: str = "http://index.commoncrawl.org/collinfo.json",
match_type: MatchType | None = None,
cc_servers: List[str] = [],
since: datetime = datetime.min,
to: datetime = datetime.max,
Expand All @@ -48,6 +54,7 @@ def __init__(
self.max_retry = max_retry
self.prefetch_size = prefetch_size
self.sleep_step = sleep_step
self.match_type = match_type
self.iterators: List[IndexAggregator.IndexAggregatorIterator] = []

async def aopen(self) -> IndexAggregator:
Expand All @@ -68,6 +75,7 @@ def __aiter__(self):
self.client,
self.domains,
self.cc_servers,
match_type=self.match_type,
since=self.since,
to=self.to,
limit=self.limit,
Expand Down Expand Up @@ -110,7 +118,7 @@ async def __retrieve(
**args: Any,
):
def should_retry(retry: int, reason: str, status: int, **args: Any):
all_purpose_logger.error(
all_purpose_logger.warn(
f"Failed to retrieve page of {domain} from {cdx_server} with reason {status}: {reason} retry: {retry + 1}/{max_retry} add_info: {args}"
)
if status not in allowed_status_errors:
Expand All @@ -124,6 +132,9 @@ def should_retry(retry: int, reason: str, status: int, **args: Any):

for retry in range(max_retry):
try:
all_purpose_logger.debug(
f"Sending request to {cdx_server} with params: {params}, retry: {retry + 1}/{max_retry}"
)
async with client.get(cdx_server, params=params) as response:
status = response.status
if not response.ok:
Expand All @@ -139,7 +150,6 @@ def should_retry(retry: int, reason: str, status: int, **args: Any):
all_purpose_logger.error(str(e), exc_info=True)
all_purpose_logger.error(e.message, exc_info=True)
all_purpose_logger.error(response.content)

break
all_purpose_logger.info(
f"Successfully retrieved page of {domain} from {cdx_server} add_info: {args}"
Expand All @@ -159,17 +169,20 @@ async def get_number_of_pages(
client: ClientSession,
cdx_server: str,
domain: str,
match_type: MatchType | None,
max_retry: int,
sleep_step: int,
page_size: int | None = None,
):
params: Dict[str, str | int] = {
"showNumPages": "true",
"output": "json",
"matchType": "domain",
"url": domain,
}

if match_type is not None:
params["matchType"] = match_type.value

if page_size is not None:
params["page_size"] = page_size
response = await IndexAggregator.__retrieve(
Expand All @@ -195,6 +208,7 @@ async def get_captured_responses(
client: ClientSession,
cdx_server: str,
domain: str,
match_type: MatchType | None,
max_retry: int,
sleep_step: int,
page: int,
Expand All @@ -203,12 +217,13 @@ async def get_captured_responses(
):
params: Dict[str, str | int] = {
"output": "json",
"matchType": "domain",
"page": page,
"url": domain,
"from": to_timestamp_format(since),
"to": to_timestamp_format(to),
}
if match_type is not None:
params["matchType"] = match_type.value
reponse = await IndexAggregator.__retrieve(
client,
domain,
Expand Down Expand Up @@ -241,13 +256,18 @@ async def get_all_CC_indexes(client: ClientSession, cdx_server: str) -> List[str
r_json = await response.json(content_type="application/json")
CC_servers = [js["cdx-api"] for js in r_json]
return CC_servers
all_purpose_logger.error(
f"Failed to get CC servers from {cdx_server} after 3 attempts"
)
return []

class IndexAggregatorIterator(AsyncIterator[DomainRecord]):
def __init__(
self,
client: ClientSession,
domains: List[str],
CC_files: List[str],
match_type: MatchType | None,
since: datetime,
to: datetime,
limit: int | None,
Expand All @@ -268,6 +288,7 @@ def __init__(
self.__max_retry = max_retry
self.__total = 0
self.__sleep_step = sleep_step
self.__match_type = match_type

self.__crawls_remaining = self.init_crawls_queue(domains, CC_files)

Expand All @@ -294,6 +315,7 @@ async def __prefetch_next_crawl(self) -> int:
self.__client,
next_crawl.cdx_server,
next_crawl.domain,
match_type=self.__match_type,
max_retry=self.__max_retry,
sleep_step=self.__sleep_step,
)
Expand Down Expand Up @@ -344,12 +366,13 @@ async def __await_next_prefetch(self):
retry < _max_retry
and response.status in ALLOWED_ERR_FOR_RETRIES
):
all_purpose_logger.info(
f"Retrying {dc.domain} of {dc.cdx_server} retry {retry + 1}/{_max_retry}"
)
self.prefetch_queue.add(
asyncio.create_task(self.__fetch_next_dc(dc, retry + 1))
)
else:
all_purpose_logger.error(
f"Failed to fetch {dc.domain} of {dc.cdx_server} with status {response.status}"
)

# Nothing more to prefetch

Expand Down Expand Up @@ -387,6 +410,7 @@ async def __fetch_next_dc(self, dc: DomainCrawl, retry: int):
self.__client,
dc.cdx_server,
dc.domain,
match_type=self.__match_type,
page=dc.page,
since=self.__since,
to=self.__to,
Expand Down
12 changes: 12 additions & 0 deletions cmoncrawl/common/types.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List
from urllib.parse import urlparse
Expand Down Expand Up @@ -93,3 +94,14 @@ class ExtractConfig:

extractors_path: Path
routes: List[RoutesConfig]


class MatchType(Enum):
"""
Match type for cdx server.
"""

EXACT = "exact"
PREFIX = "prefix"
HOST = "host"
DOMAIN = "domain"
12 changes: 11 additions & 1 deletion cmoncrawl/integrations/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from pathlib import Path
from typing import Any, List
from cmoncrawl.aggregator.index_query import IndexAggregator
from cmoncrawl.common.types import MatchType
from cmoncrawl.processor.pipeline.downloader import AsyncDownloader
from cmoncrawl.processor.pipeline.pipeline import ProcessorPipeline
from cmoncrawl.processor.pipeline.streamer import StreamerFileHTML
Expand Down Expand Up @@ -32,9 +33,9 @@ def add_mode_args(subparser: Any):
def add_args(subparser: Any):
parser = subparser.add_parser("download")
parser.add_argument("url")
parser.add_argument("output", type=Path)
mode_subparser = parser.add_subparsers(dest="mode", required=True)
mode_subparser = add_mode_args(mode_subparser)
parser.add_argument("output", type=Path)
parser.add_argument("--limit", type=int, default=5)
parser.add_argument(
"--since", type=datetime.fromisoformat, default=str(datetime.min)
Expand All @@ -44,6 +45,12 @@ def add_args(subparser: Any):
parser.add_argument("--max_retry", type=int, default=30)
parser.add_argument("--sleep_step", type=int, default=4)
# Add option to output to either json or html
parser.add_argument(
"--match_type",
type=MatchType,
choices=list(MatchType.__members__.values()),
default=MatchType.PREFIX,
)
parser.add_argument("--max_directory_size", type=int, default=1000)
parser.add_argument("--filter_non_200", action="store_true", default=True)
parser.set_defaults(func=run_download)
Expand Down Expand Up @@ -85,6 +92,7 @@ def url_download_prepare_streamer(

async def url_download(
url: str,
match_type: str | None,
output: Path,
cc_server: List[str] | None,
since: datetime,
Expand All @@ -108,6 +116,7 @@ async def url_download(
index_agg = IndexAggregator(
cc_servers=cc_server or [],
domains=[url],
match_type=match_type,
since=since,
to=to,
limit=limit,
Expand All @@ -122,6 +131,7 @@ def run_download(args: argparse.Namespace):
return asyncio.run(
url_download(
args.url,
args.match_type,
args.output,
args.cc_server,
args.since,
Expand Down
33 changes: 24 additions & 9 deletions cmoncrawl/integrations/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
import json
import multiprocessing
from pathlib import Path

from tqdm import tqdm
from cmoncrawl.common.types import ExtractConfig

from cmoncrawl.processor.pipeline.downloader import DownloaderDummy, AsyncDownloader
from cmoncrawl.processor.pipeline.pipeline import ProcessorPipeline
from cmoncrawl.middleware.synchronized import extract
import argparse
from typing import Any, List
from typing import Any, Dict, List, Tuple
import asyncio
from cmoncrawl.processor.pipeline.streamer import (
StreamerFileJSON,
Expand Down Expand Up @@ -68,15 +70,28 @@ def get_extract_downloader(
return AsyncDownloader(max_retry=max_retry, sleep_step=sleep_step)


def get_domain_records_json(file_path: Path) -> List[DomainRecord]:
def get_domain_records_json(
file_path: Path,
) -> List[Tuple[DomainRecord, Dict[str, Any]]]:
records: List[Tuple[DomainRecord, Dict[str, Any]]] = []
with open(file_path, "r") as f:
js = [json.loads(line) for line in f.readlines()]
return [DomainRecord.schema().load(record["domain_record"]) for record in js]
for line in tqdm(f):
js = json.loads(line)
domain_record: DomainRecord = DomainRecord.schema().load(
js["domain_record"]
)
additional_info = js.get("additional_info", {})
if not isinstance(additional_info, dict):
additional_info = {}
records.append((domain_record, additional_info))
return records


def get_domain_records_html(url: str | None, date: datetime | None):
def get_domain_records_html(
url: str | None, date: datetime | None
) -> List[Tuple[DomainRecord, Dict[str, Any]]]:
# Just return dummy as correct crawl will be loaded from dummy downloader
return [DomainRecord("", url=url, offset=0, length=0, timestamp=date)]
return [DomainRecord("", url=url, offset=0, length=0, timestamp=date), {}]


def load_config(config_path: Path) -> ExtractConfig:
Expand Down Expand Up @@ -111,10 +126,10 @@ async def extract_from_files(
for path in files:
match mode:
case ExtractMode.RECORD:
domain_records = get_domain_records_json(path)
records = get_domain_records_json(path)
case ExtractMode.HTML:
domain_records = get_domain_records_html(url, date)
await extract(domain_records, pipeline)
records = get_domain_records_html(url, date)
await extract(records, pipeline)


def _extract_task(
Expand Down
6 changes: 6 additions & 0 deletions cmoncrawl/middleware/synchronized.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ async def index_and_extract(
filter_non_unique_url: bool = False,
):
processed_urls: Set[str] = set()
total_extracted: int = 0

if hasattr(pipeline.downloader, "__aenter__"):
await pipeline.downloader.__aenter__()
Expand All @@ -26,6 +27,7 @@ async def index_and_extract(
continue
try:
await pipeline.process_domain_record(domain_record, {})
total_extracted += 1
except KeyboardInterrupt as e:
break

Expand All @@ -39,6 +41,7 @@ async def index_and_extract(
finally:
if hasattr(pipeline.downloader, "__aexit__"):
await pipeline.downloader.__aexit__(None, None, None)
all_purpose_logger.info(f"Extracted {total_extracted} urls")


async def _extract_task(
Expand Down Expand Up @@ -67,6 +70,7 @@ async def extract(
):
domain_records_iterator = iter(tqdm(records))
domains_exausted = False
total_extracted: int = 0
if hasattr(pipeline.downloader, "__aenter__"):
await pipeline.downloader.__aenter__()
try:
Expand All @@ -92,6 +96,7 @@ async def extract(
for task in done:
try:
await task
total_extracted += 1
except KeyboardInterrupt as e:
break

Expand All @@ -104,3 +109,4 @@ async def extract(
finally:
if hasattr(pipeline.downloader, "__aexit__"):
await pipeline.downloader.__aexit__(None, None, None)
all_purpose_logger.info(f"Extracted {total_extracted} urls")

0 comments on commit 36531a8

Please sign in to comment.