Skip to content

Commit

Permalink
Updated docker images version
Browse files Browse the repository at this point in the history
Removed version
Added pandasai integration
Added support for multiple csv/excel files in the src
Added plotting code
Added image rendering
Updated .gitignore
Added structured file upload in FE
Fixed structured data loading via data-dir in FE
Fixed datasrc deletion for structured in BE
Added FE QnA UI for structured query
Added structured constant
Added where clause
Fixed window height for answers
  • Loading branch information
S1LV3RJ1NX committed Nov 29, 2024
1 parent d9b4269 commit 6ad8197
Show file tree
Hide file tree
Showing 44 changed files with 2,339 additions and 117 deletions.
5 changes: 5 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,8 @@ qdrant_db/
qdrant_storage/
sample-data/
user_data/
*.log
cache/
*.db
*.parquet
exports/
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,8 @@ user_data/
pgdata/
*.bak
models_config.yaml
*.log
cache/
*.db
*.parquet
exports/
5 changes: 5 additions & 0 deletions .tfyignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,8 @@ node_modules/
sample-data/
.github/
docs/
*.log
cache/
*.parquet
*.db
exports/
11 changes: 11 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
build:
docker compose --env-file compose.env.bak up --build -d

down:
docker compose --env-file compose.env.bak down

up:
docker compose --env-file compose.env.bak up -d

deploy:
python -m deployment.deploy --workspace_fqn ${ws_fqn} --application_set_name ${app_set_name} --ml_repo ${ml_repo} --base_domain_url ${domain} --dockerhub-images-registry ${registry} --secrets-base ${secrets_base}
1 change: 1 addition & 0 deletions backend/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ class DataSourceType(StrEnum):
TRUEFOUNDRY = "truefoundry"
LOCAL = "localdir"
WEB = "web"
STRUCTURED = "structured"
3 changes: 3 additions & 0 deletions backend/modules/dataloaders/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from backend.constants import DataSourceType
from backend.modules.dataloaders.loader import register_dataloader
from backend.modules.dataloaders.local_dir_loader import LocalDirLoader
from backend.modules.dataloaders.structured_loader import StructuredLoader
from backend.modules.dataloaders.web_loader import WebLoader
from backend.settings import settings

register_dataloader(DataSourceType.LOCAL, LocalDirLoader)
register_dataloader(DataSourceType.WEB, WebLoader)
register_dataloader(DataSourceType.STRUCTURED, StructuredLoader)

if settings.TFY_API_KEY:
from backend.modules.dataloaders.truefoundry_loader import TrueFoundryLoader

Expand Down
239 changes: 239 additions & 0 deletions backend/modules/dataloaders/structured_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
import os
import time
from typing import AsyncGenerator, Dict, List, Tuple

import pandas as pd
from pandasai import Agent
from truefoundry.ml import get_client as get_tfy_client

from backend.logger import logger
from backend.modules.dataloaders.loader import BaseDataLoader
from backend.types import DataIngestionMode, DataSource, LoadedDataPoint
from backend.utils import unzip_file


class CacheItem:
"""Class to hold cached item with timestamp"""

def __init__(self, items: List[any]):
self.items = items
self.timestamp = time.time()


class StructuredLoader(BaseDataLoader):
"""
Load structured data from various sources (CSV, Excel, and Databases)
"""

_instance = None
CACHE_TTL = 300 # 5 minutes in seconds

def __new__(cls):
if cls._instance is None:
cls._instance = super(StructuredLoader, cls).__new__(cls)
cls._instance.dataframes = {} # Cache for lists of dataframes
cls._instance.agents = {} # Cache for PandasAI agents
cls._instance._last_cleanup = time.time()
return cls._instance

def _load_file(self, filepath: str) -> pd.DataFrame:
"""Load data from CSV or Excel file"""
file_extension = os.path.splitext(filepath)[1].lower()

if file_extension == ".csv":
return pd.read_csv(filepath)
elif file_extension in [".xlsx", ".xls"]:
return pd.read_excel(filepath)
else:
raise ValueError(f"Unsupported file type: {file_extension}")

