Skip to content

Commit

Permalink
Updated the script to permanently avoid the race conditions
Browse files Browse the repository at this point in the history
  • Loading branch information
iampradiptaghosh committed Mar 9, 2018
1 parent b497a8e commit 115a198
Show file tree
Hide file tree
Showing 9 changed files with 227 additions and 106 deletions.
7 changes: 3 additions & 4 deletions nodes.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
home ubuntu-2gb-ams2-04 root PASSWORD
node1 ubuntu-2gb-sfo1-05 root PASSWORD
node2 ubuntu-s-1vcpu-3gb-lon1-01 root PASSWORD
node3 ubuntu-2gb-sfo1-03 root PASSWORD
node4 ubuntu-2gb-ams2-03 root PASSWORD
Expand Down Expand Up @@ -39,7 +40,7 @@ node38 ubuntu-s-1vcpu-3gb-lon1-02 root PASSWORD
node39 ubuntu-s-1vcpu-3gb-lon1-03 root PASSWORD
node40 ubuntu-s-1vcpu-3gb-lon1-04 root PASSWORD
node41 ubuntu-s-1vcpu-3gb-lon1-05 root PASSWORD
node42 ubuntu-s-1vcpu-3gb-lon1-06 root PASSWORD
node42 ubuntu-2gb-sfo1-04 root PASSWORD
node43 ubuntu-s-1vcpu-3gb-lon1-07 root PASSWORD
node44 ubuntu-s-1vcpu-3gb-lon1-08 root PASSWORD
node45 ubuntu-s-1vcpu-3gb-lon1-09 root PASSWORD
Expand Down Expand Up @@ -85,6 +86,4 @@ node84 ubuntu-s-1vcpu-3gb-fra1-06 root PASSWORD
node85 ubuntu-s-1vcpu-3gb-fra1-07 root PASSWORD
node86 ubuntu-s-1vcpu-3gb-fra1-08 root PASSWORD
node87 ubuntu-s-1vcpu-3gb-fra1-09 root PASSWORD
node88 ubuntu-2gb-sfo1-03 root PASSWORD
node89 ubuntu-2gb-sfo1-04 root PASSWORD
node90 ubuntu-2gb-sfo1-05 root PASSWORD
node88 ubuntu-2gb-sfo1-03 root PASSWORD
3 changes: 3 additions & 0 deletions scripts/delete_all_circe_deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,3 +150,6 @@ def delete_all_circe_deployments():
if resp:
del_resp_2 = core_v1_api.delete_namespaced_service('home', namespace)
print("Service Deleted. status='%s'" % str(del_resp_2.status))

if __name__ == '__main__':
delete_all_circe_deployments()
5 changes: 4 additions & 1 deletion scripts/delete_all_profilers.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,7 @@ def delete_all_profilers():
del_resp_2 = api_2.delete_namespaced_service(key, namespace)
print("Service Deleted. status='%s'" % str(del_resp_2.status))

# At this point you should not have any of the profiler related service, pod, or deployment running
# At this point you should not have any of the profiler related service, pod, or deployment running

if __name__ == '__main__':
delete_all_profilers()
5 changes: 4 additions & 1 deletion scripts/delete_all_waves.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,7 @@ def delete_all_waves():
del_resp_2 = api_2.delete_namespaced_service(key, namespace)
print("Service Deleted. status='%s'" % str(del_resp_2.status))

# At this point you should not have any of the profiler related service, pod, or deployment running
# At this point you should not have any of the profiler related service, pod, or deployment running

if __name__ == '__main__':
delete_all_waves()
53 changes: 53 additions & 0 deletions scripts/k8s_circe_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,55 @@
from kubernetes import client, config
from pprint import *

"""
This function prints out all the tasks that are not running.
If all the tasks are running: return True; else return False.
"""
def check_status_circe(dag):


"""
This loads the kubernetes instance configuration.
In our case this is stored in admin.conf.
You should set the config file path in the jupiter_config.py file.
"""
config.load_kube_config(config_file = jupiter_config.KUBECONFIG_PATH)
namespace = jupiter_config.DEPLOYMENT_NAMESPACE


