From 9c4d3e26c1152510adf8661608e0f27320a6a8bb Mon Sep 17 00:00:00 2001 From: engshahrad Date: Wed, 25 Oct 2023 03:18:51 +0000 Subject: [PATCH 1/4] style check for python tests --- tests/solver/rps_cost_test.py | 335 ++++++++++++++++------------- tests/solver/rps_latency_test.py | 226 +++++++++++-------- tests/solver/testcase_generator.py | 125 ++++++----- 3 files changed, 397 insertions(+), 289 deletions(-) diff --git a/tests/solver/rps_cost_test.py b/tests/solver/rps_cost_test.py index 12e9db2..5d06150 100644 --- a/tests/solver/rps_cost_test.py +++ b/tests/solver/rps_cost_test.py @@ -5,47 +5,53 @@ import pandas as pd import random import time -import numpy as np from rpsMultiVMSolver import rpsOffloadingSolver + class TestCaseGenerator: def __init__(self, n_funcs, n_hosts): self.n_funcs = n_funcs self.n_hosts = n_hosts - # #random.seed(0) + # random.seed(0) def build(self): log_data = [] for i in range(self.n_funcs): for id in range(10): - log_data.append({ - 'function': f'Func_{i}', - 'reqID': id, - 'start': "", - 'finish': "", - 'mergingPoint': "", - 'host': "s", - 'duration': 100 - }) + log_data.append( + { + "function": f"Func_{i}", + "reqID": id, + "start": "", + "finish": "", + "mergingPoint": "", + "host": "s", + "duration": 100, + } + ) for j in range(self.n_hosts): for i in range(self.n_funcs): for id in range(10): - log_data.append({ - 'function': f'Func_{i}', - 'reqID': id, - 'start': "", - 'finish': "", - 'mergingPoint': "", - 'host': f"vm{j}", - 'duration': 100 - }) + log_data.append( + { + "function": f"Func_{i}", + "reqID": id, + "start": "", + "finish": "", + "mergingPoint": "", + "host": f"vm{j}", + "duration": 100, + } + ) df = pd.DataFrame(log_data) - df.to_csv('../log-parser/get-workflow-logs/data/TestCaseNWorkflow/generatedDataFrame.csv') - #df.to_csv('/home/pjavan/unfaasener/tests/logCollector/TestCaseNWorkflow/generatedDataFrame.csv') - + df.to_csv( + "../log-parser/get-workflow-logs/data/TestCaseNWorkflow/generatedDataFrame.csv" + ) + # df.to_csv('/home/pjavan/unfaasener/tests/logCollector/TestCaseNWorkflow/generatedDataFrame.csv') + costs_dict = {} pubSub_dict = {} slack_data = {} @@ -54,9 +60,9 @@ def build(self): predecessors = [] workflowFunctions = [] last_dec = [] - + for i in range(self.n_funcs): - func_name = f'Func_{i}' + func_name = f"Func_{i}" workflowFunctions.append(func_name) # if i == 1: # costs_dict[func_name] = {"best-case": 10000, "worst-case": 10000, "default": 10000} @@ -66,24 +72,39 @@ def build(self): # costs_dict[func_name] = {"best-case": 10000, "worst-case": 10000, "default": 10000} # elif i == 4: # costs_dict[func_name] = {"best-case": 10000, "worst-case": 10000, "default": 10000} - # else: + # else: # costs_dict[func_name] = {"best-case": 4.6399999999999997e-07, "worst-case": 4.6399999999999997e-07, "default": 4.6399999999999997e-07} - - costs_dict[func_name] = {"best-case": 4.6399999999999997e-07, "worst-case": 4.6399999999999997e-07, "default": 4.6399999999999997e-07} - + + costs_dict[func_name] = { + "best-case": 4.6399999999999997e-07, + "worst-case": 4.6399999999999997e-07, + "default": 4.6399999999999997e-07, + } + pubSub_dict[func_name] = 100000000000 - slack_data[func_name] = {"best-case": 0.0, "worst-case": 0.0, "default": 0.0} - if i < self.n_funcs-1: - slack_data[f'Func_{i}-Func_{i+1}'] = {"best-case": 0.0, "worst-case": 0.0, "default": 0.0} - successors.append([f'Func_{i+1}']) - else: successors.append([]) + slack_data[func_name] = { + "best-case": 0.0, + "worst-case": 0.0, + "default": 0.0, + } + if i < self.n_funcs - 1: + slack_data[f"Func_{i}-Func_{i+1}"] = { + "best-case": 0.0, + "worst-case": 0.0, + "default": 0.0, + } + successors.append([f"Func_{i+1}"]) + else: + successors.append([]) - if i > 0: predecessors.append([f'Func_{i-1}']) - else: predecessors.append([]) + if i > 0: + predecessors.append([f"Func_{i-1}"]) + else: + predecessors.append([]) if random.randint(1, 100) > 50: dec = [] - + for vm in range(self.n_hosts): if random.randint(1, 100) > 20: dec.append(1.0) @@ -92,34 +113,29 @@ def build(self): last_dec.append(dec) else: - last_dec.append([0.0]*self.n_hosts) - - + last_dec.append([0.0] * self.n_hosts) workflow_dict = { - "workflow": "TestCaseNWorkflow", - "workflowFunctions": workflowFunctions, - "initFunc": "Func_0", - "successors": successors, - "predecessors": predecessors, - "memory": [1/self.n_funcs]*self.n_funcs, + "workflow": "TestCaseNWorkflow", + "workflowFunctions": workflowFunctions, + "initFunc": "Func_0", + "successors": successors, + "predecessors": predecessors, + "memory": [1 / self.n_funcs] * self.n_funcs, "lastDecision_default": last_dec, "lastDecision_best-case": last_dec, "lastDecision_worst-case": last_dec, - "topics": [""] + ["dag-Profanity"]*(self.n_funcs-1) + "topics": [""] + ["dag-Profanity"] * (self.n_funcs - 1), } # print(f'LAST DEC -> {last_dec}') jsonPath = ( - "../log-parser/get-workflow-logs/data/" - + "TestCaseNWorkflow" - + ".json" + "../log-parser/get-workflow-logs/data/" + "TestCaseNWorkflow" + ".json" ) with open(jsonPath, "w") as json_file: json.dump(workflow_dict, json_file) + base_dir = "./data/TestCaseNWorkflow" - base_dir = './data/TestCaseNWorkflow' - with open(f"{base_dir}/Costs.json", "w+") as outfile: json.dump(costs_dict, outfile) @@ -133,8 +149,10 @@ def build(self): json.dump(slack_data, outfile) def delete(self): - base_dir = './data/TestCaseNWorkflow' - os.system('rm ../log-parser/get-workflow-logs/data/TestCaseNWorkflow/generatedDataFrame.csv') + base_dir = "./data/TestCaseNWorkflow" + os.system( + "rm ../log-parser/get-workflow-logs/data/TestCaseNWorkflow/generatedDataFrame.csv" + ) os.system(f"rm {base_dir}/Costs.json") os.system(f"rm {base_dir}/pubSubSize.json") os.system(f"rm {base_dir}/slackData.json") @@ -149,7 +167,6 @@ def delete(self): class TestSolver(unittest.TestCase): - workflow = "Text2SpeechCensoringWorkflow" mode = "cost" rps = 10 @@ -184,7 +201,7 @@ def test_similar2prevdecision(self): ) end = time.time() print(f"Julia Time: {end-start}") - print(f'SOLVED -> {x}') + print(f"SOLVED -> {x}") self.assertEqual(x, [[0.0], [0.0], [0.0], [0.0]]) def test_highPubsubCost(self): @@ -252,8 +269,8 @@ def test_highPubsubCost(self): "w", ) as outfile: json.dump(prevPubSubSize, outfile) - - print(f'SOLVED -> {x}') + + print(f"SOLVED -> {x}") self.assertEqual(x, [[0.0], [100.0], [100.0], [100.0]]) def test_highCost(self): @@ -323,11 +340,18 @@ def test_highCost(self): "w", ) as outfile: json.dump(prevCosts, outfile) - print(f'SOLVED -> {x}') - cost_julia = solver.calcLatencyCost(alpha, solver.offloadingCandidates, availResources, x) - cost_exp = solver.calcLatencyCost(alpha, solver.offloadingCandidates, availResources, [[0.0], [100.0], [100.0], [100.0]]) + print(f"SOLVED -> {x}") + cost_julia = solver.calcLatencyCost( + alpha, solver.offloadingCandidates, availResources, x + ) + cost_exp = solver.calcLatencyCost( + alpha, + solver.offloadingCandidates, + availResources, + [[0.0], [100.0], [100.0], [100.0]], + ) # time.sleep(100) - self.assertLessEqual(cost_julia, 1.1*cost_exp) + self.assertLessEqual(cost_julia, 1.1 * cost_exp) # self.assertEqual(x, [[0.0], [100.0], [100.0], [100.0]]) def test_limitedVMResources(self): @@ -349,10 +373,17 @@ def test_limitedVMResources(self): ) end = time.time() print(f"Julia Time: {end-start}") - cost_julia = solver.calcLatencyCost(alpha, solver.offloadingCandidates, availResources, x) - cost_exp = solver.calcLatencyCost(alpha, solver.offloadingCandidates, availResources, [[0.0], [0.0], [0.0], [0.0]]) + cost_julia = solver.calcLatencyCost( + alpha, solver.offloadingCandidates, availResources, x + ) + cost_exp = solver.calcLatencyCost( + alpha, + solver.offloadingCandidates, + availResources, + [[0.0], [0.0], [0.0], [0.0]], + ) # time.sleep(100) - self.assertLessEqual(cost_julia, 1.1*cost_exp) + self.assertLessEqual(cost_julia, 1.1 * cost_exp) # self.assertEqual(x, [[0.0], [0.0], [0.0], [0.0]]) def test_rps(self): @@ -395,32 +426,38 @@ def test_rps(self): end = time.time() print(f"Julia Time: {end-start}") - cost_julia = solver.calcLatencyCost(alpha, solver.offloadingCandidates, availResources, x) + cost_julia = solver.calcLatencyCost( + alpha, solver.offloadingCandidates, availResources, x + ) exps = [ - # Due to adding the mu factor set to 1 by default the decisions have changed - [[0.0, 0.0], [1.0, 0.0], [49.0, 51.0], [2.0, 1.0]], - [[0.0, 0.0], [6.0, 7.0], [50.0, 50.0], [6.0, 6.0]], - [[0.0, 0.0], [6.0, 7.0], [51.0, 49.0], [6.0, 7.0]], - [[0.0, 0.0], [8.0, 7.0], [50.0, 50.0], [5.0, 6.0]], - [[0.0, 0.0], [7.0, 6.0], [49.0, 51.0], [7.0, 6.0]], - [[0.0, 0.0], [7.0, 6.0], [50.0, 50.0], [6.0, 6.0]], - [[0.0, 0.0], [7.0, 7.0], [50.0, 50.0], [6.0, 6.0]], - [[0.0, 0.0], [12.0, 12.0], [50.0, 50.0], [1.0, 1.0]], - [[0.0, 0.0], [1.0, 12.0], [50.0, 50.0], [12.0, 1.0]], - [[0.0, 0.0], [12.0, 1.0], [50.0, 50.0], [1.0, 12.0]], - [[0.0, 0.0], [7.0, 6.0], [50.0, 50.0], [6.0, 7.0]], - [[0.0, 0.0], [6.0, 7.0], [50.0, 50.0], [7.0, 6.0]], - [[0.0, 0.0], [14.0, 1.0], [48.0, 52.0], [1.0, 10.0]], - ] + # Due to adding the mu factor set to 1 by default the decisions have changed + [[0.0, 0.0], [1.0, 0.0], [49.0, 51.0], [2.0, 1.0]], + [[0.0, 0.0], [6.0, 7.0], [50.0, 50.0], [6.0, 6.0]], + [[0.0, 0.0], [6.0, 7.0], [51.0, 49.0], [6.0, 7.0]], + [[0.0, 0.0], [8.0, 7.0], [50.0, 50.0], [5.0, 6.0]], + [[0.0, 0.0], [7.0, 6.0], [49.0, 51.0], [7.0, 6.0]], + [[0.0, 0.0], [7.0, 6.0], [50.0, 50.0], [6.0, 6.0]], + [[0.0, 0.0], [7.0, 7.0], [50.0, 50.0], [6.0, 6.0]], + [[0.0, 0.0], [12.0, 12.0], [50.0, 50.0], [1.0, 1.0]], + [[0.0, 0.0], [1.0, 12.0], [50.0, 50.0], [12.0, 1.0]], + [[0.0, 0.0], [12.0, 1.0], [50.0, 50.0], [1.0, 12.0]], + [[0.0, 0.0], [7.0, 6.0], [50.0, 50.0], [6.0, 7.0]], + [[0.0, 0.0], [6.0, 7.0], [50.0, 50.0], [7.0, 6.0]], + [[0.0, 0.0], [14.0, 1.0], [48.0, 52.0], [1.0, 10.0]], + ] - cost_exp = min([ - solver.calcLatencyCost(alpha, solver.offloadingCandidates, availResources, s) - for s in exps - ]) + cost_exp = min( + [ + solver.calcLatencyCost( + alpha, solver.offloadingCandidates, availResources, s + ) + for s in exps + ] + ) # time.sleep(100) - self.assertLessEqual(cost_julia, 1.1*cost_exp) + self.assertLessEqual(cost_julia, 1.1 * cost_exp) # self.assertIn( # x, @@ -441,7 +478,7 @@ def test_rps(self): # [[0.0, 0.0], [14.0, 1.0], [48.0, 52.0], [1.0, 10.0]], # ], # ) - + # self.assertEqual(x, [[0.0, 0.0], [6.0, 6.0], [50.0, 50.0], [7.0, 6.0]]) def test_rps2(self): @@ -531,10 +568,8 @@ def test_rpsN(self): # n_hosts_list = [1] + [5, 10, 15, 20] n_funcs_list = [5, 35] n_hosts_list = [1] + [20] - - + repeats = 1 - results = [] print(n_hosts_list) @@ -550,7 +585,7 @@ def test_rpsN(self): workflow = "TestCaseNWorkflow" toleranceWindow = 0 - availResources = [{"cores": 10, "mem_mb": 300}]*n_hosts + availResources = [{"cores": 10, "mem_mb": 300}] * n_hosts alpha = 0.1 solver = rpsOffloadingSolver() @@ -564,9 +599,8 @@ def test_rpsN(self): ) gekko_stats = [] - for k in range(repeats): - random.seed(k*10+1) + random.seed(k * 10 + 1) tgen = TestCaseGenerator(n_funcs, n_hosts) tgen.build() start = time.time() @@ -574,31 +608,35 @@ def test_rpsN(self): availResources=availResources, alpha=alpha, verbose=True ) end = time.time() - time_gekko = end-start - print(f'GEKKO = {time_gekko} -> {x_gekko}') - - if x_gekko == 'NotFound': - cost_gekko = float('inf') + time_gekko = end - start + print(f"GEKKO = {time_gekko} -> {x_gekko}") + + if x_gekko == "NotFound": + cost_gekko = float("inf") else: - cost_gekko = solver.calcLatencyCost(alpha, solver.offloadingCandidates, availResources, x_gekko) - - print(f'COST GEKKO {cost_gekko}') - - gekko_stats.append({ - "time_gekko": time_gekko, - "sol_gekko": x_gekko, - "cost_gekko": cost_gekko, - }) + cost_gekko = solver.calcLatencyCost( + alpha, solver.offloadingCandidates, availResources, x_gekko + ) + + print(f"COST GEKKO {cost_gekko}") + + gekko_stats.append( + { + "time_gekko": time_gekko, + "sol_gekko": x_gekko, + "cost_gekko": cost_gekko, + } + ) tgen.delete() ####################################### - + random.seed(0) tgen = TestCaseGenerator(n_funcs, n_hosts) tgen.build() workflow = "TestCaseNWorkflow" toleranceWindow = 0 - availResources = [{"cores": 10, "mem_mb": 300}]*n_hosts + availResources = [{"cores": 10, "mem_mb": 300}] * n_hosts alpha = 0.1 solver = rpsOffloadingSolver() @@ -614,7 +652,7 @@ def test_rpsN(self): # random.seed(0) for k in range(repeats): - random.seed(k*10+1) + random.seed(k * 10 + 1) tgen = TestCaseGenerator(n_funcs, n_hosts) tgen.build() start = time.time() @@ -622,57 +660,62 @@ def test_rpsN(self): availResources=availResources, alpha=alpha, verbose=True ) end = time.time() - time_julia = end-start - - if x_julia == 'NotFound': - cost_julia = float('inf') + time_julia = end - start + + if x_julia == "NotFound": + cost_julia = float("inf") else: - cost_julia = solver.calcLatencyCost(alpha, solver.offloadingCandidates, availResources, x_julia) - - print(f'Julia = {time_julia} -> {x_julia}') - print(f'COST JULIA {cost_julia}') - - julia_stats.append({ - "time_julia": time_julia, - "sol_julia": x_julia, - "cost_julia": cost_julia, - }) - - tgen.delete() + cost_julia = solver.calcLatencyCost( + alpha, solver.offloadingCandidates, availResources, x_julia + ) + + print(f"Julia = {time_julia} -> {x_julia}") + print(f"COST JULIA {cost_julia}") + + julia_stats.append( + { + "time_julia": time_julia, + "sol_julia": x_julia, + "cost_julia": cost_julia, + } + ) + tgen.delete() for k in range(repeats): - results.append({ - "n_funcs": n_funcs, - "n_hosts": n_hosts, - "time_gekko": gekko_stats[k]["time_gekko"], - "time_julia": julia_stats[k]["time_julia"], - "sol_gekko": gekko_stats[k]["sol_gekko"], - "cost_gekko": gekko_stats[k]["cost_gekko"], - "sol_julia": julia_stats[k]["sol_julia"], - "cost_julia": julia_stats[k]["cost_julia"], - }) - - - + results.append( + { + "n_funcs": n_funcs, + "n_hosts": n_hosts, + "time_gekko": gekko_stats[k]["time_gekko"], + "time_julia": julia_stats[k]["time_julia"], + "sol_gekko": gekko_stats[k]["sol_gekko"], + "cost_gekko": gekko_stats[k]["cost_gekko"], + "sol_julia": julia_stats[k]["sol_julia"], + "cost_julia": julia_stats[k]["cost_julia"], + } + ) - #print(x) + # print(x) - - print('-----------') + print("-----------") # print(results) results = pd.DataFrame(results) self.assertEqual( - len(results[(results['cost_julia']-results['cost_gekko'])/results['cost_julia'] >= (5/100)]), - 0 + len( + results[ + (results["cost_julia"] - results["cost_gekko"]) + / results["cost_julia"] + >= (5 / 100) + ] + ), + 0, ) - # pd.DataFrame(results).to_csv('/home/pjavan/unfaasener/tests/Timing_df_Gekko.csv') if __name__ == "__main__": unittest.main() - diff --git a/tests/solver/rps_latency_test.py b/tests/solver/rps_latency_test.py index 70e43d8..84dd118 100644 --- a/tests/solver/rps_latency_test.py +++ b/tests/solver/rps_latency_test.py @@ -43,23 +43,29 @@ def test_similar2prevdecision(self): end = time.time() print(f"Julia Time: {end-start}") expected = [[0.0], [0.0], [0.0], [0.0]] - cost_expected = solver.calcLatencyCost(alpha, solver.offloadingCandidates, availResources, expected) - print(f'EXPECTED = {expected}') - print(f'COST EXPECTED = {cost_expected}') + cost_expected = solver.calcLatencyCost( + alpha, solver.offloadingCandidates, availResources, expected + ) + print(f"EXPECTED = {expected}") + print(f"COST EXPECTED = {cost_expected}") start = time.time() gekko_result = solver.suggestBestOffloadingMultiVMGekko( availResources=availResources, alpha=alpha, verbose=True ) end = time.time() print(f"GEKKO Time: {end-start}") - cost_gekko = solver.calcLatencyCost(alpha, solver.offloadingCandidates, availResources, gekko_result) - print(f'GEKKO = {gekko_result}') - print(f'COST GEKKO = {cost_gekko}') - cost_julia = solver.calcLatencyCost(alpha, solver.offloadingCandidates, availResources, x) - print(f'Julia = {x}') - print(f'COST Julia = {cost_julia}') - print('-----------------------------------') - self.assertLessEqual(cost_julia, 1.1*cost_gekko) + cost_gekko = solver.calcLatencyCost( + alpha, solver.offloadingCandidates, availResources, gekko_result + ) + print(f"GEKKO = {gekko_result}") + print(f"COST GEKKO = {cost_gekko}") + cost_julia = solver.calcLatencyCost( + alpha, solver.offloadingCandidates, availResources, x + ) + print(f"Julia = {x}") + print(f"COST Julia = {cost_julia}") + print("-----------------------------------") + self.assertLessEqual(cost_julia, 1.1 * cost_gekko) # print('-----------------------------------') # self.assertEqual(x, [[0.0], [0.0], [0.0], [0.0]]) @@ -79,30 +85,36 @@ def test_preferHigherCost(self): alpha = 0 # Due to the change of adding mu factor set to 1, the cpu for that function is increased causing the change in decisions expected = [[0.0], [0.0], [52.0], [0.0]] - cost_expected = solver.calcLatencyCost(alpha, solver.offloadingCandidates, availResources, expected) - print(f'EXPECTED = {expected}') - print(f'COST EXPECTED = {cost_expected}') + cost_expected = solver.calcLatencyCost( + alpha, solver.offloadingCandidates, availResources, expected + ) + print(f"EXPECTED = {expected}") + print(f"COST EXPECTED = {cost_expected}") start = time.time() gekko_result = solver.suggestBestOffloadingMultiVMGekko( availResources=availResources, alpha=alpha, verbose=True ) end = time.time() print(f"GEKKO Time: {end-start}") - cost_gekko = solver.calcLatencyCost(alpha, solver.offloadingCandidates, availResources, gekko_result) - print(f'GEKKO = {gekko_result}') - print(f'COST GEKKO = {cost_gekko}') + cost_gekko = solver.calcLatencyCost( + alpha, solver.offloadingCandidates, availResources, gekko_result + ) + print(f"GEKKO = {gekko_result}") + print(f"COST GEKKO = {cost_gekko}") start = time.time() x = solver.suggestBestOffloadingMultiVM( availResources=availResources, alpha=alpha, verbose=True ) end = time.time() print(f"Julia Time: {end-start}") - cost_julia = solver.calcLatencyCost(alpha, solver.offloadingCandidates, availResources, x) - print(f'Julia = {x}') - print(f'COST Julia = {cost_julia}') - print('-----------------------------------') - self.assertLessEqual(cost_julia, 1.1*cost_gekko) - #self.assertEqual(x, [[0.0], [0.0], [52.0], [0.0]]) + cost_julia = solver.calcLatencyCost( + alpha, solver.offloadingCandidates, availResources, x + ) + print(f"Julia = {x}") + print(f"COST Julia = {cost_julia}") + print("-----------------------------------") + self.assertLessEqual(cost_julia, 1.1 * cost_gekko) + # self.assertEqual(x, [[0.0], [0.0], [52.0], [0.0]]) # Test on when the tolerance window is not limited for the user def test_unlimitedToleranceWindow(self): @@ -121,23 +133,29 @@ def test_unlimitedToleranceWindow(self): end = time.time() print(f"Julia Time: {end-start}") expected = [[0.0], [100.0], [100.0], [100.0], [100.0], [100.0]] - cost_expected = solver.calcLatencyCost(alpha, solver.offloadingCandidates, availResources, expected) - print(f'EXPECTED = {expected}') - print(f'COST EXPECTED = {cost_expected}') + cost_expected = solver.calcLatencyCost( + alpha, solver.offloadingCandidates, availResources, expected + ) + print(f"EXPECTED = {expected}") + print(f"COST EXPECTED = {cost_expected}") start = time.time() gekko_result = solver.suggestBestOffloadingMultiVMGekko( availResources=availResources, alpha=alpha, verbose=True ) end = time.time() print(f"GEKKO Time: {end-start}") - cost_gekko = solver.calcLatencyCost(alpha, solver.offloadingCandidates, availResources, gekko_result) - print(f'GEKKO = {gekko_result}') - print(f'COST GEKKO = {cost_gekko}') - cost_julia = solver.calcLatencyCost(alpha, solver.offloadingCandidates, availResources, x) - print(f'Julia = {x}') - print(f'COST Julia = {cost_julia}') - print('-----------------------------------') - self.assertLessEqual(cost_julia, 1.1*cost_gekko) + cost_gekko = solver.calcLatencyCost( + alpha, solver.offloadingCandidates, availResources, gekko_result + ) + print(f"GEKKO = {gekko_result}") + print(f"COST GEKKO = {cost_gekko}") + cost_julia = solver.calcLatencyCost( + alpha, solver.offloadingCandidates, availResources, x + ) + print(f"Julia = {x}") + print(f"COST Julia = {cost_julia}") + print("-----------------------------------") + self.assertLessEqual(cost_julia, 1.1 * cost_gekko) # print('-----------------------------------') # self.assertEqual(x, [[0.0], [100.0], [100.0], [100.0], [100.0], [100.0]]) @@ -158,23 +176,29 @@ def test_giveReuiredtoleranceWindow1(self): end = time.time() print(f"Julia Time: {end-start}") expected = [[0.0], [100.0], [0.0], [0.0]] - cost_expected = solver.calcLatencyCost(alpha, solver.offloadingCandidates, availResources, expected) - print(f'EXPECTED = {expected}') - print(f'COST EXPECTED = {cost_expected}') + cost_expected = solver.calcLatencyCost( + alpha, solver.offloadingCandidates, availResources, expected + ) + print(f"EXPECTED = {expected}") + print(f"COST EXPECTED = {cost_expected}") start = time.time() gekko_result = solver.suggestBestOffloadingMultiVMGekko( availResources=availResources, alpha=alpha, verbose=True ) end = time.time() print(f"GEKKO Time: {end-start}") - cost_gekko = solver.calcLatencyCost(alpha, solver.offloadingCandidates, availResources, gekko_result) - print(f'GEKKO = {gekko_result}') - print(f'COST GEKKO = {cost_gekko}') - cost_julia = solver.calcLatencyCost(alpha, solver.offloadingCandidates, availResources, x) - print(f'Julia = {x}') - print(f'COST Julia = {cost_julia}') - print('-----------------------------------') - self.assertLessEqual(cost_julia, 1.1*cost_gekko) + cost_gekko = solver.calcLatencyCost( + alpha, solver.offloadingCandidates, availResources, gekko_result + ) + print(f"GEKKO = {gekko_result}") + print(f"COST GEKKO = {cost_gekko}") + cost_julia = solver.calcLatencyCost( + alpha, solver.offloadingCandidates, availResources, x + ) + print(f"Julia = {x}") + print(f"COST Julia = {cost_julia}") + print("-----------------------------------") + self.assertLessEqual(cost_julia, 1.1 * cost_gekko) # print('-----------------------------------') # self.assertEqual(x, [[0.0], [100.0], [0.0], [0.0]]) # Test for checking the case which the toleranceWindow is less than what is required for offloading a function @@ -195,23 +219,29 @@ def test_lessThanrequiredtoleranceWindow(self): end = time.time() print(f"Julia Time: {end-start}") expected = [[0.0], [0.0], [0.0], [0.0]] - cost_expected = solver.calcLatencyCost(alpha, solver.offloadingCandidates, availResources, expected) - print(f'EXPECTED = {expected}') - print(f'COST EXPECTED = {cost_expected}') + cost_expected = solver.calcLatencyCost( + alpha, solver.offloadingCandidates, availResources, expected + ) + print(f"EXPECTED = {expected}") + print(f"COST EXPECTED = {cost_expected}") start = time.time() gekko_result = solver.suggestBestOffloadingMultiVMGekko( availResources=availResources, alpha=alpha, verbose=True ) end = time.time() print(f"GEKKO Time: {end-start}") - cost_gekko = solver.calcLatencyCost(alpha, solver.offloadingCandidates, availResources, gekko_result) - print(f'GEKKO = {gekko_result}') - print(f'COST GEKKO = {cost_gekko}') - cost_julia = solver.calcLatencyCost(alpha, solver.offloadingCandidates, availResources, x) - print(f'Julia = {x}') - print(f'COST Julia = {cost_julia}') - print('-----------------------------------') - self.assertLessEqual(cost_julia, 1.1*cost_gekko) + cost_gekko = solver.calcLatencyCost( + alpha, solver.offloadingCandidates, availResources, gekko_result + ) + print(f"GEKKO = {gekko_result}") + print(f"COST GEKKO = {cost_gekko}") + cost_julia = solver.calcLatencyCost( + alpha, solver.offloadingCandidates, availResources, x + ) + print(f"Julia = {x}") + print(f"COST Julia = {cost_julia}") + print("-----------------------------------") + self.assertLessEqual(cost_julia, 1.1 * cost_gekko) # print('-----------------------------------') # self.assertEqual(x, [[0.0], [0.0], [0.0], [0.0]]) @@ -259,23 +289,29 @@ def test_rps(self): end = time.time() print(f"Julia Time: {end-start}") expected = [[0.0, 0.0], [0.0, 0.0], [79.0, 21.0], [0.0, 0.0]] - cost_expected = solver.calcLatencyCost(alpha, solver.offloadingCandidates, availResources, expected) - print(f'EXPECTED = {expected}') - print(f'COST EXPECTED = {cost_expected}') + cost_expected = solver.calcLatencyCost( + alpha, solver.offloadingCandidates, availResources, expected + ) + print(f"EXPECTED = {expected}") + print(f"COST EXPECTED = {cost_expected}") start = time.time() gekko_result = solver.suggestBestOffloadingMultiVMGekko( availResources=availResources, alpha=alpha, verbose=True ) end = time.time() print(f"GEKKO Time: {end-start}") - cost_gekko = solver.calcLatencyCost(alpha, solver.offloadingCandidates, availResources, gekko_result) - print(f'GEKKO = {gekko_result}') - print(f'COST GEKKO = {cost_gekko}') - cost_julia = solver.calcLatencyCost(alpha, solver.offloadingCandidates, availResources, x) - print(f'Julia = {x}') - print(f'COST Julia = {cost_julia}') - print('-----------------------------------') - self.assertLessEqual(cost_julia, 1.1*cost_gekko) + cost_gekko = solver.calcLatencyCost( + alpha, solver.offloadingCandidates, availResources, gekko_result + ) + print(f"GEKKO = {gekko_result}") + print(f"COST GEKKO = {cost_gekko}") + cost_julia = solver.calcLatencyCost( + alpha, solver.offloadingCandidates, availResources, x + ) + print(f"Julia = {x}") + print(f"COST Julia = {cost_julia}") + print("-----------------------------------") + self.assertLessEqual(cost_julia, 1.1 * cost_gekko) # print('-----------------------------------') # self.assertEqual(x, [[0.0, 0.0], [0.0, 0.0], [79.0, 21.0], [0.0, 0.0]]) @@ -313,23 +349,29 @@ def test_rps2(self): end = time.time() print(f"Julia Time: {end-start}") expected = [[0.0], [0.0], [21.0], [0.0]] - cost_expected = solver.calcLatencyCost(alpha, solver.offloadingCandidates, availResources, expected) - print(f'EXPECTED = {expected}') - print(f'COST EXPECTED = {cost_expected}') + cost_expected = solver.calcLatencyCost( + alpha, solver.offloadingCandidates, availResources, expected + ) + print(f"EXPECTED = {expected}") + print(f"COST EXPECTED = {cost_expected}") start = time.time() gekko_result = solver.suggestBestOffloadingMultiVMGekko( availResources=availResources, alpha=alpha, verbose=True ) end = time.time() print(f"GEKKO Time: {end-start}") - cost_gekko = solver.calcLatencyCost(alpha, solver.offloadingCandidates, availResources, gekko_result) - print(f'GEKKO = {gekko_result}') - print(f'COST GEKKO = {cost_gekko}') - cost_julia = solver.calcLatencyCost(alpha, solver.offloadingCandidates, availResources, x) - print(f'Julia = {x}') - print(f'COST Julia = {cost_julia}') - print('-----------------------------------') - self.assertLessEqual(cost_julia, 1.1*cost_gekko) + cost_gekko = solver.calcLatencyCost( + alpha, solver.offloadingCandidates, availResources, gekko_result + ) + print(f"GEKKO = {gekko_result}") + print(f"COST GEKKO = {cost_gekko}") + cost_julia = solver.calcLatencyCost( + alpha, solver.offloadingCandidates, availResources, x + ) + print(f"Julia = {x}") + print(f"COST Julia = {cost_julia}") + print("-----------------------------------") + self.assertLessEqual(cost_julia, 1.1 * cost_gekko) # print('-----------------------------------') # self.assertEqual(x, [[0.0], [0.0], [21.0], [0.0]]) @@ -377,23 +419,29 @@ def test_rps3(self): end = time.time() print(f"Julia Time: {end-start}") expected = [[0.0, 0.0], [0.0, 0.0], [84.0, 16.0], [0.0, 5.0]] - cost_expected = solver.calcLatencyCost(alpha, solver.offloadingCandidates, availResources, expected) - print(f'EXPECTED = {expected}') - print(f'COST EXPECTED = {cost_expected}') + cost_expected = solver.calcLatencyCost( + alpha, solver.offloadingCandidates, availResources, expected + ) + print(f"EXPECTED = {expected}") + print(f"COST EXPECTED = {cost_expected}") start = time.time() gekko_result = solver.suggestBestOffloadingMultiVMGekko( availResources=availResources, alpha=alpha, verbose=True ) end = time.time() print(f"GEKKO Time: {end-start}") - cost_gekko = solver.calcLatencyCost(alpha, solver.offloadingCandidates, availResources, gekko_result) - print(f'GEKKO = {gekko_result}') - print(f'COST GEKKO = {cost_gekko}') - cost_julia = solver.calcLatencyCost(alpha, solver.offloadingCandidates, availResources, x) - print(f'Julia = {x}') - print(f'COST Julia = {cost_julia}') - print('-----------------------------------') - self.assertLessEqual(cost_julia, 1.1*cost_gekko) + cost_gekko = solver.calcLatencyCost( + alpha, solver.offloadingCandidates, availResources, gekko_result + ) + print(f"GEKKO = {gekko_result}") + print(f"COST GEKKO = {cost_gekko}") + cost_julia = solver.calcLatencyCost( + alpha, solver.offloadingCandidates, availResources, x + ) + print(f"Julia = {x}") + print(f"COST Julia = {cost_julia}") + print("-----------------------------------") + self.assertLessEqual(cost_julia, 1.1 * cost_gekko) # print('-----------------------------------') # self.assertEqual(x, [[0.0, 0.0], [0.0, 0.0], [84.0, 16.0], [0.0, 5.0]]) diff --git a/tests/solver/testcase_generator.py b/tests/solver/testcase_generator.py index 43ba1cc..2a1be1c 100644 --- a/tests/solver/testcase_generator.py +++ b/tests/solver/testcase_generator.py @@ -1,4 +1,3 @@ -import unittest import os import json from pathlib import Path @@ -16,33 +15,39 @@ def build(self): for i in range(self.n_funcs): for id in range(10): - log_data.append({ - 'function': f'Func_{i}', - 'reqID': id, - 'start': "", - 'finish': "", - 'mergingPoint': "", - 'host': "s", - 'duration': 100 - }) + log_data.append( + { + "function": f"Func_{i}", + "reqID": id, + "start": "", + "finish": "", + "mergingPoint": "", + "host": "s", + "duration": 100, + } + ) for j in range(self.n_hosts): for i in range(self.n_funcs): for id in range(10): - log_data.append({ - 'function': f'Func_{i}', - 'reqID': id, - 'start': "", - 'finish': "", - 'mergingPoint': "", - 'host': f"vm{j}", - 'duration': 100 - }) + log_data.append( + { + "function": f"Func_{i}", + "reqID": id, + "start": "", + "finish": "", + "mergingPoint": "", + "host": f"vm{j}", + "duration": 100, + } + ) df = pd.DataFrame(log_data) - df.to_csv('../log-parser/get-workflow-logs/data/TestCaseNWorkflow/generatedDataFrame.csv') - #df.to_csv('/home/pjavan/unfaasener/tests/logCollector/TestCaseNWorkflow/generatedDataFrame.csv') - + df.to_csv( + "../log-parser/get-workflow-logs/data/TestCaseNWorkflow/generatedDataFrame.csv" + ) + # df.to_csv('/home/pjavan/unfaasener/tests/logCollector/TestCaseNWorkflow/generatedDataFrame.csv') + costs_dict = {} pubSub_dict = {} slack_data = {} @@ -51,9 +56,9 @@ def build(self): predecessors = [] workflowFunctions = [] last_dec = [] - + for i in range(self.n_funcs): - func_name = f'Func_{i}' + func_name = f"Func_{i}" workflowFunctions.append(func_name) # if i == 1: # costs_dict[func_name] = {"best-case": 10000, "worst-case": 10000, "default": 10000} @@ -63,52 +68,62 @@ def build(self): # costs_dict[func_name] = {"best-case": 10000, "worst-case": 10000, "default": 10000} # elif i == 4: # costs_dict[func_name] = {"best-case": 10000, "worst-case": 10000, "default": 10000} - # else: + # else: # costs_dict[func_name] = {"best-case": 4.6399999999999997e-07, "worst-case": 4.6399999999999997e-07, "default": 4.6399999999999997e-07} - - costs_dict[func_name] = {"best-case": 4.6399999999999997e-07, "worst-case": 4.6399999999999997e-07, "default": 4.6399999999999997e-07} - - pubSub_dict[func_name] = 100000000000 - slack_data[func_name] = {"best-case": 0.0, "worst-case": 0.0, "default": 0.0} - if i < self.n_funcs-1: - slack_data[f'Func_{i}-Func_{i+1}'] = {"best-case": 0.0, "worst-case": 0.0, "default": 0.0} - successors.append([f'Func_{i+1}']) - else: successors.append([]) - if i > 0: predecessors.append([f'Func_{i-1}']) - else: predecessors.append([]) + costs_dict[func_name] = { + "best-case": 4.6399999999999997e-07, + "worst-case": 4.6399999999999997e-07, + "default": 4.6399999999999997e-07, + } - if random.randint(1, 10) > 5: - last_dec.append([1.0]*self.n_hosts) + pubSub_dict[func_name] = 100000000000 + slack_data[func_name] = { + "best-case": 0.0, + "worst-case": 0.0, + "default": 0.0, + } + if i < self.n_funcs - 1: + slack_data[f"Func_{i}-Func_{i+1}"] = { + "best-case": 0.0, + "worst-case": 0.0, + "default": 0.0, + } + successors.append([f"Func_{i+1}"]) else: - last_dec.append([0.0]*self.n_hosts) + successors.append([]) + if i > 0: + predecessors.append([f"Func_{i-1}"]) + else: + predecessors.append([]) + if random.randint(1, 10) > 5: + last_dec.append([1.0] * self.n_hosts) + else: + last_dec.append([0.0] * self.n_hosts) workflow_dict = { - "workflow": "TestCaseNWorkflow", - "workflowFunctions": workflowFunctions, - "initFunc": "Func_0", - "successors": successors, - "predecessors": predecessors, - "memory": [1/self.n_funcs]*self.n_funcs, + "workflow": "TestCaseNWorkflow", + "workflowFunctions": workflowFunctions, + "initFunc": "Func_0", + "successors": successors, + "predecessors": predecessors, + "memory": [1 / self.n_funcs] * self.n_funcs, "lastDecision_default": last_dec, "lastDecision_best-case": last_dec, "lastDecision_worst-case": last_dec, - "topics": [""] + ["dag-Profanity"]*(self.n_funcs-1) + "topics": [""] + ["dag-Profanity"] * (self.n_funcs - 1), } jsonPath = ( - "../log-parser/get-workflow-logs/data/" - + "TestCaseNWorkflow" - + ".json" + "../log-parser/get-workflow-logs/data/" + "TestCaseNWorkflow" + ".json" ) with open(jsonPath, "w") as json_file: json.dump(workflow_dict, json_file) + base_dir = "./data/TestCaseNWorkflow" - base_dir = './data/TestCaseNWorkflow' - with open(f"{base_dir}/Costs.json", "w") as outfile: json.dump(costs_dict, outfile) @@ -122,8 +137,10 @@ def build(self): json.dump(slack_data, outfile) def delete(self): - base_dir = './data/TestCaseNWorkflow' - os.system('rm ../log-parser/get-workflow-logs/data/TestCaseNWorkflow/generatedDataFrame.csv') + base_dir = "./data/TestCaseNWorkflow" + os.system( + "rm ../log-parser/get-workflow-logs/data/TestCaseNWorkflow/generatedDataFrame.csv" + ) os.system(f"rm {base_dir}/Costs.json") os.system(f"rm {base_dir}/pubSubSize.json") os.system(f"rm {base_dir}/slackData.json") @@ -140,4 +157,4 @@ def delete(self): if __name__ == "__main__": n_funcs, n_hosts = 30, 10 tgen = TestCaseGenerator(n_funcs, n_hosts) - tgen.build() \ No newline at end of file + tgen.build() From 0c7b0ab53504896ecb2ed66f1da31ce19aae4234 Mon Sep 17 00:00:00 2001 From: engshahrad Date: Wed, 25 Oct 2023 03:36:03 +0000 Subject: [PATCH 2/4] VM -> Host (rpsMultiVMSolver -> rpsMultiHostSolver) --- initialDeploy.sh | 2 +- run_tests.sh | 2 +- scheduler/rpsCIScheduler.py | 2 +- scheduler/{rpsMultiVMSolver.jl => rpsMultiHostSolver.jl} | 0 scheduler/{rpsMultiVMSolver.py => rpsMultiHostSolver.py} | 0 tests/solver/rps_cost_test.py | 2 +- tests/solver/rps_latency_test.py | 2 +- 7 files changed, 5 insertions(+), 5 deletions(-) rename scheduler/{rpsMultiVMSolver.jl => rpsMultiHostSolver.jl} (100%) rename scheduler/{rpsMultiVMSolver.py => rpsMultiHostSolver.py} (100%) diff --git a/initialDeploy.sh b/initialDeploy.sh index 8649b21..ccd9d5b 100755 --- a/initialDeploy.sh +++ b/initialDeploy.sh @@ -16,7 +16,7 @@ python3 scheduler/resetLastDecisions.py $workflow $hostcount $solvingMode # initialize Julia Solver mkfifo ./scheduler/juliaStdin mkfifo ./scheduler/juliaStdout -julia scheduler/rpsMultiVMSolver.jl & +julia scheduler/rpsMultiHostSolver.jl & # clean the host execution agent queue python3 host-agents/execution-agent/cleanup-queue.py vmSubscriber1 vm0 diff --git a/run_tests.sh b/run_tests.sh index 8f22fc8..f45f5db 100755 --- a/run_tests.sh +++ b/run_tests.sh @@ -5,7 +5,7 @@ mkfifo ./scheduler/juliaStdin mkfifo ./scheduler/juliaStdout -julia ./scheduler/rpsMultiVMSolver.jl & +julia ./scheduler/rpsMultiHostSolver.jl & cp -a ./tests/solver/data/. ./scheduler/data/ cp -a ./tests/logCollector/. ./log-parser/get-workflow-logs/data/ diff --git a/scheduler/rpsCIScheduler.py b/scheduler/rpsCIScheduler.py index 21294fe..ecea5ea 100644 --- a/scheduler/rpsCIScheduler.py +++ b/scheduler/rpsCIScheduler.py @@ -8,7 +8,7 @@ import datetime from pathlib import Path from google.cloud import datastore -from rpsMultiVMSolver import rpsOffloadingSolver +from rpsMultiHostSolver import rpsOffloadingSolver from Estimator import Estimator from getInvocationRate import InvocationRate import sys diff --git a/scheduler/rpsMultiVMSolver.jl b/scheduler/rpsMultiHostSolver.jl similarity index 100% rename from scheduler/rpsMultiVMSolver.jl rename to scheduler/rpsMultiHostSolver.jl diff --git a/scheduler/rpsMultiVMSolver.py b/scheduler/rpsMultiHostSolver.py similarity index 100% rename from scheduler/rpsMultiVMSolver.py rename to scheduler/rpsMultiHostSolver.py diff --git a/tests/solver/rps_cost_test.py b/tests/solver/rps_cost_test.py index 5d06150..e08b6d4 100644 --- a/tests/solver/rps_cost_test.py +++ b/tests/solver/rps_cost_test.py @@ -5,7 +5,7 @@ import pandas as pd import random import time -from rpsMultiVMSolver import rpsOffloadingSolver +from rpsMultiHostSolver import rpsOffloadingSolver class TestCaseGenerator: diff --git a/tests/solver/rps_latency_test.py b/tests/solver/rps_latency_test.py index 84dd118..55ad538 100644 --- a/tests/solver/rps_latency_test.py +++ b/tests/solver/rps_latency_test.py @@ -4,7 +4,7 @@ import time import json from pathlib import Path -from rpsMultiVMSolver import rpsOffloadingSolver +from rpsMultiHostSolver import rpsOffloadingSolver class TestSolver(unittest.TestCase): From e28dcf081cd97e95a3bfa9725d26a856cae60bda Mon Sep 17 00:00:00 2001 From: engshahrad Date: Wed, 25 Oct 2023 03:56:25 +0000 Subject: [PATCH 3/4] polished rpsMultiHostSolver --- scheduler/rpsMultiHostSolver.py | 412 +++++++++++++++----------------- 1 file changed, 189 insertions(+), 223 deletions(-) diff --git a/scheduler/rpsMultiHostSolver.py b/scheduler/rpsMultiHostSolver.py index 4032edb..42305e3 100644 --- a/scheduler/rpsMultiHostSolver.py +++ b/scheduler/rpsMultiHostSolver.py @@ -6,7 +6,6 @@ from gekko import GEKKO from Estimator import Estimator import itertools -import time import subprocess # Non-linear optimzation models for cost and latency @@ -17,7 +16,9 @@ def __init__(self, solver_prog="julia"): self.solver_prog = solver_prog return - def configure(self, workflow, mode, decisionMode, toleranceWindow, rps, testingFlag): + def configure( + self, workflow, mode, decisionMode, toleranceWindow, rps, testingFlag + ): self.testingFlag = testingFlag self.rps = rps path = ( @@ -104,7 +105,7 @@ def configure(self, workflow, mode, decisionMode, toleranceWindow, rps, testingF self.readWorkflow() # self.initJuliaSolver() - + def readWorkflow(self): with open(self.jsonPath, "r") as json_file: self.workflow_json = json.load(json_file) @@ -115,7 +116,7 @@ def readWorkflow(self): self.predecessors = self.workflow_json["predecessors"] self.initial = self.workflow_json["initFunc"] self.memories = self.workflow_json["memory"] - + def getAllPaths(self): """ Returns all possible paths in the dag starting from the initial node to a terminal node @@ -311,8 +312,14 @@ def GetPubsubCost(self, offloadingCandidate, child): return cost def suggestBestOffloadingMultiVM(self, availResources, alpha, verbose): - if self.solver_prog == "gekko": return self.suggestBestOffloadingMultiVMGekko(availResources, alpha, verbose) - else: return self.suggestBestOffloadingMultiVMJulia(availResources, alpha, verbose) + if self.solver_prog == "gekko": + return self.suggestBestOffloadingMultiVMGekko( + availResources, alpha, verbose + ) + else: + return self.suggestBestOffloadingMultiVMJulia( + availResources, alpha, verbose + ) def suggestBestOffloadingMultiVMJulia(self, availResources, alpha, verbose): """ @@ -334,77 +341,82 @@ def suggestBestOffloadingMultiVMJulia(self, availResources, alpha, verbose): offloadingCandidates[function], self.testingFlag ) * ( - self.getVMexecution( - offloadingCandidates[function], VMIndex - ) + self.getVMexecution(offloadingCandidates[function], VMIndex) * 0.001 - ) * (self.getMem(offloadingCandidates[function])) - ) for VMIndex in range(len(availResources)) - ] for function in range(len(offloadingCandidates)) + ) + * (self.getMem(offloadingCandidates[function])) + ) + for VMIndex in range(len(availResources)) + ] + for function in range(len(offloadingCandidates)) ] - cpu_coeffs = [ [ - self.rps * self.estimator.get_num_per_req(offloadingCandidates[function], False) * ( - self.getVMexecution( - offloadingCandidates[function], VMIndex - ) + self.rps + * self.estimator.get_num_per_req( + offloadingCandidates[function], False + ) + * ( + self.getVMexecution(offloadingCandidates[function], VMIndex) * 0.001 - ) * float(self.muFactor) for VMIndex in range(len(availResources)) - ] for function in range(0, len(offloadingCandidates)) + ) + * float(self.muFactor) + for VMIndex in range(len(availResources)) + ] + for function in range(0, len(offloadingCandidates)) ] - + func_costs_1 = [ ((10**5) * 2) * (1 - alpha) * self.rps - * self.estimator.get_num_per_req( - offloadingCandidates[i], False - ) - * ( - self.GetServerlessCostEstimate(offloadingCandidates[i]) - ) for i in range(len(offloadingCandidates)) + * self.estimator.get_num_per_req(offloadingCandidates[i], False) + * (self.GetServerlessCostEstimate(offloadingCandidates[i])) + for i in range(len(offloadingCandidates)) ] - + func_costs_2 = [ ( (1 - alpha) * self.rps - * self.estimator.get_num_per_req( - offloadingCandidates[i], False - ) + * self.estimator.get_num_per_req(offloadingCandidates[i], False) * ( self.GetDatastoreCost("w") + self.GetDatastoreCost("d") + self.GetDatastoreCost("r") ) - ) for i in range(len(offloadingCandidates)) + ) + for i in range(len(offloadingCandidates)) ] - - matrix_prev_offloadings = [ [ - self.IsOffloaded( - offloadingCandidates[i], vm - ) for vm in range(len(availResources)) - ] for i in range(len(offloadingCandidates)) + self.IsOffloaded(offloadingCandidates[i], vm) + for vm in range(len(availResources)) + ] + for i in range(len(offloadingCandidates)) ] - + # solve json_dict = { - 'mode': 'cost', - 'locality': alpha, - 'n_hosts': len(availResources), - 'n_funcs': len(offloadingCandidates), - 'list_mem_capacity': [availResources[VMIndex]["mem_mb"] for VMIndex in range(len(availResources))], - 'list_cpu_capacity': [availResources[VMIndex]["cores"] for VMIndex in range(len(availResources))], - 'list_func_costs_1': func_costs_1, - 'list_func_costs_2': func_costs_2, - 'matrix_cpu_coeff': cpu_coeffs, - 'matrix_mem_coeff': mem_coeffs, - 'matrix_prev_offloadings': matrix_prev_offloadings, + "mode": "cost", + "locality": alpha, + "n_hosts": len(availResources), + "n_funcs": len(offloadingCandidates), + "list_mem_capacity": [ + availResources[VMIndex]["mem_mb"] + for VMIndex in range(len(availResources)) + ], + "list_cpu_capacity": [ + availResources[VMIndex]["cores"] + for VMIndex in range(len(availResources)) + ], + "list_func_costs_1": func_costs_1, + "list_func_costs_2": func_costs_2, + "matrix_cpu_coeff": cpu_coeffs, + "matrix_mem_coeff": mem_coeffs, + "matrix_prev_offloadings": matrix_prev_offloadings, #'solution': offloadingCandidatesFinal } try: @@ -412,13 +424,11 @@ def suggestBestOffloadingMultiVMJulia(self, availResources, alpha, verbose): sol = self.readJuliaOutput() self.saveNewDecision(sol) return sol - + except: print("\nJulia Failed!") return "NotFound" - - - + elif self.optimizationMode == "latency": self.getAllPaths() self.getSlackForPath() @@ -433,29 +443,33 @@ def suggestBestOffloadingMultiVMJulia(self, availResources, alpha, verbose): offloadingCandidates[function], False ) * ( - self.getVMexecution( - offloadingCandidates[function], VMIndex - ) + self.getVMexecution(offloadingCandidates[function], VMIndex) * 0.001 - ) + ) * (self.getMem(offloadingCandidates[function])) - ) for VMIndex in range(len(availResources)) - ] for function in range(len(offloadingCandidates)) + ) + for VMIndex in range(len(availResources)) + ] + for function in range(len(offloadingCandidates)) ] # Constraint on checking available number of cores for each VM cpu_coeffs = [ [ - self.rps * self.estimator.get_num_per_req(offloadingCandidates[function], False) * ( - self.getVMexecution( - offloadingCandidates[function], VMIndex - ) + self.rps + * self.estimator.get_num_per_req( + offloadingCandidates[function], False + ) + * ( + self.getVMexecution(offloadingCandidates[function], VMIndex) * 0.001 - ) * float(self.muFactor) for VMIndex in range(len(availResources)) - ] for function in range(len(offloadingCandidates)) + ) + * float(self.muFactor) + for VMIndex in range(len(availResources)) + ] + for function in range(len(offloadingCandidates)) ] - ################################################ expression_1 = [] @@ -475,12 +489,13 @@ def suggestBestOffloadingMultiVMJulia(self, availResources, alpha, verbose): for c in combinations: x_1 = [ { - "tmpIndex": (self.offloadingCandidates.index(node), c[path.index(node) - 1]), + "tmpIndex": ( + self.offloadingCandidates.index(node), + c[path.index(node) - 1], + ), "coeff": ( - self.addedExecLatency( - node, c[path.index(node) - 1] - ) - ) + self.addedExecLatency(node, c[path.index(node) - 1]) + ), } for node in path[1:] ] @@ -489,8 +504,16 @@ def suggestBestOffloadingMultiVMJulia(self, availResources, alpha, verbose): x_2 = [ { - "tmpIndex1": (self.offloadingCandidates.index(node), c[path.index(node) - 1]), - "tmpIndex2": (self.offloadingCandidates.index(path[path.index(node) + 1]), c[path.index(node)]), + "tmpIndex1": ( + self.offloadingCandidates.index(node), + c[path.index(node) - 1], + ), + "tmpIndex2": ( + self.offloadingCandidates.index( + path[path.index(node) + 1] + ), + c[path.index(node)], + ), "coeff1": ( self.getCommunicationLatency( (path[path.index(node) + 1]), @@ -517,11 +540,11 @@ def suggestBestOffloadingMultiVMJulia(self, availResources, alpha, verbose): c[path.index(node) - 1], self.decisionMode, ) - ) + ), } for node in path[1:-1] ] - #print(x_2) + # print(x_2) X_2.append(x_2) x_3 = { @@ -534,174 +557,153 @@ def suggestBestOffloadingMultiVMJulia(self, availResources, alpha, verbose): "s", self.decisionMode, ) - ) + ), } X_3.append(x_3) - - inequality_const.append((self.getCriticalPathDuration() - duration) + self.toleranceWindow) + inequality_const.append( + (self.getCriticalPathDuration() - duration) + self.toleranceWindow + ) expression_1.append(X_1) expression_2.append(X_2) expression_3.append(X_3) ################################################ - func_costs_1 = [ ((10**5) * 2) * (1 - alpha) * self.rps - * self.estimator.get_num_per_req( - offloadingCandidates[i], False - ) - * ( - self.GetServerlessCostEstimate(offloadingCandidates[i]) - ) for i in range(len(offloadingCandidates)) + * self.estimator.get_num_per_req(offloadingCandidates[i], False) + * (self.GetServerlessCostEstimate(offloadingCandidates[i])) + for i in range(len(offloadingCandidates)) ] - + func_costs_2 = [ ( (1 - alpha) * self.rps - * self.estimator.get_num_per_req( - offloadingCandidates[i], False - ) + * self.estimator.get_num_per_req(offloadingCandidates[i], False) * ( self.GetDatastoreCost("w") + self.GetDatastoreCost("d") + self.GetDatastoreCost("r") ) - ) for i in range(len(offloadingCandidates)) + ) + for i in range(len(offloadingCandidates)) ] - matrix_prev_offloadings = [ [ - self.IsOffloaded( - offloadingCandidates[i], vm - ) for vm in range(len(availResources)) - ] for i in range(len(offloadingCandidates)) + self.IsOffloaded(offloadingCandidates[i], vm) + for vm in range(len(availResources)) + ] + for i in range(len(offloadingCandidates)) ] - json_dict = { - 'mode': 'latency', - 'locality': alpha, - 'n_hosts': len(availResources), - 'n_funcs': len(offloadingCandidates), - 'list_mem_capacity': [availResources[VMIndex]["mem_mb"] for VMIndex in range(len(availResources))], - 'list_cpu_capacity': [availResources[VMIndex]["cores"] for VMIndex in range(len(availResources))], - 'expression_1': expression_1, - 'expression_2': expression_2, - 'expression_3': expression_3, - 'inequality_const': inequality_const, - 'list_func_costs_1': func_costs_1, - 'list_func_costs_2': func_costs_2, - 'matrix_cpu_coeff': cpu_coeffs, - 'matrix_mem_coeff': mem_coeffs, - 'matrix_prev_offloadings': matrix_prev_offloadings, + "mode": "latency", + "locality": alpha, + "n_hosts": len(availResources), + "n_funcs": len(offloadingCandidates), + "list_mem_capacity": [ + availResources[VMIndex]["mem_mb"] + for VMIndex in range(len(availResources)) + ], + "list_cpu_capacity": [ + availResources[VMIndex]["cores"] + for VMIndex in range(len(availResources)) + ], + "expression_1": expression_1, + "expression_2": expression_2, + "expression_3": expression_3, + "inequality_const": inequality_const, + "list_func_costs_1": func_costs_1, + "list_func_costs_2": func_costs_2, + "matrix_cpu_coeff": cpu_coeffs, + "matrix_mem_coeff": mem_coeffs, + "matrix_prev_offloadings": matrix_prev_offloadings, } try: self.createJuliaInput(json_dict) sol = self.readJuliaOutput() self.saveNewDecision(sol) - + return sol - + except: print("\nJulia Failed!") return "NotFound" - + def calcLatencyCost(self, alpha, offloadingCandidates, availResources, sol): - return sum( - [ - ( - ((10**5) * 2) - * (1 - alpha) + [ + ( + ((10**5) * 2) + * (1 - alpha) + * self.rps + * self.estimator.get_num_per_req(offloadingCandidates[i], False) + * self.GetServerlessCostEstimate(offloadingCandidates[i]) + * ( + ( + 100 + - (sum([(sol[i][vm]) for vm in range(len(availResources))])) + ) + / 100 + ) + + ( + (1 - alpha) * self.rps - * self.estimator.get_num_per_req( - offloadingCandidates[i], False + * self.estimator.get_num_per_req(offloadingCandidates[i], False) + * ( + self.GetDatastoreCost("w") + + self.GetDatastoreCost("d") + + self.GetDatastoreCost("r") ) - * self.GetServerlessCostEstimate(offloadingCandidates[i]) * ( - ( - 100 - - ( - sum( - [ - (sol[i][vm]) - for vm in range(len(availResources)) - ] - ) - ) - ) + ((sum([(sol[i][vm]) for vm in range(len(availResources))]))) / 100 ) - + ( - (1 - alpha) - * self.rps - * self.estimator.get_num_per_req( - offloadingCandidates[i], False - ) - * ( - self.GetDatastoreCost("w") - + self.GetDatastoreCost("d") - + self.GetDatastoreCost("r") - ) - * ( - ( - ( - sum( - [ - (sol[i][vm]) - for vm in range(len(availResources)) - ] - ) - ) - ) - / 100 - ) - ) ) - + sum( - [ + ) + + sum( + [ + ( ( - ( - (10**3) - * (alpha) - * ( - abs( - min(sol[i][vm], 1) - - ( - self.IsOffloaded( - offloadingCandidates[i], vm - ) + (10**3) + * (alpha) + * ( + abs( + min(sol[i][vm], 1) + - ( + self.IsOffloaded( + offloadingCandidates[i], vm ) ) ) ) ) - for vm in range(len(availResources)) - ] - ) - for i in range(len(sol)) - ] - ) - + ) + for vm in range(len(availResources)) + ] + ) + for i in range(len(sol)) + ] + ) + def createJuliaInput(self, json_dict): - self.outfile = open("./juliaStdin", 'w') + self.outfile = open("./juliaStdin", "w") json_str = json.dumps(json_dict) print(json_str, file=self.outfile) self.outfile.close() - + def readJuliaOutput(self): - self.infile = open("./juliaStdout", 'r') + self.infile = open("./juliaStdout", "r") json_str = self.infile.readline() self.infile.close() return json.loads(json_str) - + def suggestBestOffloadingMultiVMGekko(self, availResources, alpha, verbose): """ Returns a list of 0's (no offloading) and 1's (offloading) @@ -927,9 +929,9 @@ def suggestBestOffloadingMultiVMGekko(self, availResources, alpha, verbose): # solve model.options.SOLVER = 1 - + try: - model.solve(disp=False) + model.solve(disp=False) offloadingDecisionsFinal = [ [ (offloadingDecisions[j][i].value)[0] @@ -1076,12 +1078,10 @@ def suggestBestOffloadingMultiVMGekko(self, availResources, alpha, verbose): # print("") # print("======================") - # print("Expression 3") + # print("Expression 3") # print(f"{[self.offloadingCandidates.index(path[1]),c[0]]} * {self.getCommunicationLatency((path[1]),(path[0]),c[0],'s',self.decisionMode,)}") # print("======================") - - model.Equation( ( sum( @@ -1125,8 +1125,7 @@ def suggestBestOffloadingMultiVMGekko(self, availResources, alpha, verbose): self.decisionMode, ) ) - + - ( + + ( ( 1 - ( @@ -1152,8 +1151,7 @@ def suggestBestOffloadingMultiVMGekko(self, availResources, alpha, verbose): self.decisionMode, ) ) - + - ( + + ( ( tempGoalVar[ self.offloadingCandidates.index( @@ -1183,7 +1181,6 @@ def suggestBestOffloadingMultiVMGekko(self, availResources, alpha, verbose): for node in path[1:-1] ] ) - + ( ( ( @@ -1339,7 +1336,6 @@ def suggestBestOffloadingMultiVMGekko(self, availResources, alpha, verbose): # node, c[path.index(node) - 1] # )) - # print('---------------------------') # cons = ( # ( @@ -1466,37 +1462,8 @@ def suggestBestOffloadingMultiVMGekko(self, availResources, alpha, verbose): # print(f'{cons} <= {limit}') - # continue - - - - - - - - - - - - - - - - - - - - - - - - - - - - cost = sum( [ ( @@ -1630,7 +1597,7 @@ def suggestBestOffloadingMultiVMGekko(self, availResources, alpha, verbose): for i in range(len(offloadingDecisionsFinal)) ] ) - + cost1 = sum( [ ( @@ -1807,7 +1774,6 @@ def suggestBestOffloadingMultiVMGekko(self, availResources, alpha, verbose): return "NotFound" # model.open_folder() - # Saving new decisions in the Json file assigned to each workflow def saveNewDecision(self, offloadingCandidates): self.workflow_json[ From 73eee2149c976397fa2d6bcef1347cced25d639e Mon Sep 17 00:00:00 2001 From: engshahrad Date: Wed, 25 Oct 2023 04:42:38 +0000 Subject: [PATCH 4/4] bug fix for #111 --- scheduler/rpsCIScheduler.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/scheduler/rpsCIScheduler.py b/scheduler/rpsCIScheduler.py index ecea5ea..f76f98b 100644 --- a/scheduler/rpsCIScheduler.py +++ b/scheduler/rpsCIScheduler.py @@ -304,13 +304,16 @@ def resolveOffloadingSolutions(self): triggerType = "resolve" checkingFlag = True logging.info("Forced to change to resolve!!!") - print("LOCK CREATED!!!") pid = os.getpid() logging.info(str(pid)) logging.info("LOCK CREATED!!!") logging.info(str(datetime.datetime.now())) # triggerType = "resolve" - solver = CIScheduler(triggerType) + try: + solver = CIScheduler(triggerType) + except: + logging.info("Scheduler failed to run!") + print("ERROR: Scheduler failed to run!") os.remove(str(Path(os.path.dirname(os.path.abspath(__file__)))) + "/lock.txt") if ( os.path.exists(