diff --git a/.flake8 b/.flake8 deleted file mode 100644 index 82de7a13..00000000 --- a/.flake8 +++ /dev/null @@ -1,3 +0,0 @@ -[flake8] -max-line-length = 80 -max-complexity = 18 diff --git a/.github/workflows/test_and_push.yml b/.github/workflows/release.yml similarity index 61% rename from .github/workflows/test_and_push.yml rename to .github/workflows/release.yml index 491384a1..35675b81 100644 --- a/.github/workflows/test_and_push.yml +++ b/.github/workflows/release.yml @@ -11,6 +11,23 @@ env: jobs: + test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - name: Set up Python ${{ env.PYTHON_VERSION }} + uses: actions/setup-python@v2 + with: + python-version: ${{ env.PYTHON_VERSION }} + - name: Install dependencies + run: | + pip install -r requirements.txt + - name: Run tests + run: | + pytest + + + publish: runs-on: ubuntu-latest steps: diff --git a/.github/workflows/test_and_types.yml b/.github/workflows/test_and_types.yml new file mode 100644 index 00000000..ace597f5 --- /dev/null +++ b/.github/workflows/test_and_types.yml @@ -0,0 +1,50 @@ +name: Test on main branch +env: + PYTHON_VERSION: "3.10" + +on: + push: + branches: + - main + pull_request: + branches: + - main + +jobs: + test: + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v2 + + - name: Set up Python + uses: actions/setup-python@v2 + with: + python-version: ${{ env.PYTHON_VERSION }} + + - name: Install dependencies + run: pip install -r requirements.txt # Replace with your dependencies installation command + + - name: Run tests + run: python -m unittest discover -s tests -p "*_tests.py" # Replace with your test command + lint_and_types: + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v2 + + - name: Set up Python + uses: actions/setup-python@v2 + with: + python-version: ${{ env.PYTHON_VERSION }} + + - name: Install dependencies + run: pip install -r requirements.txt # Replace with your dependencies installation command + + - name: Lint with pyright + run: pip install pyright && pyright + + - name: Lint with black + run: pip install black && black --diff . diff --git a/cmoncrawl/aggregator/index_query.py b/cmoncrawl/aggregator/index_query.py index afc53b1d..559cc3d9 100644 --- a/cmoncrawl/aggregator/index_query.py +++ b/cmoncrawl/aggregator/index_query.py @@ -3,12 +3,15 @@ from datetime import datetime import re -from cmoncrawl.aggregator.utils.ndjson_decoder import Decoder +from cmoncrawl.aggregator.utils import ndjson +import json from types import TracebackType from typing import ( Any, AsyncIterable, AsyncIterator, + Awaitable, + Callable, Deque, List, Dict, @@ -24,7 +27,13 @@ MatchType, ) -from aiohttp import ClientError, ClientSession, ContentTypeError, ServerConnectionError +from aiohttp import ( + ClientError, + ClientResponse, + ClientSession, + ContentTypeError, + ServerConnectionError, +) import asyncio import random @@ -156,6 +165,16 @@ def should_retry(retry: int, reason: str, status: int, **args: Any): status = 0 content = None reason: str | None = None + decode: Callable[[ClientResponse], Awaitable[Any]] + + if content_type == "text/x-ndjson": + decode = lambda response: response.json( + content_type=content_type, loads=ndjson.Decoder().decode + ) + elif content_type == "application/json": + decode = lambda response: response.json(content_type=content_type) + else: + raise ValueError(f"Unknown content type: {content_type}") for retry in range(max_retry): try: @@ -165,13 +184,11 @@ def should_retry(retry: int, reason: str, status: int, **args: Any): async with client.get(cdx_server, params=params) as response: status = response.status if not response.ok: - reason = response.reason if response.reason else "Unknown" + reason = str(response.reason) if response.reason else "Unknown" # type: ignore if not should_retry(retry, reason, status, **args): break else: - content = await response.json( - content_type=content_type, loads=Decoder().decode - ) + content = await decode(response) all_purpose_logger.info( f"Successfully retrieved page of {domain} from {cdx_server} add_info: {args}" ) @@ -184,7 +201,8 @@ def should_retry(retry: int, reason: str, status: int, **args: Any): ContentTypeError, ) as e: reason = f"{type(e)} {str(e)}" - if not should_retry(retry, reason, 500, **args): + status = 500 + if not should_retry(retry, reason, status, **args): break await asyncio.sleep(random.randint(0, (retry + 1) * sleep_step)) @@ -280,15 +298,22 @@ async def get_all_CC_indexes(client: ClientSession, cdx_server: str) -> List[str """ Get all CC index servers from a given CDX server """ - for _ in range(3): - async with client.get(cdx_server) as response: - r_json = await response.json(content_type="application/json") - CC_servers = [js["cdx-api"] for js in r_json] - return CC_servers - all_purpose_logger.error( - f"Failed to get CC servers from {cdx_server} after 3 attempts" + response = await IndexAggregator.__retrieve( + client=client, + cdx_server=cdx_server, + domain=cdx_server, + params={}, + content_type="application/json", + max_retry=10, + sleep_step=1, ) - return [] + if response.content is None: + all_purpose_logger.error( + f"Failed to get CC servers from {cdx_server} after 3 attempts" + ) + return [] + CC_servers = [js["cdx-api"] for js in response.content] + return CC_servers class IndexAggregatorIterator(AsyncIterator[DomainRecord]): def __init__( @@ -309,7 +334,7 @@ def __init__( self.__opt_prefetch_size = prefetch_size self.__domain_records: Deque[DomainRecord] = deque() self.prefetch_queue: Set[ - asyncio.Task[Tuple[RetrieveResponse, DomainCrawl, int]] + asyncio.Task[Tuple[RetrieveResponse, DomainCrawl]] ] = set() self.__since = since self.__to = to @@ -359,7 +384,7 @@ async def __prefetch_next_crawl(self) -> int: for i in range(pages): dc = DomainCrawl(next_crawl.domain, next_crawl.cdx_server, i) self.prefetch_queue.add( - asyncio.create_task(self.__fetch_next_dc(dc, 0)) + asyncio.create_task(self.__fetch_next_dc(dc)) ) return pages return 0 @@ -384,30 +409,13 @@ async def __await_next_prefetch(self): self.prefetch_queue, return_when="FIRST_COMPLETED" ) for task in done: - response, dc, retry = task.result() + response, dc = task.result() if response.content is not None: self.__domain_records.extend(response.content) all_purpose_logger.info( f"Found {len(response.content)} records for {dc.domain} of {dc.cdx_server}" ) - else: - _max_retry = ( - self.__max_retry - if response.status in ALLOWED_ERR_FOR_RETRIES - else retry - ) - if ( - retry < _max_retry - and response.status in ALLOWED_ERR_FOR_RETRIES - ): - self.prefetch_queue.add( - asyncio.create_task(self.__fetch_next_dc(dc, retry + 1)) - ) - else: - all_purpose_logger.error( - f"Failed to fetch {dc.domain} of {dc.cdx_server} with status {response.status}" - ) # Nothing more to prefetch @@ -438,8 +446,7 @@ def clean(self): for task in self.prefetch_queue: task.cancel() - async def __fetch_next_dc(self, dc: DomainCrawl, retry: int): - await asyncio.sleep(random.randint(0, self.__sleep_step * (retry))) + async def __fetch_next_dc(self, dc: DomainCrawl): return ( await IndexAggregator.get_captured_responses( self.__client, @@ -449,11 +456,10 @@ async def __fetch_next_dc(self, dc: DomainCrawl, retry: int): page=dc.page, since=self.__since, to=self.__to, - max_retry=1, + max_retry=self.__max_retry, sleep_step=self.__sleep_step, ), dc, - retry, ) diff --git a/cmoncrawl/aggregator/utils/ndjson_decoder.py b/cmoncrawl/aggregator/utils/ndjson.py similarity index 100% rename from cmoncrawl/aggregator/utils/ndjson_decoder.py rename to cmoncrawl/aggregator/utils/ndjson.py diff --git a/cmoncrawl/integrations/download.py b/cmoncrawl/integrations/download.py index faeb3e1e..8db30629 100644 --- a/cmoncrawl/integrations/download.py +++ b/cmoncrawl/integrations/download.py @@ -34,9 +34,17 @@ def add_mode_args(subparser: Any): default=500_000, help="Max number of domain records per file output", ) - subparser.add_parser( + html_parser = subparser.add_parser( DownloadOutputFormat.HTML.value, help="Download HTML files from Common Crawl" ) + + html_parser.add_argument( + "--encoding", + type=str, + default=None, + help="Force usage of specified encoding is possible", + ) + return subparser @@ -105,7 +113,7 @@ def add_args(subparser: Any): def url_download_prepare_router( - output_format: DownloadOutputFormat, filter_non_200: bool + output_format: DownloadOutputFormat, filter_non_200: bool, encoding: str | None ): router = Router() match output_format: @@ -115,7 +123,8 @@ def url_download_prepare_router( ) case DownloadOutputFormat.HTML: router.load_extractor( - "dummy_extractor", HTMLExtractor(filter_non_ok=filter_non_200) + "dummy_extractor", + HTMLExtractor(filter_non_ok=filter_non_200, encoding=encoding), ) router.register_route("dummy_extractor", [r".*"]) return router @@ -140,7 +149,7 @@ def url_download_prepare_streamer( async def url_download( url: str, - match_type: str | None, + match_type: MatchType | None, output: Path, cc_server: List[str] | None, since: datetime, @@ -152,11 +161,12 @@ async def url_download( max_crawls_per_file: int, max_directory_size: int, filter_non_200: bool, + encoding: str | None, ): outstreamer = url_download_prepare_streamer( mode, output, max_directory_size, max_crawls_per_file ) - router = url_download_prepare_router(mode, filter_non_200) + router = url_download_prepare_router(mode, filter_non_200, encoding) pipeline = ProcessorPipeline( router, AsyncDownloader(max_retry=max_retry), outstreamer ) @@ -178,18 +188,21 @@ def run_download(args: argparse.Namespace): mode = DownloadOutputFormat(args.mode) return asyncio.run( url_download( - args.url, - args.match_type, - args.output, - args.cc_server, - args.since, - args.to, - args.limit, - args.max_retry, - args.sleep_step, - mode, - args.max_crawls_per_file if mode == DownloadOutputFormat.RECORD else 1, - args.max_directory_size, - args.filter_non_200, + url=args.url, + match_type=args.match_type, + output=args.output, + cc_server=args.cc_server, + since=args.since, + to=args.to, + limit=args.limit, + max_retry=args.max_retry, + sleep_step=args.sleep_step, + mode=mode, + max_crawls_per_file=args.max_crawls_per_file + if mode == DownloadOutputFormat.RECORD + else 1, + max_directory_size=args.max_directory_size, + filter_non_200=args.filter_non_200, + encoding=args.encoding if mode == DownloadOutputFormat.HTML else None, ) ) diff --git a/cmoncrawl/integrations/extract.py b/cmoncrawl/integrations/extract.py index 25bad429..eafab473 100644 --- a/cmoncrawl/integrations/extract.py +++ b/cmoncrawl/integrations/extract.py @@ -119,13 +119,13 @@ def get_domain_records_json( with open(file_path, "r") as f: for line in tqdm(f): js = json.loads(line) - domain_record: DomainRecord = DomainRecord.schema().load( + domain_record: DomainRecord = DomainRecord.schema().load( # type: ignore js["domain_record"] ) additional_info = js.get("additional_info", {}) if not isinstance(additional_info, dict): additional_info = {} - records.append((domain_record, additional_info)) + records.append((domain_record, additional_info)) # type: ignore return records @@ -139,7 +139,7 @@ def get_domain_records_html( def load_config(config_path: Path) -> ExtractConfig: with open(config_path, "r") as f: config = json.load(f) - return ExtractConfig.schema().load(config) + return ExtractConfig.schema().load(config) # type: ignore def create_router(config: ExtractConfig) -> Router: diff --git a/cmoncrawl/middleware/stompware.py b/cmoncrawl/middleware/stompware.py index 0f054fe2..2eaef342 100644 --- a/cmoncrawl/middleware/stompware.py +++ b/cmoncrawl/middleware/stompware.py @@ -71,7 +71,7 @@ def _init_connection(self): [(self.queue_host, self.queue_port)], heartbeats=(self.heartbeat, self.heartbeat), ) - conn.connect(login="producer", passcode="producer", wait=True) + conn.connect(login="producer", passcode="producer", wait=True) # type: ignore all_purpose_logger.info(f"Connected to queue") return conn @@ -98,10 +98,10 @@ async def aggregate(self, filter_duplicates: bool = True): json_str = json.dumps(domain_record.__dict__, default=str) headers = {} - id = unify_url_id(domain_record.url) + id = unify_url_id(domain_record.url or "") if filter_duplicates: headers[DUPL_ID_HEADER] = id - conn.send( + conn.send( # type: ignore f"queue.{self.url}", json_str, headers=headers, @@ -119,10 +119,10 @@ async def aggregate(self, filter_duplicates: bool = True): break all_purpose_logger.info(f"Sent {i} messages") - conn.send( + conn.send( # type: ignore f"topic.poisson_pill.{self.url}", "", headers={"type": "poisson_pill"} ) - conn.disconnect() + conn.disconnect() # type: ignore class ArtemisProcessor: @@ -175,16 +175,16 @@ def __init__( self.listener_stats = listener_stats def on_message(self, frame: Frame): - if frame.headers.get("type") == "poisson_pill": + if frame.headers.get("type") == "poisson_pill": # type: ignore self.pills += 1 else: - msg_json = json.loads(frame.body) + msg_json = json.loads(frame.body) # type: ignore try: msg_json["timestamp"] = datetime.fromisoformat( msg_json.get("timestamp") ) domain_record = DomainRecord(**msg_json) - self.messages.put_nowait(Message(domain_record, frame.headers)) + self.messages.put_nowait(Message(domain_record, frame.headers)) # type: ignore self.listener_stats.messages += 1 self.listener_stats.last_message_time = datetime.now() except ValueError: @@ -196,13 +196,13 @@ def _init_connection(self, addresses: List[str]): reconnect_attempts_max=-1, heartbeats=(self.heartbeat, self.heartbeat), ) - conn.connect(login="consumer", passcode="consumer", wait=True) + conn.connect(login="consumer", passcode="consumer", wait=True) # type: ignore for address in addresses: - conn.subscribe(address, id=address, ack="client-individual") - conn.subscribe("topic.poisson_pill.#", id="poisson_pill", ack="auto") + conn.subscribe(address, id=address, ack="client-individual") # type: ignore + conn.subscribe("topic.poisson_pill.#", id="poisson_pill", ack="auto") # type: ignore listener_stats = ListnerStats() listener = self.Listener(asyncio.Queue(0), listener_stats) - conn.set_listener("", listener) + conn.set_listener("", listener) # type: ignore all_purpose_logger.info("Connected to queue") return conn, listener @@ -244,7 +244,7 @@ async def process(self): extracted_num = 0 try: if hasattr(self.pipeline.downloader, "__aenter__"): - await self.pipeline.downloader.__aenter__() + await self.pipeline.downloader.__aenter__() # type: ignore while True: if ( @@ -318,9 +318,9 @@ async def process(self): finally: if hasattr(self.pipeline.downloader, "__aexit__"): - await self.pipeline.downloader.__aexit__(None, None, None) + await self.pipeline.downloader.__aexit__(None, None, None) # type: ignore all_purpose_logger.info( f"Extracted {extracted_num}/{listener.listener_stats.messages} articles" ) - conn.disconnect() + conn.disconnect() # type: ignore diff --git a/cmoncrawl/middleware/synchronized.py b/cmoncrawl/middleware/synchronized.py index 74db062f..26cbedcb 100644 --- a/cmoncrawl/middleware/synchronized.py +++ b/cmoncrawl/middleware/synchronized.py @@ -29,7 +29,7 @@ async def query_and_extract( total_extracted: int = 0 if hasattr(pipeline.downloader, "__aenter__"): - await pipeline.downloader.__aenter__() + await pipeline.downloader.__aenter__() # type: ignore try: async with index_agg: async for domain_record in index_agg: @@ -50,7 +50,7 @@ async def query_and_extract( finally: if hasattr(pipeline.downloader, "__aexit__"): - await pipeline.downloader.__aexit__(None, None, None) + await pipeline.downloader.__aexit__(None, None, None) # type: ignore all_purpose_logger.info(f"Extracted {total_extracted} urls") return processed_urls @@ -93,7 +93,7 @@ async def extract( domains_exausted = False total_extracted: int = 0 if hasattr(pipeline.downloader, "__aenter__"): - await pipeline.downloader.__aenter__() + await pipeline.downloader.__aenter__() # type: ignore try: queue: Set[asyncio.Task[List[Path]]] = set() while not domains_exausted or len(queue) > 0: @@ -127,5 +127,5 @@ async def extract( finally: if hasattr(pipeline.downloader, "__aexit__"): - await pipeline.downloader.__aexit__(None, None, None) + await pipeline.downloader.__aexit__(None, None, None) # type: ignore all_purpose_logger.info(f"Extracted {total_extracted} urls") diff --git a/cmoncrawl/processor/pipeline/downloader.py b/cmoncrawl/processor/pipeline/downloader.py index 3753e65b..83912f09 100644 --- a/cmoncrawl/processor/pipeline/downloader.py +++ b/cmoncrawl/processor/pipeline/downloader.py @@ -116,7 +116,7 @@ def unwrap( self, response: bytes, domain_record: DomainRecord ) -> List[Tuple[str, PipeMetadata]]: ariter = ArchiveIterator( - io.BytesIO(response), check_digests="raise", arc2warc=True + io.BytesIO(response), check_digests="raise", arc2warc=True # type: ignore wrong typing in package ) encoding = self.encoding warcs: List[Tuple[str, PipeMetadata]] = [ diff --git a/cmoncrawl/processor/pipeline/extractor.py b/cmoncrawl/processor/pipeline/extractor.py index 4405fb33..fa423b5b 100644 --- a/cmoncrawl/processor/pipeline/extractor.py +++ b/cmoncrawl/processor/pipeline/extractor.py @@ -31,11 +31,12 @@ class BaseExtractor(IExtractor, ABC): Args: encoding (str, optional): Default encoding to be used. Defaults to None. - + raise_on_encoding (bool, optional): If True, the extractor will raise ValueException if it fails to decode the response. Defaults to False. """ - def __init__(self, encoding: str | None = None): + def __init__(self, encoding: str | None = None, raise_on_encoding: bool = False): self.encoding = encoding + self.raise_on_encoding = raise_on_encoding def filter_raw(self, response: str, metadata: PipeMetadata) -> bool: # If raw fails bs4 will not be used -> speed @@ -70,22 +71,22 @@ def extract_soup( ) -> Dict[str, Any] | None: raise NotImplementedError() - def preprocess(self, response: str, metadata: PipeMetadata) -> str: - linux = response.replace("\r\n", "\n") - # Sorted set pythonic way + def encode(self, response: str, metadata: PipeMetadata): + # Sorted set pythonic way, we assume that dict is sortd encodings: Dict[str, int] = {} if self.encoding is not None: encodings[self.encoding] = 1 if metadata.domain_record.encoding is not None: encodings[metadata.domain_record.encoding] = 1 + # GET from http header http_split = metadata.http_header.get("Content-Type", "").split("charset=") if len(http_split) > 1 and http_split[1] != "": encodings[http_split[-1]] = 1 # Fallbacks encodings["utf-8"] = 1 - - encoded = linux.encode(metadata.encoding) + encoded = response.encode(metadata.encoding) + decoded = None for encoding in encodings: try: decoded = encoded.decode(encoding) @@ -96,11 +97,20 @@ def preprocess(self, response: str, metadata: PipeMetadata) -> str: f"Failed to decode with {encoding}", extra={"domain_record": metadata.domain_record}, ) - else: - raise ValueError("Failed to decode") + + if decoded is None: + if self.raise_on_encoding: + raise ValueError("Failed to decode") + else: + decoded = response return decoded + def preprocess(self, response: str, metadata: PipeMetadata) -> str: + response = response.replace("\r\n", "\n") + response = self.encode(response, metadata) + return response + class HTMLExtractor(BaseExtractor): """ @@ -108,10 +118,11 @@ class HTMLExtractor(BaseExtractor): Args: filter_non_ok (bool, optional): If True, only 200 status codes will be extracted. Defaults to True. + encoding (str, optional): Default encoding to be used. Defaults to None. If set, the extractor will raise ValueException if it fails to decode the response. """ - def __init__(self, filter_non_ok: bool = True): - super().__init__() + def __init__(self, filter_non_ok: bool = True, encoding: str | None = None): + super().__init__(encoding=encoding, raise_on_encoding=encoding is not None) self.filter_non_ok = filter_non_ok def extract_soup(self, soup: BeautifulSoup, metadata: PipeMetadata): @@ -146,6 +157,7 @@ class DomainRecordExtractor(BaseExtractor): """ def __init__(self, filter_non_ok: bool = True): + # We only extract records, don't try to encode super().__init__() self.filter_non_ok = filter_non_ok @@ -156,7 +168,7 @@ def extract_soup(self, soup: BeautifulSoup, metadata: PipeMetadata): else "unknown" ) result_dict: Dict[str, Any] = { - "domain_record": metadata.domain_record.to_dict() + "domain_record": metadata.domain_record.to_dict() # type: ignore Wrong type } return result_dict diff --git a/docs/source/conf.py b/docs/source/conf.py index ddff9613..3f11370b 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -70,5 +70,3 @@ # html_theme = "sphinx_book_theme" autosummary_generate_overwrite = False - - diff --git a/docs/source/misc/domain_record.rst b/docs/source/misc/domain_record.rst index 201c4d60..e7c3e196 100644 --- a/docs/source/misc/domain_record.rst +++ b/docs/source/misc/domain_record.rst @@ -50,7 +50,6 @@ The Athena SQL keys are: ```u.url, cc.warc_filename, cc.warc_record_offset, cc.warc_record_length, cc.content_digest, cc.fetch_time``` - The ```additional_info``` field is optional and can contain any additional information. It will be added to extracted fields as is. It's usefull when you for example want to add to which set the url belongs to. \ No newline at end of file diff --git a/examples/extractor_tutorial/Extractors/bbc_extractor.py b/examples/extractor_tutorial/Extractors/bbc_extractor.py index 6c15a2ab..8b040984 100644 --- a/examples/extractor_tutorial/Extractors/bbc_extractor.py +++ b/examples/extractor_tutorial/Extractors/bbc_extractor.py @@ -1,18 +1,20 @@ from datetime import datetime from Processor.App.ArticleUtils.article_extractor import ArticleExtractor -from Processor.App.ArticleUtils.article_utils import headline_transform, get_text_transform, text_unifications_transform - -REQUIRED_FIELDS = { - "title": False, - "content": True -} +from Processor.App.ArticleUtils.article_utils import ( + headline_transform, + get_text_transform, + text_unifications_transform, +) + +REQUIRED_FIELDS = {"title": False, "content": True} + def content_transform(soup): return [p.text for p in soup.find_all("p", recursive=True)] class BBCExtractor(ArticleExtractor): - SINCE = datetime(2021, 1 , 20) + SINCE = datetime(2021, 1, 20) TO = datetime(2021, 3, 20) def __init__(self): @@ -24,17 +26,20 @@ def __init__(self): "content": "main[role=main]", }, # Here we define how to transform the content of the tag into a string. - article_extract_dict= { + article_extract_dict={ "title": [get_text_transform, headline_transform], - "content": [content_transform, text_unifications_transform, lambda lines : "\n".join(lines)] + "content": [ + content_transform, + text_unifications_transform, + lambda lines: "\n".join(lines), + ], }, - - # Here we define how to bind a tag that containt all fields we will use in article_css_dict # If you don't know just use body article_css_selector="body", required_fields=REQUIRED_FIELDS, - non_empty = True + non_empty=True, ) -extractor = BBCExtractor() \ No newline at end of file + +extractor = BBCExtractor() diff --git a/pyrightconfig.json b/pyrightconfig.json new file mode 100644 index 00000000..2d761458 --- /dev/null +++ b/pyrightconfig.json @@ -0,0 +1,5 @@ +{ + "include": [ + "cmoncrawl/**/*.py" + ] +} diff --git a/tests/aggregator_tests.py b/tests/aggregator_tests.py index d34d9c18..506d72b7 100644 --- a/tests/aggregator_tests.py +++ b/tests/aggregator_tests.py @@ -23,7 +23,7 @@ async def asyncSetUp(self) -> None: self.di = await IndexAggregator( ["idnes.cz"], cc_servers=self.CC_SERVERS, - max_retry=50, + max_retry=100, sleep_step=10, prefetch_size=1, match_type=MatchType.DOMAIN, @@ -59,25 +59,18 @@ async def test_indexer_all_CC(self): async def test_since(self): # That is crawl date not published date - records: List[DomainRecord] = [] self.di.since = datetime(2022, 1, 21) + self.di.limit = 5 async for record in self.di: self.assertGreaterEqual(record.timestamp, self.di.since) - records.append(record) - - self.assertEqual(len(records), 131149) async def test_to(self): # That is crawl date not published date - records: List[DomainRecord] = [] self.di.to = datetime(2022, 1, 21) async for record in self.di: self.assertLessEqual(record.timestamp, self.di.to) - records.append(record) - - self.assertEqual(len(records), 63244) async def test_limit(self): records: List[DomainRecord] = [] @@ -98,6 +91,7 @@ async def test_init_queue_since_to(self): max_retry=10, sleep_step=4, prefetch_size=2, + match_type=MatchType.DOMAIN, ) # Generates only for 2020 q = iterator.init_crawls_queue( diff --git a/tests/processor_tests.py b/tests/processor_tests.py index 321ad5b7..1f09b73e 100644 --- a/tests/processor_tests.py +++ b/tests/processor_tests.py @@ -7,9 +7,11 @@ import re from datetime import datetime from cmoncrawl.processor.pipeline.downloader import AsyncDownloader +from cmoncrawl.processor.pipeline.extractor import BaseExtractor, HTMLExtractor from cmoncrawl.processor.pipeline.streamer import StreamerFileJSON, StreamerFileHTML from cmoncrawl.processor.pipeline.router import Router from cmoncrawl.common.types import DomainRecord, PipeMetadata +from bs4 import BeautifulSoup class DownloaderTests(unittest.IsolatedAsyncioTestCase): @@ -65,6 +67,53 @@ def test_router_route_by_name(self): self.assertEqual(c3, self.router.modules["BBB"]) +class ExtradctorTests(unittest.TestCase): + def test_encoding(self): + def create_html(): + return "
test
".encode("latin-1").decode( + "latin-1" + ) + + def create_non_utf8(): + return bytes([0x81, 0x81, 0x82, 0x83]).decode("latin-1") + + def create_metadata(): + return PipeMetadata( + domain_record=DomainRecord( + filename="", + offset=0, + length=0, + url="", + ), + ) + + extractor = HTMLExtractor() # type: ignore + metadata = create_metadata() + extractor.encode(create_html(), metadata) + # By default utf-8 is tried + self.assertEqual(metadata.encoding, "utf-8") + + # Can be overwriten by setting encoding + # Revert back to default + metadata = create_metadata() + # Set expected encoding + metadata.domain_record.encoding = "latin-1" + extractor.encode(create_html(), metadata) + self.assertEqual(metadata.encoding, "latin-1") + + # By default if everything fails, the default(latin-1) is used + metadata = create_metadata() + extractor.encode(create_non_utf8(), metadata) + self.assertEqual(metadata.encoding, "latin-1") + + # But we can overwrite it by setting raise_on_error + metadata = create_metadata() + extractor.raise_on_encoding = True + metadata.encoding = "latin-1" + with self.assertRaises(ValueError): + extractor.encode(create_non_utf8(), metadata) + + class OutStremaerTests(unittest.IsolatedAsyncioTestCase): def setUp(self) -> None: self.html_folder = Path(__file__).parent / "test_html" @@ -105,7 +154,7 @@ async def test_create_multi_file(self): size = len(os.listdir(self.json_folder)) self.assertEqual(size, 1) num_lines = sum( - 1 for _ in open(self.json_folder / "directory_0" / "0_file.json", "r") + 1 for _ in open(self.json_folder / "directory_0" / "0_file.jsonl", "r") ) self.assertEqual(num_lines, 5)