# We have defined the namespace for deployments in jupiter_config

# Get proper handles or pointers to the k8-python tool to call different functions.
extensions_v1_beta1_api = client.ExtensionsV1beta1Api()
v1_delete_options = client.V1DeleteOptions()
core_v1_api = client.CoreV1Api()

result = True
for key, value in dag.items():
# First check if there is a deployment existing with
# the name = key in the respective namespac # Check if there is a replicaset running by using the label app={key}
# The label of kubernets are used to identify replicaset associate to each task
label = "app=" + key

resp = None

resp = core_v1_api.list_namespaced_pod(namespace, label_selector = label)
# if a pod is running just delete it
if resp.items:
a=resp.items[0]
if a.status.phase != "Running":
print("Pod Not Running", key)
result = False

# print("Pod Deleted. status='%s'" % str(del_resp_2.status))

if result:
print("All systems GOOOOO!!")
else:
print("Wait before trying again!!!!")

return result

# if __name__ == '__main__':
def k8s_circe_scheduler(dag_info , temp_info):

Expand Down Expand Up @@ -180,6 +229,10 @@ def k8s_circe_scheduler(dag_info , temp_info):
resp = k8s_beta.create_namespaced_deployment(body = dep, namespace = namespace)
print("Deployment created. status = '%s'" % str(resp.status))

while 1:
if check_status_circe(dag):
break
time.sleep(30)

