diff --git a/.env.sample b/.env.sample index 684e4cb1..6ec09d4a 100644 --- a/.env.sample +++ b/.env.sample @@ -1,6 +1,6 @@ -MYSQL_HOST=localhost:3306 +MYSQL_HOST=127.0.0.1 MYSQL_PORT=3306 -MYSQL_USER= -MYSQL_PASSWORD= +MYSQL_USER=root +MYSQL_PASSWORD=root MYSQL_DB_NAME=test_cc MYSQL_TABLE_NAME=ccindex diff --git a/.github/workflows/test_and_types.yml b/.github/workflows/test_and_types.yml index b119a918..e9186d34 100644 --- a/.github/workflows/test_and_types.yml +++ b/.github/workflows/test_and_types.yml @@ -38,7 +38,7 @@ jobs: run: cp .env.sample .env - name: Run tests - run: python -m unittest discover -s tests -p "athena_tests.py" # Replace with your test command + run: python -m unittest discover -s tests -p "*_tests.py" # Replace with your test command lint_and_types: runs-on: ubuntu-latest diff --git a/cmoncrawl/aggregator/athena_query.py b/cmoncrawl/aggregator/athena_query.py index c7bf1f63..8d984dba 100644 --- a/cmoncrawl/aggregator/athena_query.py +++ b/cmoncrawl/aggregator/athena_query.py @@ -4,7 +4,7 @@ from datetime import datetime from pathlib import Path import tempfile -from typing import Any, AsyncIterable, AsyncIterator, Deque, List, Optional, Set, Tuple +from typing import Any, AsyncIterable, AsyncIterator, Deque, List, Optional, Set import uuid import hashlib @@ -122,7 +122,7 @@ def __init__( max_retry: int = 5, extra_sql_where_clause: str | None = None, batch_size: int = 1, - aws_profile: str = "default", + aws_profile: Optional[str] = None, bucket_name: Optional[str] = None, catalog_name: str = "AwsDataCatalog", database_name: str = "commoncrawl", diff --git a/cmoncrawl/integrations/commands.py b/cmoncrawl/integrations/commands.py index 1bda91e4..18528dc1 100644 --- a/cmoncrawl/integrations/commands.py +++ b/cmoncrawl/integrations/commands.py @@ -1,5 +1,4 @@ import argparse -import logging from typing import Any, Dict from cmoncrawl.integrations.download import add_args as add_download_args from cmoncrawl.integrations.extract import add_args as add_extract_args diff --git a/cmoncrawl/middleware/stompware.py b/cmoncrawl/middleware/stompware.py index 725eca83..f6b87363 100644 --- a/cmoncrawl/middleware/stompware.py +++ b/cmoncrawl/middleware/stompware.py @@ -312,7 +312,7 @@ async def process(self): extracted_num += 1 else: all_purpose_logger.info(f"Failed to extract {message.dr.url}") - except Exception as e: + except Exception: pass finally: diff --git a/cmoncrawl/middleware/synchronized.py b/cmoncrawl/middleware/synchronized.py index 320cc0b9..ea9dcddc 100644 --- a/cmoncrawl/middleware/synchronized.py +++ b/cmoncrawl/middleware/synchronized.py @@ -40,7 +40,7 @@ async def query_and_extract( await pipeline.process_domain_record(domain_record, {}) total_extracted += 1 processed_urls.add(unify_url_id(url)) - except KeyboardInterrupt as e: + except KeyboardInterrupt: break except Exception as e: @@ -116,12 +116,11 @@ async def extract( for task in done: try: total_extracted += len(await task) - except KeyboardInterrupt as e: + except KeyboardInterrupt: break - except Exception as _: + except Exception: all_purpose_logger.error(f"Error in task {task}", exc_info=True) - pass except Exception as e: all_purpose_logger.error(e, exc_info=True) diff --git a/extractors/my_extractor.py b/extractors/my_extractor.py index 2fa9b8b3..0fa27b43 100644 --- a/extractors/my_extractor.py +++ b/extractors/my_extractor.py @@ -1,28 +1,30 @@ from bs4 import BeautifulSoup from cmoncrawl.common.types import PipeMetadata from cmoncrawl.processor.pipeline.extractor import BaseExtractor + + class MyExtractor(BaseExtractor): - def __init__(self): - # you can force a specific encoding if you know it - super().__init__(encoding=None) + def __init__(self): + # you can force a specific encoding if you know it + super().__init__(encoding=None) + + def extract_soup(self, soup: BeautifulSoup, metadata: PipeMetadata): + # here you can extract the data you want from the soup + # and return a dict with the data you want to save + body = soup.select_one("body") + if body is None: + return None + return {"body": body.get_text()} + + # You can also override the following methods to drop the files you don't want to extracti + # Return True to keep the file, False to drop it + def filter_raw(self, response: str, metadata: PipeMetadata) -> bool: + return True - def extract_soup(self, soup: BeautifulSoup, metadata: PipeMetadata): - # here you can extract the data you want from the soup - # and return a dict with the data you want to save - body = soup.select_one("body") - if body is None: - return None - return { - "body": body.get_text() - } + def filter_soup(self, soup: BeautifulSoup, metadata: PipeMetadata) -> bool: + return True - # You can also override the following methods to drop the files you don't want to extracti - # Return True to keep the file, False to drop it - def filter_raw(self, response: str, metadata: PipeMetadata) -> bool: - return True - def filter_soup(self, soup: BeautifulSoup, metadata: PipeMetadata) -> bool: - return True # Make sure to instantiate your extractor into extractor variable # The name must match so that the framework can find it -extractor = MyExtractor() \ No newline at end of file +extractor = MyExtractor() diff --git a/tests/athena_tests.py b/tests/athena_tests.py index 50332017..6eac6f6b 100644 --- a/tests/athena_tests.py +++ b/tests/athena_tests.py @@ -6,7 +6,7 @@ from unittest.mock import patch import boto3 -from tests.utils import MySQLRecordsDB +from tests.utils import MySQLRecordsDB, set_up_aws_credentials_testing import aioboto3 from cmoncrawl.aggregator.athena_query import ( @@ -120,6 +120,7 @@ async def asyncSetUp(self) -> None: "https://index.commoncrawl.org/CC-MAIN-2021-09-index", "https://index.commoncrawl.org/CC-MAIN-2020-50-index", ] + set_up_aws_credentials_testing() def test_prepare_athena_sql_query_multiple_urls(self): query = prepare_athena_sql_query( @@ -242,6 +243,7 @@ def setUp(self) -> None: self.mock_s3.start() self.mock_athena = mock_athena() self.mock_athena.start() + set_up_aws_credentials_testing() def tearDown(self) -> None: self.mock_s3.stop() @@ -546,7 +548,7 @@ async def test_batch_size_zero(self): async def test_extra_sql_where(self): self.domains = ["seznam.cz"] - where_clause = "cc.fetch_status != 200" + where_clause = 'cc.warc_filename = "filename1"' self.iterator = AthenaAggregator.AthenaAggregatorIterator( aws_client=self.aws_client, domains=self.domains, diff --git a/tests/utils.py b/tests/utils.py index 6b4f2d50..e98ae567 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -27,7 +27,7 @@ def seed_db(self): 100, 200, "CC-MAIN-2021-05", - 100, + 200, "warc", ], [ @@ -130,3 +130,13 @@ def setUp(self): def tearDown(self): self.remove_db() self.db.close() + + +def set_up_aws_credentials_testing(): + import os + + os.environ["AWS_ACCESS_KEY_ID"] = "testing" + os.environ["AWS_SECRET_ACCESS_KEY"] = "testing" + os.environ["AWS_SECURITY_TOKEN"] = "testing" + os.environ["AWS_SESSION_TOKEN"] = "testing" + os.environ["AWS_DEFAULT_REGION"] = "us-east-1"