-
Notifications
You must be signed in to change notification settings - Fork 550
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: dmatch01 <[email protected]>
- Loading branch information
Showing
10 changed files
with
459 additions
and
12 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
"""Configuration for carbon catalog.""" | ||
import filelock | ||
import hashlib | ||
import os | ||
import pandas as pd | ||
import requests | ||
import time | ||
|
||
from typing import Dict, List, NamedTuple, Optional, Tuple | ||
|
||
from sky.clouds import cloud_registry | ||
from sky.utils import rich_utils | ||
from sky.utils import ux_utils | ||
from sky import sky_logging | ||
|
||
logger = sky_logging.init_logger(__name__) | ||
|
||
CARBON_HOSTED_CATALOG_DIR_URL = 'https://raw.githubusercontent.com/GreenAlgorithms/green-algorithms-tool/master/data' # pylint: disable=line-too-long | ||
CARBON_CATALOG_SCHEMA_VERSION = 'latest' | ||
CARBON_LOCAL_CATALOG_DIR = os.path.expanduser('~/.sky/catalogs/carbon') | ||
|
||
_CARBON_CATALOG_DIR = os.path.join(CARBON_LOCAL_CATALOG_DIR, | ||
CARBON_CATALOG_SCHEMA_VERSION) | ||
|
||
_CARBON_PULL_FREQUENCY_HOURS = 7 | ||
|
||
def get_carbon_catalog_path(filename: str) -> str: | ||
return os.path.join(_CARBON_CATALOG_DIR, filename) | ||
|
||
|
||
def read_carbon_file(filename: str, filter_col: str, filter_val: str, | ||
pull_frequency_hours: Optional[int] = None) -> pd.DataFrame: | ||
"""Reads the catalog from a local CSV file. | ||
If the file does not exist, download the up-to-date catalog that matches | ||
the schema version. | ||
If `pull_frequency_hours` is not None: pull the latest catalog with | ||
possibly updated prices, if the local catalog file is older than | ||
`pull_frequency_hours` and no changes to the local catalog file are | ||
made after the last pull. | ||
""" | ||
assert filename.endswith('.csv'), 'The catalog file must be a CSV file.' | ||
assert (pull_frequency_hours is None or | ||
pull_frequency_hours >= 0), pull_frequency_hours | ||
catalog_path = get_carbon_catalog_path(filename) | ||
cloud_dir_name=os.path.dirname(filename) | ||
cloud = "" | ||
if len(cloud_dir_name) > 0: | ||
cloud = cloud_registry.CLOUD_REGISTRY.from_str(cloud_dir_name) | ||
|
||
meta_path = os.path.join(_CARBON_CATALOG_DIR, '.meta', filename) | ||
os.makedirs(os.path.dirname(meta_path), exist_ok=True) | ||
|
||
# Atomic check, to avoid conflicts with other processes. | ||
# TODO(mraheja): remove pylint disabling when filelock version updated | ||
# pylint: disable=abstract-class-instantiated | ||
with filelock.FileLock(meta_path + '.lock'): | ||
|
||
def _need_update() -> bool: | ||
if not os.path.exists(catalog_path): | ||
return True | ||
if pull_frequency_hours is None: | ||
return False | ||
# Check the md5 of the file to see if it has changed. | ||
with open(catalog_path, 'rb') as f: | ||
file_md5 = hashlib.md5(f.read()).hexdigest() | ||
md5_filepath = meta_path + '.md5' | ||
if os.path.exists(md5_filepath): | ||
with open(md5_filepath, 'r') as f: | ||
last_md5 = f.read() | ||
if file_md5 != last_md5: | ||
# Do not update the file if the user modified it. | ||
return False | ||
|
||
last_update = os.path.getmtime(catalog_path) | ||
return last_update + pull_frequency_hours * 3600 < time.time() | ||
|
||
if _need_update(): | ||
# TODO: Cleanup hack below for better impl. | ||
source_filename = filename | ||
if len(cloud_dir_name) > 0 and str.startswith(filename, cloud_dir_name+'/') : | ||
source_filename = filename[len(cloud_dir_name)+1:] | ||
url = f'{CARBON_HOSTED_CATALOG_DIR_URL}/{CARBON_CATALOG_SCHEMA_VERSION}/{source_filename}' # pylint: disable=line-too-long | ||
update_frequency_str = '' | ||
if pull_frequency_hours is not None: | ||
update_frequency_str = f' (every {pull_frequency_hours} hours)' | ||
with rich_utils.safe_status((f'Updating {cloud} carbon file: ' | ||
f'{filename}' | ||
f'{update_frequency_str}')): | ||
try: | ||
r = requests.get(url) | ||
r.raise_for_status() | ||
except requests.exceptions.RequestException as e: | ||
error_str = (f'Failed to fetch {cloud} carbon file: ' | ||
f'{filename}. ') | ||
if os.path.exists(catalog_path): | ||
logger.warning( | ||
f'{error_str}Using cached catalog files.') | ||
# Update catalog file modification time. | ||
os.utime(catalog_path, None) # Sets to current time | ||
else: | ||
logger.error( | ||
f'{error_str}Please check your internet connection.' | ||
) | ||
with ux_utils.print_exception_no_traceback(): | ||
raise e | ||
else: | ||
# Download successful, save the catalog to a local file. | ||
os.makedirs(os.path.dirname(catalog_path), exist_ok=True) | ||
with open(catalog_path, 'w') as f: | ||
f.write(r.text) | ||
with open(meta_path + '.md5', 'w') as f: | ||
f.write(hashlib.md5(r.text.encode()).hexdigest()) | ||
try: | ||
df = pd.read_csv(catalog_path, sep=',', skiprows=1) | ||
# Filter out some rows | ||
if len(filter_col) > 0: | ||
df = df[(df[filter_col] == filter_val)] | ||
except Exception as e: # pylint: disable=broad-except | ||
# As users can manually modify the catalog, read_csv can fail. | ||
logger.error(f'Failed to read {catalog_path}. ' | ||
'To fix: delete the csv file and try again.') | ||
with ux_utils.print_exception_no_traceback(): | ||
raise e | ||
return df |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.