Skip to content

Commit

Permalink
Async generator-executor for site checks (#1978)
Browse files Browse the repository at this point in the history
  • Loading branch information
soxoj authored Dec 17, 2024
1 parent 36ce285 commit 97e5f60
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 28 deletions.
37 changes: 16 additions & 21 deletions maigret/checking.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,7 @@
from . import errors
from .activation import ParsingActivator, import_aiohttp_cookies
from .errors import CheckError
from .executors import (
AsyncExecutor,
AsyncioSimpleExecutor,
AsyncioProgressbarQueueExecutor,
)
from .executors import AsyncioQueueGeneratorExecutor
from .result import MaigretCheckResult, MaigretCheckStatus
from .sites import MaigretDatabase, MaigretSite
from .types import QueryOptions, QueryResultWrapper
Expand Down Expand Up @@ -670,18 +666,13 @@ async def maigret(
await debug_ip_request(clearweb_checker, logger)

# setup parallel executor
executor: Optional[AsyncExecutor] = None
if no_progressbar:
# TODO: switch to AsyncioProgressbarQueueExecutor with progress object mock
executor = AsyncioSimpleExecutor(logger=logger)
else:
executor = AsyncioProgressbarQueueExecutor(
logger=logger,
in_parallel=max_connections,
timeout=timeout + 0.5,
*args,
**kwargs,
)
executor = AsyncioQueueGeneratorExecutor(
logger=logger,
in_parallel=max_connections,
timeout=timeout + 0.5,
*args,
**kwargs,
)

# make options objects for all the requests
options: QueryOptions = {}
Expand Down Expand Up @@ -728,13 +719,17 @@ async def maigret(
},
)

cur_results = await executor.run(tasks_dict.values())

# wait for executor timeout errors
await asyncio.sleep(1)
cur_results = []
with alive_bar(
len(tasks_dict), title="Searching", force_tty=True, disable=no_progressbar
) as progress:
async for result in executor.run(tasks_dict.values()):
cur_results.append(result)
progress()

all_results.update(cur_results)

# rerun for failed sites
sites = get_failed_sites(dict(cur_results))
attempts -= 1

Expand Down
70 changes: 69 additions & 1 deletion maigret/executors.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import sys
import time
from typing import Any, Iterable, List
from typing import Any, Iterable, List, Callable

import alive_progress
from alive_progress import alive_bar
Expand All @@ -19,6 +19,7 @@ def create_task_func():


class AsyncExecutor:
# Deprecated: will be removed soon, don't use it
def __init__(self, *args, **kwargs):
self.logger = kwargs['logger']

Expand All @@ -34,6 +35,7 @@ async def _run(self, tasks: Iterable[QueryDraft]):


class AsyncioSimpleExecutor(AsyncExecutor):
# Deprecated: will be removed soon, don't use it
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.semaphore = asyncio.Semaphore(kwargs.get('in_parallel', 100))
Expand All @@ -48,6 +50,7 @@ async def sem_task(f, args, kwargs):


class AsyncioProgressbarExecutor(AsyncExecutor):
# Deprecated: will be removed soon, don't use it
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

Expand All @@ -71,6 +74,7 @@ async def track_task(task):


class AsyncioProgressbarSemaphoreExecutor(AsyncExecutor):
# Deprecated: will be removed soon, don't use it
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.semaphore = asyncio.Semaphore(kwargs.get('in_parallel', 1))
Expand Down Expand Up @@ -174,3 +178,67 @@ async def _run(self, queries: Iterable[QueryDraft]):
w.cancel()

return self.results


class AsyncioQueueGeneratorExecutor:
# Deprecated: will be removed soon, don't use it
def __init__(self, *args, **kwargs):
self.workers_count = kwargs.get('in_parallel', 10)
self.queue = asyncio.Queue()
self.timeout = kwargs.get('timeout')
self.logger = kwargs['logger']
self._results = asyncio.Queue()
self._stop_signal = object()

async def worker(self):
"""Process tasks from the queue and put results into the results queue."""
while True:
task = await self.queue.get()
if task is self._stop_signal:
self.queue.task_done()
break

try:
f, args, kwargs = task
query_future = f(*args, **kwargs)
query_task = create_task_func()(query_future)

try:
result = await asyncio.wait_for(query_task, timeout=self.timeout)
except asyncio.TimeoutError:
result = kwargs.get('default')
await self._results.put(result)
except Exception as e:
self.logger.error(f"Error in worker: {e}")
finally:
self.queue.task_done()

async def run(self, queries: Iterable[Callable[..., Any]]):
"""Run workers to process queries in parallel."""
start_time = time.time()

# Add tasks to the queue
for t in queries:
await self.queue.put(t)

# Create workers
workers = [
asyncio.create_task(self.worker()) for _ in range(self.workers_count)
]

# Add stop signals
for _ in range(self.workers_count):
await self.queue.put(self._stop_signal)

try:
while any(w.done() is False for w in workers) or not self._results.empty():
try:
result = await asyncio.wait_for(self._results.get(), timeout=1)
yield result
except asyncio.TimeoutError:
pass
finally:
# Ensure all workers are awaited
await asyncio.gather(*workers)
self.execution_time = time.time() - start_time
self.logger.debug(f"Spent time: {self.execution_time}")
4 changes: 3 additions & 1 deletion maigret/maigret.py
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,9 @@ async def main():
if args.web is not None:
from maigret.web.app import app

port = args.web if args.web else 5000 # args.web is either the specified port or 5000 by default
port = (
args.web if args.web else 5000
) # args.web is either the specified port or 5000 by default
app.run(port=port)
return

Expand Down
5 changes: 3 additions & 2 deletions maigret/resources/data.json
Original file line number Diff line number Diff line change
Expand Up @@ -7218,7 +7218,8 @@
"url": "https://gramho.com/explore-hashtag/{username}",
"source": "Instagram",
"usernameClaimed": "adam",
"usernameUnclaimed": "noonewouldeverusethis7"
"usernameUnclaimed": "noonewouldeverusethis7",
"disabled": true
},
"Gravatar": {
"tags": [
Expand Down Expand Up @@ -17476,7 +17477,7 @@
"method": "vimeo"
},
"headers": {
"Authorization": "jwt eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE3MzM5Njc3MjAsInVzZXJfaWQiOm51bGwsImFwcF9pZCI6NTg0NzksInNjb3BlcyI6InB1YmxpYyIsInRlYW1fdXNlcl9pZCI6bnVsbCwianRpIjoiNGJkNDE4NzktM2VhOS00ZWRiLWIzZDUtNjAyNjQ3YjMyNTVhIn0.kPbKREujSfYsisyF0pS_HskTapRlHBfVLRw4cis1ezk"
"Authorization": "jwt eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE3MzQxMTc1NDAsInVzZXJfaWQiOm51bGwsImFwcF9pZCI6NTg0NzksInNjb3BlcyI6InB1YmxpYyIsInRlYW1fdXNlcl9pZCI6bnVsbCwianRpIjoiNDc4Y2ZhZGUtZjI0Yy00MDVkLTliYWItN2RlNGEzNGM4MzI5In0.guN7Fg8dqq7EYdckrJ-6Rdkj_5MOl6FaC4YUSOceDpU"
},
"urlProbe": "https://api.vimeo.com/users/{username}?fields=name%2Cgender%2Cbio%2Curi%2Clink%2Cbackground_video%2Clocation_details%2Cpictures%2Cverified%2Cmetadata.public_videos.total%2Cavailable_for_hire%2Ccan_work_remotely%2Cmetadata.connections.videos.total%2Cmetadata.connections.albums.total%2Cmetadata.connections.followers.total%2Cmetadata.connections.following.total%2Cmetadata.public_videos.total%2Cmetadata.connections.vimeo_experts.is_enrolled%2Ctotal_collection_count%2Ccreated_time%2Cprofile_preferences%2Cmembership%2Cclients%2Cskills%2Cproject_types%2Crates%2Ccategories%2Cis_expert%2Cprofile_discovery%2Cwebsites%2Ccontact_emails&fetch_user_profile=1",
"checkType": "status_code",
Expand Down
1 change: 1 addition & 0 deletions maigret/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ def extract_username_dialog(url):
)
return entered_username if entered_username else supposed_username

