Skip to content

Commit

Permalink
WIP Sustainability
Browse files Browse the repository at this point in the history
Signed-off-by: dmatch01 <[email protected]>
  • Loading branch information
dmatch01 committed Sep 27, 2023
1 parent 8184f2a commit 196b940
Show file tree
Hide file tree
Showing 10 changed files with 458 additions and 11 deletions.
8 changes: 8 additions & 0 deletions sky/clouds/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,15 @@ def accelerators_to_hourly_cost(self, accelerators: Dict[str, int],
"""Returns the hourly on-demand price for accelerators."""
raise NotImplementedError

def instance_type_to_hourly_carbon_cost(self, instance_type: str,
accelerators: Dict[str, int],
region: Optional[str],
zone: Optional[str]) -> float:
"""Returns the hourly carbon cost for an instance type."""
raise NotImplementedError

def get_egress_cost(self, num_gigabytes: float):

"""Returns the egress cost.
TODO: takes into account "per month" accumulation per account.
Expand Down
11 changes: 11 additions & 0 deletions sky/clouds/gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,17 @@ def instance_type_to_hourly_cost(self,
zone=zone,
clouds='gcp')

def instance_type_to_hourly_carbon_cost(self,
instance_type: str,
accelerators: Dict[str, int],
region: Optional[str] = None,
zone: Optional[str] = None) -> float:
return service_catalog.get_hourly_carbon_cost(instance_type,
accelerators,
region=region,
zone=zone,
clouds='gcp')

def accelerators_to_hourly_cost(self,
accelerators: Dict[str, int],
use_spot: bool,
Expand Down
13 changes: 13 additions & 0 deletions sky/clouds/ibm.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,19 @@ def instance_type_to_hourly_cost(self,
zone=zone,
clouds='ibm')

def instance_type_to_hourly_carbon_cost(self,
instance_type: str,
accelerators: Dict[str, int],
region: Optional[str] = None,
zone: Optional[str] = None) -> float:
return service_catalog.get_hourly_carbon_cost(instance_type,
accelerators,
region=region,
zone=zone,
clouds='ibm')



def accelerators_to_hourly_cost(self,
accelerators: Dict[str, int],
use_spot: bool,
Expand Down
11 changes: 11 additions & 0 deletions sky/clouds/service_catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,17 @@ def get_hourly_cost(instance_type: str,
use_spot, region, zone)


def get_hourly_carbon_cost(instance_type: str,
accelerators: Dict[str, int],
region: Optional[str],
zone: Optional[str],
clouds: CloudFilter = None) -> float:
"""Returns the hourly carbon cost of a VM instance in the given region and zone.
"""
return _map_clouds_catalog(clouds, 'get_hourly_carbon_cost', instance_type,
accelerators, region, zone)

def get_vcpus_mem_from_instance_type(
instance_type: str,
clouds: CloudFilter = None) -> Tuple[Optional[float], Optional[float]]:
Expand Down
125 changes: 125 additions & 0 deletions sky/clouds/service_catalog/carbon_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
"""Configuration for carbon catalog."""
import filelock
import hashlib
import os
import pandas as pd
import requests
import time

from typing import Dict, List, NamedTuple, Optional, Tuple

from sky.clouds import cloud_registry
from sky.utils import rich_utils
from sky.utils import ux_utils
from sky import sky_logging

logger = sky_logging.init_logger(__name__)

CARBON_HOSTED_CATALOG_DIR_URL = 'https://raw.githubusercontent.com/GreenAlgorithms/green-algorithms-tool/master/data' # pylint: disable=line-too-long
CARBON_CATALOG_SCHEMA_VERSION = 'latest'
CARBON_LOCAL_CATALOG_DIR = os.path.expanduser('~/.sky/catalogs/carbon')

_CARBON_CATALOG_DIR = os.path.join(CARBON_LOCAL_CATALOG_DIR,
CARBON_CATALOG_SCHEMA_VERSION)

_CARBON_PULL_FREQUENCY_HOURS = 7

def get_carbon_catalog_path(filename: str) -> str:
return os.path.join(_CARBON_CATALOG_DIR, filename)


def read_carbon_file(filename: str, filter_col: str, filter_val: str,
pull_frequency_hours: Optional[int] = None) -> pd.DataFrame:
"""Reads the catalog from a local CSV file.
If the file does not exist, download the up-to-date catalog that matches
the schema version.
If `pull_frequency_hours` is not None: pull the latest catalog with
possibly updated prices, if the local catalog file is older than
`pull_frequency_hours` and no changes to the local catalog file are
made after the last pull.
"""
assert filename.endswith('.csv'), 'The catalog file must be a CSV file.'
assert (pull_frequency_hours is None or
pull_frequency_hours >= 0), pull_frequency_hours
catalog_path = get_carbon_catalog_path(filename)
cloud_dir_name=os.path.dirname(filename)
cloud = ""
if len(cloud_dir_name) > 0:
cloud = cloud_registry.CLOUD_REGISTRY.from_str(cloud_dir_name)

meta_path = os.path.join(_CARBON_CATALOG_DIR, '.meta', filename)
os.makedirs(os.path.dirname(meta_path), exist_ok=True)

# Atomic check, to avoid conflicts with other processes.
# TODO(mraheja): remove pylint disabling when filelock version updated
# pylint: disable=abstract-class-instantiated
with filelock.FileLock(meta_path + '.lock'):

def _need_update() -> bool:
if not os.path.exists(catalog_path):
return True
if pull_frequency_hours is None:
return False
# Check the md5 of the file to see if it has changed.
with open(catalog_path, 'rb') as f:
file_md5 = hashlib.md5(f.read()).hexdigest()
md5_filepath = meta_path + '.md5'
if os.path.exists(md5_filepath):
with open(md5_filepath, 'r') as f:
last_md5 = f.read()
if file_md5 != last_md5:
# Do not update the file if the user modified it.
return False

last_update = os.path.getmtime(catalog_path)
return last_update + pull_frequency_hours * 3600 < time.time()

if _need_update():
# TODO: Cleanup hack below for better impl.
source_filename = filename
if len(cloud_dir_name) > 0 and str.startswith(filename, cloud_dir_name+'/') :
source_filename = filename[len(cloud_dir_name)+1:]
url = f'{CARBON_HOSTED_CATALOG_DIR_URL}/{CARBON_CATALOG_SCHEMA_VERSION}/{source_filename}' # pylint: disable=line-too-long
update_frequency_str = ''
if pull_frequency_hours is not None:
update_frequency_str = f' (every {pull_frequency_hours} hours)'
with rich_utils.safe_status((f'Updating {cloud} carbon file: '
f'{filename}'
f'{update_frequency_str}')):
try:
r = requests.get(url)
r.raise_for_status()
except requests.exceptions.RequestException as e:
error_str = (f'Failed to fetch {cloud} carbon file: '
f'{filename}. ')
if os.path.exists(catalog_path):
logger.warning(
f'{error_str}Using cached catalog files.')
# Update catalog file modification time.
os.utime(catalog_path, None) # Sets to current time
else:
logger.error(
f'{error_str}Please check your internet connection.'
)
with ux_utils.print_exception_no_traceback():
raise e
else:
# Download successful, save the catalog to a local file.
os.makedirs(os.path.dirname(catalog_path), exist_ok=True)
with open(catalog_path, 'w') as f:
f.write(r.text)
with open(meta_path + '.md5', 'w') as f:
f.write(hashlib.md5(r.text.encode()).hexdigest())
try:
df = pd.read_csv(catalog_path, sep=',', skiprows=1)
# Filter out some rows
if len(filter_col) > 0:
df = df[(df[filter_col] == filter_val)]
except Exception as e: # pylint: disable=broad-except
# As users can manually modify the catalog, read_csv can fail.
logger.error(f'Failed to read {catalog_path}. '
'To fix: delete the csv file and try again.')
with ux_utils.print_exception_no_traceback():
raise e
return df
135 changes: 135 additions & 0 deletions sky/clouds/service_catalog/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from sky import sky_logging
from sky.clouds import cloud as cloud_lib
from sky.clouds import cloud_registry
from sky.clouds.service_catalog import carbon_utils
from sky.clouds.service_catalog import constants
from sky.utils import rich_utils
from sky.utils import ux_utils
Expand All @@ -23,6 +24,22 @@
constants.CATALOG_SCHEMA_VERSION)
os.makedirs(_CATALOG_DIR, exist_ok=True)

# Carbon Intensity
_ci_df = carbon_utils.read_carbon_file('CI_aggregated.csv',
filter_col='', filter_val='',
pull_frequency_hours=carbon_utils._CARBON_PULL_FREQUENCY_HOURS)

_tdp_gpu_df = carbon_utils.read_carbon_file('TDP_gpu.csv',
filter_col='', filter_val='',
pull_frequency_hours=carbon_utils._CARBON_PULL_FREQUENCY_HOURS)

_tdp_cpu_df = carbon_utils.read_carbon_file('TDP_cpu.csv',
filter_col='', filter_val='',
pull_frequency_hours=carbon_utils._CARBON_PULL_FREQUENCY_HOURS)

_reference_vals_df = carbon_utils.read_carbon_file('referenceValues.csv',
filter_col='', filter_val='',
pull_frequency_hours=carbon_utils._CARBON_PULL_FREQUENCY_HOURS)

class InstanceTypeInfo(NamedTuple):
"""Instance type information.
Expand Down Expand Up @@ -256,6 +273,123 @@ def get_hourly_cost_impl(
return cheapest[price_str]


# Includes carbon cost for accelerator
def get_carbon_cost_impl(
df: pd.DataFrame,
carbon_df: pd.DataFrame,
carbon_pue_df: pd.DataFrame,
instance_type: str,
acc_name: str,
n_GPUs: int,
region: Optional[str],
zone: Optional[str],
) -> float:
"""Returns the hourly carbon cost of a VM instance in the given region any zone.
Refer to get_carbon_cost in service_catalog/__init__.py for the docstring.
"""
assert region is not None

# INSTANCE
df = _get_instance_type(df, instance_type, region, zone)
if df.empty:
with ux_utils.print_exception_no_traceback():
raise ValueError(f'Instance type {instance_type!r} not found.')

# Get the first instance of the region in any zone
any_zone_instance = df.iloc[0]

# PUE
# TODO Make const.
PUE_used = 1.67
if not carbon_pue_df.empty:
pue = _get_value(carbon_pue_df.iloc[0]['PUE'])
if pue is not None:
PUE_used = pue

# CPU
n_CPUcores = _get_value(df.iloc[0]['vCPUs'])
if n_CPUcores is None:
n_CPUcores = 0

# Finding Thermal Design Power (TDP) value per core usually 10-15W.
# TODO: Find a way to get cpu model from instance
# TODO: Create constant
tdp_cpu_df = _tdp_cpu_df[
(_tdp_cpu_df['model'].str.lower() == "any")]
if tdp_cpu_df.empty:
powerNeeded_CPU = 0
else:
CPUpower = tdp_cpu_df.iloc[0]['TDP_per_core']
powerNeeded_CPU = PUE_used * n_CPUcores * CPUpower


# GPU
if n_GPUs > 0:
fuzzy_tdp_gpu_df = _tdp_gpu_df[
(_tdp_gpu_df['model'].str.contains(acc_name, case=False))]
if fuzzy_tdp_gpu_df.empty:
# Default
# TODO: Create constant
fuzzy_tdp_gpu_df = _tdp_gpu_df[
(_tdp_gpu_df['model'].str.lower() == "any")]
assert len(fuzzy_tdp_gpu_df) == 1

# Finding Thermal Design Power (TDP) value per core usually 10-15W.
GPUpower = fuzzy_tdp_gpu_df.iloc[0]['TDP_per_core']
powerNeeded_GPU = PUE_used * n_GPUs * GPUpower
else:
powerNeeded_GPU = 0


# MEMORY
memory = _get_value(df.iloc[0]['MemoryGiB'])
if memory is None:
memory = 0

memoryPower = 0.3725
ref_vals_df = _reference_vals_df[
(_reference_vals_df['variable'].str.lower() == "memorypower")]
if not ref_vals_df.empty:
memoryPowerVal = _get_value(ref_vals_df.iloc[0]['value'])
if memoryPowerVal is not None:
memoryPower = memoryPowerVal

# SERVER/LOCATION
# First get the region code
carbon_df = carbon_df[(carbon_df['Name'].str.lower() == region.lower())]
regionCode = "World"
if not carbon_df.empty:
location = carbon_df.iloc[0]['location']
if not pd.isnull(location) and len(location) > 0:
regionCode = location

# From the region code get the carbon intensity
ci_df = _ci_df[(_ci_df['location'].str.lower() == regionCode.lower())]
if ci_df.empty:
with ux_utils.print_exception_no_traceback():
raise ValueError(f'Carbon intesity metrics not found '
f'for {region}.')

carbonIntensity = ci_df.iloc[0]['carbonIntensity']

# Pragmatic Scaling Factor (PSF)
PSF_used = 1

# Power needed, in Watt
powerNeeded_core = powerNeeded_CPU + powerNeeded_GPU
powerNeeded_memory = PUE_used * (memory * memoryPower)
powerNeeded = powerNeeded_core + powerNeeded_memory

# Energy needed, in kWh (so dividing by 1000 to convert to kW)
energyNeeded = powerNeeded * PSF_used / 1000

# Carbon emissions: carbonIntensity is in g per kWh, so results in gCO2
carbonEmissions = energyNeeded * carbonIntensity

return carbonEmissions


def _get_value(value):
if pd.isna(value):
return None
Expand Down Expand Up @@ -400,6 +534,7 @@ def get_instance_type_for_accelerator_impl(
result = df[(df['AcceleratorName'].str.fullmatch(acc_name, case=False)) &
(df['AcceleratorCount'] == acc_count)]
result = _filter_region_zone(result, region, zone)

if len(result) == 0:
fuzzy_result = df[
(df['AcceleratorName'].str.contains(acc_name, case=False)) &
Expand Down
Loading

0 comments on commit 196b940

Please sign in to comment.