Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable support for custom filesystem #117

Merged
merged 12 commits into from
Sep 10, 2024
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
__pycache__/
*.pyc
.DS_Store
.idea
72 changes: 48 additions & 24 deletions llama_parse/base.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
import os
import asyncio
from io import TextIOWrapper

import httpx
import mimetypes
import time
from pathlib import Path
from typing import List, Optional, Union
from pathlib import Path, PurePath
from typing import Any, Dict, List, Optional, Union

from fsspec import AbstractFileSystem
from fsspec.spec import AbstractBufferedFile
from llama_index.core.async_utils import run_jobs
from llama_index.core.bridge.pydantic import Field, validator
from llama_index.core.constants import DEFAULT_BASE_URL
from llama_index.core.readers.base import BasePydanticReader
from llama_index.core.readers.file.base import get_default_fs
from llama_index.core.schema import Document
from llama_parse.utils import (
nest_asyncio_err,
Expand Down Expand Up @@ -143,25 +148,31 @@ def validate_base_url(cls, v: str) -> str:

# upload a document and get back a job_id
async def _create_job(
self, file_path: str, extra_info: Optional[dict] = None
self,
file_path: Union[str, PurePath],
extra_info: Optional[Dict[str, Any]] = None,
fs: Optional[AbstractFileSystem] = None,
) -> str:
file_path = str(file_path)
file_ext = os.path.splitext(file_path)[1]
str_file_path = file_path
if isinstance(file_path, PurePath):
str_file_path = file_path.name
file_ext = os.path.splitext(str_file_path)[1]
if file_ext not in SUPPORTED_FILE_TYPES:
raise Exception(
f"Currently, only the following file types are supported: {SUPPORTED_FILE_TYPES}\n"
f"Current file type: {file_ext}"
)

extra_info = extra_info or {}
extra_info["file_path"] = file_path
extra_info["file_path"] = str_file_path

headers = {"Authorization": f"Bearer {self.api_key}"}

# load data, set the mime type
with open(file_path, "rb") as f:
mime_type = mimetypes.guess_type(file_path)[0]
files = {"file": (f.name, f, mime_type)}
fs = fs or get_default_fs()
with fs.open(file_path, "rb") as f:
mime_type = mimetypes.guess_type(str_file_path)[0]
files = {"file": (self.__get_filename(f), f, mime_type)}
Copy link
Contributor

@logan-markewich logan-markewich Apr 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this works, at least locally, it doesn't work for me

Since we already have the path, can't we use that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am using it in production right now and it is working properly. What is the error that you are getting locally?

The issue with the path is that I've seen situations in production where people upload a file with .txt but the file is actually a .csv and it doesn't work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:sigh: this still doesn't work guys lol

>>> from llama_parse import LlamaParse
>>> documents = LlamaParse(api_key="llx-...").load_data("2023.acl-srw.0.pdf")
Error while parsing the file '2023.acl-srw.0.pdf': '_io.BufferedReader' object has no attribute 'full_name'
>>> 


# send the request, start job
url = f"{self.base_url}/api/parsing/upload"
Expand Down Expand Up @@ -192,9 +203,15 @@ async def _create_job(
job_id = response.json()["id"]
return job_id

@staticmethod
def __get_filename(f: Union[TextIOWrapper, AbstractBufferedFile]) -> str:
if isinstance(f, TextIOWrapper):
return f.name
return f.full_name

async def _get_job_result(
self, job_id: str, result_type: str, verbose: bool = False
) -> dict:
) -> Dict[str, Any]:
result_url = f"{self.base_url}/api/parsing/job/{job_id}/result/{result_type}"
status_url = f"{self.base_url}/api/parsing/job/{job_id}"
headers = {"Authorization": f"Bearer {self.api_key}"}
Expand Down Expand Up @@ -233,18 +250,16 @@ async def _get_job_result(

await asyncio.sleep(self.check_interval)

continue
else:
raise Exception(
f"Failed to parse the file: {job_id}, status: {status}"
)

async def _aload_data(
self, file_path: str, extra_info: Optional[dict] = None, verbose: bool = False
self,
file_path: Union[str, PurePath],
extra_info: Optional[Dict[str, Any]] = None,
fs: Optional[AbstractFileSystem] = None,
verbose: bool = False,
) -> List[Document]:
"""Load data from the input path."""
try:
job_id = await self._create_job(file_path, extra_info=extra_info)
job_id = await self._create_job(file_path, extra_info=extra_info, fs=fs)
if verbose:
print("Started parsing the file under job_id %s" % job_id)

Expand All @@ -271,18 +286,22 @@ async def _aload_data(
raise e

async def aload_data(
self, file_path: Union[List[str], str], extra_info: Optional[dict] = None
self,
file_path: Union[List[str], str, PurePath, List[PurePath]],
extra_info: Optional[Dict[str, Any]] = None,
fs: Optional[AbstractFileSystem] = None,
) -> List[Document]:
"""Load data from the input path."""
if isinstance(file_path, (str, Path)):
if isinstance(file_path, (str, PurePath)):
return await self._aload_data(
file_path, extra_info=extra_info, verbose=self.verbose
file_path, extra_info=extra_info, fs=fs, verbose=self.verbose
)
elif isinstance(file_path, list):
jobs = [
self._aload_data(
f,
extra_info=extra_info,
fs=fs,
verbose=self.verbose and not self.show_progress,
)
for f in file_path
Expand All @@ -308,11 +327,14 @@ async def aload_data(
)

def load_data(
self, file_path: Union[List[str], str], extra_info: Optional[dict] = None
self,
file_path: Union[List[str], str, PurePath, List[PurePath]],
extra_info: Optional[Dict[str, Any]] = None,
fs: Optional[AbstractFileSystem] = None,
) -> List[Document]:
"""Load data from the input path."""
try:
return asyncio.run(self.aload_data(file_path, extra_info))
return asyncio.run(self.aload_data(file_path, extra_info, fs=fs))
except RuntimeError as e:
if nest_asyncio_err in str(e):
raise RuntimeError(nest_asyncio_msg)
Expand Down Expand Up @@ -380,7 +402,9 @@ def get_json_result(
else:
raise e

def get_images(self, json_result: List[dict], download_path: str) -> List[dict]:
def get_images(
self, json_result: List[Dict[str, Any]], download_path: str
) -> List[Dict[str, Any]]:
"""Download images from the parsed result."""
headers = {"Authorization": f"Bearer {self.api_key}"}

Expand Down
16 changes: 16 additions & 0 deletions tests/test_reader.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import os
import pytest
from fsspec.implementations.local import LocalFileSystem

from llama_parse import LlamaParse


Expand Down Expand Up @@ -33,6 +35,20 @@ def test_simple_page_markdown() -> None:
assert len(result[0].text) > 0


@pytest.mark.skipif(
os.environ.get("LLAMA_CLOUD_API_KEY", "") == "",
reason="LLAMA_CLOUD_API_KEY not set",
)
def test_simple_page_with_custom_fs() -> None:
parser = LlamaParse(result_type="markdown")
fs = LocalFileSystem()
filepath = os.path.join(
os.path.dirname(__file__), "test_files/attention_is_all_you_need.pdf"
)
result = parser.load_data(filepath, fs=fs)
assert len(result) == 1


@pytest.mark.skipif(
os.environ.get("LLAMA_CLOUD_API_KEY", "") == "",
reason="LLAMA_CLOUD_API_KEY not set",
Expand Down
Loading