From ff8269609cdec162446aa709af66fdd6f587da30 Mon Sep 17 00:00:00 2001 From: AkkiG2401 <72610152+AkkiG2401@users.noreply.github.com> Date: Mon, 23 Oct 2023 12:37:51 +0100 Subject: [PATCH] Add Flags for Energy Profilers - Issue #311 (#313) * Add Flags for Energy Profilers - Issue #311 * Fix Black Lint Issue and Convert Profiler Functions to wrapper functions. Clean up and Improve Readability of Code. --- baler/baler.py | 58 +++++++++++- baler/modules/helper.py | 16 +++- baler/modules/profiling.py | 181 ++++++++++++++++++------------------- 3 files changed, 157 insertions(+), 98 deletions(-) diff --git a/baler/baler.py b/baler/baler.py index af8e0e39..470f4c6c 100644 --- a/baler/baler.py +++ b/baler/baler.py @@ -38,7 +38,7 @@ def main(): """Calls different functions depending on argument parsed in command line. - if --mode=newProject: call `helper.create_new_project` and create a new project sub directory with config file - - if --mode=train: call `perform_training` and train the network on given data and based on the config file + - if --mode=train: call `perform_training` and train the network on given data and based on the config file and check if profilers are enabled - if --mode=compress: call `perform_compression` and compress the given data using the model trained in `--mode=train` - if --mode=decompress: call `perform_decompression` and decompress the compressed file outputted from `--mode=compress` - if --mode=plot: call `perform_plotting` and plot the comparison between the original data and the decompressed data from `--mode=decompress`. Also plots the loss plot from the trained network. @@ -48,14 +48,29 @@ def main(): Raises: NameError: Raises error if the chosen mode does not exist. """ - config, mode, workspace_name, project_name, verbose = helper.get_arguments() + ( + config, + mode, + workspace_name, + project_name, + verbose, + pytorch_profile, + energy_profile, + ) = helper.get_arguments() project_path = os.path.join("workspaces", workspace_name, project_name) output_path = os.path.join(project_path, "output") if mode == "newProject": helper.create_new_project(workspace_name, project_name, verbose) elif mode == "train": - perform_training(output_path, config, verbose) + check_enabled_profilers( + perform_training, + pytorch_profile, + energy_profile, + output_path, + config, + verbose, + ) elif mode == "diagnose": perform_diagnostics(output_path, verbose) elif mode == "compress": @@ -76,8 +91,41 @@ def main(): ) -@pytorch_profile -@energy_profiling(project_name="baler_training", measure_power_secs=1) +def check_enabled_profilers( + f, pytorchProfile=False, energyProfile=False, *args, **kwargs +): + """ + Conditionally apply profiling based on the given boolean flags. + + Args: + f (callable): The function to be potentially profiled. + pytorchProfile (bool): Whether to apply PyTorch profiling. + energyProfile (bool): Whether to apply energy profiling. + + Returns: + result: The result of the function `f` execution. + """ + + # Placeholder function to avoid nested conditions + def identity_func(fn, *a, **kw): + return fn(*a, **kw) + + # Set the outer and inner functions based on the flags + inner_function = pytorch_profile if pytorchProfile else identity_func + outer_function = ( + ( + lambda fn: energy_profiling( + fn, project_name="baler_training", measure_power_secs=1 + ) + ) + if energyProfile + else identity_func + ) + + # Nest the profiling steps and run the function only once + return outer_function(lambda: inner_function(f, *args, **kwargs))() + + def perform_training(output_path, config, verbose: bool): """Main function calling the training functions, ran when --mode=train is selected. The three functions called are: `helper.process`, `helper.mode_init` and `helper.training`. diff --git a/baler/modules/helper.py b/baler/modules/helper.py index 5e566085..38b7be30 100644 --- a/baler/modules/helper.py +++ b/baler/modules/helper.py @@ -73,6 +73,12 @@ def get_arguments(): " 2. If workspace exists but project does not, create project in workspace.\n" " 3. If workspace does not exist, create workspace directory and project.", ) + parser.add_argument( + "--pytorchProfile", action="store_true", help="Enable PyTorch profiling" + ) + parser.add_argument( + "--energyProfile", action="store_true", help="Enable Energy profiling" + ) parser.add_argument( "--verbose", dest="verbose", action="store_true", help="Verbose mode" ) @@ -92,7 +98,15 @@ def get_arguments(): config = Config importlib.import_module(config_path).set_config(config) - return config, args.mode, workspace_name, project_name, args.verbose + return ( + config, + args.mode, + workspace_name, + project_name, + args.verbose, + args.pytorchProfile, + args.energyProfile, + ) def create_new_project( diff --git a/baler/modules/profiling.py b/baler/modules/profiling.py index fba9e47d..4613537e 100644 --- a/baler/modules/profiling.py +++ b/baler/modules/profiling.py @@ -7,110 +7,107 @@ import codecarbon -def pytorch_profile(f): - """This function perform the pytorch profiling - of CPU, GPU time and memory consumed of the function f execution. - Args: - f (_type_): decorated function +def pytorch_profile(f, *args, **kwargs): """ + This function performs PyTorch profiling of CPU, GPU time and memory + consumed by the function f execution. - def inner_function(*args, **kwargs): - """Wrapper for the function f - Returns: - _type_: _description_ - """ - if torch.cuda.is_available(): - activities = [ProfilerActivity.CPU, ProfilerActivity.CUDA] - else: - activities = [ProfilerActivity.CPU] - # Start profiler before the function will be executed - with profile( - activities=activities, - on_trace_ready=torch.profiler.tensorboard_trace_handler( - "log/baler", worker_name="worker0" - ), - schedule=torch.profiler.schedule(wait=0, warmup=0, active=1, repeat=1), - record_shapes=True, - with_stack=True, - profile_memory=True, - ) as prof: - with record_function(f"{f.__name__}"): - # Call the decorated function - val = f(*args, **kwargs) - prof.step() - prof.stop() - # Print the CPU time for each torch operation - print(prof.key_averages().table(sort_by="cpu_time_total")) - # Store the information about CPU and GPU usage - if torch.cuda.is_available(): - prof.export_stacks("profiler_stacks.json", "self_cuda_time_total") - # Store the results to the .json file - prof.export_stacks("/tmp/profiler_stacks.json", "self_cpu_time_total") - return val - - return inner_function - - -def energy_profiling(project_name, measure_power_secs): - """Energy Profiling measure the amount of electricity that - was consumed by decorated function f and amount of CO(2) emission. - It utilize the codecarbon package for tracking this information. Args: - f (_type_): decorated function + f (callable): The function to be profiled. + + Returns: + result: The result of the function `f` execution. + """ + + if torch.cuda.is_available(): + activities = [ProfilerActivity.CPU, ProfilerActivity.CUDA] + else: + activities = [ProfilerActivity.CPU] + + # Start profiler before the function will be executed + with profile( + activities=activities, + on_trace_ready=torch.profiler.tensorboard_trace_handler( + "log/baler", worker_name="worker0" + ), + schedule=torch.profiler.schedule(wait=0, warmup=0, active=1, repeat=1), + record_shapes=True, + with_stack=True, + profile_memory=True, + ) as prof: + with record_function(f"{f.__name__}"): + # Call the function + result = f(*args, **kwargs) + prof.step() + prof.stop() + + # Print the CPU time for each torch operation + print(prof.key_averages().table(sort_by="cpu_time_total")) + + # Store the information about CPU and GPU usage + if torch.cuda.is_available(): + prof.export_stacks("profiler_stacks.json", "self_cuda_time_total") + + # Store the results to the .json file + prof.export_stacks("/tmp/profiler_stacks.json", "self_cpu_time_total") + + return result + + +def energy_profiling(f, project_name, measure_power_secs, *args, **kwargs): """ + Energy Profiling measures the amount of electricity that + was consumed by the given function f and the amount of CO2 emission. + It utilizes the codecarbon package for tracking this information. - def decorator(f): - """Wrapper for the inner function f - Args: - f (_type_): _description_ - """ + Args: + f (callable): The function to be profiled. + project_name (str): The name of the project. + measure_power_secs (int): The number of seconds to measure power. - def inner_function(*args, **kwargs): - """_summary_ + Returns: + result: The result of the function `f` execution. + """ - Returns: - _type_: _description_ - """ - tracker = codecarbon.EmissionsTracker( - project_name=project_name, measure_power_secs=measure_power_secs - ) - tracker.start_task(f"{f.__name__}") - val = f(*args, **kwargs) - emissions = tracker.stop_task() - print("CO2 emission [kg]: ", emissions.emissions) - print("CO2 emission rate [kg/h]: ", 3600 * emissions.emissions_rate) - print("CPU energy consumed [kWh]: ", emissions.cpu_energy) - print("GPU energy consumed [kWh]: ", emissions.gpu_energy) - print("RAM energy consumed [kWh]: ", emissions.ram_energy) - return val + tracker = codecarbon.EmissionsTracker( + project_name=project_name, measure_power_secs=measure_power_secs + ) + tracker.start_task(f"{f.__name__}") - return inner_function + # Execute the function and get its result + result = f(*args, **kwargs) - return decorator + emissions = tracker.stop_task() + print("CO2 emission [kg]: ", emissions.emissions) + print("CO2 emission rate [kg/h]: ", 3600 * emissions.emissions_rate) + print("CPU energy consumed [kWh]: ", emissions.cpu_energy) + print("GPU energy consumed [kWh]: ", emissions.gpu_energy) + print("RAM energy consumed [kWh]: ", emissions.ram_energy) + return result -def c_profile(func): - """Profile the function func with cProfile + +def c_profile(func, *args, **kwargs): + """ + Profile the function func with cProfile. Args: - func (_type_): _description_ + func (callable): The function to be profiled. + + Returns: + result: The result of the function `func` execution. """ - def wrapper(*args, **kwargs): - """_summary_ - - Returns: - _type_: _description_ - """ - pr = cProfile.Profile() - pr.enable() - retval = func(*args, **kwargs) - pr.disable() - s = io.StringIO() - sortby = SortKey.CUMULATIVE # 'cumulative' - ps = pstats.Stats(pr, stream=s).sort_stats(sortby) - ps.print_stats() - print(s.getvalue()) - return retval - - return wrapper + pr = cProfile.Profile() + pr.enable() + # Execute the function and get its result + result = func(*args, **kwargs) + pr.disable() + + s = io.StringIO() + sortby = SortKey.CUMULATIVE + ps = pstats.Stats(pr, stream=s).sort_stats(sortby) + ps.print_stats() + print(s.getvalue()) + + return result