def _load_files_from_directory(
self, directory: str
) -> List[Tuple[str, pd.DataFrame]]:
"""Load all structured data files from a directory"""
dataframes = []
files = [
f for f in os.listdir(directory) if f.endswith((".csv", ".xlsx", ".xls"))
]

for file in files:
file_path = os.path.join(directory, file)
try:
df = self._load_file(file_path)
dataframes.append((file, df))
except Exception as e:
logger.warning(f"Failed to load file {file}: {e}")
continue

return dataframes

def _cleanup_cache(self):
"""Remove expired items from cache"""
current_time = time.time()

# Only run cleanup every minute to avoid too frequent checks
if current_time - self._last_cleanup < 60:
return

expired_dfs = [
fqn
for fqn, item in self.dataframes.items()
if current_time - item.timestamp > self.CACHE_TTL
]
expired_agents = [
fqn
for fqn, item in self.agents.items()
if current_time - item.timestamp > self.CACHE_TTL
]

# Remove expired items
for fqn in expired_dfs:
logger.debug(f"Removing expired dataframe from cache: {fqn}")
del self.dataframes[fqn]

for fqn in expired_agents:
logger.debug(f"Removing expired agent from cache: {fqn}")
del self.agents[fqn]

self._last_cleanup = current_time

def _cache_items(self, data_source_fqn: str, df: pd.DataFrame, agent: Agent):
"""Cache dataframe and agent with timestamps"""
self.dataframes[data_source_fqn] = CacheItem(df)
self.agents[data_source_fqn] = CacheItem(agent)

async def load_filtered_data(
self,
data_source: DataSource,
dest_dir: str,
previous_snapshot: Dict[str, str],
batch_size: int,
data_ingestion_mode: DataIngestionMode,
) -> AsyncGenerator[List[LoadedDataPoint], None]:
"""Load structured data from source"""
self._cleanup_cache()
source_type = self._detect_source_type(data_source.uri)

try:
if source_type in ["csv", "excel"]:
loaded_files = [] # List to store (filename, dataframe) tuples
working_dir = None
if data_source.uri.startswith("data-dir:"):
# Handle remote (TrueFoundry) files
tfy_files_dir = None
try:
tfy_client = get_tfy_client()
dataset = tfy_client.get_data_directory_by_fqn(data_source.uri)
working_dir = dataset.download(path=dest_dir)
logger.debug(f"Data directory download info: {working_dir}")

if os.path.exists(os.path.join(working_dir, "files")):
working_dir = os.path.join(working_dir, "files")

# Handle zip files
for file_name in os.listdir(working_dir):
if file_name.endswith(".zip"):
unzip_file(
file_path=os.path.join(working_dir, file_name),
dest_dir=working_dir,
)

loaded_files = self._load_files_from_directory(working_dir)

except Exception as e:
logger.exception(f"Error downloading data directory: {str(e)}")
raise ValueError(f"Failed to download data directory: {str(e)}")

else:
# Handle local files
working_dir = (
data_source.uri
if os.path.isdir(data_source.uri)
else os.path.dirname(data_source.uri)
)
loaded_files = self._load_files_from_directory(working_dir)

if not loaded_files:
raise Exception(f"No valid structured data files found")

# Cache the dataframes
self.dataframes[data_source.fqn] = CacheItem(
[df for _, df in loaded_files]
)

# Create LoadedDataPoints for each file
data_points = []
for filename, _ in loaded_files:
file_path = os.path.join(working_dir, filename)
data_point_hash = (
f"{os.path.getsize(file_path)}:{dataset.updated_at}"
if data_source.uri.startswith("data-dir:")
else str(os.lstat(file_path))
)

data_points.append(
LoadedDataPoint(
data_point_hash=data_point_hash,
data_point_uri=filename,
data_source_fqn=data_source.fqn,
local_filepath=file_path,
file_extension=os.path.splitext(filename)[1],
metadata={"structured_type": source_type},
)
)

yield data_points

elif source_type in ["sql", "gsheet"]:
# Handle SQL and Google Sheets as before
data_point = LoadedDataPoint(
data_point_hash=str(hash(data_source.uri)),
data_point_uri=data_source.uri,
data_source_fqn=data_source.fqn,
local_filepath=data_source.uri,
metadata={"structured_type": source_type},
)
yield [data_point]

except Exception as e:
logger.exception(f"Error loading structured data: {e}")
raise

def _detect_source_type(self, uri: str) -> str:
"""Detect the type of structured data source"""
# For TrueFoundry data directories
if uri.startswith("data-dir:"):
return "csv" # Default to CSV for data-dir

# For local directories
if os.path.isdir(uri):
files = [
f for f in os.listdir(uri) if f.endswith((".csv", ".xlsx", ".xls"))
]
if not files:
raise ValueError(f"No structured data files found in directory: {uri}")
return "csv" if files[0].endswith(".csv") else "excel"

# For direct file or connection paths
if uri.endswith(".csv"):
return "csv"
elif uri.endswith((".xlsx", ".xls")):
return "excel"
elif uri.startswith(("postgresql://", "mysql://", "sqlite://")):
return "sql"
elif "docs.google.com/spreadsheets" in uri:
return "gsheet"
else:
raise ValueError(f"Unsupported structured data source: {uri}")

def get_dataframes(self, data_source_fqn: str) -> List[pd.DataFrame]:
"""Get list of cached dataframes for a data source"""
self._cleanup_cache()
cached_item = self.dataframes.get(data_source_fqn)
return cached_item.items if cached_item else None

def get_agent(self, data_source_fqn: str) -> Agent:
"""Get cached agent for a data source"""
self._cleanup_cache()
cached_item = self.agents.get(data_source_fqn)
return cached_item.items[0] if cached_item else None
23 changes: 23 additions & 0 deletions backend/modules/model_gateway/model_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from langchain_core.language_models.chat_models import BaseChatModel
from langchain_openai import OpenAIEmbeddings
from langchain_openai.chat_models import ChatOpenAI
from pandasai.llm.local_llm import LocalLLM

from backend.logger import logger
from backend.modules.model_gateway.audio_processing_svc import AudioProcessingSvc
Expand Down Expand Up @@ -41,6 +42,7 @@ def __init__(self):
"""
self._embedder_cache = create_cache()
self._llm_cache = create_cache()
self._pandas_ai_cache = create_cache()
self._reranker_cache = create_cache()
self._audio_cache = create_cache()

Expand Down Expand Up @@ -270,6 +272,27 @@ def get_llm_from_model_config(

return self._llm_cache[cache_key]

def get_pandas_ai_model_from_model_config(self, model_config: ModelConfig):
"""
Get a PandasAI model instance for the specified model configuration.
"""
cache_key = model_config.name
if cache_key not in self._pandas_ai_cache:
if model_config.name not in self.model_name_to_provider_config:
raise ValueError(
f"Model {model_config.name} not registered in the model gateway."
)

provider_config = self.model_name_to_provider_config[model_config.name]
api_key = self._get_api_key(provider_config)
model_id = "/".join(model_config.name.split("/")[1:])

self._pandas_ai_cache[cache_key] = LocalLLM(
api_base=provider_config.base_url, model=model_id, api_key=api_key
)

return self._pandas_ai_cache[cache_key]

def get_reranker_from_model_config(self, model_name: str, top_k: int = 3):
"""
Get a reranker model instance for the specified model configuration. Uses caching to avoid
Expand Down
4 changes: 4 additions & 0 deletions backend/modules/query_controllers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
MultiModalRAGQueryController,
)
from backend.modules.query_controllers.query_controller import register_query_controller
from backend.modules.query_controllers.structured.controller import (
StructuredQueryController,
)

register_query_controller("basic-rag", BasicRAGQueryController)
register_query_controller("multimodal", MultiModalRAGQueryController)
register_query_controller("structured", StructuredQueryController)
Empty file.
Loading

0 comments on commit 6ad8197

Please sign in to comment.