Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring, test coverage increased to 60% #1943

Merged
merged 9 commits into from
Dec 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ jobs:
python-version: ["3.10", "3.11", "3.12"]

steps:
- uses: actions/checkout@v2
- name: Checkout
uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
Expand All @@ -26,6 +27,13 @@ jobs:
python -m pip install --upgrade pip
python -m pip install poetry
python -m poetry install --with dev
- name: Test with pytest
- name: Test with Coverage and Pytest (Fail if coverage is low)
run: |
poetry run pytest --reruns 3 --reruns-delay 5
poetry run coverage run --source=./maigret -m pytest --reruns 3 --reruns-delay 5 tests
poetry run coverage report --fail-under=60
poetry run coverage html
- name: Upload coverage report
uses: actions/upload-artifact@v3
with:
name: htmlcov
path: htmlcov
111 changes: 62 additions & 49 deletions maigret/checking.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
AsyncioSimpleExecutor,
AsyncioProgressbarQueueExecutor,
)
from .result import QueryResult, QueryStatus
from .result import MaigretCheckResult, MaigretCheckStatus
from .sites import MaigretDatabase, MaigretSite
from .types import QueryOptions, QueryResultWrapper
from .utils import ascii_data_display, get_random_user_agent
Expand Down Expand Up @@ -322,7 +322,7 @@ def process_site_result(
break

def build_result(status, **kwargs):
return QueryResult(
return MaigretCheckResult(
username,
site_name,
url,
Expand All @@ -334,11 +334,11 @@ def build_result(status, **kwargs):

if check_error:
logger.warning(check_error)
result = QueryResult(
result = MaigretCheckResult(
username,
site_name,
url,
QueryStatus.UNKNOWN,
MaigretCheckStatus.UNKNOWN,
query_time=response_time,
error=check_error,
context=str(CheckError),
Expand All @@ -350,25 +350,25 @@ def build_result(status, **kwargs):
[(absence_flag in html_text) for absence_flag in site.absence_strs]
)
if not is_absence_detected and is_presense_detected:
result = build_result(QueryStatus.CLAIMED)
result = build_result(MaigretCheckStatus.CLAIMED)
else:
result = build_result(QueryStatus.AVAILABLE)
result = build_result(MaigretCheckStatus.AVAILABLE)
elif check_type in "status_code":
# Checks if the status code of the response is 2XX
if 200 <= status_code < 300:
result = build_result(QueryStatus.CLAIMED)
result = build_result(MaigretCheckStatus.CLAIMED)
else:
result = build_result(QueryStatus.AVAILABLE)
result = build_result(MaigretCheckStatus.AVAILABLE)
elif check_type == "response_url":
# For this detection method, we have turned off the redirect.
# So, there is no need to check the response URL: it will always
# match the request. Instead, we will ensure that the response
# code indicates that the request was successful (i.e. no 404, or
# forward to some odd redirect).
if 200 <= status_code < 300 and is_presense_detected:
result = build_result(QueryStatus.CLAIMED)
result = build_result(MaigretCheckStatus.CLAIMED)
else:
result = build_result(QueryStatus.AVAILABLE)
result = build_result(MaigretCheckStatus.AVAILABLE)
else:
# It should be impossible to ever get here...
raise ValueError(
Expand All @@ -377,33 +377,11 @@ def build_result(status, **kwargs):

extracted_ids_data = {}

if is_parsing_enabled and result.status == QueryStatus.CLAIMED:
try:
extracted_ids_data = extract(html_text)
except Exception as e:
logger.warning(f"Error while parsing {site.name}: {e}", exc_info=True)

if is_parsing_enabled and result.status == MaigretCheckStatus.CLAIMED:
extracted_ids_data = extract_ids_data(html_text, logger, site)
if extracted_ids_data:
new_usernames = {}
for k, v in extracted_ids_data.items():
if "username" in k and not "usernames" in k:
new_usernames[v] = "username"
elif "usernames" in k:
try:
tree = ast.literal_eval(v)
if type(tree) == list:
for n in tree:
new_usernames[n] = "username"
except Exception as e:
logger.warning(e)
if k in SUPPORTED_IDS:
new_usernames[v] = k

results_info["ids_usernames"] = new_usernames
links = ascii_data_display(extracted_ids_data.get("links", "[]"))
if "website" in extracted_ids_data:
links.append(extracted_ids_data["website"])
results_info["ids_links"] = links
new_usernames = parse_usernames(extracted_ids_data, logger)
results_info = update_results_info(results_info, extracted_ids_data, new_usernames)
result.ids_data = extracted_ids_data

# Save status of request
Expand Down Expand Up @@ -462,29 +440,29 @@ def make_site_result(
# site check is disabled
if site.disabled and not options['forced']:
logger.debug(f"Site {site.name} is disabled, skipping...")
results_site["status"] = QueryResult(
results_site["status"] = MaigretCheckResult(
username,
site.name,
url,
QueryStatus.ILLEGAL,
MaigretCheckStatus.ILLEGAL,
error=CheckError("Check is disabled"),
)
# current username type could not be applied
elif site.type != options["id_type"]:
results_site["status"] = QueryResult(
results_site["status"] = MaigretCheckResult(
username,
site.name,
url,
QueryStatus.ILLEGAL,
MaigretCheckStatus.ILLEGAL,
error=CheckError('Unsupported identifier type', f'Want "{site.type}"'),
)
# username is not allowed.
elif site.regex_check and re.search(site.regex_check, username) is None:
results_site["status"] = QueryResult(
results_site["status"] = MaigretCheckResult(
username,
site.name,
url,
QueryStatus.ILLEGAL,
MaigretCheckStatus.ILLEGAL,
error=CheckError(
'Unsupported username format', f'Want "{site.regex_check}"'
),
Expand Down Expand Up @@ -731,11 +709,11 @@ async def maigret(
continue
default_result: QueryResultWrapper = {
'site': site,
'status': QueryResult(
'status': MaigretCheckResult(
username,
sitename,
'',
QueryStatus.UNKNOWN,
MaigretCheckStatus.UNKNOWN,
error=CheckError('Request failed'),
),
}
Expand Down Expand Up @@ -819,8 +797,8 @@ async def site_self_check(
}

check_data = [
(site.username_claimed, QueryStatus.CLAIMED),
(site.username_unclaimed, QueryStatus.AVAILABLE),
(site.username_claimed, MaigretCheckStatus.CLAIMED),
(site.username_unclaimed, MaigretCheckStatus.AVAILABLE),
]

logger.info(f"Checking {site.name}...")
Expand Down Expand Up @@ -859,7 +837,7 @@ async def site_self_check(
site_status = result.status

if site_status != status:
if site_status == QueryStatus.UNKNOWN:
if site_status == MaigretCheckStatus.UNKNOWN:
msgs = site.absence_strs
etype = site.check_type
logger.warning(
Expand All @@ -871,9 +849,9 @@ async def site_self_check(
if skip_errors:
pass
# don't disable in case of available username
elif status == QueryStatus.CLAIMED:
elif status == MaigretCheckStatus.CLAIMED:
changes["disabled"] = True
elif status == QueryStatus.CLAIMED:
elif status == MaigretCheckStatus.CLAIMED:
logger.warning(
f"Not found `{username}` in {site.name}, must be claimed"
)
Expand Down Expand Up @@ -960,3 +938,38 @@ def disabled_count(lst):
print(f"Unchecked sites verified: {unchecked_old_count - unchecked_new_count}")

return total_disabled != 0 or unchecked_new_count != unchecked_old_count


def extract_ids_data(html_text, logger, site) -> Dict:
try:
return extract(html_text)
except Exception as e:
logger.warning(f"Error while parsing {site.name}: {e}", exc_info=True)
return {}


def parse_usernames(extracted_ids_data, logger) -> Dict:
new_usernames = {}
for k, v in extracted_ids_data.items():
if "username" in k and not "usernames" in k:
new_usernames[v] = "username"
elif "usernames" in k:
try:
tree = ast.literal_eval(v)
if type(tree) == list:
for n in tree:
new_usernames[n] = "username"
except Exception as e:
logger.warning(e)
if k in SUPPORTED_IDS:
new_usernames[v] = k
return new_usernames


def update_results_info(results_info, extracted_ids_data, new_usernames):
results_info["ids_usernames"] = new_usernames
links = ascii_data_display(extracted_ids_data.get("links", "[]"))
if "website" in extracted_ids_data:
links.append(extracted_ids_data["website"])
results_info["ids_links"] = links
return results_info
48 changes: 45 additions & 3 deletions maigret/errors.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Dict, List, Any
from typing import Dict, List, Any, Tuple

from .result import QueryResult
from .result import MaigretCheckResult
from .types import QueryResultWrapper


Expand Down Expand Up @@ -114,7 +114,7 @@ def extract_and_group(search_res: QueryResultWrapper) -> List[Dict[str, Any]]:
errors_counts: Dict[str, int] = {}
for r in search_res.values():
if r and isinstance(r, dict) and r.get('status'):
if not isinstance(r['status'], QueryResult):
if not isinstance(r['status'], MaigretCheckResult):
continue

err = r['status'].error
Expand All @@ -133,3 +133,45 @@ def extract_and_group(search_res: QueryResultWrapper) -> List[Dict[str, Any]]:
)

return counts


def notify_about_errors(
search_results: QueryResultWrapper, query_notify, show_statistics=False
) -> List[Tuple]:
"""
Prepare error notifications in search results, text + symbol,
to be displayed by notify object.

Example:
[
("Too many errors of type "timeout" (50.0%)", "!")
("Verbose error statistics:", "-")
]
"""
results = []

errs = extract_and_group(search_results)
was_errs_displayed = False
for e in errs:
if not is_important(e):
continue
text = f'Too many errors of type "{e["err"]}" ({round(e["perc"],2)}%)'
solution = solution_of(e['err'])
if solution:
text = '. '.join([text, solution.capitalize()])

results.append((text, '!'))
was_errs_displayed = True

if show_statistics:
results.append(('Verbose error statistics:', '-'))
for e in errs:
text = f'{e["err"]}: {round(e["perc"],2)}%'
results.append((text, '!'))

if was_errs_displayed:
results.append(
('You can see detailed site check errors with a flag `--print-errors`', '-')
)

return results
32 changes: 3 additions & 29 deletions maigret/maigret.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,34 +45,6 @@
from .permutator import Permute


def notify_about_errors(
search_results: QueryResultWrapper, query_notify, show_statistics=False
):
errs = errors.extract_and_group(search_results)
was_errs_displayed = False
for e in errs:
if not errors.is_important(e):
continue
text = f'Too many errors of type "{e["err"]}" ({round(e["perc"],2)}%)'
solution = errors.solution_of(e['err'])
if solution:
text = '. '.join([text, solution.capitalize()])

query_notify.warning(text, '!')
was_errs_displayed = True

if show_statistics:
query_notify.warning(f'Verbose error statistics:')
for e in errs:
text = f'{e["err"]}: {round(e["perc"],2)}%'
query_notify.warning(text, '!')

if was_errs_displayed:
query_notify.warning(
'You can see detailed site check errors with a flag `--print-errors`'
)


def extract_ids_from_page(url, logger, timeout=5) -> dict:
results = {}
# url, headers
Expand Down Expand Up @@ -693,7 +665,9 @@ async def main():
check_domains=args.with_domains,
)

notify_about_errors(results, query_notify, show_statistics=args.verbose)
errs = errors.notify_about_errors(results, query_notify, show_statistics=args.verbose)
for e in errs:
query_notify.warning(*e)

if args.reports_sorting == "data":
results = sort_report_by_data_points(results)
Expand Down
Loading
Loading