Skip to content

Commit

Permalink
Merge pull request #4 from reworkd/paginate
Browse files Browse the repository at this point in the history
Paginate
  • Loading branch information
awtkns authored Mar 21, 2024
2 parents 30fee74 + 48e4d44 commit 01cc243
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 27 deletions.
38 changes: 38 additions & 0 deletions .github/workflows/python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,26 @@ env:
PYTHON_VERSION: "3.11"

jobs:
check-version:
runs-on: ubuntu-latest
outputs:
should_publish: ${{ steps.pre-val.outputs.should_publish }}
steps:
- uses: actions/checkout@v2
- name: Check if current version is published
id: pre-val
run: |
LOCAL_VERSION=$(grep '^version =' pyproject.toml | head -1 | awk -F '"' '{print $2}')
REMOTE_VERSION=$(curl -s https://pypi.org/pypi/tarsier/json | jq -r .info.version)
echo "Local version: $LOCAL_VERSION"
echo "Remote version: $REMOTE_VERSION"
if [ "$LOCAL_VERSION" != "$REMOTE_VERSION" ]; then
echo "Version $LOCAL_VERSION is not published yet"
echo "::set-output name=should_publish::true"
fi
format:
runs-on: ubuntu-latest
steps:
Expand Down Expand Up @@ -38,3 +58,21 @@ jobs:
- run: poetry install
- name: Run pytest
run: poetry run pytest . -v

publish:
needs: [ check-version, format, pytest ]
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main' && needs.check-version.outputs.should_publish == 'true'
steps:
- uses: actions/checkout@v3
- name: Install poetry
run: pipx install poetry
- uses: actions/setup-python@v4
with:
python-version: ${{ env.PYTHON_VERSION }}
cache: "poetry"
- run: poetry install
- name: Build and Publish
run: |
poetry config pypi-token.pypi ${{ secrets.PYPI_TOKEN }}
poetry publish --build
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,4 @@ cython_debug/
*.db
*.bin
/screenshots/
/data/
58 changes: 37 additions & 21 deletions harambe/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import tempfile
import uuid
from functools import wraps
from typing import Callable, List, Optional, Protocol, Union, Awaitable
from typing import Any, Callable, List, Optional, Protocol, Union, Awaitable

import aiohttp
from playwright.async_api import (
Expand All @@ -23,6 +23,8 @@
LoggingObserver,
OutputObserver,
DownloadMeta,
StopPaginationObserver,
ObservationTrigger,
)
from harambe.tracker import FileDataTracker
from harambe.types import URL, AsyncScraperType, Context, ScrapeResult, Stage
Expand All @@ -34,8 +36,7 @@ class AsyncScraper(Protocol):
Note that scrapers in harambe should be functions, not classes.
"""

async def scrape(self, sdk: "SDK", url: URL, context: Context) -> None:
...
async def scrape(self, sdk: "SDK", url: URL, context: Context) -> None: ...


class SDK:
Expand Down Expand Up @@ -63,13 +64,15 @@ def __init__(
self._stage = stage
self._scraper = scraper
self._context = context or {}
self._saved_data = set()

if not observer:
observer = [LoggingObserver()]

if not isinstance(observer, list):
observer = [observer]

observer.insert(0, StopPaginationObserver())
self._observers = observer

async def save_data(self, *data: ScrapeResult) -> None:
Expand All @@ -82,7 +85,7 @@ async def save_data(self, *data: ScrapeResult) -> None:
url = self.page.url
for d in data:
d["__url"] = url
await asyncio.gather(*[o.on_save_data(d) for o in self._observers])
await self._notify_observers("on_save_data", d)

async def enqueue(self, *urls: URL, context: Optional[Context] = None) -> None:
"""
Expand All @@ -97,19 +100,17 @@ async def enqueue(self, *urls: URL, context: Optional[Context] = None) -> None:
context["__url"] = self.page.url

for url in urls:
await asyncio.gather(
*[o.on_queue_url(url, context) for o in self._observers]
)
await self._notify_observers("on_queue_url", url, context)

async def paginate(
self,
get_next_page_element: Callable[..., Awaitable[URL | ElementHandle | None]],
timeout: int = 5000,
timeout: int = 2000,
) -> None:
"""
Navigate to the next page of a listing.
:param sleep: seconds to sleep for before continuing
:param timeout: milliseconds to sleep for before continuing
:param get_next_page_element: the url or ElementHandle of the next page
"""
try:
Expand All @@ -120,6 +121,7 @@ async def paginate(
next_url = ""
if isinstance(next_page, ElementHandle):
await next_page.click(timeout=timeout)
await self.page.wait_for_timeout(timeout)
next_url = self.page.url

elif isinstance(next_page, str):
Expand All @@ -128,12 +130,16 @@ async def paginate(
# TODO: merge query params
next_url = self.page.url.split("?")[0] + next_url
await self.page.goto(next_url)
await self.page.wait_for_timeout(timeout)

if next_url:
for o in self._observers:
o.on_paginate(self.page.url)

await self._scraper(
self, next_url, self._context
) # TODO: eventually fix this to not be recursive
except TimeoutError:
except (TimeoutError, StopAsyncIteration):
return

async def capture_url(
Expand Down Expand Up @@ -175,11 +181,8 @@ async def capture_download(
with open(temp_file.name, "rb") as f:
content = f.read()

res = await asyncio.gather(
*[
o.on_download(download.url, download.suggested_filename, content)
for o in self._observers
]
res = await self._notify_observers(
"on_download", download.url, download.suggested_filename, content
)
return res[0]

Expand All @@ -195,14 +198,28 @@ async def capture_pdf(
) # Allow for some extra time for the page to load
pdf_content = await self.page.pdf()
file_name = PAGE_PDF_FILENAME
res = await asyncio.gather(
*[
o.on_download(self.page.url, file_name, pdf_content)
for o in self._observers
]
res = await self._notify_observers(
"on_download", self.page.url, file_name, pdf_content
)
return res[0]

async def _notify_observers(
self, method: ObservationTrigger, *args: Any, **kwargs: Any
) -> Any:
"""
Notify all observers of an event. This will call the method on each observer that is subscribed. Note that
the first observer is the stop pagination observer, so it will always be called separately so that we can stop
pagination if needed.
:param method: observation trigger
:param args: arguments to pass to the method
:param kwargs: keyword arguments to pass to the method
:return: the result of the method call
"""
await getattr(self._observers[0], method)(*args, **kwargs)
return await asyncio.gather(
*[getattr(o, method)(*args, **kwargs) for o in self._observers[1:]]
)

@staticmethod
async def run(
scraper: AsyncScraperType,
Expand Down Expand Up @@ -254,7 +271,6 @@ async def run(
context,
)
except Exception as e:
# TODO: Fix path for non Mr. Watkins
await ctx.tracing.stop(path="trace.zip")
await browser.close()
raise e
Expand Down
63 changes: 59 additions & 4 deletions harambe/observer.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,23 @@
from abc import abstractmethod
from typing import Any, Dict, List, Protocol, Tuple, runtime_checkable, TypedDict
from typing import (
Any,
Dict,
List,
Protocol,
Tuple,
runtime_checkable,
TypedDict,
Literal,
)
from urllib.parse import quote

from harambe.tracker import FileDataTracker
from harambe.types import URL, Context, Stage


ObservationTrigger = Literal["on_save_data", "on_queue_url", "on_download", "on_paginate"]


@runtime_checkable
class OutputObserver(Protocol):
@abstractmethod
Expand All @@ -22,9 +34,12 @@ async def on_download(
) -> "DownloadMeta":
raise NotImplementedError()

@abstractmethod
def on_paginate(self, next_url: str) -> None:
raise NotImplementedError()


class LoggingObserver(OutputObserver):
# TODO: use logger
async def on_save_data(self, data: Dict[str, Any]):
print(data)

Expand All @@ -40,12 +55,15 @@ async def on_download(
"filename": filename,
}

def on_paginate(self, next_url: str) -> None:
pass


class LocalStorageObserver(OutputObserver):
def __init__(self, domain: str, stage: Stage):
self._tracker = FileDataTracker(domain, stage)

async def on_save_data(self, data: Dict[str, Any]):
async def on_save_data(self, data: Dict[str, Any]) -> None:
self._tracker.save_data(data)

async def on_queue_url(self, url: URL, context: Dict[str, Any]) -> None:
Expand All @@ -61,14 +79,17 @@ async def on_download(
self._tracker.save_data(data)
return data

def on_paginate(self, next_url: str) -> None:
pass


class InMemoryObserver(OutputObserver):
def __init__(self):
self._data: List[Dict[str, Any]] = []
self._urls: List[Tuple[URL, Context]] = []
self._files: List[Tuple[str, bytes]] = []

async def on_save_data(self, data: Dict[str, Any]):
async def on_save_data(self, data: Dict[str, Any]) -> None:
self._data.append(data)

async def on_queue_url(self, url: URL, context: Dict[str, Any]) -> None:
Expand All @@ -84,6 +105,9 @@ async def on_download(
self._files.append((filename, content))
return data

def on_paginate(self, next_url: str) -> None:
pass

@property
def data(self) -> List[Dict[str, Any]]:
return self._data
Expand All @@ -97,6 +121,37 @@ def files(self) -> List[Tuple[str, bytes]]:
return self._files


class StopPaginationObserver(OutputObserver):
def __init__(self):
self._saved_data = set()
self._paginator_called = False

async def on_save_data(self, data: dict[str, Any]):
self._add_data(data)

async def on_queue_url(self, url: URL, context: dict[str, Any]) -> None:
self._add_data(url)

# noinspection PyTypeChecker
async def on_download(
self, download_url: str, filename: str, content: bytes
) -> "DownloadMeta":
self._add_data((download_url, filename))

def on_paginate(self, next_url: str) -> None:
self._paginator_called = True

def _add_data(self, data: Any):
d_set = frozenset(
(item for item in data.items() if not item[0].startswith("__"))
if isinstance(data, dict)
else data
)
if self._paginator_called and d_set in self._saved_data:
raise StopAsyncIteration()
self._saved_data.add(d_set)


class DownloadMeta(TypedDict):
url: str
filename: str
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "harambe-sdk"
version = "0.8.41"
version = "0.9.0"
description = "Data extraction SDK for Playwright 🐒🍌"
authors = ["awtkns <[email protected]>"]
readme = "README.md"
Expand Down
Loading

0 comments on commit 01cc243

Please sign in to comment.