Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integration #54

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
42 changes: 32 additions & 10 deletions autodist/autodist.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from tensorflow.python.ops import array_ops
from tensorflow.python.util import tf_contextlib

from autodist.cluster import Cluster, SSHCluster
from autodist.cluster import Cluster, SSHCluster, ADAPTDLCluster
from autodist.const import ENV
from autodist.coordinator import Coordinator
from autodist.graph_item import GraphItem
Expand All @@ -39,7 +39,8 @@

IS_AUTODIST_WORKER = bool(ENV.AUTODIST_WORKER.val)
IS_AUTODIST_CHIEF = not IS_AUTODIST_WORKER

IS_ADAPTDL = bool(ENV.ADAPTDL.val)
logging.info(f"is chief: {IS_AUTODIST_CHIEF}, is from adaptdl: {IS_ADAPTDL}")
_DEFAULT_AUTODIST = {}


Expand Down Expand Up @@ -74,7 +75,10 @@ def __init__(self, resource_spec_file, strategy_builder=None):
self._remapper = None
self._built = None # Ref to the built GraphDef

self._cluster: Cluster = SSHCluster(self._resource_spec) # which can be also defined with strategy
if IS_ADAPTDL:
self._cluster: Cluster = ADAPTDLCluster(self._resource_spec)
else:
self._cluster: Cluster = SSHCluster(self._resource_spec) # which can be also defined with strategy
self._coordinator: Coordinator

@tf_contextlib.contextmanager
Expand All @@ -97,12 +101,18 @@ def build_strategy(self):
"""
return self._strategy_builder.build(self._original_graph_item, self._resource_spec)

def _build_or_load_strategy(self):
def _build_or_load_strategy(self, load=False):
self._original_graph_item.prepare()
if IS_AUTODIST_CHIEF:
s = self.build_strategy()
s.serialize()
else:
# At AdaptDL mode, when the worker pass through this before
# the chief has created the strategy, this should returns
# nothing. Later, when the chief has created the strategy,
# it can load it.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still not quite sure about the purpose of load and what this comment means. In L162-L163 load is always true when IS_ADAPTDL is true. Could you explain more?

Copy link
Contributor Author

@DachengLi1 DachengLi1 Jan 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is kind of subtle. Previously Autodist chief run first and generate the strategy; it will spawn worker instances after it builds the strategy, setup the cluster, etc. Now every instance will run through _build, and thus call _build_or_load_strategy. The first time the worker gets None from this function. The second time the worker will get the strategy from the chief. This is because kubernetes launch instances parallelly. The second time when the worker call the load, it is guaranteed that the chief has already generates it because there are several collective calls in between, which is blocking.

if IS_ADAPTDL and not load:
DachengLi1 marked this conversation as resolved.
Show resolved Hide resolved
return None
strategy_id = ENV.AUTODIST_STRATEGY_ID.val
assert strategy_id
s = base.Strategy.deserialize(strategy_id)
Expand All @@ -119,12 +129,22 @@ def _compile_strategy(self, strategy):

def _setup(self, strategy):
"""Prepare for the execution."""
if IS_AUTODIST_CHIEF:
# we should only have one single coordinator for one single AutoDist() instance scope,
# even though we could have multiple strategies.
self._coordinator = Coordinator(strategy=strategy, cluster=self._cluster)
self._cluster.start()
self._coordinator.launch_clients()
if not IS_ADAPTDL:
if IS_AUTODIST_CHIEF:
# we should only have one single coordinator for one single AutoDist() instance scope,
# even though we could have multiple strategies.
self._coordinator = Coordinator(strategy=strategy, cluster=self._cluster)
self._cluster.start()
self._coordinator.launch_clients()
else:
if IS_AUTODIST_CHIEF:
self._coordinator = Coordinator(strategy=strategy, cluster=self._cluster)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better if we create different Coordinator classes based on the cluster mode?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion. I tried similar format like you suggest. But I think the current version is more readable in autodist.py though more lengthy. Its easy to maintain this way since autodist.py is the first file to look at.

self._cluster.start_chief()
self._coordinator.launch_clients_chief()
else:
self._coordinator = Coordinator(strategy=strategy, cluster=self._cluster)
self._cluster.start_worker()
self._coordinator.launch_clients_worker()
logging.info('Current PID {} belongs to address {}'.format(os.getpid(), self._cluster.get_local_address()))


Expand All @@ -139,6 +159,8 @@ def _initialize_graph(self):
def _build(self):
strategy = self._build_or_load_strategy()
self._setup(strategy) # Put it before transforming to allow multiple works to transform concurrently
if IS_ADAPTDL:
strategy = self._build_or_load_strategy(load=True)
compiled_strategy = self._compile_strategy(strategy)
graph_transformer = GraphTransformer(
compiled_strategy=compiled_strategy,
Expand Down
193 changes: 184 additions & 9 deletions autodist/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,15 @@
from abc import ABCMeta, abstractmethod

import paramiko

from autodist.const import DEFAULT_PORT_RANGE, DEFAULT_WORKING_DIR, ENV
from autodist.resource_spec import ResourceSpec
from autodist.utils import logging

warnings.filterwarnings(action='ignore', module=paramiko.__name__)
IS_ADAPTDL = bool(ENV.ADAPTDL.val)
if IS_ADAPTDL:
import adaptdl.collective as collective
import socket


class Cluster(metaclass=ABCMeta):
Expand Down Expand Up @@ -135,6 +138,11 @@ def get_local_address(self):
Returns:
str: Worker ip or chief address by default.
"""
if IS_ADAPTDL:
hostname = socket.gethostname()
local_ip = socket.gethostbyname(hostname)
return local_ip
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since there is already a class named ADAPTDLCluster inherited from Cluster, is it necessary to insert ADAPTDL related code in the base class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out. I have updated it to the ADAPTDLCluster Class. Thanks!


