From dcbf7077a88596891f7b3d514b1b4883a75e2852 Mon Sep 17 00:00:00 2001 From: timroepcke Date: Tue, 5 Jul 2022 11:10:13 +0200 Subject: [PATCH] changes for BVG files --- ebus_toolbox/schedule.py | 101 +++++++++++++++++---------------------- ebus_toolbox/simulate.py | 29 ++++++----- 2 files changed, 63 insertions(+), 67 deletions(-) diff --git a/ebus_toolbox/schedule.py b/ebus_toolbox/schedule.py index 66b4d889..e6fd2678 100644 --- a/ebus_toolbox/schedule.py +++ b/ebus_toolbox/schedule.py @@ -2,11 +2,11 @@ import json import random import datetime -import itertools -from os import path, makedirs +from os import path from datetime import timedelta from ebus_toolbox.rotation import Rotation +from src.scenario import Scenario class Schedule: @@ -191,7 +191,7 @@ def get_arrival_of_last_trip(self): sorted_rotations = sorted(self.rotations.values(), key=lambda rot: rot.arrival_time) return sorted_rotations[-1].arrival_time - def readjust_charging_type(self, args): + def readjust_charging_type(self, args, scenario): """ Loads rotations with negative soc from spice_ev results and adjusts charging type from depb to oppb and vice versa. @@ -199,8 +199,10 @@ def readjust_charging_type(self, args): # todo: all together, as they may influence one another :param args: Command line arguments and/or arguments from config file. :type args: argparse.Namespace + :param scenario: Scenario object containing results a completed simulation + :type scenario: spice_ev.Scenario """ - negative_rotations = self.get_negative_rotations(args) + negative_rotations = self.get_negative_rotations(scenario) print(f"Rotations {negative_rotations} have negative SoC.") print("Adjust charging types for rotations with negative soc.") @@ -212,42 +214,21 @@ def readjust_charging_type(self, args): else: self.set_charging_type("depb", args, [rot]) - def get_negative_rotations(self, args): + def get_negative_rotations(self, scenario): """ Get rotations with negative soc from spice_ev outputs - :param args: Command line arguments and/or arguments from config file. - :type args: argparse.Namespace + :param scenario: Simulation scenario containing simulation results + including the SoC of all vehicles over time + :type scenario: spice_ev.Scenario :return: list of negative rotation_id's :rtype: list - - :raises TypeError: If args.save_results is not set. """ - # load any json output file of sice_ev - gcIDs = list(self.scenario["constants"]["grid_connectors"].keys()) - try: - filename, ext = path.splitext(args.save_results) - except TypeError: - raise TypeError("In order to get negative totations from spice_ev results, please " - "specify 'save_results' in your input arguments.") - # in spiceEV there is one results file for each grid connector and the filename indicates - # which results file corresponds to which grid connector - # in case there is only one grid connector the filename does not mention the grid connector - if len(gcIDs) > 1: - # every gc results file contains the same info with regard to negative socs - # so just pick the first one - filename = f"{filename}_{gcIDs[0]}{ext}" - else: - filename = args.save_results - - with open(filename) as f: - results = json.load(f) - # get dict of vehicles with negative soc's try: - negative_vehicles = results["vehicles with negative soc"] - except KeyError: + negative_vehicles = scenario.negative_soc_tracker + except AttributeError: return [] # get matching rotations negative_rotations = [] @@ -258,11 +239,14 @@ def get_negative_rotations(self, args): time <= v.arrival_time][0]) return negative_rotations - def generate_scenario_json(self, args): + def generate_scenario(self, args): """ Generate scenario.json for spiceEV :param args: Command line arguments and/or arguments from config file. :type args: argparse.Namespace + :return: A spiceEV Scenario instance that can be run and also collects all + simulation outputs. + :rtype: spice_ev.src.Scenario """ # load stations file if args.electrified_stations is None: @@ -309,6 +293,19 @@ def generate_scenario_json(self, args): "soc": args.desired_soc, "vehicle_type": vt + "_" + ct } + charging_stations[vehicles[v_name]['connected_charging_station']] = { + "max_power": args.gc_power_deps, + "min_power": 0.1 * args.gc_power_deps, + "parent": first_rotation.departure_name + } + number_cs = stations_dict[first_rotation.departure_name]["n_charging_stations"] + # add one grid connector for first bus station + number_cs = None if number_cs == 'None' else number_cs + grid_connectors[first_rotation.departure_name] = { + "max_power": args.gc_power_opps, + "cost": {"type": "fixed", "value": 0.3}, + "number_cs": number_cs + } for i, v in enumerate(rotation_ids): departure_event_in_input = True @@ -574,7 +571,7 @@ def generate_scenario_json(self, args): "charging_curve": [[0, max_power], [1, max_power]] } # create final dict - j = { + self.scenario = { "scenario": { "start_time": start.isoformat(), "interval": interval.days * 24 * 60 + interval.seconds // 60, @@ -589,36 +586,28 @@ def generate_scenario_json(self, args): }, "events": events } - # Write JSON - self.scenario = j - makedirs(args.output_directory, exist_ok=True) - with open(args.input, 'w+') as f: - json.dump(j, f, indent=2) - def generate_rotations_overview(self, args): + return Scenario(self.scenario, args.output_directory) + + def generate_rotations_overview(self, scenario, args): rotation_infos = [] - negative_rotations = self.get_negative_rotations(args) + + negative_rotations = self.get_negative_rotations(scenario) + + interval = timedelta(minutes=args.interval) sim_start_time = \ self.get_departure_of_first_trip() - timedelta(minutes=args.signal_time_dif) for id, rotation in self.rotations.items(): # get SOC timeseries for this rotation - socs = [] vehicle_id = rotation.vehicle_id - with open(path.join(args.output_directory, 'simulation_soc_spiceEV.csv')) as f: - reader = csv.DictReader(f) - - interval = timedelta(minutes=args.interval) - read_start_time = rotation.departure_time - duration = rotation.arrival_time-rotation.departure_time - - start_reading = (read_start_time - sim_start_time) // interval - end_reading = start_reading + (duration // interval) - - reader = itertools.islice(reader, start_reading, end_reading) - for line in reader: - socs.append(line[vehicle_id]) + # get soc timeseries for current rotation + vehicle_soc = scenario.vehicle_socs[vehicle_id] + start_idx = (rotation.departure_time - sim_start_time) // interval + end_idx = start_idx + ((rotation.arrival_time-rotation.departure_time) // interval) + rotation_soc_ts = vehicle_soc[start_idx:end_idx] + rotation_soc_ts = [1 if x is None else x for x in rotation_soc_ts] rotation_info = { "rotation_id": id, @@ -631,8 +620,8 @@ def generate_rotations_overview(self, args): "total_consumption_[kWh]": rotation.consumption, "distance": rotation.distance, "charging_type": rotation.charging_type, - "SOC_at_arrival": socs[-1], - "Minumum_SOC": min(socs), + "SOC_at_arrival": rotation_soc_ts[-1], + "Minumum_SOC": min(rotation_soc_ts), "Negative_SOC": 1 if id in negative_rotations else 0 } rotation_infos.append(rotation_info) diff --git a/ebus_toolbox/simulate.py b/ebus_toolbox/simulate.py index 7ff35548..01514512 100644 --- a/ebus_toolbox/simulate.py +++ b/ebus_toolbox/simulate.py @@ -5,8 +5,6 @@ from ebus_toolbox.schedule import Schedule from ebus_toolbox.trip import Trip from ebus_toolbox import report # , optimizer -# SPICE EV SIMULATE -import simulate as spice_ev def simulate(args): @@ -32,26 +30,35 @@ def simulate(args): schedule.set_charging_type(preferred_ct=args.preferred_charging_type, args=args) for i in range(args.iterations): - # construct szenario and simulate in spice ev until optimizer is happy - # if optimizer None, quit after single iteration + # (re)calculate the change in SoC for every trip + # charging types may have changed which may impact battery capacity + # while mileage is assumed to stay constant schedule.delta_soc_all_trips() + + # each rotation is assigned a vehicle ID schedule.assign_vehicles() - # RUN SPICE EV - # write trips to csv in spiceEV format - schedule.generate_scenario_json(args) + scenario = schedule.generate_scenario(args) print("Running Spice EV...") with warnings.catch_warnings(): warnings.simplefilter('ignore', UserWarning) - spice_ev.simulate(args) + # for analyzes + # if flag_sensitivity == 1: + # for ix in range(1000): + # args = run_sensitivity(args, ix) + # scenario.run('distributed', vars(args).copy()) + # else: + # scenario.run('distributed', vars(args).copy()) + + scenario.run('distributed', vars(args).copy()) print(f"Spice EV simulation complete. (Iteration {i})") if i < args.iterations - 1: # TODO: replace with optimizer step in the future - schedule.readjust_charging_type(args) + schedule.readjust_charging_type(args, scenario) - print(f"Rotations {schedule.get_negative_rotations(args)} have negative SoC.") + print(f"Rotations {schedule.get_negative_rotations(scenario)} have negative SoC.") # create report - report.generate(schedule, args) + report.generate(schedule, scenario, args)