diff --git a/llama_parse/base.py b/llama_parse/base.py index 8ea1118..7a089b9 100644 --- a/llama_parse/base.py +++ b/llama_parse/base.py @@ -32,6 +32,11 @@ _DEFAULT_SEPARATOR = "\n---\n" +JOB_RESULT_URL = "/api/parsing/job/{job_id}/result/{result_type}" +JOB_STATUS_ROUTE = "/api/parsing/job/{job_id}" +JOB_UPLOAD_ROUTE = "/api/parsing/upload" + + class LlamaParse(BasePydanticReader): """A smart-parser for files.""" @@ -49,9 +54,11 @@ class LlamaParse(BasePydanticReader): default=1, description="The interval in seconds to check if the parsing is done.", ) + custom_client: Optional[httpx.AsyncClient] = Field( default=None, description="A custom HTTPX client to use for sending requests." ) + ignore_errors: bool = Field( default=True, description="Whether or not to ignore and skip errors raised during parsing.", @@ -308,7 +315,7 @@ def validate_base_url(cls, v: str) -> str: @property def aclient(self) -> httpx.AsyncClient: if not self._aclient: - client = self.custom_client or httpx.AsyncClient() + self._aclient = self.custom_client or httpx.AsyncClient() # need to do this outside instantiation in case user # updates base_url, api_key, or max_timeout later @@ -316,11 +323,11 @@ def aclient(self) -> httpx.AsyncClient: # if someone does do it and it doesn't reflect on # the client they'll end up pretty confused, so # for the sake of ergonomics... - client.base_url = self.base_url - client.headers["Authorization"] = f"Bearer {self.api_key}" - client.timeout = self.max_timeout + self._aclient.base_url = self.base_url + self._aclient.headers["Authorization"] = f"Bearer {self.api_key}" + self._aclient.timeout = self.max_timeout - return client + return self._aclient @asynccontextmanager async def client_context(self) -> AsyncGenerator[httpx.AsyncClient, None]: @@ -370,8 +377,6 @@ async def _create_job( extra_info: Optional[dict] = None, fs: Optional[AbstractFileSystem] = None, ) -> str: - headers = {"Authorization": f"Bearer {self.api_key}"} - url = f"{self.base_url}/api/parsing/upload" files = None file_handle = None input_url = file_input if self._is_input_url(file_input) else None @@ -589,17 +594,12 @@ async def _create_job( data["gpt4o_api_key"] = self.gpt4o_api_key try: - async with self.client_context() as client: - response = await client.post( - url, - files=files, - headers=headers, - data=data, - ) - if not response.is_success: - raise Exception(f"Failed to parse the file: {response.text}") - job_id = response.json()["id"] - return job_id + resp = await self.aclient.post(JOB_UPLOAD_ROUTE, files=files, data=data) # type: ignore + resp.raise_for_status() # this raises if status is not 2xx + return resp.json()["id"] + except httpx.HTTPStatusError as err: # this catches it + msg = f"Failed to parse the file: {err.response.text}" + raise Exception(msg) from err # this preserves the exception context finally: if file_handle is not None: file_handle.close() @@ -607,52 +607,51 @@ async def _create_job( async def _get_job_result( self, job_id: str, result_type: str, verbose: bool = False ) -> Dict[str, Any]: - result_url = f"{self.base_url}/api/parsing/job/{job_id}/result/{result_type}" - status_url = f"{self.base_url}/api/parsing/job/{job_id}" - headers = {"Authorization": f"Bearer {self.api_key}"} - start = time.time() tries = 0 + + # so we're not re-setting the headers & stuff on each + # usage... assume that there is not some other + # coro also modifying base_url and the other client related configs. + client = self.aclient while True: await asyncio.sleep(self.check_interval) - async with self.client_context() as client: - tries += 1 - - result = await client.get(status_url, headers=headers) - - if result.status_code != 200: - end = time.time() - if end - start > self.max_timeout: - raise Exception(f"Timeout while parsing the file: {job_id}") - if verbose and tries % 10 == 0: - print(".", end="", flush=True) - - await asyncio.sleep(self.check_interval) - - continue - - # Allowed values "PENDING", "SUCCESS", "ERROR", "CANCELED" - result_json = result.json() - status = result_json["status"] - if status == "SUCCESS": - parsed_result = await client.get(result_url, headers=headers) - return parsed_result.json() - elif status == "PENDING": - end = time.time() - if end - start > self.max_timeout: - raise Exception(f"Timeout while parsing the file: {job_id}") - if verbose and tries % 10 == 0: - print(".", end="", flush=True) - - await asyncio.sleep(self.check_interval) - else: - error_code = result_json.get("error_code", "No error code found") - error_message = result_json.get( - "error_message", "No error message found" - ) + tries += 1 + result = await client.get(JOB_STATUS_ROUTE.format(job_id=job_id)) + if result.status_code != 200: + end = time.time() + if end - start > self.max_timeout: + raise Exception(f"Timeout while parsing the file: {job_id}") + if verbose and tries % 10 == 0: + print(".", end="", flush=True) + await asyncio.sleep(self.check_interval) + continue + + # Allowed values "PENDING", "SUCCESS", "ERROR", "CANCELED" + result_json = result.json() + status = result_json["status"] + if status == "SUCCESS": + parsed_result = await client.get( + JOB_RESULT_URL.format(job_id=job_id, result_type=result_type), + ) + return parsed_result.json() - exception_str = f"Job ID: {job_id} failed with status: {status}, Error code: {error_code}, Error message: {error_message}" - raise Exception(exception_str) + elif status == "PENDING": + end = time.time() + if end - start > self.max_timeout: + raise Exception(f"Timeout while parsing the file: {job_id}") + if verbose and tries % 10 == 0: + print(".", end="", flush=True) + await asyncio.sleep(self.check_interval) + + else: + error_code = result_json.get("error_code", "No error code found") + error_message = result_json.get( + "error_message", "No error message found" + ) + + exception_str = f"Job ID: {job_id} failed with status: {status}, Error code: {error_code}, Error message: {error_message}" + raise Exception(exception_str) async def _aload_data( self, @@ -817,12 +816,11 @@ async def aget_assets( self, json_result: List[dict], download_path: str, asset_key: str ) -> List[dict]: """Download assets (images or charts) from the parsed result.""" - headers = {"Authorization": f"Bearer {self.api_key}"} - # Make the download path if not os.path.exists(download_path): os.makedirs(download_path) + client = self.aclient try: assets = [] for result in json_result: @@ -847,18 +845,14 @@ async def aget_assets( asset["path"] = asset_path asset["job_id"] = job_id - asset["original_file_path"] = result.get("file_path", None) - asset["page_number"] = page["page"] + with open(asset_path, "wb") as f: asset_url = f"{self.base_url}/api/parsing/job/{job_id}/result/image/{asset_name}" - async with self.client_context() as client: - res = await client.get( - asset_url, headers=headers, timeout=self.max_timeout - ) - res.raise_for_status() - f.write(res.content) + resp = await client.get(asset_url) + resp.raise_for_status() + f.write(resp.content) assets.append(asset) return assets except Exception as e: @@ -918,11 +912,10 @@ async def aget_xlsx( self, json_result: List[dict], download_path: str ) -> List[dict]: """Download xlsx from the parsed result.""" - headers = {"Authorization": f"Bearer {self.api_key}"} - # make the download path if not os.path.exists(download_path): os.makedirs(download_path) + client = self.aclient try: xlsx_list = [] for result in json_result: @@ -942,12 +935,9 @@ async def aget_xlsx( xlsx_url = ( f"{self.base_url}/api/parsing/job/{job_id}/result/raw/xlsx" ) - async with self.client_context() as client: - res = await client.get( - xlsx_url, headers=headers, timeout=self.max_timeout - ) - res.raise_for_status() - f.write(res.content) + res = await client.get(xlsx_url) + res.raise_for_status() + f.write(res.content) xlsx_list.append(xlsx) return xlsx_list