return ENV.AUTODIST_WORKER.val or self._chief

def get_local_worker_task_index(self):
Expand All @@ -144,6 +152,13 @@ def get_local_worker_task_index(self):
Returns:
int: Task index
"""
if IS_ADAPTDL:
logging.info(f"full address {self._full_addresses}")
logging.info(f"local address {self.get_local_address()}")
return_ = [i for i, a in enumerate(self._full_addresses) if self.get_local_address() in a][0]
logging.info(f"returning {return_}")
return return_

return [i for i, a in enumerate(self._full_addresses) if self.get_local_address() in a][0]

def get_local_session_target(self):
Expand Down Expand Up @@ -200,7 +215,7 @@ def start(self):
full_address, job_name, task_index
))
else: # remote
self.remote_pre_start_tf_server(address, tf_server_starter_filepath=module_file)
self.remote_pre_start_tf_server(address, tf_server_starter_filepath=module_file, chief=False)
file = os.path.join(DEFAULT_WORKING_DIR, os.path.basename(module_file))
bash = envs + envs_cuda + ['python', '-u', file] + args
logging.info("Launching tf.server on %s" % address)
Expand All @@ -209,28 +224,102 @@ def start(self):
# to ensure no gap for termination failure due to the empty proc list.
self.subprocesses.append(proc)

# pylint: disable=too-many-locals
def start_chief(self):
"""Start tf.servers on all nodes. AdaptDL version. Run on chief."""
# pylint: disable=import-outside-toplevel
from autodist.utils import server_starter
envs = {ENV.AUTODIST_MIN_LOG_LEVEL.name: 'ERROR'}
envs = ['{}={}'.format(k, v) for k, v in envs.items()]
module_name = server_starter.__name__
module_file = server_starter.__file__
for job_name, tasks in self.cluster_spec.items():
for task_index, full_address in enumerate(tasks):
address = full_address.split(':')[0]
args = ['--job_name=%s' % job_name, '--task_index=%d' % task_index,
'--cpu_device_num=%d' % len(self._cpu_devices[address])]
if address in self._gpu_devices:
envs_cuda = []
else:
envs_cuda = ['CUDA_VISIBLE_DEVICES=""']
if self.is_chief(address):
json.dump(self.cluster_spec, open(os.path.join(DEFAULT_WORKING_DIR, 'cluster_spec.json'), 'w+'))
cmd = envs + envs_cuda + [sys.executable, '-m', module_name] + args
# pylint: disable=subprocess-popen-preexec-fn
logging.info("cmd at chief: %s", cmd)
proc = subprocess.Popen(' '.join(cmd), shell=True, preexec_fn=os.setsid)
self.subprocesses.append(proc)
logging.debug('$ local tf.server started at {}: job_name={} task_index={}'.format(
full_address, job_name, task_index
))
self.remote_pre_start_tf_server(None, tf_server_starter_filepath=module_file, chief=True)

# pylint: disable=too-many-locals
def start_worker(self):
"""Start tf.servers on all nodes. AdaptDL version. Run on non-chief."""
# pylint: disable=import-outside-toplevel
from autodist.utils import server_starter
envs = {ENV.AUTODIST_MIN_LOG_LEVEL.name: 'ERROR'}
envs = ['{}={}'.format(k, v) for k, v in envs.items()]
module_name = server_starter.__name__
module_file = server_starter.__file__
for job_name, tasks in self.cluster_spec.items():
for task_index, full_address in enumerate(tasks):
address = full_address.split(':')[0]
hostname = socket.gethostname()
local_ip = socket.gethostbyname(hostname)
if local_ip != address:
continue
args = ['--job_name=%s' % job_name, '--task_index=%d' % task_index,
'--cpu_device_num=%d' % len(self._cpu_devices[address])]
if address in self._gpu_devices:
envs_cuda = []
else:
envs_cuda = ['CUDA_VISIBLE_DEVICES=""']
assert not self.is_chief(address)
self.remote_pre_start_tf_server(address, tf_server_starter_filepath=module_file, chief=False)

cmd = envs + envs_cuda + [sys.executable, '-m', module_name] + args
logging.info("Launching tf.server on %s" % address)
proc = self.local_exec(cmd, address)
self.subprocesses.append(proc)
assert len(self.subprocesses) <= 1

def terminate(self):
"""Terminate."""
logging.debug('Terminating cluster...')
for p in self.subprocesses:
os.killpg(os.getpgid(p.pid), signal.SIGTERM)

def remote_pre_start_tf_server(self, hostname, tf_server_starter_filepath, working_dir=DEFAULT_WORKING_DIR):
def remote_pre_start_tf_server(self, hostname, tf_server_starter_filepath, chief, working_dir=DEFAULT_WORKING_DIR):
"""
Prepare to start a TensorFlow server remotely.

