Skip to content

Commit

Permalink
changes for BVG files
Browse files Browse the repository at this point in the history
  • Loading branch information
timroepcke committed Jul 5, 2022
1 parent 66159fb commit dcbf707
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 67 deletions.
101 changes: 45 additions & 56 deletions ebus_toolbox/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
import json
import random
import datetime
import itertools
from os import path, makedirs
from os import path
from datetime import timedelta

from ebus_toolbox.rotation import Rotation
from src.scenario import Scenario


class Schedule:
Expand Down Expand Up @@ -191,16 +191,18 @@ def get_arrival_of_last_trip(self):
sorted_rotations = sorted(self.rotations.values(), key=lambda rot: rot.arrival_time)
return sorted_rotations[-1].arrival_time

def readjust_charging_type(self, args):
def readjust_charging_type(self, args, scenario):
"""
Loads rotations with negative soc from spice_ev results and adjusts charging type from
depb to oppb and vice versa.
# todo: it would maybe make sense to change each rotations charging type at a time and not
# todo: all together, as they may influence one another
:param args: Command line arguments and/or arguments from config file.
:type args: argparse.Namespace
:param scenario: Scenario object containing results a completed simulation
:type scenario: spice_ev.Scenario
"""
negative_rotations = self.get_negative_rotations(args)
negative_rotations = self.get_negative_rotations(scenario)

print(f"Rotations {negative_rotations} have negative SoC.")
print("Adjust charging types for rotations with negative soc.")
Expand All @@ -212,42 +214,21 @@ def readjust_charging_type(self, args):
else:
self.set_charging_type("depb", args, [rot])

def get_negative_rotations(self, args):
def get_negative_rotations(self, scenario):
"""
Get rotations with negative soc from spice_ev outputs
:param args: Command line arguments and/or arguments from config file.
:type args: argparse.Namespace
:param scenario: Simulation scenario containing simulation results
including the SoC of all vehicles over time
:type scenario: spice_ev.Scenario
:return: list of negative rotation_id's
:rtype: list
:raises TypeError: If args.save_results is not set.
"""

# load any json output file of sice_ev
gcIDs = list(self.scenario["constants"]["grid_connectors"].keys())
try:
filename, ext = path.splitext(args.save_results)
except TypeError:
raise TypeError("In order to get negative totations from spice_ev results, please "
"specify 'save_results' in your input arguments.")
# in spiceEV there is one results file for each grid connector and the filename indicates
# which results file corresponds to which grid connector
# in case there is only one grid connector the filename does not mention the grid connector
if len(gcIDs) > 1:
# every gc results file contains the same info with regard to negative socs
# so just pick the first one
filename = f"{filename}_{gcIDs[0]}{ext}"
else:
filename = args.save_results

with open(filename) as f:
results = json.load(f)

# get dict of vehicles with negative soc's
try:
negative_vehicles = results["vehicles with negative soc"]
except KeyError:
negative_vehicles = scenario.negative_soc_tracker
except AttributeError:
return []
# get matching rotations
negative_rotations = []
Expand All @@ -258,11 +239,14 @@ def get_negative_rotations(self, args):
time <= v.arrival_time][0])
return negative_rotations

def generate_scenario_json(self, args):
def generate_scenario(self, args):
""" Generate scenario.json for spiceEV
:param args: Command line arguments and/or arguments from config file.
:type args: argparse.Namespace
:return: A spiceEV Scenario instance that can be run and also collects all
simulation outputs.
:rtype: spice_ev.src.Scenario
"""
# load stations file
if args.electrified_stations is None:
Expand Down Expand Up @@ -309,6 +293,19 @@ def generate_scenario_json(self, args):
"soc": args.desired_soc,
"vehicle_type": vt + "_" + ct
}
charging_stations[vehicles[v_name]['connected_charging_station']] = {
"max_power": args.gc_power_deps,
"min_power": 0.1 * args.gc_power_deps,
"parent": first_rotation.departure_name
}
number_cs = stations_dict[first_rotation.departure_name]["n_charging_stations"]
# add one grid connector for first bus station
number_cs = None if number_cs == 'None' else number_cs
grid_connectors[first_rotation.departure_name] = {
"max_power": args.gc_power_opps,
"cost": {"type": "fixed", "value": 0.3},
"number_cs": number_cs
}

