From 90582387cd79477278db4e2d3dd8d891e9decf05 Mon Sep 17 00:00:00 2001 From: khoomeik <32777448+KhoomeiK@users.noreply.github.com> Date: Wed, 20 Mar 2024 17:05:59 -0700 Subject: [PATCH 1/4] check duplicates --- harambe/core.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/harambe/core.py b/harambe/core.py index 8165f1a..96649f7 100644 --- a/harambe/core.py +++ b/harambe/core.py @@ -63,6 +63,7 @@ def __init__( self._stage = stage self._scraper = scraper self._context = context or {} + self._saved_data = set() if not observer: observer = [LoggingObserver()] @@ -81,8 +82,13 @@ async def save_data(self, *data: ScrapeResult) -> None: """ url = self.page.url for d in data: - d["__url"] = url - await asyncio.gather(*[o.on_save_data(d) for o in self._observers]) + d_set = frozenset(d.items()) + if d_set not in self._saved_data: + self._saved_data.add(d_set) + d["__url"] = url + await asyncio.gather(*[o.on_save_data(d) for o in self._observers]) + else: + continue async def enqueue(self, *urls: URL, context: Optional[Context] = None) -> None: """ From 468cda783a910a96689d3114234a3c6bf8fc3642 Mon Sep 17 00:00:00 2001 From: awtkns <32209255+awtkns@users.noreply.github.com> Date: Thu, 21 Mar 2024 10:21:04 -0700 Subject: [PATCH 2/4] imporve pagination --- .github/workflows/python.yml | 38 ++++++++++++++++++++++++++++++++++++ .gitignore | 1 + harambe/core.py | 15 +++++--------- harambe/observer.py | 33 +++++++++++++++++++++++++------ pyproject.toml | 2 +- 5 files changed, 72 insertions(+), 17 deletions(-) diff --git a/.github/workflows/python.yml b/.github/workflows/python.yml index 5da37c2..24450d1 100644 --- a/.github/workflows/python.yml +++ b/.github/workflows/python.yml @@ -9,6 +9,26 @@ env: PYTHON_VERSION: "3.11" jobs: + check-version: + runs-on: ubuntu-latest + outputs: + should_publish: ${{ steps.pre-val.outputs.should_publish }} + steps: + - uses: actions/checkout@v2 + - name: Check if current version is published + id: pre-val + run: | + LOCAL_VERSION=$(grep '^version =' pyproject.toml | head -1 | awk -F '"' '{print $2}') + REMOTE_VERSION=$(curl -s https://pypi.org/pypi/tarsier/json | jq -r .info.version) + + echo "Local version: $LOCAL_VERSION" + echo "Remote version: $REMOTE_VERSION" + + if [ "$LOCAL_VERSION" != "$REMOTE_VERSION" ]; then + echo "Version $LOCAL_VERSION is not published yet" + echo "::set-output name=should_publish::true" + fi + format: runs-on: ubuntu-latest steps: @@ -38,3 +58,21 @@ jobs: - run: poetry install - name: Run pytest run: poetry run pytest . -v + + publish: + needs: [ check-version, format, pytest ] + runs-on: ubuntu-latest + if: github.ref == 'refs/heads/main' && needs.check-version.outputs.should_publish == 'true' + steps: + - uses: actions/checkout@v3 + - name: Install poetry + run: pipx install poetry + - uses: actions/setup-python@v4 + with: + python-version: ${{ env.PYTHON_VERSION }} + cache: "poetry" + - run: poetry install + - name: Build and Publish + run: | + poetry config pypi-token.pypi ${{ secrets.PYPI_TOKEN }} + poetry publish --build diff --git a/.gitignore b/.gitignore index d2a31f5..53a47b9 100644 --- a/.gitignore +++ b/.gitignore @@ -147,3 +147,4 @@ cython_debug/ *.db *.bin /screenshots/ +/data/ diff --git a/harambe/core.py b/harambe/core.py index 96649f7..ac8dd4b 100644 --- a/harambe/core.py +++ b/harambe/core.py @@ -22,7 +22,7 @@ LocalStorageObserver, LoggingObserver, OutputObserver, - DownloadMeta, + DownloadMeta, StopPaginationObserver, ) from harambe.tracker import FileDataTracker from harambe.types import URL, AsyncScraperType, Context, ScrapeResult, Stage @@ -71,6 +71,7 @@ def __init__( if not isinstance(observer, list): observer = [observer] + observer.append(StopPaginationObserver()) self._observers = observer async def save_data(self, *data: ScrapeResult) -> None: @@ -82,13 +83,8 @@ async def save_data(self, *data: ScrapeResult) -> None: """ url = self.page.url for d in data: - d_set = frozenset(d.items()) - if d_set not in self._saved_data: - self._saved_data.add(d_set) - d["__url"] = url - await asyncio.gather(*[o.on_save_data(d) for o in self._observers]) - else: - continue + d["__url"] = url + await asyncio.gather(*[o.on_save_data(d) for o in self._observers]) async def enqueue(self, *urls: URL, context: Optional[Context] = None) -> None: """ @@ -139,7 +135,7 @@ async def paginate( await self._scraper( self, next_url, self._context ) # TODO: eventually fix this to not be recursive - except TimeoutError: + except (TimeoutError, StopAsyncIteration): return async def capture_url( @@ -260,7 +256,6 @@ async def run( context, ) except Exception as e: - # TODO: Fix path for non Mr. Watkins await ctx.tracing.stop(path="trace.zip") await browser.close() raise e diff --git a/harambe/observer.py b/harambe/observer.py index b3564e2..6239caa 100644 --- a/harambe/observer.py +++ b/harambe/observer.py @@ -18,7 +18,7 @@ async def on_queue_url(self, url: URL, context: Dict[str, Any]) -> None: @abstractmethod async def on_download( - self, download_url: str, filename: str, content: bytes + self, download_url: str, filename: str, content: bytes ) -> "DownloadMeta": raise NotImplementedError() @@ -32,7 +32,7 @@ async def on_queue_url(self, url: URL, context: Dict[str, Any]) -> None: print(f"Enqueuing: {url} with context {context}") async def on_download( - self, download_url: str, filename: str, content: bytes + self, download_url: str, filename: str, content: bytes ) -> "DownloadMeta": print(f"Downloading file: {filename}") # TODO: use logger return { @@ -45,14 +45,14 @@ class LocalStorageObserver(OutputObserver): def __init__(self, domain: str, stage: Stage): self._tracker = FileDataTracker(domain, stage) - async def on_save_data(self, data: Dict[str, Any]): + async def on_save_data(self, data: Dict[str, Any]) -> None: self._tracker.save_data(data) async def on_queue_url(self, url: URL, context: Dict[str, Any]) -> None: self._tracker.save_data({"url": url, "context": context}) async def on_download( - self, download_url: str, filename: str, content: bytes + self, download_url: str, filename: str, content: bytes ) -> "DownloadMeta": data = { "url": f"{download_url}/{quote(filename)}", @@ -68,14 +68,14 @@ def __init__(self): self._urls: List[Tuple[URL, Context]] = [] self._files: List[Tuple[str, bytes]] = [] - async def on_save_data(self, data: Dict[str, Any]): + async def on_save_data(self, data: Dict[str, Any]) -> None: self._data.append(data) async def on_queue_url(self, url: URL, context: Dict[str, Any]) -> None: self._urls.append((url, context)) async def on_download( - self, download_url: str, filename: str, content: bytes + self, download_url: str, filename: str, content: bytes ) -> "DownloadMeta": data = { "url": f"{download_url}/{quote(filename)}", @@ -97,6 +97,27 @@ def files(self) -> List[Tuple[str, bytes]]: return self._files +class StopPaginationObserver(OutputObserver): + def __init__(self): + self._saved_data = set() + + async def on_save_data(self, data: dict[str, Any]): + self._add_data(data) + + async def on_queue_url(self, url: URL, context: dict[str, Any]) -> None: + self._add_data(url) + + async def on_download(self, download_url: str, filename: str, content: bytes) -> "DownloadMeta": + pass + + def _add_data(self, data: Any): + # TODO remove __url from data? + d_set = frozenset(data.items() if isinstance(data, dict) else data) + if d_set in self._saved_data: + raise StopAsyncIteration() + self._saved_data.add(d_set) + + class DownloadMeta(TypedDict): url: str filename: str diff --git a/pyproject.toml b/pyproject.toml index d2dbc10..d9b0d5a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "harambe-sdk" -version = "0.8.41" +version = "0.8.43" description = "Data extraction SDK for Playwright 🐒🍌" authors = ["awtkns "] readme = "README.md" From ee4f86a7e15943be9a580a84b6daddedfa201194 Mon Sep 17 00:00:00 2001 From: khoomeik <32777448+KhoomeiK@users.noreply.github.com> Date: Thu, 21 Mar 2024 13:06:46 -0700 Subject: [PATCH 3/4] stop pagination upon duplicate data + mild refactoring --- harambe/core.py | 48 ++++++++++++++++++++++------------------- harambe/observer.py | 30 ++++++++++++++++++-------- tests/test_observers.py | 13 ++++++++++- 3 files changed, 59 insertions(+), 32 deletions(-) diff --git a/harambe/core.py b/harambe/core.py index ac8dd4b..df25ee1 100644 --- a/harambe/core.py +++ b/harambe/core.py @@ -2,7 +2,7 @@ import tempfile import uuid from functools import wraps -from typing import Callable, List, Optional, Protocol, Union, Awaitable +from typing import Any, Callable, List, Optional, Protocol, Union, Awaitable import aiohttp from playwright.async_api import ( @@ -22,7 +22,8 @@ LocalStorageObserver, LoggingObserver, OutputObserver, - DownloadMeta, StopPaginationObserver, + DownloadMeta, + StopPaginationObserver, ) from harambe.tracker import FileDataTracker from harambe.types import URL, AsyncScraperType, Context, ScrapeResult, Stage @@ -34,8 +35,7 @@ class AsyncScraper(Protocol): Note that scrapers in harambe should be functions, not classes. """ - async def scrape(self, sdk: "SDK", url: URL, context: Context) -> None: - ... + async def scrape(self, sdk: "SDK", url: URL, context: Context) -> None: ... class SDK: @@ -71,9 +71,16 @@ def __init__( if not isinstance(observer, list): observer = [observer] - observer.append(StopPaginationObserver()) + observer.insert(0, StopPaginationObserver()) self._observers = observer + async def _notify_observers(self, method: str, *args: Any, **kwargs: Any) -> None: + await getattr(self._observers[0], method)(*args, **kwargs) + res = await asyncio.gather( + *[getattr(o, method)(*args, **kwargs) for o in self._observers[1:]] + ) + return res + async def save_data(self, *data: ScrapeResult) -> None: """ Save scraped data. This will be passed to the on_save_data callback. @@ -84,7 +91,7 @@ async def save_data(self, *data: ScrapeResult) -> None: url = self.page.url for d in data: d["__url"] = url - await asyncio.gather(*[o.on_save_data(d) for o in self._observers]) + await self._notify_observers("on_save_data", d) async def enqueue(self, *urls: URL, context: Optional[Context] = None) -> None: """ @@ -99,19 +106,17 @@ async def enqueue(self, *urls: URL, context: Optional[Context] = None) -> None: context["__url"] = self.page.url for url in urls: - await asyncio.gather( - *[o.on_queue_url(url, context) for o in self._observers] - ) + await self._notify_observers("on_queue_url", url, context) async def paginate( self, get_next_page_element: Callable[..., Awaitable[URL | ElementHandle | None]], - timeout: int = 5000, + timeout: int = 2000, ) -> None: """ Navigate to the next page of a listing. - :param sleep: seconds to sleep for before continuing + :param timeout: milliseconds to sleep for before continuing :param get_next_page_element: the url or ElementHandle of the next page """ try: @@ -122,6 +127,7 @@ async def paginate( next_url = "" if isinstance(next_page, ElementHandle): await next_page.click(timeout=timeout) + await self.page.wait_for_timeout(timeout) next_url = self.page.url elif isinstance(next_page, str): @@ -130,12 +136,16 @@ async def paginate( # TODO: merge query params next_url = self.page.url.split("?")[0] + next_url await self.page.goto(next_url) + await self.page.wait_for_timeout(timeout) if next_url: + for o in self._observers: + o.on_paginate(self.page.url) + await self._scraper( self, next_url, self._context ) # TODO: eventually fix this to not be recursive - except (TimeoutError, StopAsyncIteration): + except (TimeoutError, StopAsyncIteration) as e: return async def capture_url( @@ -177,11 +187,8 @@ async def capture_download( with open(temp_file.name, "rb") as f: content = f.read() - res = await asyncio.gather( - *[ - o.on_download(download.url, download.suggested_filename, content) - for o in self._observers - ] + res = await self._notify_observers( + "on_download", download.url, download.suggested_filename, content ) return res[0] @@ -197,11 +204,8 @@ async def capture_pdf( ) # Allow for some extra time for the page to load pdf_content = await self.page.pdf() file_name = PAGE_PDF_FILENAME - res = await asyncio.gather( - *[ - o.on_download(self.page.url, file_name, pdf_content) - for o in self._observers - ] + res = await self._notify_observers( + "on_download", self.page.url, file_name, pdf_content ) return res[0] diff --git a/harambe/observer.py b/harambe/observer.py index 6239caa..fbbd936 100644 --- a/harambe/observer.py +++ b/harambe/observer.py @@ -18,10 +18,13 @@ async def on_queue_url(self, url: URL, context: Dict[str, Any]) -> None: @abstractmethod async def on_download( - self, download_url: str, filename: str, content: bytes + self, download_url: str, filename: str, content: bytes ) -> "DownloadMeta": raise NotImplementedError() + def on_paginate(self, next_url: str) -> None: + pass + class LoggingObserver(OutputObserver): # TODO: use logger @@ -32,7 +35,7 @@ async def on_queue_url(self, url: URL, context: Dict[str, Any]) -> None: print(f"Enqueuing: {url} with context {context}") async def on_download( - self, download_url: str, filename: str, content: bytes + self, download_url: str, filename: str, content: bytes ) -> "DownloadMeta": print(f"Downloading file: {filename}") # TODO: use logger return { @@ -52,7 +55,7 @@ async def on_queue_url(self, url: URL, context: Dict[str, Any]) -> None: self._tracker.save_data({"url": url, "context": context}) async def on_download( - self, download_url: str, filename: str, content: bytes + self, download_url: str, filename: str, content: bytes ) -> "DownloadMeta": data = { "url": f"{download_url}/{quote(filename)}", @@ -75,7 +78,7 @@ async def on_queue_url(self, url: URL, context: Dict[str, Any]) -> None: self._urls.append((url, context)) async def on_download( - self, download_url: str, filename: str, content: bytes + self, download_url: str, filename: str, content: bytes ) -> "DownloadMeta": data = { "url": f"{download_url}/{quote(filename)}", @@ -100,6 +103,7 @@ def files(self) -> List[Tuple[str, bytes]]: class StopPaginationObserver(OutputObserver): def __init__(self): self._saved_data = set() + self._paginator_called = False async def on_save_data(self, data: dict[str, Any]): self._add_data(data) @@ -107,13 +111,21 @@ async def on_save_data(self, data: dict[str, Any]): async def on_queue_url(self, url: URL, context: dict[str, Any]) -> None: self._add_data(url) - async def on_download(self, download_url: str, filename: str, content: bytes) -> "DownloadMeta": - pass + async def on_download( + self, download_url: str, filename: str, content: bytes + ) -> "DownloadMeta": + self._add_data((download_url, filename, content)) + + def on_paginate(self, next_url: str) -> None: + self._paginator_called = True def _add_data(self, data: Any): - # TODO remove __url from data? - d_set = frozenset(data.items() if isinstance(data, dict) else data) - if d_set in self._saved_data: + d_set = frozenset( + (item for item in data.items() if not item[0].startswith("__")) + if isinstance(data, dict) + else data + ) + if self._paginator_called and d_set in self._saved_data: raise StopAsyncIteration() self._saved_data.add(d_set) diff --git a/tests/test_observers.py b/tests/test_observers.py index ad1f412..28855e5 100644 --- a/tests/test_observers.py +++ b/tests/test_observers.py @@ -1,6 +1,6 @@ import pytest -from harambe.observer import InMemoryObserver +from harambe.observer import InMemoryObserver, StopPaginationObserver @pytest.mark.asyncio @@ -24,3 +24,14 @@ async def in_memory_on_queue_url(): ("https://example.com", {"foo": "bar"}), ("https://example.org", {"baz": "qux"}), ] + + +@pytest.mark.asyncio +async def test_stop_pagination_observer_duplicate_data_error(): + observer = StopPaginationObserver() + + await observer.on_save_data({"foo": "bar"}) + observer.on_paginate("https://example.com/page2") + + with pytest.raises(StopAsyncIteration): + await observer.on_save_data({"foo": "bar"}) From 48e4d4491a127b6c36ace1e412eefdcbeaf85798 Mon Sep 17 00:00:00 2001 From: awtkns <32209255+awtkns@users.noreply.github.com> Date: Thu, 21 Mar 2024 15:41:32 -0700 Subject: [PATCH 4/4] =?UTF-8?q?=F0=9F=AB=A1=20Stop=20pagination=20observer?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- harambe/core.py | 27 ++++++++++++++++------- harambe/observer.py | 30 ++++++++++++++++++++++---- pyproject.toml | 2 +- tests/test_observers.py | 47 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 93 insertions(+), 13 deletions(-) diff --git a/harambe/core.py b/harambe/core.py index df25ee1..7a2425d 100644 --- a/harambe/core.py +++ b/harambe/core.py @@ -24,6 +24,7 @@ OutputObserver, DownloadMeta, StopPaginationObserver, + ObservationTrigger, ) from harambe.tracker import FileDataTracker from harambe.types import URL, AsyncScraperType, Context, ScrapeResult, Stage @@ -74,13 +75,6 @@ def __init__( observer.insert(0, StopPaginationObserver()) self._observers = observer - async def _notify_observers(self, method: str, *args: Any, **kwargs: Any) -> None: - await getattr(self._observers[0], method)(*args, **kwargs) - res = await asyncio.gather( - *[getattr(o, method)(*args, **kwargs) for o in self._observers[1:]] - ) - return res - async def save_data(self, *data: ScrapeResult) -> None: """ Save scraped data. This will be passed to the on_save_data callback. @@ -145,7 +139,7 @@ async def paginate( await self._scraper( self, next_url, self._context ) # TODO: eventually fix this to not be recursive - except (TimeoutError, StopAsyncIteration) as e: + except (TimeoutError, StopAsyncIteration): return async def capture_url( @@ -209,6 +203,23 @@ async def capture_pdf( ) return res[0] + async def _notify_observers( + self, method: ObservationTrigger, *args: Any, **kwargs: Any + ) -> Any: + """ + Notify all observers of an event. This will call the method on each observer that is subscribed. Note that + the first observer is the stop pagination observer, so it will always be called separately so that we can stop + pagination if needed. + :param method: observation trigger + :param args: arguments to pass to the method + :param kwargs: keyword arguments to pass to the method + :return: the result of the method call + """ + await getattr(self._observers[0], method)(*args, **kwargs) + return await asyncio.gather( + *[getattr(o, method)(*args, **kwargs) for o in self._observers[1:]] + ) + @staticmethod async def run( scraper: AsyncScraperType, diff --git a/harambe/observer.py b/harambe/observer.py index fbbd936..787ea37 100644 --- a/harambe/observer.py +++ b/harambe/observer.py @@ -1,11 +1,23 @@ from abc import abstractmethod -from typing import Any, Dict, List, Protocol, Tuple, runtime_checkable, TypedDict +from typing import ( + Any, + Dict, + List, + Protocol, + Tuple, + runtime_checkable, + TypedDict, + Literal, +) from urllib.parse import quote from harambe.tracker import FileDataTracker from harambe.types import URL, Context, Stage +ObservationTrigger = Literal["on_save_data", "on_queue_url", "on_download", "on_paginate"] + + @runtime_checkable class OutputObserver(Protocol): @abstractmethod @@ -22,12 +34,12 @@ async def on_download( ) -> "DownloadMeta": raise NotImplementedError() + @abstractmethod def on_paginate(self, next_url: str) -> None: - pass + raise NotImplementedError() class LoggingObserver(OutputObserver): - # TODO: use logger async def on_save_data(self, data: Dict[str, Any]): print(data) @@ -43,6 +55,9 @@ async def on_download( "filename": filename, } + def on_paginate(self, next_url: str) -> None: + pass + class LocalStorageObserver(OutputObserver): def __init__(self, domain: str, stage: Stage): @@ -64,6 +79,9 @@ async def on_download( self._tracker.save_data(data) return data + def on_paginate(self, next_url: str) -> None: + pass + class InMemoryObserver(OutputObserver): def __init__(self): @@ -87,6 +105,9 @@ async def on_download( self._files.append((filename, content)) return data + def on_paginate(self, next_url: str) -> None: + pass + @property def data(self) -> List[Dict[str, Any]]: return self._data @@ -111,10 +132,11 @@ async def on_save_data(self, data: dict[str, Any]): async def on_queue_url(self, url: URL, context: dict[str, Any]) -> None: self._add_data(url) + # noinspection PyTypeChecker async def on_download( self, download_url: str, filename: str, content: bytes ) -> "DownloadMeta": - self._add_data((download_url, filename, content)) + self._add_data((download_url, filename)) def on_paginate(self, next_url: str) -> None: self._paginator_called = True diff --git a/pyproject.toml b/pyproject.toml index d9b0d5a..1255c3b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "harambe-sdk" -version = "0.8.43" +version = "0.9.0" description = "Data extraction SDK for Playwright 🐒🍌" authors = ["awtkns "] readme = "README.md" diff --git a/tests/test_observers.py b/tests/test_observers.py index 28855e5..c2c0a9f 100644 --- a/tests/test_observers.py +++ b/tests/test_observers.py @@ -35,3 +35,50 @@ async def test_stop_pagination_observer_duplicate_data_error(): with pytest.raises(StopAsyncIteration): await observer.on_save_data({"foo": "bar"}) + + +@pytest.mark.asyncio +async def test_stop_pagination_observer_duplicate_url_error(): + observer = StopPaginationObserver() + + await observer.on_queue_url("https://example.com", {"foo": "bar"}) + observer.on_paginate("https://example.com/page2") + + with pytest.raises(StopAsyncIteration): + await observer.on_queue_url("https://example.com", {"foo": "bar"}) + + +@pytest.mark.asyncio +async def test_stop_pagination_observer_duplicate_download_error(): + observer = StopPaginationObserver() + + await observer.on_download("https://example.com", "foo.txt", b"foo") + observer.on_paginate("https://example.com/page2") + + with pytest.raises(StopAsyncIteration): + await observer.on_download("https://example.com", "foo.txt", b"foo") + + +@pytest.mark.asyncio +async def test_stop_pagination_observer_no_duplicate_data(): + observer = StopPaginationObserver() + await observer.on_save_data({"foo": "bar"}) + observer.on_paginate("https://example.com/page2") + await observer.on_save_data({"baz": "qux"}) + + +@pytest.mark.asyncio +async def test_duplicate_data_without_pagination(): + observer = StopPaginationObserver() + await observer.on_save_data({"foo": "bar"}) + await observer.on_save_data({"foo": "bar"}) + + await observer.on_queue_url("https://example.com", {"foo": "bar"}) + await observer.on_queue_url("https://example.com", {"foo": "bar"}) + + await observer.on_download("https://example.com", "foo.txt", b"foo") + await observer.on_download("https://example.com", "foo.txt", b"foo") + + observer.on_paginate("https://example.com/page2") + with pytest.raises(StopAsyncIteration): + await observer.on_save_data({"foo": "bar"})