Args:
hostname (str): host name or address
tf_server_starter_filepath (str): local starter file path
chief (bool): indicator that this process is on chief or not. Only apply with adaptDL.
working_dir (str): remote working directory
"""
logging.info("Copying necessary files to %s" % hostname)
self.remote_copy(local_path=tf_server_starter_filepath, remote_path=working_dir, hostname=hostname)
self.remote_file_write(
remote_path=os.path.join(working_dir, 'cluster_spec.json'),
data=json.dumps(self.cluster_spec),
hostname=hostname,
)
if IS_ADAPTDL:
# pylint: disable=unexpected-keyword-arg
self.remote_copy(local_path=tf_server_starter_filepath, remote_path=working_dir,
hostname=hostname, chief=chief)
self.remote_file_write(
remote_path=os.path.join(working_dir, 'cluster_spec.json'),
data=json.dumps(self.cluster_spec),
hostname=hostname,
chief=chief
)
else:
assert chief is False
self.remote_copy(local_path=tf_server_starter_filepath, remote_path=working_dir, hostname=hostname)
self.remote_file_write(
remote_path=os.path.join(working_dir, 'cluster_spec.json'),
data=json.dumps(self.cluster_spec),
hostname=hostname
)

@abstractmethod
def remote_exec(self, args, hostname):
Expand Down Expand Up @@ -291,6 +380,7 @@ def _get_ssh_client(self, hostname):
client.load_system_host_keys()
client.set_missing_host_key_policy(paramiko.WarningPolicy)
client.connect(hostname=hostname, port=ssh_config.port, username=ssh_config.username, pkey=ssh_config.pkey)
assert not IS_ADAPTDL
yield client
client.close()

Expand Down Expand Up @@ -372,3 +462,88 @@ def remote_copy(self, local_path, remote_path, hostname):

with self._get_sftp_client(hostname) as sftp:
sftp.put(localpath=local_path, remotepath=os.path.join(remote_path, os.path.basename(local_path)))


class ADAPTDLCluster(Cluster):
"""An AutoDist Cluster Based on AdaptDL."""

def __init__(self, resource_spec):
assert IS_ADAPTDL
super().__init__(resource_spec)

def remote_exec(self, args, hostname):
"""
Execute a bash script remotely. disabled in AdaptDL.

