Skip to content

Commit

Permalink
Refactoring, test coverage increased to 60%
Browse files Browse the repository at this point in the history
  • Loading branch information
soxoj committed Dec 8, 2024
1 parent 4b13177 commit 781c8c0
Show file tree
Hide file tree
Showing 14 changed files with 192 additions and 155 deletions.
15 changes: 12 additions & 3 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ jobs:
python-version: ["3.10", "3.11", "3.12"]

steps:
- uses: actions/checkout@v2
- name: Checkout
uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
Expand All @@ -26,6 +27,14 @@ jobs:
python -m pip install --upgrade pip
python -m pip install poetry
python -m poetry install --with dev
- name: Test with pytest
- name: Test with Coverage and Pytest (Fail if coverage is low)
run: |
poetry run pytest --reruns 3 --reruns-delay 5
coverage run -m pytest --reruns 3 --reruns-delay 5
# Fail if coverage is less than 60%
coverage report --fail-under=60
coverage html
- name: Upload coverage report
uses: actions/upload-artifact@v3
with:
name: htmlcov
path: htmlcov
111 changes: 62 additions & 49 deletions maigret/checking.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
AsyncioSimpleExecutor,
AsyncioProgressbarQueueExecutor,
)
from .result import QueryResult, QueryStatus
from .result import MaigretCheckResult, MaigretCheckStatus
from .sites import MaigretDatabase, MaigretSite
from .types import QueryOptions, QueryResultWrapper
from .utils import ascii_data_display, get_random_user_agent
Expand Down Expand Up @@ -322,7 +322,7 @@ def process_site_result(
break

def build_result(status, **kwargs):
return QueryResult(
return MaigretCheckResult(
username,
site_name,
url,
Expand All @@ -334,11 +334,11 @@ def build_result(status, **kwargs):

if check_error:
logger.warning(check_error)
result = QueryResult(
result = MaigretCheckResult(
username,
site_name,
url,
QueryStatus.UNKNOWN,
MaigretCheckStatus.UNKNOWN,
query_time=response_time,
error=check_error,
context=str(CheckError),
Expand All @@ -350,25 +350,25 @@ def build_result(status, **kwargs):
[(absence_flag in html_text) for absence_flag in site.absence_strs]
)
if not is_absence_detected and is_presense_detected:
result = build_result(QueryStatus.CLAIMED)
result = build_result(MaigretCheckStatus.CLAIMED)
else:
result = build_result(QueryStatus.AVAILABLE)
result = build_result(MaigretCheckStatus.AVAILABLE)
elif check_type in "status_code":
# Checks if the status code of the response is 2XX
if 200 <= status_code < 300:
result = build_result(QueryStatus.CLAIMED)
result = build_result(MaigretCheckStatus.CLAIMED)
else:
result = build_result(QueryStatus.AVAILABLE)
result = build_result(MaigretCheckStatus.AVAILABLE)
elif check_type == "response_url":
# For this detection method, we have turned off the redirect.
# So, there is no need to check the response URL: it will always
# match the request. Instead, we will ensure that the response
# code indicates that the request was successful (i.e. no 404, or
# forward to some odd redirect).
if 200 <= status_code < 300 and is_presense_detected:
result = build_result(QueryStatus.CLAIMED)
result = build_result(MaigretCheckStatus.CLAIMED)
else:
result = build_result(QueryStatus.AVAILABLE)
result = build_result(MaigretCheckStatus.AVAILABLE)
else:
# It should be impossible to ever get here...
raise ValueError(
Expand All @@ -377,33 +377,11 @@ def build_result(status, **kwargs):

extracted_ids_data = {}

if is_parsing_enabled and result.status == QueryStatus.CLAIMED:
try:
extracted_ids_data = extract(html_text)
except Exception as e:
logger.warning(f"Error while parsing {site.name}: {e}", exc_info=True)

if is_parsing_enabled and result.status == MaigretCheckStatus.CLAIMED:
extracted_ids_data = extract_ids_data(html_text, logger, site)
if extracted_ids_data:
new_usernames = {}
for k, v in extracted_ids_data.items():
if "username" in k and not "usernames" in k:
new_usernames[v] = "username"
elif "usernames" in k:
try:
tree = ast.literal_eval(v)
if type(tree) == list:
for n in tree:
new_usernames[n] = "username"
except Exception as e:
logger.warning(e)
if k in SUPPORTED_IDS:
new_usernames[v] = k

results_info["ids_usernames"] = new_usernames
links = ascii_data_display(extracted_ids_data.get("links", "[]"))
if "website" in extracted_ids_data:
links.append(extracted_ids_data["website"])
results_info["ids_links"] = links
new_usernames = parse_usernames(extracted_ids_data, logger)
results_info = update_results_info(results_info, extracted_ids_data, new_usernames)
result.ids_data = extracted_ids_data

# Save status of request
Expand Down Expand Up @@ -462,29 +440,29 @@ def make_site_result(
# site check is disabled
if site.disabled and not options['forced']:
logger.debug(f"Site {site.name} is disabled, skipping...")
results_site["status"] = QueryResult(
results_site["status"] = MaigretCheckResult(
username,
site.name,
url,
QueryStatus.ILLEGAL,
MaigretCheckStatus.ILLEGAL,
error=CheckError("Check is disabled"),
)
# current username type could not be applied
elif site.type != options["id_type"]:
results_site["status"] = QueryResult(
results_site["status"] = MaigretCheckResult(
username,
site.name,
url,
QueryStatus.ILLEGAL,
MaigretCheckStatus.ILLEGAL,
error=CheckError('Unsupported identifier type', f'Want "{site.type}"'),
)
# username is not allowed.
elif site.regex_check and re.search(site.regex_check, username) is None:
results_site["status"] = QueryResult(
results_site["status"] = MaigretCheckResult(
username,
site.name,
url,
QueryStatus.ILLEGAL,
MaigretCheckStatus.ILLEGAL,
error=CheckError(
'Unsupported username format', f'Want "{site.regex_check}"'
),
Expand Down Expand Up @@ -731,11 +709,11 @@ async def maigret(
continue
default_result: QueryResultWrapper = {
'site': site,
'status': QueryResult(
'status': MaigretCheckResult(
username,
sitename,
'',
QueryStatus.UNKNOWN,
MaigretCheckStatus.UNKNOWN,
error=CheckError('Request failed'),
),
}
Expand Down Expand Up @@ -819,8 +797,8 @@ async def site_self_check(
}

check_data = [
(site.username_claimed, QueryStatus.CLAIMED),
(site.username_unclaimed, QueryStatus.AVAILABLE),
(site.username_claimed, MaigretCheckStatus.CLAIMED),
(site.username_unclaimed, MaigretCheckStatus.AVAILABLE),
]

logger.info(f"Checking {site.name}...")
Expand Down Expand Up @@ -859,7 +837,7 @@ async def site_self_check(
site_status = result.status

if site_status != status:
if site_status == QueryStatus.UNKNOWN:
if site_status == MaigretCheckStatus.UNKNOWN:
msgs = site.absence_strs
etype = site.check_type
logger.warning(
Expand All @@ -871,9 +849,9 @@ async def site_self_check(
if skip_errors:
pass
# don't disable in case of available username
elif status == QueryStatus.CLAIMED:
elif status == MaigretCheckStatus.CLAIMED:
changes["disabled"] = True
elif status == QueryStatus.CLAIMED:
elif status == MaigretCheckStatus.CLAIMED:
logger.warning(
f"Not found `{username}` in {site.name}, must be claimed"
)
Expand Down Expand Up @@ -960,3 +938,38 @@ def disabled_count(lst):
print(f"Unchecked sites verified: {unchecked_old_count - unchecked_new_count}")

return total_disabled != 0 or unchecked_new_count != unchecked_old_count


def extract_ids_data(html_text, logger, site) -> Dict:
try:
return extract(html_text)
except Exception as e:
logger.warning(f"Error while parsing {site.name}: {e}", exc_info=True)
return {}


def parse_usernames(extracted_ids_data, logger) -> Dict:
new_usernames = {}
for k, v in extracted_ids_data.items():
if "username" in k and not "usernames" in k:
new_usernames[v] = "username"
elif "usernames" in k:
try:
tree = ast.literal_eval(v)
if type(tree) == list:
for n in tree:
new_usernames[n] = "username"
except Exception as e:
logger.warning(e)
if k in SUPPORTED_IDS:
new_usernames[v] = k
return new_usernames


def update_results_info(results_info, extracted_ids_data, new_usernames):
results_info["ids_usernames"] = new_usernames
links = ascii_data_display(extracted_ids_data.get("links", "[]"))
if "website" in extracted_ids_data:
links.append(extracted_ids_data["website"])
results_info["ids_links"] = links
return results_info
48 changes: 45 additions & 3 deletions maigret/errors.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Dict, List, Any
from typing import Dict, List, Any, Tuple

from .result import QueryResult
from .result import MaigretCheckResult
from .types import QueryResultWrapper


Expand Down Expand Up @@ -114,7 +114,7 @@ def extract_and_group(search_res: QueryResultWrapper) -> List[Dict[str, Any]]:
errors_counts: Dict[str, int] = {}
for r in search_res.values():
if r and isinstance(r, dict) and r.get('status'):
if not isinstance(r['status'], QueryResult):
if not isinstance(r['status'], MaigretCheckResult):
continue

err = r['status'].error
Expand All @@ -133,3 +133,45 @@ def extract_and_group(search_res: QueryResultWrapper) -> List[Dict[str, Any]]:
)

return counts


def notify_about_errors(
search_results: QueryResultWrapper, query_notify, show_statistics=False
) -> List[Tuple]:
"""
Prepare error notifications in search results, text + symbol,
to be displayed by notify object.
Example:
[
("Too many errors of type "timeout" (50.0%)", "!")
("Verbose error statistics:", "-")
]
"""
results = []

errs = extract_and_group(search_results)
was_errs_displayed = False
for e in errs:
if not is_important(e):
continue
text = f'Too many errors of type "{e["err"]}" ({round(e["perc"],2)}%)'
solution = solution_of(e['err'])
if solution:
text = '. '.join([text, solution.capitalize()])

results.append((text, '!'))
was_errs_displayed = True

if show_statistics:
results.append(('Verbose error statistics:', '-'))
for e in errs:
text = f'{e["err"]}: {round(e["perc"],2)}%'
results.append((text, '!'))

if was_errs_displayed:
results.append(
('You can see detailed site check errors with a flag `--print-errors`', '-')
)

return results
32 changes: 3 additions & 29 deletions maigret/maigret.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,34 +45,6 @@
from .permutator import Permute


def notify_about_errors(
search_results: QueryResultWrapper, query_notify, show_statistics=False
):
errs = errors.extract_and_group(search_results)
was_errs_displayed = False
for e in errs:
if not errors.is_important(e):
continue
text = f'Too many errors of type "{e["err"]}" ({round(e["perc"],2)}%)'
solution = errors.solution_of(e['err'])
if solution:
text = '. '.join([text, solution.capitalize()])

query_notify.warning(text, '!')
was_errs_displayed = True

if show_statistics:
query_notify.warning(f'Verbose error statistics:')
for e in errs:
text = f'{e["err"]}: {round(e["perc"],2)}%'
query_notify.warning(text, '!')

if was_errs_displayed:
query_notify.warning(
'You can see detailed site check errors with a flag `--print-errors`'
)


def extract_ids_from_page(url, logger, timeout=5) -> dict:
results = {}
# url, headers
Expand Down Expand Up @@ -693,7 +665,9 @@ async def main():
check_domains=args.with_domains,
)

notify_about_errors(results, query_notify, show_statistics=args.verbose)
errs = errors.notify_about_errors(results, query_notify, show_statistics=args.verbose)
for e in errs:
query_notify.warning(*e)

if args.reports_sorting == "data":
results = sort_report_by_data_points(results)
Expand Down
Loading

0 comments on commit 781c8c0

Please sign in to comment.