Skip to content

Commit

Permalink
Implement review changes.
Browse files Browse the repository at this point in the history
  • Loading branch information
PaulScheerRLI committed Jun 13, 2024
1 parent abe3bec commit d4d1ce3
Show file tree
Hide file tree
Showing 11 changed files with 164 additions and 154 deletions.
74 changes: 27 additions & 47 deletions simba/data_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,28 @@

import pandas as pd

from simba import util
from simba import util, ids
from simba.ids import INCLINE, LEVEL_OF_LOADING, SPEED, T_AMB, TEMPERATURE, CONSUMPTION


class DataContainer:
def __init__(self):
# Dictionary of dict[VehicleTypeName][ChargingType] containing the vehicle_info dictionary
self.vehicle_types_data: Dict[str, any] = {}
# Dictionary of dict[consumption_lookup_name] containing a consumption lookup
self.consumption_data: Dict[str, pd.DataFrame] = {}
# Dictionary of dict[hour_of_day] containing temperature in °C
self.temperature_data: Dict[int, float] = {}
# Dictionary of dict[hour_of_day] containing level of loading [-]
self.level_of_loading_data: Dict[int, float] = {}
# Dictionary of dict[station_name] containing information about electrification
self.stations_data: Dict[str, dict] = {}
# Dictionary containing various infos about investment costs and grid operator
self.cost_parameters_data: Dict[str, dict] = {}
# Dictionary containing all stations and their geo location (lng,lat,elevation)
self.station_geo_data: Dict[str, dict] = {}

# List of trip dictionaries containing trip information like arrival time and station
# departure time and station, distance and more
self.trip_data: [dict] = []