Args:
args (list): bash commands
hostname (str): host name or address

Returns:
None
"""
return

@staticmethod
def local_exec(args, hostname):
"""
Execute a bash script locally.

Args:
args (list): bash commands
hostname (str): host name or address

Returns:
Process: process handle
"""
full_cmd = ' '.join(args)
logging.info(full_cmd)
if ENV.AUTODIST_DEBUG_REMOTE.val:
return None
# pylint: disable=subprocess-popen-preexec-fn
proc = subprocess.Popen(full_cmd, shell=True, preexec_fn=os.setsid)
return proc

def remote_file_write(self, remote_path, data, hostname, **kwargs):
"""
Write a remote file.

Args:
remote_path (str): remote file path
data (str): data to be written
hostname (str): host name or address
chief (boolean): whether this is autodist chief
"""
if kwargs["chief"]:
_ = collective.broadcast(data)
else:
data_ = collective.broadcast(None)
f = open(remote_path, "w")
f.write(data_)
f.close()

def remote_copy(self, local_path, remote_path, hostname, **kwargs):
"""
Copy a file to a remote directory.

Args:
local_path (str): local file path to be copied
remote_path (str): remote directory path
hostname (str): host name or address
chief (boolean): whether this is autodist chief
"""
# Make sure directory exists

if kwargs["chief"]:
f = open(local_path, "r")
lines = f.readlines()
_ = collective.broadcast(lines)
f.close()
else:
lines = collective.broadcast(None)
if not os.path.isdir(remote_path):
os.mkdir(remote_path)
f = open(os.path.join(remote_path, os.path.basename(local_path)), "w")
for line in lines:
f.write(line)
f.close()
25 changes: 23 additions & 2 deletions autodist/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from enum import Enum, auto

import os

# Below consts can be modified if necessary.
# Note that if one of these consts requires frequent modification,
# it should probably be moved into `ENV`.
Expand Down Expand Up @@ -80,10 +79,32 @@ class ENV(Enum):
AUTODIST_INTERNAL_TF = auto(), lambda v: (v or "False") == "True" # noqa: E731
SYS_DATA_PATH = auto(), lambda v: v or "" # noqa: E731
SYS_RESOURCE_PATH = auto(), lambda v: v or "" # noqa: E731
ADAPTDL = auto(), lambda v: v or "" # noqa: E731

@property
def val(self):
"""Return the output of the lambda on the system's value in the environment."""
# pylint: disable=invalid-envvar-value, unpacking-non-sequence
# pylint: disable=invalid-envvar-value, unpacking-non-sequence, comparison-with-callable
if self.name == "AUTODIST_WORKER" and self.ADAPTDL.val:
return self.val_autodist_worker()
_, default_fn = self.value
return default_fn(os.getenv(self.name))

# pylint: disable=import-outside-toplevel
@staticmethod
def val_autodist_worker():
"""Evaluate autodist_worker in AdaptDL."""
import adaptdl.env as env
import socket
f = open(os.path.join(env.share_path(), "resource_spec.yml"))
lines = f.readlines()
line_chief = lines[1]
chief_addr = line_chief.split(":")[1].strip()
hostname = socket.gethostname()
local_ip = socket.gethostbyname(hostname)
f.close()

if chief_addr != local_ip:
return local_ip
else:
return ""
Loading