Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re: Support for cloud storage connectors #172 Added Implementation for GCP connector #197

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,372 changes: 1,397 additions & 975 deletions channel/poetry.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions channel/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package-mode = false
python = "^3.11"
python-dotenv = "^1.0.0"
sqlalchemy = "^2.0.25"
google-cloud-storage = "^2.18.2"

[tool.poetry.group.test.dependencies]
pytest = "^8.2.2"
Expand Down
5,492 changes: 2,805 additions & 2,687 deletions indexer/poetry.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions indexer/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ psycopg = {extras = ["binary", "pool"], version = "^3.1.19"}
pymupdf = "^1.24.5"
langchain-community = "^0.2.7"
r2r = "^0.2.78"
google-cloud-storage = "^2.18.2"

[tool.poetry.group.dev.dependencies]
lib = {path = "../jb-lib", develop = true}
Expand Down
1 change: 1 addition & 0 deletions jb-lib/lib/file_storage/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .azure import AzureAsyncStorage, AzureSyncStorage
from .local import LocalAsyncStorage, LocalSyncStorage
from .gcp import GcpAsyncStorage, GcpSyncStorage
from .storage import Storage
from .handler import StorageHandler
2 changes: 2 additions & 0 deletions jb-lib/lib/file_storage/gcp/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .gcp_storage import GcpAsyncStorage
from .gcp_sync_storage import GcpSyncStorage
101 changes: 101 additions & 0 deletions jb-lib/lib/file_storage/gcp/gcp_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import os
from typing import Union, Optional
from datetime import datetime, timedelta, timezone
import logging
from google.cloud import storage
import aiofiles

logger = logging.getLogger("storage")

class GcpAsyncStorage:
__client__ = None
tmp_folder = "/tmp/jb_files"

def __init__(self):
logger.info("Initializing GCP Storage")

project_id = 'indian-legal-bert'
self.__bucket_name__ = 'jugalbandi'
os.environ['GOOGLE_APPLICATION_CREDENTIALS']='/Users/sunandhitab/Downloads/indian-legal-bert-72a5c6f931f1.json'
if not project_id or not self.__bucket_name__:
print(project_id, self.__bucket_name__)
raise ValueError(
"GCPAsyncStorage client not initialized. Missing project_id or bucket_name"
)

self.__client__ = storage.Client(project=project_id)
os.makedirs(self.tmp_folder, exist_ok=True)

async def write_file(
self,
file_path: str,
file_content: Union[str, bytes],
mime_type: Optional[str] = None,
):
if not self.__client__:
raise Exception("GCPAsyncStorage client not initialized")

blob_name = file_path
bucket = self.__client__.bucket(self.__bucket_name__)
blob = bucket.blob(blob_name)

# Determine MIME type if not provided
if mime_type is None:
mime_type = (
"audio/mpeg" if file_path.lower().endswith(".mp3") else "application/octet-stream"
)

# Upload the blob
await asyncio.to_thread(blob.upload_from_string, file_content, content_type=mime_type)

async def _download_file_to_temp_storage(
self, file_path: Union[str, os.PathLike]
) -> Union[str, os.PathLike]:
if not self.__client__:
raise Exception("GCPAsyncStorage client not initialized")

blob_name = file_path
bucket = self.__client__.bucket(self.__bucket_name__)
blob = bucket.blob(blob_name)

tmp_file_path = os.path.join(self.tmp_folder, file_path)

# Create directory if it doesn't exist
os.makedirs(os.path.dirname(tmp_file_path), exist_ok=True)

async with aiofiles.open(tmp_file_path, 'wb') as my_blob:
await asyncio.to_thread(blob.download_to_file, my_blob)

return tmp_file_path

def public_url(self, file_path: str) -> str:
if not self.__client__:
raise Exception("GCPAsyncStorage client not initialized")

blob_name = file_path
bucket = self.__client__.bucket(self.__bucket_name__)
blob = bucket.blob(blob_name)

# Generate a signed URL that expires in 1 day
url = blob.generate_signed_url(
version="v4",
expiration=timedelta(days=1),
method="GET"
)