# TODO: replace with checking.py/SimpleAiohttpChecker call
@staticmethod
async def get_html_response_to_compare(
url: str, session: ClientSession = None, redirects=False, headers: Dict = None
Expand Down
33 changes: 33 additions & 0 deletions tests/test_executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
AsyncioProgressbarExecutor,
AsyncioProgressbarSemaphoreExecutor,
AsyncioProgressbarQueueExecutor,
AsyncioQueueGeneratorExecutor,
)

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -76,3 +77,35 @@ async def test_asyncio_progressbar_queue_executor():
assert await executor.run(tasks) == [0, 3, 6, 9, 1, 4, 7, 2, 5, 8]
assert executor.execution_time > 0.2
assert executor.execution_time < 0.4


@pytest.mark.asyncio
async def test_asyncio_queue_generator_executor():
tasks = [(func, [n], {}) for n in range(10)]

executor = AsyncioQueueGeneratorExecutor(logger=logger, in_parallel=2)
results = [result async for result in executor.run(tasks)]
assert results == [0, 1, 3, 2, 4, 6, 7, 5, 9, 8]
assert executor.execution_time > 0.5
assert executor.execution_time < 0.6

executor = AsyncioQueueGeneratorExecutor(logger=logger, in_parallel=3)
results = [result async for result in executor.run(tasks)]
assert results == [0, 3, 1, 4, 6, 2, 7, 9, 5, 8]
assert executor.execution_time > 0.4
assert executor.execution_time < 0.5

executor = AsyncioQueueGeneratorExecutor(logger=logger, in_parallel=5)
results = [result async for result in executor.run(tasks)]
assert results in (
[0, 3, 6, 1, 4, 7, 9, 2, 5, 8],
[0, 3, 6, 1, 4, 9, 7, 2, 5, 8],
)
assert executor.execution_time > 0.3
assert executor.execution_time < 0.4

executor = AsyncioQueueGeneratorExecutor(logger=logger, in_parallel=10)
results = [result async for result in executor.run(tasks)]
assert results == [0, 3, 6, 9, 1, 4, 7, 2, 5, 8]
assert executor.execution_time > 0.2
assert executor.execution_time < 0.3
5 changes: 2 additions & 3 deletions tests/test_submit.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from maigret.submit import Submitter, MaigretSite, MaigretEngine
from unittest.mock import MagicMock, patch
from maigret.submit import Submitter
from aiohttp import ClientSession
from maigret.sites import MaigretDatabase
from maigret.settings import Settings
import logging


Expand Down

0 comments on commit 97e5f60

Please sign in to comment.