diff --git a/sky/clouds/cloud.py b/sky/clouds/cloud.py index d9aeca0cca2e..6e4711ac3d59 100644 --- a/sky/clouds/cloud.py +++ b/sky/clouds/cloud.py @@ -191,7 +191,15 @@ def accelerators_to_hourly_cost(self, accelerators: Dict[str, int], """Returns the hourly on-demand price for accelerators.""" raise NotImplementedError + def instance_type_to_hourly_carbon_cost(self, instance_type: str, + accelerators: Dict[str, int], + region: Optional[str], + zone: Optional[str]) -> float: + """Returns the hourly carbon cost for an instance type.""" + raise NotImplementedError + def get_egress_cost(self, num_gigabytes: float): + """Returns the egress cost. TODO: takes into account "per month" accumulation per account. diff --git a/sky/clouds/gcp.py b/sky/clouds/gcp.py index 2d539287d250..ee104d0d0846 100644 --- a/sky/clouds/gcp.py +++ b/sky/clouds/gcp.py @@ -316,6 +316,17 @@ def instance_type_to_hourly_cost(self, zone=zone, clouds='gcp') + def instance_type_to_hourly_carbon_cost(self, + instance_type: str, + accelerators: Dict[str, int], + region: Optional[str] = None, + zone: Optional[str] = None) -> float: + return service_catalog.get_hourly_carbon_cost(instance_type, + accelerators, + region=region, + zone=zone, + clouds='gcp') + def accelerators_to_hourly_cost(self, accelerators: Dict[str, int], use_spot: bool, diff --git a/sky/clouds/ibm.py b/sky/clouds/ibm.py index b8f30f666a24..c8ebc2478b20 100644 --- a/sky/clouds/ibm.py +++ b/sky/clouds/ibm.py @@ -127,6 +127,19 @@ def instance_type_to_hourly_cost(self, zone=zone, clouds='ibm') + def instance_type_to_hourly_carbon_cost(self, + instance_type: str, + accelerators: Dict[str, int], + region: Optional[str] = None, + zone: Optional[str] = None) -> float: + return service_catalog.get_hourly_carbon_cost(instance_type, + accelerators, + region=region, + zone=zone, + clouds='ibm') + + + def accelerators_to_hourly_cost(self, accelerators: Dict[str, int], use_spot: bool, diff --git a/sky/clouds/service_catalog/__init__.py b/sky/clouds/service_catalog/__init__.py index 03d62144103a..265c140bfa6b 100644 --- a/sky/clouds/service_catalog/__init__.py +++ b/sky/clouds/service_catalog/__init__.py @@ -166,6 +166,17 @@ def get_hourly_cost(instance_type: str, use_spot, region, zone) +def get_hourly_carbon_cost(instance_type: str, + accelerators: Dict[str, int], + region: Optional[str], + zone: Optional[str], + clouds: CloudFilter = None) -> float: + """Returns the hourly carbon cost of a VM instance in the given region and zone. + + """ + return _map_clouds_catalog(clouds, 'get_hourly_carbon_cost', instance_type, + accelerators, region, zone) + def get_vcpus_mem_from_instance_type( instance_type: str, clouds: CloudFilter = None) -> Tuple[Optional[float], Optional[float]]: diff --git a/sky/clouds/service_catalog/carbon_utils.py b/sky/clouds/service_catalog/carbon_utils.py new file mode 100644 index 000000000000..06d791755205 --- /dev/null +++ b/sky/clouds/service_catalog/carbon_utils.py @@ -0,0 +1,125 @@ +"""Configuration for carbon catalog.""" +import filelock +import hashlib +import os +import pandas as pd +import requests +import time + +from typing import Dict, List, NamedTuple, Optional, Tuple + +from sky.clouds import cloud_registry +from sky.utils import rich_utils +from sky.utils import ux_utils +from sky import sky_logging + +logger = sky_logging.init_logger(__name__) + +CARBON_HOSTED_CATALOG_DIR_URL = 'https://raw.githubusercontent.com/GreenAlgorithms/green-algorithms-tool/master/data' # pylint: disable=line-too-long +CARBON_CATALOG_SCHEMA_VERSION = 'latest' +CARBON_LOCAL_CATALOG_DIR = os.path.expanduser('~/.sky/catalogs/carbon') + +_CARBON_CATALOG_DIR = os.path.join(CARBON_LOCAL_CATALOG_DIR, + CARBON_CATALOG_SCHEMA_VERSION) + +_CARBON_PULL_FREQUENCY_HOURS = 7 + +def get_carbon_catalog_path(filename: str) -> str: + return os.path.join(_CARBON_CATALOG_DIR, filename) + + +def read_carbon_file(filename: str, filter_col: str, filter_val: str, + pull_frequency_hours: Optional[int] = None) -> pd.DataFrame: + """Reads the catalog from a local CSV file. + + If the file does not exist, download the up-to-date catalog that matches + the schema version. + If `pull_frequency_hours` is not None: pull the latest catalog with + possibly updated prices, if the local catalog file is older than + `pull_frequency_hours` and no changes to the local catalog file are + made after the last pull. + """ + assert filename.endswith('.csv'), 'The catalog file must be a CSV file.' + assert (pull_frequency_hours is None or + pull_frequency_hours >= 0), pull_frequency_hours + catalog_path = get_carbon_catalog_path(filename) + cloud_dir_name=os.path.dirname(filename) + cloud = "" + if len(cloud_dir_name) > 0: + cloud = cloud_registry.CLOUD_REGISTRY.from_str(cloud_dir_name) + + meta_path = os.path.join(_CARBON_CATALOG_DIR, '.meta', filename) + os.makedirs(os.path.dirname(meta_path), exist_ok=True) + + # Atomic check, to avoid conflicts with other processes. + # TODO(mraheja): remove pylint disabling when filelock version updated + # pylint: disable=abstract-class-instantiated + with filelock.FileLock(meta_path + '.lock'): + + def _need_update() -> bool: + if not os.path.exists(catalog_path): + return True + if pull_frequency_hours is None: + return False + # Check the md5 of the file to see if it has changed. + with open(catalog_path, 'rb') as f: + file_md5 = hashlib.md5(f.read()).hexdigest() + md5_filepath = meta_path + '.md5' + if os.path.exists(md5_filepath): + with open(md5_filepath, 'r') as f: + last_md5 = f.read() + if file_md5 != last_md5: + # Do not update the file if the user modified it. + return False + + last_update = os.path.getmtime(catalog_path) + return last_update + pull_frequency_hours * 3600 < time.time() + + if _need_update(): + # TODO: Cleanup hack below for better impl. + source_filename = filename + if len(cloud_dir_name) > 0 and str.startswith(filename, cloud_dir_name+'/') : + source_filename = filename[len(cloud_dir_name)+1:] + url = f'{CARBON_HOSTED_CATALOG_DIR_URL}/{CARBON_CATALOG_SCHEMA_VERSION}/{source_filename}' # pylint: disable=line-too-long + update_frequency_str = '' + if pull_frequency_hours is not None: + update_frequency_str = f' (every {pull_frequency_hours} hours)' + with rich_utils.safe_status((f'Updating {cloud} carbon file: ' + f'{filename}' + f'{update_frequency_str}')): + try: + r = requests.get(url) + r.raise_for_status() + except requests.exceptions.RequestException as e: + error_str = (f'Failed to fetch {cloud} carbon file: ' + f'{filename}. ') + if os.path.exists(catalog_path): + logger.warning( + f'{error_str}Using cached catalog files.') + # Update catalog file modification time. + os.utime(catalog_path, None) # Sets to current time + else: + logger.error( + f'{error_str}Please check your internet connection.' + ) + with ux_utils.print_exception_no_traceback(): + raise e + else: + # Download successful, save the catalog to a local file. + os.makedirs(os.path.dirname(catalog_path), exist_ok=True) + with open(catalog_path, 'w') as f: + f.write(r.text) + with open(meta_path + '.md5', 'w') as f: + f.write(hashlib.md5(r.text.encode()).hexdigest()) + try: + df = pd.read_csv(catalog_path, sep=',', skiprows=1) + # Filter out some rows + if len(filter_col) > 0: + df = df[(df[filter_col] == filter_val)] + except Exception as e: # pylint: disable=broad-except + # As users can manually modify the catalog, read_csv can fail. + logger.error(f'Failed to read {catalog_path}. ' + 'To fix: delete the csv file and try again.') + with ux_utils.print_exception_no_traceback(): + raise e + return df \ No newline at end of file diff --git a/sky/clouds/service_catalog/common.py b/sky/clouds/service_catalog/common.py index a3a668bbf3b9..5cf655aca7a4 100644 --- a/sky/clouds/service_catalog/common.py +++ b/sky/clouds/service_catalog/common.py @@ -13,6 +13,7 @@ from sky import sky_logging from sky.clouds import cloud as cloud_lib from sky.clouds import cloud_registry +from sky.clouds.service_catalog import carbon_utils from sky.clouds.service_catalog import constants from sky.utils import rich_utils from sky.utils import ux_utils @@ -23,6 +24,22 @@ constants.CATALOG_SCHEMA_VERSION) os.makedirs(_CATALOG_DIR, exist_ok=True) +# Carbon Intensity +_ci_df = carbon_utils.read_carbon_file('CI_aggregated.csv', + filter_col='', filter_val='', + pull_frequency_hours=carbon_utils._CARBON_PULL_FREQUENCY_HOURS) + +_tdp_gpu_df = carbon_utils.read_carbon_file('TDP_gpu.csv', + filter_col='', filter_val='', + pull_frequency_hours=carbon_utils._CARBON_PULL_FREQUENCY_HOURS) + +_tdp_cpu_df = carbon_utils.read_carbon_file('TDP_cpu.csv', + filter_col='', filter_val='', + pull_frequency_hours=carbon_utils._CARBON_PULL_FREQUENCY_HOURS) + +_reference_vals_df = carbon_utils.read_carbon_file('referenceValues.csv', + filter_col='', filter_val='', + pull_frequency_hours=carbon_utils._CARBON_PULL_FREQUENCY_HOURS) class InstanceTypeInfo(NamedTuple): """Instance type information. @@ -256,6 +273,123 @@ def get_hourly_cost_impl( return cheapest[price_str] +# Includes carbon cost for accelerator +def get_carbon_cost_impl( + df: pd.DataFrame, + carbon_df: pd.DataFrame, + carbon_pue_df: pd.DataFrame, + instance_type: str, + acc_name: str, + n_GPUs: int, + region: Optional[str], + zone: Optional[str], +) -> float: + """Returns the hourly carbon cost of a VM instance in the given region any zone. + + Refer to get_carbon_cost in service_catalog/__init__.py for the docstring. + """ + assert region is not None + + # INSTANCE + df = _get_instance_type(df, instance_type, region, zone) + if df.empty: + with ux_utils.print_exception_no_traceback(): + raise ValueError(f'Instance type {instance_type!r} not found.') + + # Get the first instance of the region in any zone + any_zone_instance = df.iloc[0] + + # PUE + # TODO Make const. + PUE_used = 1.67 + if not carbon_pue_df.empty: + pue = _get_value(carbon_pue_df.iloc[0]['PUE']) + if pue is not None: + PUE_used = pue + + # CPU + n_CPUcores = _get_value(df.iloc[0]['vCPUs']) + if n_CPUcores is None: + n_CPUcores = 0 + + # Finding Thermal Design Power (TDP) value per core usually 10-15W. + # TODO: Find a way to get cpu model from instance + # TODO: Create constant + tdp_cpu_df = _tdp_cpu_df[ + (_tdp_cpu_df['model'].str.lower() == "any")] + if tdp_cpu_df.empty: + powerNeeded_CPU = 0 + else: + CPUpower = tdp_cpu_df.iloc[0]['TDP_per_core'] + powerNeeded_CPU = PUE_used * n_CPUcores * CPUpower + + + # GPU + if n_GPUs > 0: + fuzzy_tdp_gpu_df = _tdp_gpu_df[ + (_tdp_gpu_df['model'].str.contains(acc_name, case=False))] + if fuzzy_tdp_gpu_df.empty: + # Default + # TODO: Create constant + fuzzy_tdp_gpu_df = _tdp_gpu_df[ + (_tdp_gpu_df['model'].str.lower() == "any")] + assert len(fuzzy_tdp_gpu_df) == 1 + + # Finding Thermal Design Power (TDP) value per core usually 10-15W. + GPUpower = fuzzy_tdp_gpu_df.iloc[0]['TDP_per_core'] + powerNeeded_GPU = PUE_used * n_GPUs * GPUpower + else: + powerNeeded_GPU = 0 + + + # MEMORY + memory = _get_value(df.iloc[0]['MemoryGiB']) + if memory is None: + memory = 0 + + memoryPower = 0.3725 + ref_vals_df = _reference_vals_df[ + (_reference_vals_df['variable'].str.lower() == "memorypower")] + if not ref_vals_df.empty: + memoryPowerVal = _get_value(ref_vals_df.iloc[0]['value']) + if memoryPowerVal is not None: + memoryPower = memoryPowerVal + + # SERVER/LOCATION + # First get the region code + carbon_df = carbon_df[(carbon_df['Name'].str.lower() == region.lower())] + regionCode = "World" + if not carbon_df.empty: + location = carbon_df.iloc[0]['location'] + if not pd.isnull(location) and len(location) > 0: + regionCode = location + + # From the region code get the carbon intensity + ci_df = _ci_df[(_ci_df['location'].str.lower() == regionCode.lower())] + if ci_df.empty: + with ux_utils.print_exception_no_traceback(): + raise ValueError(f'Carbon intesity metrics not found ' + f'for {region}.') + + carbonIntensity = ci_df.iloc[0]['carbonIntensity'] + + # Pragmatic Scaling Factor (PSF) + PSF_used = 1 + + # Power needed, in Watt + powerNeeded_core = powerNeeded_CPU + powerNeeded_GPU + powerNeeded_memory = PUE_used * (memory * memoryPower) + powerNeeded = powerNeeded_core + powerNeeded_memory + + # Energy needed, in kWh (so dividing by 1000 to convert to kW) + energyNeeded = powerNeeded * PSF_used / 1000 + + # Carbon emissions: carbonIntensity is in g per kWh, so results in gCO2 + carbonEmissions = energyNeeded * carbonIntensity + + return carbonEmissions + + def _get_value(value): if pd.isna(value): return None @@ -400,6 +534,7 @@ def get_instance_type_for_accelerator_impl( result = df[(df['AcceleratorName'].str.fullmatch(acc_name, case=False)) & (df['AcceleratorCount'] == acc_count)] result = _filter_region_zone(result, region, zone) + if len(result) == 0: fuzzy_result = df[ (df['AcceleratorName'].str.contains(acc_name, case=False)) & diff --git a/sky/clouds/service_catalog/gcp_catalog.py b/sky/clouds/service_catalog/gcp_catalog.py index 1e0828f6f5f1..10aa0efd66dd 100644 --- a/sky/clouds/service_catalog/gcp_catalog.py +++ b/sky/clouds/service_catalog/gcp_catalog.py @@ -11,6 +11,7 @@ from sky import exceptions from sky import sky_logging +from sky.clouds.service_catalog import carbon_utils from sky.clouds.service_catalog import common from sky.utils import ux_utils @@ -33,6 +34,14 @@ _quotas_df = common.read_catalog('gcp/accelerator_quota_mapping.csv', pull_frequency_hours=_PULL_FREQUENCY_HOURS) +_carbon_df = carbon_utils.read_carbon_file('gcp/cloudProviders_datacenters.csv', + filter_col='provider', filter_val='gcp', + pull_frequency_hours=_PULL_FREQUENCY_HOURS) + +_carbon_pue_df = carbon_utils.read_carbon_file('gcp/defaults_PUE.csv', + filter_col='provider', filter_val='gcp', + pull_frequency_hours=_PULL_FREQUENCY_HOURS) + # We will select from the following three CPU instance families: _DEFAULT_INSTANCE_FAMILY = [ # This is the latest general-purpose instance family as of Mar 2023. @@ -224,6 +233,28 @@ def get_hourly_cost( zone) +def get_hourly_carbon_cost( + instance_type: str, + accelerators: Dict[str, int], + region: Optional[str] = None, + zone: Optional[str] = None, +) -> float: + """Returns the hourly enery cost of a VM instance in the given region and zone. + + Refer to get_hourly_energy_cost in service_catalog/__init__.py for the docstring. + """ + assert region is not None + + if accelerators is not None and len(accelerators) > 0: + acc_name, n_GPUs = list(accelerators.items())[0] + else: + acc_name = "" + n_GPUs = 0 + return common.get_carbon_cost_impl(_df, _carbon_df, _carbon_pue_df, + instance_type, acc_name, n_GPUs, + region, zone) + + def get_vcpus_mem_from_instance_type( instance_type: str) -> Tuple[Optional[float], Optional[float]]: # The number of vCPUs and memory size provided with a TPU VM is not diff --git a/sky/clouds/service_catalog/ibm_catalog.py b/sky/clouds/service_catalog/ibm_catalog.py index b1bd84023e43..ebadd63cb9e5 100644 --- a/sky/clouds/service_catalog/ibm_catalog.py +++ b/sky/clouds/service_catalog/ibm_catalog.py @@ -8,6 +8,7 @@ from sky import sky_logging from sky.adaptors import ibm +from sky.clouds.service_catalog import carbon_utils from sky.clouds import cloud from sky.clouds.service_catalog import common @@ -19,6 +20,12 @@ _df = common.read_catalog('ibm/vms.csv') +_carbon_df = carbon_utils.read_carbon_file('gcp/cloudProviders_datacenters.csv', + filter_col='provider', filter_val='ibm') + +_carbon_pue_df = carbon_utils.read_carbon_file('gcp/defaults_PUE.csv', + filter_col='provider', filter_val='Unknown') + def instance_type_exists(instance_type: str) -> bool: return common.instance_type_exists_impl(_df, instance_type) @@ -44,6 +51,28 @@ def get_hourly_cost(instance_type: str, zone) +def get_hourly_carbon_cost( + instance_type: str, + accelerators: Dict[str, int], + region: Optional[str] = None, + zone: Optional[str] = None, +) -> float: + """Returns the hourly enery cost of a VM instance in the given region and zone. + + Refer to get_hourly_energy_cost in service_catalog/__init__.py for the docstring. + """ + assert region is not None + + if accelerators is not None and len(accelerators) > 0: + acc_name, n_GPUs = list(accelerators.items())[0] + else: + acc_name = "" + n_GPUs = 0 + return common.get_carbon_cost_impl(_df, _carbon_df, _carbon_pue_df, + instance_type, acc_name, n_GPUs, + region, zone) + + def get_vcpus_mem_from_instance_type( instance_type: str) -> Tuple[Optional[float], Optional[float]]: return common.get_vcpus_mem_from_instance_type_impl(_df, instance_type) diff --git a/sky/optimizer.py b/sky/optimizer.py index 8385430019e4..0e4c0eb34519 100644 --- a/sky/optimizer.py +++ b/sky/optimizer.py @@ -34,6 +34,8 @@ # task -> resources -> estimated cost or time. _TaskToCostMap = Dict[task_lib.Task, Dict[resources_lib.Resources, float]] +# task -> resources -> estimated carbon cost or time. +_TaskToCIMap = Dict[task_lib.Task, Dict[resources_lib.Resources, float]] # cloud -> list of resources that have the same accelerators. _PerCloudCandidates = Dict[clouds.Cloud, List[resources_lib.Resources]] # task -> per-cloud candidates @@ -44,6 +46,7 @@ class OptimizeTarget(enum.Enum): COST = 0 TIME = 1 + CARBON_INTENSITY = 2 # For logging purposes. @@ -117,6 +120,7 @@ def optimize(dag: 'dag_lib.Dag', unused_best_plan = Optimizer._optimize_objective( dag, minimize_cost=minimize == OptimizeTarget.COST, + estimate_carbon_intensity=minimize == OptimizeTarget.COST, blocked_resources=blocked_resources, quiet=quiet) finally: @@ -223,8 +227,9 @@ def _egress_cost_or_time(minimize_cost: bool, parent: task_lib.Task, def _estimate_nodes_cost_or_time( topo_order: List[task_lib.Task], minimize_cost: bool = True, + estimate_carbon_intensity: bool = False, blocked_resources: Optional[Iterable[resources_lib.Resources]] = None, - ) -> Tuple[_TaskToCostMap, _TaskToPerCloudCandidates]: + ) -> Tuple[_TaskToCostMap, _TaskToCIMap, _TaskToPerCloudCandidates]: """Estimates the cost/time of each task-resource mapping in the DAG. Note that the egress cost/time is not considered in this function. @@ -235,6 +240,8 @@ def _estimate_nodes_cost_or_time( # Cost/time of running the task on the resources. # node -> {resources -> cost/time} node_to_cost_map: _TaskToCostMap = collections.defaultdict(dict) + # node -> {resources -> carbon intensity} + node_to_ci_map: _TaskToCIMap = collections.defaultdict(dict) # node -> cloud -> list of resources that satisfy user's requirements. node_to_candidate_map: _TaskToPerCloudCandidates = {} @@ -246,6 +253,7 @@ def _estimate_nodes_cost_or_time( if node_i == 0: # Base case: a special source node. node_to_cost_map[node][list(node.get_resources())[0]] = 0 + node_to_ci_map[node][list(node.get_resources())[0]] = 0 continue # Don't print for the last node, Sink. @@ -349,6 +357,14 @@ def _estimate_nodes_cost_or_time( else: # Minimize run time. estimated_cost_or_time = estimated_runtime + + if estimate_carbon_intensity: + carbs_per_node = resources.get_carbon_cost(estimated_runtime) + estimated_carbs = carbs_per_node * node.num_nodes + node_to_ci_map[node][resources] = estimated_carbs + else: + node_to_ci_map[node][resources] = 0.0 + if do_print: logger.debug( ' estimated_runtime: {:.0f} s ({:.1f} hr)'.format( @@ -358,7 +374,7 @@ def _estimate_nodes_cost_or_time( ' estimated_cost (not incl. egress): ${:.1f}'. format(estimated_cost_or_time)) node_to_cost_map[node][resources] = estimated_cost_or_time - return node_to_cost_map, node_to_candidate_map + return node_to_cost_map, node_to_ci_map, node_to_candidate_map @staticmethod def _optimize_by_dp( @@ -631,6 +647,28 @@ def _compute_total_cost( True, pred, plan[pred], node, resources) total_cost += egress_cost return total_cost + + @staticmethod + def _compute_total_carbong_cost( + graph, + topo_order: List[task_lib.Task], + plan: Dict[task_lib.Task, resources_lib.Resources], + ) -> float: + """Estimates the total cost of running the DAG by the plan.""" + total_cost = 0 + for node in topo_order: + resources = plan[node] + if node.time_estimator_func is None: + execution_time = 1 * 3600 + else: + # The execution time of dummy nodes is always 0, + # as they have a time estimator lambda _: 0. + execution_time = node.estimate_runtime(resources) + + cost_per_node = resources.get_carbon_cost(execution_time) + total_cost += cost_per_node * node.num_nodes + + return total_cost @staticmethod def _print_egress_plan(graph, plan, minimize_cost): @@ -677,14 +715,17 @@ def print_optimized_plan( total_time: float, total_cost: float, node_to_cost_map: _TaskToCostMap, + node_to_ci_map: _TaskToCIMap, minimize_cost: bool, ): logger.info('== Optimizer ==') ordered_node_to_cost_map = collections.OrderedDict() + ordered_node_to_ci_map = collections.OrderedDict() ordered_best_plan = collections.OrderedDict() for node in topo_order: if node.name not in (_DUMMY_SOURCE_NAME, _DUMMY_SINK_NAME): ordered_node_to_cost_map[node] = node_to_cost_map[node] + ordered_node_to_ci_map[node] = node_to_ci_map[node] ordered_best_plan[node] = best_plan[node] is_trivial = all(len(v) == 1 for v in node_to_cost_map.values()) @@ -700,10 +741,20 @@ def print_optimized_plan( if (node.time_estimator_func is None and node.get_inputs() is None and node.get_outputs() is None): print_hourly_cost = True + + print_hourly_carbon_cost = False + if len(ordered_node_to_cost_map) == 1: + node = list(ordered_node_to_ci_map.keys())[0] + if (node.time_estimator_func is None and + node.get_inputs() is None and node.get_outputs() is None): + print_hourly_carbon_cost = True + if print_hourly_cost: logger.info(f'{colorama.Style.BRIGHT}Estimated cost: ' f'{colorama.Style.RESET_ALL}${total_cost:.1f} / hour\n') + # logger.info(f'{colorama.Style.BRIGHT}Estimated carbon footprint: ' + # f'{colorama.Style.RESET_ALL}${total_cost:.1f} g CO2e/ hour\n') else: logger.info(f'{colorama.Style.BRIGHT}Estimated total runtime: ' f'{colorama.Style.RESET_ALL}{total_time / 3600:.1f} ' @@ -739,6 +790,7 @@ def format_number(x): region_or_zone = resources.region else: region_or_zone = resources.zone + return [ str(cloud), resources.instance_type + spot, @@ -751,8 +803,9 @@ def format_number(x): # Print the list of resouces that the optimizer considered. resource_fields = [ 'CLOUD', 'INSTANCE', 'vCPUs', 'Mem(GB)', 'ACCELERATORS', - 'REGION/ZONE' + 'REGION/ZONE', 'CARBON FOOTPRINT(g CO2e)' ] + # Do not print Source or Sink. best_plan_rows = [[t, t.num_nodes] + _get_resources_element_list(r) for t, r in ordered_best_plan.items()] @@ -772,6 +825,7 @@ def format_number(x): num_tasks = len(ordered_node_to_cost_map) for task, v in ordered_node_to_cost_map.items(): + ci_task, ci_v = ordered_node_to_ci_map.popitem() task_str = (f'for task_lib.Task {repr(task)!r}' if num_tasks > 1 else '') plural = 's' if task.num_nodes > 1 else '' @@ -783,13 +837,18 @@ def format_number(x): # Only print 1 row per cloud. best_per_cloud: Dict[str, Tuple[resources_lib.Resources, float]] = {} + optimized_result_per_cloud_ci_val: Dict[str, Tuple[resources_lib.Resources, + float]] = {} for resources, cost in v.items(): + ci_resources, ci = ci_v.popitem() cloud = str(resources.cloud) if cloud in best_per_cloud: if cost < best_per_cloud[cloud][1]: best_per_cloud[cloud] = (resources, cost) + optimized_result_per_cloud_ci_val[cloud] = (resources, ci) else: - best_per_cloud[cloud] = (resources, cost) + best_per_cloud[cloud] = (resources, cost) + optimized_result_per_cloud_ci_val[cloud] = (resources, ci) # If the DAG has multiple tasks, the chosen resources may not be # the best resources for the task. @@ -804,7 +863,16 @@ def format_number(x): else: cost_str = f'{cost / 3600:.2f}' - row = [*_get_resources_element_list(resources), cost_str, ''] + cloud_key, ci_cloud_val = optimized_result_per_cloud_ci_val.popitem() + ci_cloud_val_len = len(ci_cloud_val) + if ci_cloud_val_len < 2: + ci_val = -1.0 + else: + ci_val = ci_cloud_val[1] + + ci_str = f'{ci_val:.2f}' + row = [*_get_resources_element_list(resources), ci_str, cost_str, ''] + if resources == best_plan[task]: # Use tick sign for the chosen resources. row[-1] = (colorama.Fore.GREEN + ' ' + u'\u2714' + @@ -852,6 +920,7 @@ def _print_candidates(node_to_candidate_map: _TaskToPerCloudCandidates): def _optimize_objective( dag: 'dag_lib.Dag', minimize_cost: bool = True, + estimate_carbon_intensity: bool = False, blocked_resources: Optional[Iterable[resources_lib.Resources]] = None, quiet: bool = False, ) -> Dict[task_lib.Task, resources_lib.Resources]: @@ -868,9 +937,12 @@ def _optimize_objective( graph = dag.get_graph() topo_order = list(nx.topological_sort(graph)) - node_to_cost_map, node_to_candidate_map = ( - Optimizer._estimate_nodes_cost_or_time(topo_order, minimize_cost, - blocked_resources)) + node_to_cost_map, node_to_ci_map, node_to_candidate_map = \ + Optimizer._estimate_nodes_cost_or_time( + topo_order, + minimize_cost, + estimate_carbon_intensity, + blocked_resources) if dag.is_chain(): best_plan, best_total_objective = Optimizer._optimize_by_dp( @@ -891,7 +963,8 @@ def _optimize_objective( if not quiet: Optimizer.print_optimized_plan(graph, topo_order, best_plan, total_time, total_cost, - node_to_cost_map, minimize_cost) + node_to_cost_map, node_to_ci_map, + minimize_cost) if not env_options.Options.MINIMIZE_LOGGING.get(): Optimizer._print_candidates(node_to_candidate_map) return best_plan @@ -908,6 +981,8 @@ def __repr__(self) -> str: def get_cost(self, seconds): return 0 + def get_carbon_cost(self, seconds): + return 0 class DummyCloud(clouds.Cloud): """A dummy Cloud that has zero egress cost from/to.""" diff --git a/sky/resources.py b/sky/resources.py index 1b70fbb00c51..577ebd1c5d1d 100644 --- a/sky/resources.py +++ b/sky/resources.py @@ -536,8 +536,8 @@ def _set_region_zone(self, region: Optional[str], raise ValueError( 'Cloud must be specified when region/zone are specified.') - # Validate whether region and zone exist in the catalog, and set the - # region if zone is specified. + # Validate whether region and zone exist in the catalog, and set + # the region if zone is specified. self._region, self._zone = self._cloud.validate_region_zone( region, zone) @@ -826,6 +826,15 @@ def get_cost(self, seconds: float) -> float: self.accelerators, self.use_spot, self._region, self._zone) return hourly_cost * hours + def get_carbon_cost(self, seconds: float) -> float: + """Returns cost in grams CO2 footprint for the runtime in seconds.""" + hours = seconds / 3600 + # Instance + accelerators. + carbs = self.cloud.instance_type_to_hourly_carbon_cost( + self._instance_type, self.accelerators, + self._region, self._zone) + return carbs * hours + def make_deploy_variables( self, cluster_name_on_cloud: str, region: clouds.Region, zones: Optional[List[clouds.Zone]]) -> Dict[str, Optional[str]]: