From cfd00188cfdbbe1cfb6e1ae22355e919681cee2c Mon Sep 17 00:00:00 2001 From: mosc5 Date: Mon, 13 Feb 2023 10:19:59 +0100 Subject: [PATCH] run new function with multiprocessing and add config option #98 --- scenarios/default/configs/default.cfg | 3 +- scenarios/default_RS7/configs/default.cfg | 3 +- simbev/__main__.py | 2 +- simbev/simbev_class.py | 34 +++++++++++++---------- 4 files changed, 25 insertions(+), 17 deletions(-) diff --git a/scenarios/default/configs/default.cfg b/scenarios/default/configs/default.cfg index f0a3fc3..601bbe1 100644 --- a/scenarios/default/configs/default.cfg +++ b/scenarios/default/configs/default.cfg @@ -62,6 +62,7 @@ hpc_data = hpc_config.csv # simulation parameters scaling = 1 -num_threads = 4 +region_threads = 1 +car_threads = 5 seed = 3 private_only_run = false \ No newline at end of file diff --git a/scenarios/default_RS7/configs/default.cfg b/scenarios/default_RS7/configs/default.cfg index 419ba6e..fcc2ce3 100644 --- a/scenarios/default_RS7/configs/default.cfg +++ b/scenarios/default_RS7/configs/default.cfg @@ -60,6 +60,7 @@ hpc_data = hpc_config.csv # simulation parameters scaling = 1 -num_threads = 4 +region_threads = 7 +car_threads = 2 seed = 3 private_only_run = false \ No newline at end of file diff --git a/simbev/__main__.py b/simbev/__main__.py index b5562a1..bf427c8 100644 --- a/simbev/__main__.py +++ b/simbev/__main__.py @@ -25,7 +25,7 @@ def main(): # run simulation with optional timing helpers.timeitlog(simbev_obj.output_options["timing"], simbev_obj.save_directory)( - simbev_obj.run_multi + simbev_obj.run )() helpers.export_metadata(simbev_obj, cfg) diff --git a/simbev/simbev_class.py b/simbev/simbev_class.py index 4e59534..0b701cd 100644 --- a/simbev/simbev_class.py +++ b/simbev/simbev_class.py @@ -47,7 +47,8 @@ def __init__(self, data_dict, config_dict, name): self.energy_min = data_dict["energy_min"] self.private_only_run = config_dict["private_only_run"] - self.num_threads = config_dict["num_threads"] + self.region_threads = config_dict["region_threads"] + self.car_threads = config_dict["car_threads"] self.output_options = config_dict["output_options"] self.input_type = config_dict["input_type"] @@ -192,25 +193,25 @@ def _add_regions_from_dataframe(self): ) self.regions.append(new_region) - def run_multi(self): + def run(self): """Runs Simulation for multiprocessing""" print( "Scaling set to {}: 1 simulated vehicle represents {} vehicles in grid time series".format( self.scaling, self.scaling ) ) - self.num_threads = min(self.num_threads, len(self.regions)) - if self.num_threads == 1: + self.region_threads = min(self.region_threads, len(self.regions)) + if self.region_threads == 1: for region in self.regions: - grid_data = self.run(region) + grid_data = self.run_region(region) self._log_grid_data(grid_data) else: - pool = mp.Pool(processes=self.num_threads) + pool = mp.Pool(processes=self.region_threads) for region in self.regions: - pool.apply_async(self.run, (region,), callback=self._log_grid_data) + pool.apply_async(self.run_region, (region,), callback=self._log_grid_data) pool.close() pool.join() grid_time_series_all_regions = helpers.timeitlog( @@ -219,7 +220,7 @@ def run_multi(self): if self.output_options["region_plot"] or self.output_options["collective_plot"]: plot.plot_gridtimeseries_by_usecase(self, grid_time_series_all_regions) - def run(self, region): + def run_region(self, region): """Runs Simulation for single-processing Parameters @@ -233,7 +234,7 @@ def run(self, region): Returns grid-data for current region. """ - if self.num_threads == 1: + if self.region_threads == 1: print( f"===== Region: {region.id} ({region.number + 1}/{len(self.regions)}) =====" ) @@ -247,8 +248,13 @@ def run(self, region): cars_simulated = 0 # exception_count = 0 for car_type_name, car_count in region.car_dict.items(): + pool = mp.Pool(processes=self.car_threads) for car_number in range(car_count): - self.run_car(car_type_name, car_number, region, cars_simulated, region_directory) + + for region in self.regions: + pool.apply_async(self.run_car, (car_type_name, car_number, region, cars_simulated, region_directory)) + pool.close() + pool.join() cars_simulated += car_number + 1 # if self.private_only_run: # print( @@ -273,7 +279,6 @@ def run_car(self, car_type_name, car_number, region, cars_simulated, region_dire # Create new car car_type = self.car_types[car_type_name] # create new car objects - # TODO: parking parameters that change by region work_parking = ( self.work_parking[region.region_type.rs7_type] >= self.rng.random() ) @@ -305,7 +310,7 @@ def run_car(self, car_type_name, car_number, region, cars_simulated, region_dire soc_init, ) - if self.num_threads == 1: + if self.region_threads == 1 and self.car_threads == 1: print( "\r{}% {} {} / {}".format( round( @@ -332,7 +337,7 @@ def run_car(self, car_type_name, car_number, region, cars_simulated, region_dire self.simulate_car(private_car, region) car = private_car except SoCError: - exception_count += 1 + # exception_count += 1 self.simulate_car(car, region) else: self.simulate_car(car, region) @@ -666,7 +671,8 @@ def from_config(cls, config_path): "scenario_path": scenario_path, "input_type": cfg["basic"]["input_type"], "input_directory": cfg["basic"]["input_directory"], - "num_threads": cfg.getint("sim_params", "num_threads", fallback=1), + "region_threads": cfg.getint("sim_params", "region_threads", fallback=1), + "car_threads": cfg.getint("sim_params", "car_threads", fallback=1), "output_options": output_options, "private_only_run": cfg.getboolean( "sim_params", "private_only_run", fallback=False