Skip to content

Commit

Permalink
feat(python-sdk): add async scraping and crawling methods using aioht…
Browse files Browse the repository at this point in the history
…tp and asyncio
  • Loading branch information
RutamBhagat committed Dec 11, 2024
1 parent f877fbf commit 8a69b4e
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 2 deletions.
158 changes: 158 additions & 0 deletions apps/python-sdk/firecrawl/firecrawl.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import requests
import pydantic
import websockets
import aiohttp
import asyncio

logger : logging.Logger = logging.getLogger("firecrawl")

Expand Down Expand Up @@ -106,6 +108,70 @@ def scrape_url(self, url: str, params: Optional[Dict[str, Any]] = None) -> Any:
else:
self._handle_error(response, 'scrape URL')


async def async_scrape_url(self, url: str, params: Optional[Dict[str, Any]] = None) -> Any:
"""
Async version of scrape_url using aiohttp.
"""
headers = self._prepare_headers()
scrape_params = {'url': url}

if params:
extract = params.get('extract', {})
if extract:
if 'schema' in extract and hasattr(extract['schema'], 'schema'):
extract['schema'] = extract['schema'].schema()
scrape_params['extract'] = extract

for key, value in params.items():
if key not in ['extract']:
scrape_params[key] = value

endpoint = f'/v1/scrape'
async with aiohttp.ClientSession() as session:
async with session.post(
f'{self.api_url}{endpoint}',
headers=headers,
json=scrape_params
) as response:
if response.status == 200:
response_data = await response.json()
if response_data['success'] and 'data' in response_data:
return response_data['data']
elif "error" in response_data:
raise Exception(f'Failed to scrape URL. Error: {response_data["error"]}')
else:
raise Exception(f'Failed to scrape URL. Error: {response_data}')
else:
await self._async_handle_error(response, 'scrape URL')


async def async_batch_scrape_urls(self, urls: list[str],
params: Optional[Dict[str, Any]] = None,
poll_interval: Optional[int] = 2,
idempotency_key: Optional[str] = None) -> Any:
"""
Async version of batch_scrape_urls using aiohttp.
"""
headers = self._prepare_headers(idempotency_key)
json_data = {'urls': urls}
if params:
json_data.update(params)

async with aiohttp.ClientSession() as session:
async with session.post(
f'{self.api_url}/v1/batch/scrape',
headers=headers,
json=json_data
) as response:
if response.status == 200:
response_data = await response.json()
id = response_data.get('id')
return await self._async_monitor_job_status(id, headers, poll_interval)
else:
await self._async_handle_error(response, 'start batch scrape job')


def search(self, query: str, params: Optional[Dict[str, Any]] = None) -> Any:
"""
Perform a search using the Firecrawl API.
Expand All @@ -123,6 +189,7 @@ def search(self, query: str, params: Optional[Dict[str, Any]] = None) -> Any:
"""
raise NotImplementedError("Search is not supported in v1.")


def crawl_url(self, url: str,
params: Optional[Dict[str, Any]] = None,
poll_interval: Optional[int] = 2,
Expand Down Expand Up @@ -163,6 +230,32 @@ def crawl_url(self, url: str,
self._handle_error(response, 'start crawl job')


async def async_crawl_url(self, url: str,
params: Optional[Dict[str, Any]] = None,
poll_interval: Optional[int] = 2,
idempotency_key: Optional[str] = None) -> Any:
"""
Async version of crawl_url using aiohttp.
"""
headers = self._prepare_headers(idempotency_key)
json_data = {'url': url}
if params:
json_data.update(params)

async with aiohttp.ClientSession() as session:
async with session.post(
f'{self.api_url}/v1/crawl',
headers=headers,
json=json_data
) as response:
if response.status == 200:
response_data = await response.json()
id = response_data.get('id')
return await self._async_monitor_job_status(id, headers, poll_interval)
else:
await self._async_handle_error(response, 'start crawl job')


def async_crawl_url(self, url: str, params: Optional[Dict[str, Any]] = None, idempotency_key: Optional[str] = None) -> Dict[str, Any]:
"""
Initiate a crawl job asynchronously.
Expand Down Expand Up @@ -603,6 +696,48 @@ def _delete_request(self, url: str,
return response
return response


async def _async_monitor_job_status(self, id: str, headers: Dict[str, str], poll_interval: int) -> Any:
"""
Async version of _monitor_job_status using aiohttp.
"""
async with aiohttp.ClientSession() as session:
while True:
async with session.get(
f'{self.api_url}/v1/crawl/{id}',
headers=headers
) as response:
if response.status == 200:
status_data = await response.json()
if status_data['status'] == 'completed':
if 'data' in status_data:
data = status_data['data']
while 'next' in status_data:
next_url = status_data.get('next')
if not next_url:
logger.warning("Expected 'next' URL is missing.")
break
try:
async with session.get(next_url, headers=headers) as next_response:
if next_response.status != 200:
logger.error(f"Failed to fetch next page: {next_response.status}")
break
next_data = await next_response.json()
data.extend(next_data.get('data', []))
status_data = next_data
except Exception as e:
logger.error(f"Error during pagination request: {e}")
break
status_data['data'] = data
return status_data
elif status_data['status'] in ['active', 'paused', 'pending', 'queued', 'waiting', 'scraping']:
await asyncio.sleep(max(poll_interval, 2))
else:
raise Exception(f'Job failed or was stopped. Status: {status_data["status"]}')
else:
await self._async_handle_error(response, 'check job status')


def _monitor_job_status(self, id: str, headers: Dict[str, str], poll_interval: int) -> Any:
"""
Monitor the status of a crawl job until completion.
Expand Down Expand Up @@ -642,6 +777,29 @@ def _monitor_job_status(self, id: str, headers: Dict[str, str], poll_interval: i
else:
self._handle_error(status_response, 'check crawl status')


async def _async_handle_error(self, response: aiohttp.ClientResponse, action: str) -> None:
"""
Async version of _handle_error for aiohttp responses.
"""
error_data = await response.json()
error_message = error_data.get('error', 'No error message provided.')
error_details = error_data.get('details', 'No additional error details provided.')

if response.status == 402:
message = f"Payment Required: Failed to {action}. {error_message} - {error_details}"
elif response.status == 408:
message = f"Request Timeout: Failed to {action} as the request timed out. {error_message} - {error_details}"
elif response.status == 409:
message = f"Conflict: Failed to {action} due to a conflict. {error_message} - {error_details}"
elif response.status == 500:
message = f"Internal Server Error: Failed to {action}. {error_message} - {error_details}"
else:
message = f"Unexpected error during {action}: Status code {response.status}. {error_message} - {error_details}"

raise aiohttp.ClientError(message)


def _handle_error(self, response: requests.Response, action: str) -> None:
"""
Handle errors from API responses.
Expand Down
4 changes: 3 additions & 1 deletion apps/python-sdk/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ dependencies = [
"requests",
"python-dotenv",
"websockets",
"nest-asyncio"
"nest-asyncio",
"aiohttp>=3.8.0",
"asyncio>=3.4.3",
]
authors = [{name = "Mendable.ai",email = "[email protected]"}]
maintainers = [{name = "Mendable.ai",email = "[email protected]"}]
Expand Down
4 changes: 3 additions & 1 deletion apps/python-sdk/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@ requests
pytest
python-dotenv
websockets
nest-asyncio
nest-asyncio
aiohttp
asyncio

0 comments on commit 8a69b4e

Please sign in to comment.