def fill_with_args(self, args: argparse.Namespace) -> 'DataContainer':
Expand Down Expand Up @@ -57,8 +65,8 @@ def add_trip_data_from_csv(self, file_path: Path) -> 'DataContainer':
trip_d = dict(trip)
trip_d["arrival_time"] = datetime.datetime.fromisoformat(trip["arrival_time"])
trip_d["departure_time"] = datetime.datetime.fromisoformat(trip["departure_time"])
trip_d[LEVEL_OF_LOADING] = cast_float_or_none(trip.get(LEVEL_OF_LOADING))
trip_d[TEMPERATURE] = cast_float_or_none(trip.get(TEMPERATURE))
trip_d[LEVEL_OF_LOADING] = util.cast_float_or_none(trip.get(LEVEL_OF_LOADING))
trip_d[TEMPERATURE] = util.cast_float_or_none(trip.get(TEMPERATURE))
trip_d["distance"] = float(trip["distance"])
self.trip_data.append(trip_d)
return self
Expand Down Expand Up @@ -123,23 +131,27 @@ def add_station_geo_data_from_csv(self, file_path: Path) -> 'DataContainer':
# this data is stored in the schedule and passed to the trips, which use the information
# for consumption calculation. Missing station data is handled with default values.
self.station_geo_data = dict()
line_num = None
try:
with open(file_path, "r", encoding='utf-8') as f:
delim = util.get_csv_delim(file_path)
reader = csv.DictReader(f, delimiter=delim)
for row in reader:
for line_num, row in enumerate(reader):
self.station_geo_data[str(row['Endhaltestelle'])] = {
"elevation": float(row['elevation']),
"lat": float(row.get('lat', 0)),
"long": float(row.get('long', 0)),
ids.ELEVATION: float(row[ids.ELEVATION]),
ids.LATITUDE: float(row.get(ids.LATITUDE, 0)),
ids.LONGITUDE: float(row.get(ids.LONGITUDE, 0)),
}
except (FileNotFoundError, KeyError):
logging.log(msg=f"External csv file {file_path} not found or not named properly. "
"(Needed column names are 'Endhaltestelle' and 'elevation')",
level=100)
raise
except ValueError:
logging.log(msg=f"External csv file {file_path} should only contain numeric data.",
line_num += 2
logging.log(msg=f"Can't parse numeric data in line {line_num} from file {file_path}.",
level=100)
raise
return self

def add_level_of_loading_data(self, data: dict) -> 'DataContainer':
Expand All @@ -156,7 +168,7 @@ def add_level_of_loading_data(self, data: dict) -> 'DataContainer':
def add_level_of_loading_data_from_csv(self, file_path: Path) -> 'DataContainer':
index = "hour"
column = "level_of_loading"
level_of_loading_data_dict = get_dict_from_csv(column, file_path, index)
level_of_loading_data_dict = util.get_dict_from_csv(column, file_path, index)
self.add_level_of_loading_data(level_of_loading_data_dict)
return self

Expand All @@ -179,7 +191,7 @@ def add_temperature_data_from_csv(self, file_path: Path) -> 'DataContainer':
"""
index = "hour"
column = "temperature"
temperature_data_dict = get_dict_from_csv(column, file_path, index)
temperature_data_dict = util.get_dict_from_csv(column, file_path, index)
self.add_temperature_data(temperature_data_dict)
return self

Expand Down Expand Up @@ -260,7 +272,7 @@ def get_json_from_file(file_path: Path, data_type: str) -> any:
with open(file_path, encoding='utf-8') as f:
return util.uncomment_json_file(f)
except FileNotFoundError:
raise FileNotFoundError(f"Path to {data_type} ({file_path}) "
raise FileNotFoundError(f"Path to {file_path} for {data_type} "
"does not exist. Exiting...")

def add_consumption_data_from_vehicle_type_linked_files(self):
Expand Down Expand Up @@ -291,9 +303,9 @@ def add_consumption_data(self, data_name, df: pd.DataFrame) -> 'DataContainer':
:type df: pd.DataFrame
:return: DatacContainer instance with added consumption data
"""

for expected_col in [INCLINE, T_AMB, LEVEL_OF_LOADING, SPEED, CONSUMPTION]:
assert expected_col in df.columns, f"Consumption data is missing {expected_col}"
missing_cols = [c for c in [INCLINE, T_AMB, LEVEL_OF_LOADING, SPEED, CONSUMPTION] if
c not in df.columns]
assert not missing_cols, f"Consumption data is missing {', '.join(missing_cols)}"
assert data_name not in self.consumption_data, f"{data_name} already exists in data"
self.consumption_data[data_name] = df

Expand All @@ -317,35 +329,3 @@ def get_values_from_nested_key(key, data: dict) -> list:
yield from get_values_from_nested_key(key, value)


def get_dict_from_csv(column, file_path, index):
""" Get a dictonary with the key of a numeric index and the value of a numeric column
:param column: column name for dictionary values. Content needs to be castable to float
:type column: str
:param file_path: file path
:type file_path: str or Path
:param index: column name of the index / keys of the dictionary.
Content needs to be castable to float
:return: dictionary with numeric keys of index and numeric values of column
"""
output = dict()
with open(file_path, "r") as f:
delim = util.get_csv_delim(file_path)
reader = csv.DictReader(f, delimiter=delim)
for row in reader:
output[float(row[index])] = float(row[column])
return output


def cast_float_or_none(val: any) -> any:
""" Cast a value to float. If a ValueError or TypeError is raised, None is returned
:param val: value to cast
:type val: any
:return: casted value
"""

try:
return float(val)
except (ValueError, TypeError):
return None
3 changes: 3 additions & 0 deletions simba/ids.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,6 @@
CONSUMPTION = "consumption_kwh_per_km"
VEHICLE_TYPE = "vehicle_type"
TEMPERATURE = "temperature"
LONGITUDE = "lng"
LATITUDE = "lat"
ELEVATION = "elevation"
2 changes: 1 addition & 1 deletion simba/optimizer_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ def get_groups_from_events(events, impossible_stations=None, could_not_be_electr
could_not_be_electrified.update([event.rotation.id])

groups = list(zip(event_groups, station_subsets))
# each event group should have events and stations. If not something went wrong.
# each event group should have events and stations. If not, something went wrong.
filtered_groups = list(filter(lambda x: len(x[0]) != 0 and len(x[1]) != 0, groups))
if len(filtered_groups) != len(groups):
if optimizer:
Expand Down
3 changes: 0 additions & 3 deletions simba/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,8 @@
from typing import Iterable

import matplotlib.pyplot as plt
import matplotlib
from spice_ev.report import aggregate_global_results, plot, generate_reports

matplotlib.use('Agg')


def open_for_csv(filepath):
""" Create a file handle to write to.
Expand Down
2 changes: 1 addition & 1 deletion simba/rotation.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def earliest_departure_next_rot(self):
def min_standing_time(self):
"""Minimum duration of standing time in minutes.
No consideration of depot buffer time or charging curve
No consideration of depot buffer time or charging curve.
:return: Minimum duration of standing time in minutes.
"""
Expand Down
91 changes: 43 additions & 48 deletions simba/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing import Dict, Type, Iterable

import simba.rotation
import spice_ev.strategy
from simba.consumption import Consumption
from simba.data_container import DataContainer
from simba import util, optimizer_util
Expand All @@ -20,17 +21,23 @@


class SocDispatcher:
"""Dispatches the right initial SoC for every vehicle id at scenario generation.
"""Initializes vehicles with specific SoCs at the start of their rotations.
Used for specific vehicle initialization for example when coupling tools."""
The first rotation of a vehicle is initialized, later rotations have their desired_soc at
departure changed.
Used for specific vehicle initialization, for example, when coupling tools."""

def __init__(self,
default_soc_deps: float,
default_soc_opps: float,
# vehicle_socs stores the departure soc of a rotation as a dict of the previous
# trip, since this is how the SpiceEV scenario is generated.
# The first trip of a vehicle has no previous trip and therefore is None
vehicle_socs: Dict[str, Type[Dict["simba.trip.Trip", float]]] = None):

def __init__(self, default_soc_deps, default_soc_opps, vehicle_socs=None):
"""
:param default_soc_deps: default desired SoC at departure for depot charger
:param default_soc_opps: default desired SoC at departure for opportunity charger
:param vehicle_socs: stores the desired departure SoC dict with the keys
[vehicle_id][previous_trip], since this is how the SpiceEV scenario is generated.
The first trip of a vehicle has no previous trip. In this case, the trip key is None.
:type vehicle_socs: Dict[str, Type[Dict["simba.trip.Trip", float]]]
:return: None
"""
self.default_soc_deps = default_soc_deps
self.default_soc_opps = default_soc_opps
self.vehicle_socs = {}
Expand Down Expand Up @@ -102,17 +109,13 @@ def from_datacontainer(cls, data: DataContainer, args):

for trip in data.trip_data:
rotation_id = trip['rotation_id']
# trip gets reference to station data and calculates height diff during trip
# initialization. Could also get the height difference from here on
# get average hour of trip if level of loading or temperature has to be read from
# auxiliary tabular data

# get average hour of trip and parse to string, since tabular data has strings
# as keys
hour = (trip["departure_time"] +
(trip["arrival_time"] - trip["departure_time"]) / 2).hour
# Get height difference from station_data

# Get height difference from station_data
trip["height_difference"] = schedule.get_height_difference(
trip["departure_name"], trip["arrival_name"])

Expand Down Expand Up @@ -223,7 +226,7 @@ def run(self, args, mode="distributed"):
"""
# Make sure all rotations have an assigned vehicle
assert all([rot.vehicle_id is not None for rot in self.rotations.values()])
assert mode in ["distributed", "greedy"]
assert mode in spice_ev.strategy.STRATEGIES
scenario = self.generate_scenario(args)

logging.info("Running SpiceEV...")
Expand Down Expand Up @@ -346,21 +349,20 @@ def assign_vehicles_w_min_recharge_soc(self):

self.vehicle_type_counts = vehicle_type_counts

def assign_vehicles_for_django(self, eflips_output: Iterable[dict]):
"""Assign vehicles based on eflips outputs
def assign_vehicles_custom(self, vehicle_assigns: Iterable[dict]):
""" Assign vehicles on a custom basis.
eflips couples vehicles and returns for every rotation the departure soc and vehicle id.
This is included into simba by assigning new vehicles with the respective values. I.e. in
simba every rotation gets a new vehicle.
:param eflips_output: output from eflips meant for simba. Iterable contains
rotation_id, vehicle_id and start_soc for each rotation
:type eflips_output: iterable of dataclass "simba_input"
Assign vehicles based on a datasource, containing all rotations, their vehicle_ids and
desired start socs.
:param vehicle_assigns: Iterable of dict with keys rotation_id, vehicle_id and start_soc
for each rotation
:type vehicle_assigns: Iterable[dict]
:raises KeyError: If not every rotation has a vehicle assigned to it
"""
eflips_rot_dict = {d["rot"]: {"v_id": d["v_id"], "soc": d["soc"]} for d in eflips_output}
unique_vids = {d["v_id"] for d in eflips_output}
eflips_rot_dict = {d["rot"]: {"v_id": d["v_id"], "soc": d["soc"]} for d in vehicle_assigns}
unique_vids = {d["v_id"] for d in vehicle_assigns}
vehicle_socs = {v_id: dict() for v_id in unique_vids}
eflips_vid_dict = {v_id: sorted([d["rot"] for d in eflips_output
eflips_vid_dict = {v_id: sorted([d["rot"] for d in vehicle_assigns
if d["v_id"] == v_id],
key=lambda r_id: self.rotations[r_id].departure_time)
for v_id in unique_vids}
Expand Down Expand Up @@ -402,17 +404,20 @@ def init_soc_dispatcher(self, args):
default_soc_opps=args.desired_soc_opps)

def assign_only_new_vehicles(self):
""" Assign new vehicle IDs to rotations
""" Assign a new vehicle to every rotation.
Iterate over all rotations and add a vehicle for each rotation. Vehicles are named on the
basis of their vehicle_type, charging type and current amount of vehicles with this
vehicle_type / charging type combination.
"""
# count number of vehicles per type
# used for unique vehicle id e.g. vehicletype_chargingtype_id
# Initialize counting of all vehicle_type / charging type combination
vehicle_type_counts = {f'{vehicle_type}_{charging_type}': 0
for vehicle_type, charging_types in self.vehicle_types.items()
for charging_type in charging_types.keys()}
rotations = sorted(self.rotations.values(), key=lambda rot: rot.departure_time)
for rot in rotations:
vt_ct = f"{rot.vehicle_type}_{rot.charging_type}"
# no vehicle available for dispatch, generate new one
# Generate a new vehicle
vehicle_type_counts[vt_ct] += 1
rot.vehicle_id = f"{vt_ct}_{vehicle_type_counts[vt_ct]}"
self.vehicle_type_counts = vehicle_type_counts
Expand Down Expand Up @@ -587,6 +592,7 @@ def calculate_rotation_consumption(self, rotation: Rotation):

def calculate_trip_consumption(self, trip: Trip):
""" Compute consumption for this trip.
:param trip: trip to calculate consumption for
:type trip: Trip
:return: Consumption of trip [kWh]
Expand Down Expand Up @@ -682,25 +688,24 @@ def get_common_stations(self, only_opps=True):
def get_height_difference(self, departure_name, arrival_name):
""" Get the height difference of two stations.
Defaults to 0 if height data is not found
:param departure_name: Departure station
:type departure_name: str
:param arrival_name: Arrival station
:type arrival_name: str
:return: Height difference
:return: Height difference. Defaults to 0 if height data is not found.
:rtype: float
"""
if isinstance(self.station_data, dict):
station = departure_name
station_name = departure_name
try:
start_height = self.station_data[station]["elevation"]
station = arrival_name
end_height = self.station_data[arrival_name]["elevation"]
start_height = self.station_data[station_name]["elevation"]
station_name = arrival_name
end_height = self.station_data[station_name]["elevation"]
return end_height - start_height
except KeyError:
logging.error(f"No elevation data found for {station}. Height Difference set to 0")
logging.error(f"No elevation data found for {station_name}. Height difference set to 0")
else:
logging.error("No Station Data found for schedule. Height Difference set to 0")
logging.error("No station data found for schedule. Height difference set to 0")
return 0

def get_negative_rotations(self, scenario):
Expand Down Expand Up @@ -1081,16 +1086,6 @@ def generate_scenario(self, args):
json.dump(self.scenario, f, indent=2)
return Scenario(self.scenario, Path())

@classmethod
def get_dict_from_csv(cls, column, file_path, index):
output = dict()
with open(file_path, "r") as f:
delim = util.get_csv_delim(file_path)
reader = csv.DictReader(f, delimiter=delim)
for row in reader:
output[float(row[index])] = float(row[column])
return output


def update_csv_file_info(file_info, gc_name):
"""
Expand Down
1 change: 1 addition & 0 deletions simba/simulate.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ def sim_greedy(schedule, scenario, args, _i):# Noqa

@staticmethod
def sim(schedule, scenario, args, _i):# Noqa
# Base simulation function for external access.
scenario = schedule.run(args, mode="distributed")
return schedule, scenario

Expand Down
Loading

0 comments on commit d4d1ce3

Please sign in to comment.