for i, v in enumerate(rotation_ids):
departure_event_in_input = True
Expand Down Expand Up @@ -574,7 +571,7 @@ def generate_scenario_json(self, args):
"charging_curve": [[0, max_power], [1, max_power]]
}
# create final dict
j = {
self.scenario = {
"scenario": {
"start_time": start.isoformat(),
"interval": interval.days * 24 * 60 + interval.seconds // 60,
Expand All @@ -589,36 +586,28 @@ def generate_scenario_json(self, args):
},
"events": events
}
# Write JSON
self.scenario = j
makedirs(args.output_directory, exist_ok=True)
with open(args.input, 'w+') as f:
json.dump(j, f, indent=2)

def generate_rotations_overview(self, args):
return Scenario(self.scenario, args.output_directory)

def generate_rotations_overview(self, scenario, args):
rotation_infos = []
negative_rotations = self.get_negative_rotations(args)

negative_rotations = self.get_negative_rotations(scenario)

interval = timedelta(minutes=args.interval)
sim_start_time = \
self.get_departure_of_first_trip() - timedelta(minutes=args.signal_time_dif)

for id, rotation in self.rotations.items():
# get SOC timeseries for this rotation
socs = []
vehicle_id = rotation.vehicle_id
with open(path.join(args.output_directory, 'simulation_soc_spiceEV.csv')) as f:
reader = csv.DictReader(f)

interval = timedelta(minutes=args.interval)
read_start_time = rotation.departure_time
duration = rotation.arrival_time-rotation.departure_time

start_reading = (read_start_time - sim_start_time) // interval
end_reading = start_reading + (duration // interval)

reader = itertools.islice(reader, start_reading, end_reading)

for line in reader:
socs.append(line[vehicle_id])
# get soc timeseries for current rotation
vehicle_soc = scenario.vehicle_socs[vehicle_id]
start_idx = (rotation.departure_time - sim_start_time) // interval
end_idx = start_idx + ((rotation.arrival_time-rotation.departure_time) // interval)
rotation_soc_ts = vehicle_soc[start_idx:end_idx]
rotation_soc_ts = [1 if x is None else x for x in rotation_soc_ts]

rotation_info = {
"rotation_id": id,
Expand All @@ -631,8 +620,8 @@ def generate_rotations_overview(self, args):
"total_consumption_[kWh]": rotation.consumption,
"distance": rotation.distance,
"charging_type": rotation.charging_type,
"SOC_at_arrival": socs[-1],
"Minumum_SOC": min(socs),
"SOC_at_arrival": rotation_soc_ts[-1],
"Minumum_SOC": min(rotation_soc_ts),
"Negative_SOC": 1 if id in negative_rotations else 0
}
rotation_infos.append(rotation_info)
Expand Down
29 changes: 18 additions & 11 deletions ebus_toolbox/simulate.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
from ebus_toolbox.schedule import Schedule
from ebus_toolbox.trip import Trip
from ebus_toolbox import report # , optimizer
# SPICE EV SIMULATE
import simulate as spice_ev


def simulate(args):
Expand All @@ -32,26 +30,35 @@ def simulate(args):
schedule.set_charging_type(preferred_ct=args.preferred_charging_type, args=args)

for i in range(args.iterations):
# construct szenario and simulate in spice ev until optimizer is happy
# if optimizer None, quit after single iteration
# (re)calculate the change in SoC for every trip
# charging types may have changed which may impact battery capacity
# while mileage is assumed to stay constant
schedule.delta_soc_all_trips()

# each rotation is assigned a vehicle ID
schedule.assign_vehicles()

# RUN SPICE EV
# write trips to csv in spiceEV format
schedule.generate_scenario_json(args)
scenario = schedule.generate_scenario(args)

print("Running Spice EV...")
with warnings.catch_warnings():
warnings.simplefilter('ignore', UserWarning)
spice_ev.simulate(args)
# for analyzes
# if flag_sensitivity == 1:
# for ix in range(1000):
# args = run_sensitivity(args, ix)
# scenario.run('distributed', vars(args).copy())
# else:
# scenario.run('distributed', vars(args).copy())

scenario.run('distributed', vars(args).copy())
print(f"Spice EV simulation complete. (Iteration {i})")

if i < args.iterations - 1:
# TODO: replace with optimizer step in the future
schedule.readjust_charging_type(args)
schedule.readjust_charging_type(args, scenario)

print(f"Rotations {schedule.get_negative_rotations(args)} have negative SoC.")
print(f"Rotations {schedule.get_negative_rotations(scenario)} have negative SoC.")

# create report
report.generate(schedule, args)
report.generate(schedule, scenario, args)

0 comments on commit dcbf707

Please sign in to comment.