home_dep = write_home_specs(image = jupiter_config.HOME_IMAGE,
host = jupiter_config.HOME_NODE,
Expand Down
56 changes: 1 addition & 55 deletions scripts/k8s_jupiter_deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,65 +26,11 @@
import requests
import json
from pprint import *
from utilities import *


static_mapping = False

"""
read the dag from the file input
"""
def k8s_read_dag(dag_info_file):

dag_info=[]
config_file = open(dag_info_file,'r')
dag_size = int(config_file.readline())

dag={}
for i, line in enumerate(config_file, 1):
dag_line = line.strip().split(" ")
if i == 1:
dag_info.append(dag_line[0])
dag.setdefault(dag_line[0], [])
for j in range(1,len(dag_line)):
dag[dag_line[0]].append(dag_line[j])
if i == dag_size:
break

dag_info.append(dag)
return dag_info


def k8s_get_nodes(node_info_file):

nodes = {}
node_file = open(node_info_file, "r")
for line in node_file:
node_line = line.strip().split(" ")
nodes.setdefault(node_line[0], [])
for i in range(1, len(node_line)):
nodes[node_line[0]].append(node_line[i])
return nodes


def k8s_get_hosts(dag_info_file, node_info_file, mapping):

dag_info = k8s_read_dag(dag_info_file)
nodes = k8s_get_nodes(node_info_file)

hosts={}

for i in mapping:
#get task, node IP, username and password
hosts.setdefault(i,[])
hosts[i].append(i) # task
hosts[i].extend(nodes.get(mapping[i])) # assigned node id

hosts.setdefault('home',[])
hosts['home'].append('home')
hosts['home'].extend(nodes.get('home'))
dag_info.append(hosts)
return dag_info


if __name__ == '__main__':

Expand Down
57 changes: 55 additions & 2 deletions scripts/k8s_profiler_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,56 @@
import os
import jupiter_config

def check_status_profilers():


path1 = jupiter_config.HERE + 'nodes.txt'
nodes = read_node_list(path1)

"""
This loads the kubernetes instance configuration.
In our case this is stored in admin.conf.
You should set the config file path in the jupiter_config.py file.
"""
config.load_kube_config(config_file = jupiter_config.KUBECONFIG_PATH)
namespace = jupiter_config.PROFILER_NAMESPACE


# We have defined the namespace for deployments in jupiter_config

# Get proper handles or pointers to the k8-python tool to call different functions.
extensions_v1_beta1_api = client.ExtensionsV1beta1Api()
v1_delete_options = client.V1DeleteOptions()
core_v1_api = client.CoreV1Api()

result = True
for key in nodes:

# First check if there is a deployment existing with
# the name = key in the respective namespac # Check if there is a replicaset running by using the label app={key}
# The label of kubernets are used to identify replicaset associate to each task
label = "app=" + key + "profiler"

resp = None

resp = core_v1_api.list_namespaced_pod(namespace, label_selector = label)
# if a pod is running just delete it
if resp.items:
a=resp.items[0]
if a.status.phase != "Running":
print("Pod Not Running", key)
result = False

# print("Pod Deleted. status='%s'" % str(del_resp_2.status))

if result:
print("All systems GOOOOO!!")
else:
print("Wait before trying again!!!!")

return result


# if __name__ == '__main__':
def k8s_profiler_scheduler():
"""
This loads the task graph and node list
Expand Down Expand Up @@ -145,7 +192,11 @@ def k8s_profiler_scheduler():


# have to somehow make sure that the worker nodes are on and working by this time
time.sleep(30)
while 1:
if check_status_profilers():
break
time.sleep(30)

home_dep = write_profiler_specs(name = 'home', label = "homeprofiler",
image = jupiter_config.PROFILER_HOME_IMAGE,
host = jupiter_config.HOME_NODE,
Expand All @@ -158,3 +209,5 @@ def k8s_profiler_scheduler():
pprint(service_ips)
return(service_ips)

if __name__ == '__main__':
k8s_profiler_scheduler()
63 changes: 62 additions & 1 deletion scripts/k8s_wave_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,61 @@
from pprint import *
import os
import jupiter_config
from k8s_get_service_ips import *

"""
This function prints out all the waves that are not running.
If all the waves are running: return True; else return False.
"""
def check_status_waves():


path1 = jupiter_config.HERE + 'nodes.txt'
nodes = read_node_list(path1)

"""
This loads the kubernetes instance configuration.
In our case this is stored in admin.conf.
You should set the config file path in the jupiter_config.py file.
"""
config.load_kube_config(config_file = jupiter_config.KUBECONFIG_PATH)
namespace = jupiter_config.WAVE_NAMESPACE


# We have defined the namespace for deployments in jupiter_config

# Get proper handles or pointers to the k8-python tool to call different functions.
extensions_v1_beta1_api = client.ExtensionsV1beta1Api()
v1_delete_options = client.V1DeleteOptions()
core_v1_api = client.CoreV1Api()

result = True
for key in nodes:

# First check if there is a deployment existing with
# the name = key in the respective namespac # Check if there is a replicaset running by using the label app={key}
# The label of kubernets are used to identify replicaset associate to each task
label = "app=wave_" + key

resp = None

resp = core_v1_api.list_namespaced_pod(namespace, label_selector = label)
# if a pod is running just delete it
if resp.items:
a=resp.items[0]
if a.status.phase != "Running":
print("Pod Not Running", key)
result = False

# print("Pod Deleted. status='%s'" % str(del_resp_2.status))

if result:
print("All systems GOOOOO!!")
else:
print("Wait before trying again!!!!")

return result

# if __name__ == '__main__':
def k8s_wave_scheduler(profiler_ips):
"""
Expand Down Expand Up @@ -135,7 +187,11 @@ def k8s_wave_scheduler(profiler_ips):


# TODO: have to make sure that the worker nodes are on and working by this time
time.sleep(60)
while 1:
if check_status_waves():
break
time.sleep(30)

home_dep = write_wave_specs(name = 'home', label = "wave_home",
image = jupiter_config.WAVE_HOME_IMAGE,
host = jupiter_config.HOME_NODE, all_node = nexthost_names,
Expand All @@ -148,3 +204,8 @@ def k8s_wave_scheduler(profiler_ips):
print("Home deployment created. status = '%s'" % str(resp.status))

pprint(service_ips)


if __name__ == '__main__':
profiler_ips = get_all_profilers()
k8s_wave_scheduler(profiler_ips)
Loading

0 comments on commit 115a198

Please sign in to comment.