Skip to content

Commit

Permalink
Reformat
Browse files Browse the repository at this point in the history
  • Loading branch information
euclidgame committed Nov 15, 2024
1 parent d87497b commit aca3da5
Showing 1 changed file with 26 additions and 25 deletions.
51 changes: 26 additions & 25 deletions sky/optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@

from sky import check as sky_check
from sky import clouds
from sky import dag as dag_lib
from sky import exceptions
from sky import resources as resources_lib
from sky import sky_logging
from sky import task as task_lib
from sky.adaptors import cloudflare
from sky.adaptors import common as adaptors_common
from sky import dag
from sky.data import storage as storage_lib
from sky.utils import env_options
from sky.utils import log_utils
Expand All @@ -30,7 +30,6 @@
if typing.TYPE_CHECKING:
import networkx as nx

from sky import dag as dag_lib
else:
nx = adaptors_common.LazyImport('networkx')

Expand Down Expand Up @@ -63,7 +62,7 @@ def _create_table(field_names: List[str]) -> prettytable.PrettyTable:
return log_utils.create_table(field_names, **table_kwargs)


def _is_dag_resources_ordered(dag: 'dag_lib.Dag') -> bool:
def _is_dag_resources_ordered(dag: dag_lib.Dag) -> bool:
graph = dag.get_graph()
topo_order = list(nx.topological_sort(graph))
for node in topo_order:
Expand Down Expand Up @@ -108,7 +107,7 @@ def _egress_time(src_cloud: clouds.Cloud, dst_cloud: clouds.Cloud,
return egress_time

@staticmethod
def optimize(dag: 'dag_lib.Dag',
def optimize(dag: dag_lib.Dag,
minimize: OptimizeTarget = OptimizeTarget.COST,
blocked_resources: Optional[Iterable[
resources_lib.Resources]] = None,
Expand Down Expand Up @@ -151,9 +150,10 @@ def optimize(dag: 'dag_lib.Dag',

@staticmethod
def _remove_storage_nodes_and_move_to_edge_attributes(
dag: 'dag_lib.Dag',
dag: dag_lib.Dag,
edges_to_add: List[Tuple[task_lib.Task, task_lib.Task, task_lib.Task,
dag.TaskData]]) -> None:
dag_lib.TaskData]]
) -> None:
"""Removes the storage nodes and adds the storage information to the
edges.
Expand Down Expand Up @@ -184,8 +184,9 @@ def _remove_storage_nodes_and_move_to_edge_attributes(

@staticmethod
def _add_storage_nodes_for_data_transfer(
dag: 'dag_lib.Dag'
) -> List[Tuple[task_lib.Task, task_lib.Task, task_lib.Task, dag.TaskData]]:
dag: dag_lib.Dag
) -> List[Tuple[task_lib.Task, task_lib.Task, task_lib.Task,
dag_lib.TaskData]]:
"""Adds special nodes for storage buckets between nodes connected by
with_data edges.
Expand All @@ -203,11 +204,12 @@ def _add_storage_nodes_for_data_transfer(
optimizer. The transfer cost refers to egress cost when optimizing cost
and time to transfer data when optimizing time.
# TODO(wenjie): Implement data transfer time estimation when optimizing time.
# TODO(wenjie): Currently we only calculate the egress cost. Investigate the
# cost for storage in those buckets and take that into account. We could
# reuse the time estimator here.
# TODO(tian): Using a bucket for intermediate storage is a temporary
# TODO(wenjie): Implement data transfer time estimation when optimizing
# time.
# TODO(wenjie): Currently we only calculate the egress cost. Investigate
# the cost for storage in those buckets and take that into account. We
# could reuse the time estimator here.
# TODO(tian): Using a bucket for intermediate storage is a temporary
# olution. This incur extra cost on storage. We should consider
# implement data streaming directly from upstream task to downstream
# task.
Expand All @@ -222,7 +224,7 @@ def _add_storage_nodes_for_data_transfer(
Returns:
List[Tuple[task_lib.Task, task_lib.Task, task_lib.Task,
dag.TaskData]]:
dag_lib.TaskData]]:
- The source node
- The storage node
- The destination node
Expand Down Expand Up @@ -270,21 +272,20 @@ def _add_storage_nodes_for_data_transfer(

for src, storage_node, dst, data in edges_to_add:
dag.remove_edge(src, dst)
dag.add_edge(src, storage_node).with_data(
source_path=data.source_path,
target_path='',
size_gb=data.size_gb)
dag.add_edge(src,
storage_node).with_data(source_path=data.source_path,
target_path='',
size_gb=data.size_gb)
dag.add_edge(storage_node,
dst).with_data(source_path='',
dst).with_data(source_path='',
target_path=data.target_path,
size_gb=data.size_gb)
logger.info(
f'Adding storage node between {src.name} and {dst.name}')
return edges_to_add


@staticmethod
def _add_dummy_source_sink_nodes(dag: 'dag_lib.Dag'):
def _add_dummy_source_sink_nodes(dag: dag_lib.Dag):
"""Adds special Source and Sink nodes.
The two special nodes are for conveniently handling cases such as
Expand Down Expand Up @@ -324,7 +325,7 @@ def make_dummy(name):
real_sink_node >> sink # pylint: disable=pointless-statement

@staticmethod
def _remove_dummy_source_sink_nodes(dag: 'dag_lib.Dag'):
def _remove_dummy_source_sink_nodes(dag: dag_lib.Dag):
"""Removes special Source and Sink nodes."""
source = [t for t in dag.tasks if t.name == _DUMMY_SOURCE_NAME]
sink = [t for t in dag.tasks if t.name == _DUMMY_SINK_NAME]
Expand All @@ -343,7 +344,7 @@ def _get_egress_info(
# TODO(wenjie): Add dummy nodes for external storage when DAG nodes read
# from buckets specified in the YAML file.
src_cloud = parent_resources.cloud
assert isinstance(edge_data['edge'], dag.TaskEdge)
assert isinstance(edge_data['edge'], dag_lib.TaskEdge)
task_edge = edge_data['edge']
n_gigabytes = 0.0
if task_edge.data is not None:
Expand Down Expand Up @@ -1126,7 +1127,7 @@ def _print_candidates(node_to_candidate_map: _TaskToPerCloudCandidates):

@staticmethod
def _optimize_dag(
dag: 'dag_lib.Dag',
dag: dag_lib.Dag,
minimize_cost: bool = True,
blocked_resources: Optional[Iterable[resources_lib.Resources]] = None,
quiet: bool = False,
Expand Down Expand Up @@ -1335,7 +1336,7 @@ def _filter_out_blocked_launchable_resources(
return available_resources


def _check_specified_clouds(dag: 'dag_lib.Dag') -> None:
def _check_specified_clouds(dag: dag_lib.Dag) -> None:
"""Check if specified clouds are enabled in cache and refresh if needed.
Our enabled cloud list is cached in a local database, and if a user
Expand Down

0 comments on commit aca3da5

Please sign in to comment.