From aa667f0a48555333dc1ff5168c94c8f8f7950bef Mon Sep 17 00:00:00 2001 From: gayathrivijayakumar Date: Tue, 21 Jan 2025 12:06:37 +0530 Subject: [PATCH 1/2] FIx auth middleware and pdm.lock issue for filesystem --- unstract/filesystem/pyproject.toml | 5 +++++ x2text-service/app/authentication_middleware.py | 4 ++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/unstract/filesystem/pyproject.toml b/unstract/filesystem/pyproject.toml index bc90725da..6414b44bb 100644 --- a/unstract/filesystem/pyproject.toml +++ b/unstract/filesystem/pyproject.toml @@ -18,3 +18,8 @@ readme = "README.md" [tool.pdm.build] includes = ["src"] package-dir = "src" + +[tool.pdm.resolution.overrides] +grpcio = "1.62.3" +grpcio-tools = "1.62.3" +grpcio-health-checking = "1.62.3" diff --git a/x2text-service/app/authentication_middleware.py b/x2text-service/app/authentication_middleware.py index 241b766e4..743c9b76c 100644 --- a/x2text-service/app/authentication_middleware.py +++ b/x2text-service/app/authentication_middleware.py @@ -99,8 +99,8 @@ def get_organization_from_bearer_token( return organization_uid, organization_identifier @classmethod - def execute_query(cls, query: str) -> Any: - cursor = be_db.execute_sql(query) + def execute_query(cls, query: str, params: tuple = ()) -> Any: + cursor = be_db.execute_sql(query, params) result_row = cursor.fetchone() cursor.close() if not result_row or len(result_row) == 0: From 30ada6379b95e33ce481488c8991dcbb092cabef Mon Sep 17 00:00:00 2001 From: gayathrivijayakumar Date: Thu, 23 Jan 2025 21:57:10 +0530 Subject: [PATCH 2/2] Fixes for writing to destination connector --- .../endpoint_v2/destination.py | 15 +++++++++++---- tools/structure/src/main.py | 2 +- .../azure_cloud_storage.py | 17 ++++++++++++++--- .../filesystems/unstract_file_system.py | 19 ++++++++++++++++--- 4 files changed, 42 insertions(+), 11 deletions(-) diff --git a/backend/workflow_manager/endpoint_v2/destination.py b/backend/workflow_manager/endpoint_v2/destination.py index 45047419f..fe8e1dce6 100644 --- a/backend/workflow_manager/endpoint_v2/destination.py +++ b/backend/workflow_manager/endpoint_v2/destination.py @@ -237,10 +237,15 @@ def copy_output_to_output_directory(self) -> None: try: destination_fs.create_dir_if_not_exists(input_dir=output_directory) - # Traverse local directory and create the same structure in the # output_directory - for root, dirs, files in os.walk(destination_volume_path): + if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): + file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION) + fs = file_system.get_file_storage() + dir_path = fs.walk(str(destination_volume_path)) + else: + dir_path = os.walk(destination_volume_path) + for root, dirs, files in dir_path: for dir_name in dirs: current_dir = os.path.join( output_directory, @@ -504,7 +509,8 @@ def delete_execution_directory(self) -> None: if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION) file_storage = file_system.get_file_storage() - file_storage.rm(self.execution_dir, recursive=True) + if file_storage.exists(self.execution_dir): + file_storage.rm(self.execution_dir, recursive=True) else: fs: LocalFileSystem = fsspec.filesystem("file") fs.rm(self.execution_dir, recursive=True) @@ -523,7 +529,8 @@ def delete_api_storage_dir(cls, workflow_id: str, execution_id: str) -> None: if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): file_system = FileSystem(FileStorageType.API_EXECUTION) file_storage = file_system.get_file_storage() - file_storage.rm(api_storage_dir, recursive=True) + if file_storage.exists(api_storage_dir): + file_storage.rm(api_storage_dir, recursive=True) else: fs: LocalFileSystem = fsspec.filesystem("file") fs.rm(api_storage_dir, recursive=True) diff --git a/tools/structure/src/main.py b/tools/structure/src/main.py index 8918365f4..33d68e51f 100644 --- a/tools/structure/src/main.py +++ b/tools/structure/src/main.py @@ -386,7 +386,7 @@ def _summarize_and_index( self.stream_log("Indexing summarized context") if self.workflow_filestorage: summarize_file_hash: str = self.workflow_filestorage.get_hash_from_file( - file_path=summarize_file_path + path=summarize_file_path ) else: summarize_file_hash: str = ToolUtils.get_hash_from_file( diff --git a/unstract/connectors/src/unstract/connectors/filesystems/azure_cloud_storage/azure_cloud_storage.py b/unstract/connectors/src/unstract/connectors/filesystems/azure_cloud_storage/azure_cloud_storage.py index caa5ca179..c9898343e 100644 --- a/unstract/connectors/src/unstract/connectors/filesystems/azure_cloud_storage/azure_cloud_storage.py +++ b/unstract/connectors/src/unstract/connectors/filesystems/azure_cloud_storage/azure_cloud_storage.py @@ -5,8 +5,13 @@ import azure.core.exceptions as AzureException from adlfs import AzureBlobFileSystem +from backend.constants import FeatureFlag from unstract.connectors.exceptions import AzureHttpError, ConnectorError from unstract.connectors.filesystems.unstract_file_system import UnstractFileSystem +from unstract.flags.feature_flag import check_feature_flag_status + +if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): + from unstract.filesystem import FileStorageType, FileSystem logging.getLogger("azurefs").setLevel(logging.ERROR) logger = logging.getLogger(__name__) @@ -90,10 +95,16 @@ def upload_file_to_storage(self, source_path: str, destination_path: str) -> Non AzureHttpError: returns error for invalid directory """ normalized_path = os.path.normpath(destination_path) - fs = self.get_fsspec_fs() + destination_connector_fs = self.get_fsspec_fs() try: - with open(source_path, "rb") as source_file: - fs.write_bytes(normalized_path, source_file.read()) + if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): + file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION) + workflow_fs = file_system.get_file_storage() + data = workflow_fs.read(path=source_path, mode="rb") + else: + with open(source_path, "rb") as source_file: + data = source_file.read() + destination_connector_fs.write_bytes(normalized_path, data) except AzureException.HttpResponseError as e: self.raise_http_exception(e=e, path=normalized_path) diff --git a/unstract/connectors/src/unstract/connectors/filesystems/unstract_file_system.py b/unstract/connectors/src/unstract/connectors/filesystems/unstract_file_system.py index 75389c772..51542357d 100644 --- a/unstract/connectors/src/unstract/connectors/filesystems/unstract_file_system.py +++ b/unstract/connectors/src/unstract/connectors/filesystems/unstract_file_system.py @@ -5,8 +5,15 @@ from fsspec import AbstractFileSystem +from backend.constants import FeatureFlag from unstract.connectors.base import UnstractConnector from unstract.connectors.enums import ConnectorMode +from unstract.flags.feature_flag import check_feature_flag_status + +if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): + from unstract.filesystem import FileStorageType, FileSystem + +logger = logging.getLogger(__name__) class UnstractFileSystem(UnstractConnector, ABC): @@ -98,6 +105,12 @@ def upload_file_to_storage(self, source_path: str, destination_path: str) -> Non uploaded """ normalized_path = os.path.normpath(destination_path) - fs = self.get_fsspec_fs() - with open(source_path, "rb") as source_file: - fs.write_bytes(normalized_path, source_file.read()) + destination_connector_fs = self.get_fsspec_fs() + if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE): + file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION) + workflow_fs = file_system.get_file_storage() + data = workflow_fs.read(path=source_path, mode="rb") + else: + with open(source_path, "rb") as source_file: + data = source_file.read() + destination_connector_fs.write_bytes(normalized_path, data)