Skip to content

Commit

Permalink
Upgrade MPIJob schema to v1 (#10) (#11)
Browse files Browse the repository at this point in the history
* upgraded mpijob version to v1

* ...

* fixed to yaml

* fix gpus and replicas funcs

* fix - enrich worker pod template

Co-authored-by: sahare92 <[email protected]>
  • Loading branch information
dinal and sahare92 authored Jul 28, 2020
1 parent 8d18c6b commit d3edfcd
Showing 1 changed file with 126 additions and 56 deletions.
182 changes: 126 additions & 56 deletions v3io_gputils/mpijob.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,38 +5,70 @@
from kubernetes import client, config
from kubernetes.client.rest import ApiException

_mpijob_template = {
'apiVersion': 'kubeflow.org/v1alpha1',
'kind': 'MPIJob',
'metadata': {
'name': '',
'namespace': 'default-tenant'
},
'spec': {
'replicas': 1,
'template': {
'spec': {
'containers': [{
'image': 'iguaziodocker/horovod:0.1.1',
'name': '',
'command': [],
'volumeMounts': [{'name': 'v3io', 'mountPath': '/User'}],
'workingDir': '/User',
'securityContext': {
'capabilities': {'add': ['IPC_LOCK']}},
'resources': {
'limits': {'nvidia.com/gpu': 1}}}],
'volumes': [{
'name': 'v3io',
'flexVolume': {
'driver': 'v3io/fuse',
'options': {
'container': 'users',
'subPath': '',
'accessKey': '',
}

class MPIJobPodTemplateType(object):
launcher = 'Launcher'
worker = 'Worker'

@staticmethod
def all():
return [
MPIJobPodTemplateType.launcher,
MPIJobPodTemplateType.worker
]


_mpijob_launcher_pod_template = {
'spec': {
'containers': [
{
'image': 'iguaziodocker/horovod:0.1.1',
'name': '',
'command': [],
'workingDir': '/User',
'securityContext': {
'capabilities': {'add': ['IPC_LOCK']}
}
}
],
'volumes': [{
'name': 'v3io',
'flexVolume': {
'driver': 'v3io/fuse',
'options': {
'container': 'users',
'subPath': '',
'accessKey': '',
}
}}]
},
}

_mpijob_worker_pod_template = {
'spec': {
'containers': [
{
'image': 'iguaziodocker/horovod:0.1.1',
'name': '',
'command': [],
'volumeMounts': [{'name': 'v3io', 'mountPath': '/User'}],
'workingDir': '/User',
'securityContext': {
'capabilities': {'add': ['IPC_LOCK']}},
'resources': {
'limits': {'nvidia.com/gpu': 1}}}],
'volumes': [{
'name': 'v3io',
'flexVolume': {
'driver': 'v3io/fuse',
'options': {
'container': 'users',
'subPath': '',
'accessKey': '',
}
}}]
}}}}
},
}


class MpiJob:
Expand All @@ -53,37 +85,69 @@ class MpiJob:
"""
group = 'kubeflow.org'
version = 'v1alpha1'
version = 'v1'
plural = 'mpijobs'

def __init__(self, name, image=None, command=None,
replicas=1, namespace='default-tenant'):
self.api_instance = None
self.name = name
self.namespace = namespace
self._struct = deepcopy(_mpijob_template)
self._struct['metadata'] = {'name': name, 'namespace': namespace}
self._update_container('name', name)
if image:
self._update_container('image', image)
if command:
self._update_container('command', ['mpirun','python'] + command)
if replicas:
self._struct['spec']['replicas'] = replicas
self._update_access_token(environ.get('V3IO_ACCESS_KEY',''))
self._update_running_user(environ.get('V3IO_USERNAME',''))

def _update_container(self, key, value):
self._struct['spec']['template']['spec']['containers'][0][key] = value

def _update_access_token(self, token):
self._struct['spec']['template']['spec']['volumes'][0]['flexVolume']['options']['accessKey'] = token
self._pod_templates = {
MPIJobPodTemplateType.launcher: deepcopy(_mpijob_launcher_pod_template),
MPIJobPodTemplateType.worker: deepcopy(_mpijob_worker_pod_template)
}

def _update_running_user(self, username):
self._struct['spec']['template']['spec']['volumes'][0]['flexVolume']['options']['subPath'] = '/' + username
self._update_container('name', name, MPIJobPodTemplateType.all())
if image:
self._update_container('image', image, MPIJobPodTemplateType.all())
if command:
self._update_container('command', ['mpirun', 'python'] + command, [MPIJobPodTemplateType.worker])

self._update_access_token(environ.get('V3IO_ACCESS_KEY', ''), MPIJobPodTemplateType.all())
self._update_running_user(environ.get('V3IO_USERNAME', ''), MPIJobPodTemplateType.all())

self._struct = self._generate_mpi_job_template(name, namespace, replicas)

def _generate_mpi_job_template(self, name, namespace, worker_replicas):
return {
'apiVersion': 'kubeflow.org/v1',
'kind': 'MPIJob',
'metadata': {'name': name, 'namespace': namespace},
'spec': {
'slotsPerWorker': 1,
'mpiReplicaSpecs': {
'Launcher': {
'template': self._pod_templates[MPIJobPodTemplateType.launcher]
},
'Worker': {
'replicas': worker_replicas,
'template': self._pod_templates[MPIJobPodTemplateType.worker]
},
},
},
}

def _update_container(self, key, value, template_types):
for template_type in template_types:
self._pod_templates[template_type]['spec']['containers'][0][key] = value

def _update_access_token(self, token, template_types):
for template_type in template_types:
self._pod_templates[template_type]['spec']['volumes'][0]['flexVolume']['options']['accessKey'] = token

def _update_running_user(self, username, template_types):
for template_type in template_types:
self._pod_templates[template_type]['spec']['volumes'][0]['flexVolume']['options']['subPath'] = \
'/' + username

def _update_volumes(self, volumes, template_types):
for template_type in template_types:
self._pod_templates[template_type]['volumes'] = volumes

def volume(self, mount='/User', volpath='~/', access_key=''):
self._update_container('volumeMounts', [{'name': 'v3io', 'mountPath': mount}])
self._update_container('volumeMounts', [{'name': 'v3io', 'mountPath': mount}], MPIJobPodTemplateType.all())

if volpath.startswith('~/'):
v3io_home = environ.get('V3IO_HOME', '')
Expand All @@ -101,26 +165,31 @@ def volume(self, mount='/User', volpath='~/', access_key=''):
}
}}

self._struct['spec']['template']['spec']['volumes'] = [vol]
self._update_volumes([vol], MPIJobPodTemplateType.all())
return self

def gpus(self, num, gpu_type='nvidia.com/gpu'):
self._update_container('resources', {'limits' : {gpu_type: num}})
self._update_container('resources', {'limits': {gpu_type: num}}, [MPIJobPodTemplateType.worker])
return self

def replicas(self, replicas_num):
self._struct['spec']['replicas'] = replicas_num
self._struct['spec']['mpiReplicaSpecs'][MPIJobPodTemplateType.worker]['replicas'] = replicas_num
return self

def working_dir(self, working_dir):
self._update_container('workingDir', working_dir)
self._update_container('workingDir', working_dir, MPIJobPodTemplateType.all())
return self

def to_dict(self):
return self._struct

def to_yaml(self):
return yaml.dump(self.to_dict(), default_flow_style=False, sort_keys=False)

# use safe dumper so yaml.dump will print full objects instead of pointer addresses
noalias_dumper = yaml.dumper.SafeDumper
noalias_dumper.ignore_aliases = lambda _self, data: True

return yaml.dump(self.to_dict(), default_flow_style=False, sort_keys=False, Dumper=noalias_dumper)

def submit(self):
config.load_incluster_config()
Expand All @@ -143,6 +212,7 @@ def delete(self):
except ApiException as e:
print("Exception when calling CustomObjectsApi->delete_namespaced_custom_object: %s\\n" % e)


def split_path(mntpath=''):
if mntpath[0] == '/':
mntpath = mntpath[1:]
Expand Down

0 comments on commit d3edfcd

Please sign in to comment.