Skip to content

Commit

Permalink
feat(schematic): added synapse cache purging fds-1445 (#2660)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewelamb authored May 9, 2024
1 parent 2990dba commit 0daed1b
Show file tree
Hide file tree
Showing 7 changed files with 265 additions and 30 deletions.
5 changes: 4 additions & 1 deletion apps/schematic/api/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,12 @@ target/

#secrets
*secrets*
synapse_config.yaml
schematic_service_account_creds.json

#config files
config.yaml
schematic_api/test/data/synapse_config.yaml

#schematic downloaded files
manifests
great_expectations
Expand Down
4 changes: 2 additions & 2 deletions apps/schematic/api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ poetry install
And run schematic APIs:

```
python3 -m schematic-api
python3 -m schematic_api
```
and open your browser to here:

```
http://127.0.0.1:7080/api/v1/ui/
http://localhost:7443/api/v1/ui/
```

## Running with Docker
Expand Down
4 changes: 4 additions & 0 deletions apps/schematic/api/default_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# If true, the synapse cache is purged before running a synapse storage related endpoint
purge_synapse_cache: true
# This can be set to a specific path, or to null to let the python client deicde
synapse_cache_path: '/var/tmp/synapse'
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from typing import Callable
import tempfile


import pandas as pd
import synapseclient # type: ignore
from schematic.store.synapse import SynapseStorage, ManifestDownload, load_df # type: ignore
Expand All @@ -22,7 +21,13 @@
from schematic_api.models.file_metadata import FileMetadata
from schematic_api.models.file_metadata_array import FileMetadataArray
from schematic_api.models.file_metadata_page import FileMetadataPage
from schematic_api.controllers.utils import handle_exceptions, get_access_token
from schematic_api.controllers.utils import (
SYNAPSE_CACHE_PATH,
PURGE_SYNAPSE_CACHE,
handle_exceptions,
get_access_token,
purge_synapse_cache,
)
from schematic_api.controllers.paging import Page


Expand All @@ -49,9 +54,27 @@ def get_asset_storage_class(asset_type: str) -> Callable:
return asset_type_object


def get_asset_view_from_schematic(
def get_store(
asset_type: str, # pylint: disable=unused-argument
) -> pd.DataFrame:
) -> SynapseStorage:
"""Creates a SynapseStorage class and purges its synapse cache
Args:
asset_type (str): The type of storage class (will be used in the future)
Returns:
SynapseStorage: A synapse storage class
"""
access_token = get_access_token()
store = SynapseStorage(
access_token=access_token, synapse_cache_path=SYNAPSE_CACHE_PATH
)
if PURGE_SYNAPSE_CACHE:
purge_synapse_cache(store)
return store


def get_asset_view_from_schematic(asset_type: str) -> pd.DataFrame:
"""Gets the asset view in pandas.Dataframe form
Args:
Expand All @@ -61,8 +84,7 @@ def get_asset_view_from_schematic(
Returns:
pandas.DataFrame: The asset view
"""
access_token = get_access_token()
store = SynapseStorage(access_token=access_token)
store = get_store(asset_type)
return store.getStorageFileviewTable()


Expand Down Expand Up @@ -115,7 +137,7 @@ def get_asset_view_json(

def get_dataset_file_metadata_from_schematic(
dataset_id: str,
asset_type: str, # pylint: disable=unused-argument
asset_type: str,
file_names: list[str] | None,
use_full_file_path: bool,
) -> list[FileMetadata]:
Expand All @@ -130,8 +152,7 @@ def get_dataset_file_metadata_from_schematic(
Returns:
list[FileMetadata]: A list of file metadata
"""
access_token = get_access_token()
store = SynapseStorage(access_token=access_token)
store = get_store(asset_type)
file_tuple_list = store.getFilesInStorageDataset(
datasetId=dataset_id,
fileNames=file_names, # type: ignore
Expand Down Expand Up @@ -242,7 +263,7 @@ def load_manifest_from_synapse_metadata(


def get_dataset_manifest_from_schematic(
asset_type: str, dataset_id: str # pylint: disable=unused-argument
asset_type: str, dataset_id: str
) -> pd.DataFrame:
"""Gets a manifest in pandas.Dataframe format
Expand All @@ -254,8 +275,7 @@ def get_dataset_manifest_from_schematic(
Returns:
pandas.DataFrame: The manifest
"""
access_token = get_access_token()
store = SynapseStorage(access_token=access_token)
store = get_store(asset_type)
manifest_data = store.getDatasetManifest(
datasetId=dataset_id, downloadFile=True, newManifestName="manifest.csv"
)
Expand Down Expand Up @@ -315,9 +335,7 @@ def get_dataset_manifest_json(
return result, status


def get_manifest_from_schematic(
asset_type: str, manifest_id: str # pylint: disable=unused-argument
) -> pd.DataFrame:
def get_manifest_from_schematic(asset_type: str, manifest_id: str) -> pd.DataFrame:
"""Gets a manifest in pandas.Dataframe format
Args:
Expand All @@ -327,9 +345,11 @@ def get_manifest_from_schematic(
Returns:
pandas.DataFrame: The manifest
"""
# The storage object isn't needed but this purges the synapse cache
get_store(asset_type)
access_token = get_access_token()
store = SynapseStorage.login(access_token=access_token)
manifest_download = ManifestDownload(store, manifest_id)
synapse = SynapseStorage.login(access_token=access_token)
manifest_download = ManifestDownload(synapse, manifest_id)
manifest_data = ManifestDownload.download_manifest(
manifest_download, "manifest.csv"
)
Expand Down Expand Up @@ -380,7 +400,7 @@ def get_manifest_json(


def get_project_dataset_metadata_from_schematic(
project_id: str, asset_type: str # pylint: disable=unused-argument
project_id: str, asset_type: str
) -> list[DatasetMetadata]:
"""Gets a list of dataset metadata from the project
Expand All @@ -391,8 +411,7 @@ def get_project_dataset_metadata_from_schematic(
Returns:
list[DatasetMetadata]: A list of dataset metadata
"""
access_token = get_access_token()
store = SynapseStorage(access_token=access_token)
store = get_store(asset_type)
tuples = store.getStorageDatasetsInProject(projectId=project_id)
return [DatasetMetadata(id=item[0], name=item[1]) for item in tuples]

Expand Down Expand Up @@ -473,7 +492,7 @@ def get_project_dataset_metadata_page(

def get_project_manifest_metadata_from_schematic(
project_id: str,
asset_type: str, # pylint: disable=unused-argument
asset_type: str,
) -> list[ManifestMetadata]:
"""Gets manifest metadata from the project
Expand All @@ -484,8 +503,7 @@ def get_project_manifest_metadata_from_schematic(
Returns:
list[ManifestMetadata]: A list of manifest metadata
"""
access_token = get_access_token()
store = SynapseStorage(access_token=access_token)
store = get_store(asset_type)
manifest_tuple_list = store.getProjectManifests(projectId=project_id)
return [
ManifestMetadata(
Expand Down Expand Up @@ -575,7 +593,7 @@ def get_project_manifest_metadata_page(


def get_project_metadata_from_schematic(
asset_type: str, # pylint: disable=unused-argument
asset_type: str,
) -> list[ProjectMetadata]:
"""Gets a list of projects
Expand All @@ -585,8 +603,7 @@ def get_project_metadata_from_schematic(
Returns:
list[ProjectMetadata]: A list of project metadata
"""
access_token = get_access_token()
store = SynapseStorage(access_token=access_token)
store = get_store(asset_type)
metadata_tuple_list = store.getStorageProjects()
return [ProjectMetadata(id=item[0], name=item[1]) for item in metadata_tuple_list]

Expand Down
134 changes: 133 additions & 1 deletion apps/schematic/api/schematic_api/controllers/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""utils for multiple controllers"""
from typing import Callable, Any
from typing import Callable, Any, Optional
import urllib.request
import shutil
import tempfile
Expand All @@ -8,17 +8,40 @@
import os
import io
import json
import logging
import subprocess
from datetime import datetime, timedelta
import re
from math import ceil
import yaml

import pandas as pd
from flask import request # type: ignore
from synapseclient.core.exceptions import ( # type: ignore
SynapseNoCredentialsError,
SynapseAuthenticationError,
)
from schematic.store import SynapseStorage
from schematic.exceptions import AccessCredentialsError # type: ignore

from schematic_api.models.basic_error import BasicError

# Config for various settable global values
# Will use config.yaml if it exists, otherwise uses the example file
# config.yaml is ignored by git so can be changed locally without accidentaly commiting it
# To do so copy default_cofnig.yaml to config.yaml and make changes there
if os.path.exists("config.yaml"):
with open("config.yaml", "r", encoding="utf-8") as file:
API_CONFIG = yaml.safe_load(file)
else:
with open("default_config.yaml", "r", encoding="utf-8") as file:
API_CONFIG = yaml.safe_load(file)

PURGE_SYNAPSE_CACHE = API_CONFIG["purge_synapse_cache"]
SYNAPSE_CACHE_PATH = API_CONFIG["synapse_cache_path"]

LOGGER = logging.getLogger("Synapse cache")


def save_manifest_json_string_as_csv(manifest_json_string: str) -> str:
"""Takes a manifest json string and converts it to a csv file
Expand Down Expand Up @@ -184,3 +207,112 @@ def download_schema_file_as_jsonld(schema_url: str) -> str:
raise InvalidSchemaURL(
"The provided URL could not be found", schema_url
) from error


def purge_synapse_cache(
store: SynapseStorage,
maximum_storage_allowed_cache_gb: float = 1,
minute_buffer: int = 15,
) -> None:
"""
Purge synapse cache if it exceeds a certain size. Default to 1GB.
Args:
maximum_storage_allowed_cache_gb (float): the maximum storage allowed
before purging cache. Default is 1 GB.
minute_buffer (int): All files created this amount of time or older will be deleted
"""
# try clearing the cache
# scan a directory and check size of files
if os.path.exists(store.root_synapse_cache):
maximum_storage_allowed_cache_bytes = maximum_storage_allowed_cache_gb * (
1024**3
)
dir_size_bytes = check_synapse_cache_size(directory=store.root_synapse_cache)
# Check if cache is bigger than the allowed size and if so delete all files in cache
# older than the buffer time
if dir_size_bytes >= maximum_storage_allowed_cache_bytes:
minutes_earlier = calculate_datetime(minute_buffer)
num_of_deleted_files = store.syn.cache.purge(before_date=minutes_earlier)
LOGGER.info(
f"{num_of_deleted_files} files have been deleted from {store.root_synapse_cache}"
)
else:
# on AWS, OS takes around 14-17% of our ephemeral storage (20GiB)
# instead of guessing how much space that we left, print out .synapseCache here
LOGGER.info(f"the total size of .synapseCache is: {dir_size_bytes} bytes")


def check_synapse_cache_size(directory: str) -> float:
"""use du --sh command to calculate size of the Synapse cache
Args:
directory (str, optional): The Synapse cache directory
Returns:
float: returns size of the Synapse directory in bytes
"""
# Note: this command might fail on windows user.
# But since this command is primarily for running on AWS, it is fine.
command = ["du", "-sh", directory]
output = subprocess.run(command, capture_output=True, check=False).stdout.decode(
"utf-8"
)

# Parsing the output to extract the directory size
size = output.split("\t")[0]
return calculate_byte_size(size)


def calculate_byte_size(size_string: str) -> int:
"""
Calculates the size in bytes of a size returned from the "du" command
Args:
size_string (str):
The input must be a string such as 4B, or 1.2K.
Sizes up to GB allowed.
Raises:
ValueError: When the input doesn't match the allowed paterns
Returns:
int: The size in bytes
"""
if size_string.isnumeric() and int(size_string) == 0:
return 0

size_dict: dict[str, int] = {"B": 0, "K": 1, "M": 2, "G": 3}

size_letter_string = "".join(size_dict.keys())
int_size_match = re.match(f"^[0-9]+[{size_letter_string}]$", size_string)
float_size_match = re.match(f"^[0-9]+\.[0-9]+[{size_letter_string}]$", size_string)
if not (int_size_match or float_size_match):
LOGGER.error("Cannot recognize the file size unit")
raise ValueError("The size string doesn't match the allowed type:", size_string)

size_letter = size_string[-1]
size = float(size_string[:-1])
multiple = 1024 ** size_dict[size_letter]
byte_size: int = ceil(size * multiple)
return byte_size


def calculate_datetime(
minutes: int, input_date_time: Optional[datetime] = None
) -> datetime:
"""
Calculates the datetime x minutes before the input date time
If no datetime is given, the current datetime is used.
Args:
minutes (int): How much time to subtract from the input date time.
input_date_time (Optional[datetime], optional): The datetime to start with. Defaults to None.
Returns:
datetime: The new datetime
"""
if input_date_time is None:
date_time = datetime.now()
else:
date_time = input_date_time
return date_time - timedelta(minutes=minutes)
Loading

0 comments on commit 0daed1b

Please sign in to comment.