return url

# Example usage
async def main():
storage = GcpAsyncStorage()
await storage.write_file('example.txt', 'Hello, World!')
tmp_path = await storage._download_file_to_temp_storage('example.txt')
print(f"File downloaded to: {tmp_path}")
url = storage.public_url('example.txt')
print(f"Public URL: {url}")

if __name__ == "__main__":
import asyncio
from dotenv import load_dotenv
load_dotenv()
asyncio.run(main())
92 changes: 92 additions & 0 deletions jb-lib/lib/file_storage/gcp/gcp_sync_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import os
from typing import Union, Optional
from datetime import timedelta
import logging
from google.cloud import storage

logger = logging.getLogger("storage")

class GcpSyncStorage:
__client__ = None
tmp_folder = "/tmp/jb_files"

def __init__(self):
logger.info("Initializing GCP Storage")

project_id = 'indian-legal-bert'
self.__bucket_name__ = 'jugalbandi'
os.environ['GOOGLE_APPLICATION_CREDENTIALS']='/Users/sunandhitab/Downloads/indian-legal-bert-72a5c6f931f1.json'

if not project_id or not self.__bucket_name__:
raise ValueError(
"GCPStorage client not initialized. Missing project_id or bucket_name"
)

self.__client__ = storage.Client(project=project_id)
os.makedirs(self.tmp_folder, exist_ok=True)

def write_file(
self,
file_path: str,
file_content: Union[str, bytes],
mime_type: Optional[str] = None,
):
if not self.__client__:
raise Exception("GCPStorage client not initialized")

blob_name = file_path
bucket = self.__client__.bucket(self.__bucket_name__)
blob = bucket.blob(blob_name)

if mime_type is None:
mime_type = (
"audio/mpeg" if file_path.lower().endswith(".mp3") else "application/octet-stream"
)

# Use synchronous method to upload
blob.upload_from_string(file_content, content_type=mime_type)

def download_file_to_temp_storage(
self, file_path: Union[str, os.PathLike]
) -> Union[str, os.PathLike]:
if not self.__client__:
raise Exception("GCPStorage client not initialized")

blob_name = file_path
bucket = self.__client__.bucket(self.__bucket_name__)
blob = bucket.blob(blob_name)

tmp_file_path = os.path.join(self.tmp_folder, file_path)
os.makedirs(os.path.dirname(tmp_file_path), exist_ok=True)

# Download the file to the temporary location
with open(tmp_file_path, 'wb') as my_blob:
blob.download_to_file(my_blob)

return tmp_file_path

def public_url(self, file_path: str) -> str:
if not self.__client__:
raise Exception("GCPStorage client not initialized")

blob_name = file_path
bucket = self.__client__.bucket(self.__bucket_name__)
blob = bucket.blob(blob_name)

return blob.generate_signed_url(
version="v4",
expiration=timedelta(days=1),
method="GET"
)

# Example usage
def main():
storage = GcpSyncStorage()
storage.write_file('example.txt', 'Hello, World!')
tmp_path = storage.download_file_to_temp_storage('example.txt')
print(f"File downloaded to: {tmp_path}")
url = storage.public_url('example.txt')
print(f"Public URL: {url}")

if __name__ == "__main__":
main()
3 changes: 3 additions & 0 deletions jb-lib/lib/file_storage/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@
from .storage import SyncStorage, AsyncStorage
from .local import LocalAsyncStorage, LocalSyncStorage
from .azure import AzureAsyncStorage, AzureSyncStorage
from .gcp import GcpAsyncStorage,GcpSyncStorage

STORAGE_REGISTRY: Dict[str, Type[AsyncStorage]] = {
"local": LocalAsyncStorage,
"azure": AzureAsyncStorage,
"gcp": GcpAsyncStorage,
}

SYNC_STORAGE_REGISTRY: Dict[str, Type[SyncStorage]] = {
"local": LocalSyncStorage,
"azure": AzureSyncStorage,
"gcp": GcpSyncStorage,
}
Loading
Loading