forked from francescomaldonato/RL_VPP_Thesis
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathVPP_environment copy.py
1606 lines (1432 loc) · 99.3 KB
/
VPP_environment copy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import os
import numpy as np
import pandas as pd
from random import randrange
import contextlib
from gym import Env
from gym.spaces import Box, Dict, MultiDiscrete
from elvis.simulate import simulate
from elvis.utility.elvis_general import create_time_steps
from elvis.utility.elvis_general import num_time_steps
from dateutil.relativedelta import relativedelta
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly.express as px
def VPP_Scenario_config(yaml_config):
"""
Function to extrapolate the VPP simulation data from the Elvis YAML config file
"""
start_date = yaml_config["start_date"]
end_date = yaml_config["end_date"]
resolution = yaml_config["resolution"]
num_households_load = yaml_config["num_households_load"]
av_max_households_load = yaml_config["av_max_households_load"]
av_max_energy_price = yaml_config["av_max_energy_price"]
solar_power = yaml_config["rw_infrastructure"][0]["solar_inverter"]["max_power"]
wind_power = yaml_config["rw_infrastructure"][1]["wind_inverter"]["max_power"]
EV_types = yaml_config["vehicle_types"]
charging_stations_n = len(yaml_config["infrastructure"]["transformers"][0]["charging_stations"])
EVs_n = yaml_config["num_charging_events"] #per week
EVs_n_max = int(EVs_n*52.2) #(52.14 weeks in a year)
EVs_mean_soc = yaml_config["mean_soc"]
EVs_std_deviation_soc = yaml_config["std_deviation_soc"]
mean_park = yaml_config["mean_park"]
std_deviation_park =yaml_config["std_deviation_park"]
EV_load_max = 0
EV_load_rated = 0
for charging_point in yaml_config["infrastructure"]["transformers"][0]["charging_stations"]:
EV_load_max = EV_load_max + charging_point["max_power"]#kW
EV_load_rated = EV_load_rated + charging_point["rated_power"]#kW
EV_load_min = yaml_config["infrastructure"]["transformers"][0]["min_power"]
houseRWload_max = av_max_households_load
simulation_param = {
"start_date": start_date,
"end_date": end_date,
"resolution": resolution,
"num_households": num_households_load,
"solar_power": solar_power, #kw
"wind_power": wind_power, #kw
"EV_types": EV_types,
"charging_stations_n": charging_stations_n,
"EVs_n": EVs_n,
"EVs_n_max": EVs_n_max,
"mean_park": mean_park, #hours
"std_deviation_park": std_deviation_park, #std in hours
"EVs_mean_soc": EVs_mean_soc*100, #% battery on arrival
"EVs_std_deviation_soc": EVs_std_deviation_soc*100, #Std in kWh
"EV_load_max": EV_load_max, #kW
"EV_load_rated": EV_load_rated, #kW
"EV_load_min": EV_load_min, #kW
"houseRWload_max": houseRWload_max, #kW
"av_max_energy_price": av_max_energy_price #€/kWh
}
return simulation_param
class VPPEnv(Env):
"""
VPP environment class based on the openAI gym Env clss
"""
def __init__(self, VPP_data_input_path, elvis_config_file, simulation_param):
"""
Initialization function to set all the simulation parameters, variables, action and state spaces.
"""
#Loading VPP data from path
VPP_data = pd.read_csv(VPP_data_input_path)
VPP_data['time'] = pd.to_datetime(VPP_data['time'])
VPP_data = VPP_data.set_index('time')
#Costants for all episodes:
self.elvis_config_file = elvis_config_file
self.start = simulation_param["start_date"]
self.end = simulation_param["end_date"]
self.res = simulation_param["resolution"]
num_households = simulation_param["num_households"]
self.solar_power = simulation_param["solar_power"]
self.wind_power = simulation_param["wind_power"]
self.EV_types = simulation_param["EV_types"]
self.charging_stations_n = simulation_param["charging_stations_n"]
self.EVs_n = simulation_param["EVs_n"]
self.EVs_n_max = simulation_param["EVs_n_max"]
self.mean_park = simulation_param["mean_park"] #hours
self.std_deviation_park = simulation_param["std_deviation_park"] #std in hours
self.EVs_mean_soc = simulation_param["EVs_mean_soc"]# %translated to kWh
self.EVs_std_deviation_soc = simulation_param["EVs_std_deviation_soc"]# %translated to kWh
self.EV_load_max = simulation_param["EV_load_max"]
#self.EV_load_rated = simulation_param["EV_load_rated"]
self.charging_point_max_power = self.EV_load_max/self.charging_stations_n #kW
self.charging_point_rated_power = simulation_param["EV_load_rated"]/self.charging_stations_n #kW
self.charging_point_min_power = simulation_param["EV_load_min"]
self.houseRWload_max = simulation_param["houseRWload_max"] + (num_households * 1)
self.max_energy_price = simulation_param["av_max_energy_price"]
main_battery_capacity = self.EV_types[0]["battery"]["capacity"]
self.battery_max_limit = main_battery_capacity - ((main_battery_capacity/100)*0.01) #99.99kWh
self.battery_min_limit = (main_battery_capacity/100)*0.01 #0.01kWh
self.DISCHARGE_threshold = 20 #percentage of battery below with the EV can't be discharged
self.IDLE_DISCHARGE_threshold = 10 #percentage of battery below with the EV can't be discharged, kept idle (must be charged)
elvis_realisation = elvis_config_file.create_realisation(self.start, self.end, self.res)
#ELVIS Initial simulation
#To be updated each episode:
self.charging_events = elvis_realisation.charging_events
print(self.charging_events[0], '\n', '...', '\n', self.charging_events[-1], '\n')
self.current_charging_events = []
self.simul_charging_events_n = len(self.charging_events)
self.elvis_time_serie = create_time_steps(elvis_realisation.start_date, elvis_realisation.end_date, elvis_realisation.resolution)
with open(os.devnull, "w") as f, contextlib.redirect_stdout(f):
result = simulate(elvis_realisation)
VPP_data["time"] = self.elvis_time_serie
VPP_data = VPP_data.set_index("time")
#Check if environment setting power needs to be rescaled
if self.solar_power!=16 or self.wind_power!=12 or num_households!=4:
VPP_data["solar_power"] = VPP_data["solar_power"]/16 * self.solar_power
VPP_data["wind_power"] = VPP_data["wind_power"]/12 * self.wind_power
VPP_data["household_power"] = VPP_data["household_power"]/4 * num_households
VPP_data["House&RW_load"] = VPP_data["household_power"] - VPP_data["solar_power"] - VPP_data["wind_power"]
VPP_data["RW_power"] = VPP_data["solar_power"] + VPP_data["wind_power"]
VPP_data["ev_power"] = result.aggregate_load_profile(num_time_steps(elvis_realisation.start_date, elvis_realisation.end_date, elvis_realisation.resolution))
VPP_data["total_load"] = VPP_data["House&RW_load"] + VPP_data["ev_power"]
VPP_data["total_cost"] = VPP_data["total_load"] * VPP_data["EUR/kWh"] / 4
self.prices_serie = VPP_data["EUR/kWh"].values #EUR/kWh
self.houseRW_load = VPP_data["House&RW_load"].values
self.VPP_loads = pd.DataFrame({'time':self.elvis_time_serie, "House&RW_load":self.houseRW_load, "household_power":VPP_data["household_power"].values, "solar_power":VPP_data["solar_power"].values, "wind_power":VPP_data["wind_power"].values})
self.VPP_loads = self.VPP_loads.set_index("time")
self.household_consume = VPP_data["household_power"].sum()/4 #kWh
self.RW_energy = VPP_data["RW_power"].sum()/4 #kWh
HRW_array = np.array(self.houseRW_load)
self.sum_HRW_power = np.sum((HRW_array)/4) #kWh
self.HRW_overenergy = HRW_array[HRW_array>0].sum()/4 #kWh (Grid energy used)
self.HRW_underenergy = HRW_array[HRW_array<0].sum()/4 #kWh (RE-to-vehicles unused energy)
self.self_consumption = self.household_consume - self.HRW_overenergy
self.selfc_rate = (self.self_consumption / self.RW_energy) * 100
self.autarky_rate = (self.self_consumption / self.household_consume) * 100
dataset_cost_array = HRW_array * np.array(self.prices_serie)/4
self.cost_HRW_power = dataset_cost_array.sum() #€
self.overcost_HRW_power = dataset_cost_array[dataset_cost_array>0].sum() #€
self.exp_ev_en_left = self.EVs_mean_soc + (-self.HRW_underenergy/self.simul_charging_events_n)
#ELVIS
load_array = np.array(VPP_data["total_load"].values)
cost_array = np.array(VPP_data["total_cost"].values)
self.av_Elvis_total_load = np.mean(load_array) #kW
self.std_Elvis_total_load = np.std(load_array) #kW
self.sum_Elvis_total_load = load_array.sum()/4 #kWh
self.Elvis_overconsume = load_array[load_array>0].sum()/4 #kWh
self.Elvis_underconsume = -load_array[load_array<0].sum()/4 #kWh
self.Elvis_total_cost = np.sum(cost_array) #€
self.Elvis_overcost = cost_array[cost_array > 0].sum()
#Init print out
print("-DATASET: House&RW_energy_sum=kWh ", round(self.sum_HRW_power,2),
f", Grid_used_en(grid-import)={round(self.HRW_overenergy,2)}kWh",
f", autarky-rate={round(self.autarky_rate,1)}",
f", RE-to-vehicle_unused_en(grid-export)={round(self.HRW_underenergy,2)}kWh",
f", self-consump.rate={round(self.selfc_rate,1)}",
", Total_selling_cost=€ ", round(self.cost_HRW_power,2),
", Grid_cost=€ ", round(self.overcost_HRW_power,2))
print("- ELVIS.Simulation (Av.EV_SOC= ", self.EVs_mean_soc, "%):\n",
"Sum_Energy=kWh ", round(self.sum_Elvis_total_load,2),
", Grid_used_en=kWh ", round(self.Elvis_overconsume,2),
", RE-to-vehicle_unused_en=kWh ", round(self.Elvis_underconsume,2),
", Total_selling_cost=€ ", round(self.Elvis_total_cost,2),
", Grid_cost=€ ", round(self.Elvis_overcost,2),
", Charging_events= ", self.simul_charging_events_n,
"\n- Exp.VPP_goals: Grid_used_en=kWh 0, RE-to-vehicle_unused_en=kWh 0, Grid_cost=€ 0",
", Av.EV_en_left=kWh ",round(self.exp_ev_en_left,2))
#Set VPP session length
self.tot_simulation_len = len(self.elvis_time_serie)
self.vpp_length = self.tot_simulation_len
#empty list init
self.energy_resources, self.avail_EVs_id, self.ev_power, self.charging_ev_power, self.discharging_ev_power, self.overcost, self.total_cost, self.total_load, self.reward_hist = ([],[],[],[],[],[],[],[],[])
self.max_total_load = self.EV_load_max + self.houseRWload_max
self.max_cost = self.max_total_load * self.max_energy_price /4
#Setting reward functions
self.set_reward_func()
self.VPP_data = VPP_data
self.VPP_actions, self.action_truth_list, self.EVs_energy_at_leaving, self.EVs_energy_at_arrival = ([],[],[],[])
#self.lstm_states_list = []
self.av_EV_energy_left, self.std_EV_energy_left, self.sim_total_load, self.sim_av_total_load, self.sim_std_total_load, self.overconsumed_en, self.underconsumed_en, self.sim_total_cost, self.sim_overcost = [0, 0, 0, 0, 0, 0, 0, 0, 0]
self.cumulative_reward, self.load_t_reward, self.overconsume_reward, self.underconsume_reward, self.overcost_reward, self.EVs_energy_reward, self.AV_EVs_energy_reward = [0, 0, 0, 0, 0, 0, 0]
#Initializing state space parameters for the first step [0]
Init_space_Available_energy_sources = np.zeros(self.charging_stations_n, dtype=np.float32)
#Init_space_Available_EVs_id = np.zeros(self.charging_stations_n, dtype=np.int32)
Init_space_ev_power = np.zeros(1,dtype=np.float32)
Init_space_total_load = np.zeros(1,dtype=np.float32)
#Init_space_total_cost = np.zeros(1,dtype=np.float32)
Init_space_total_load[0] = self.houseRW_load[0]
#Init_space_total_cost[0] = self.houseRW_load[0] * self.prices_serie[0]/4
self.Init_space = {
'ev_power': Init_space_ev_power, #EV load range in kW, depends on the infrastructure of the simulation
'total_load': Init_space_total_load, #Total load range in kW, depends on the household and EV infrastructure of the simulation
#'total_cost': Init_space_total_cost, #Total cost (EUR) per timestep (15 min) [DELETED]
'Available_energy_sources': Init_space_Available_energy_sources, #Available energy from Charging stations (EVs connected)
#'Available_evs_id': Init_space_Available_EVs_id #Available vehicles IDs from charging stations at timestep t [DELETED]
}
self.space_0 = self.Init_space
#For plotting battery levels
self.VPP_energies = Init_space_Available_energy_sources
# Define constants for Action space options
self.IDLE = 0
self.CHARGE = 1
self.DISCHARGE = 2
#self.VPP_rated_pwr = 3.7 #kW
self.possible_actions = 3
#Action space definition
self.action_space = MultiDiscrete( self.possible_actions * np.ones(self.charging_stations_n))
#Actions set definition for action masking
dims = [self.possible_actions]*self.charging_stations_n
self.actions_set = np.arange(sum(dims))
self.invalid_actions_t = np.ones(len(self.actions_set), dtype=bool)
self.lstm_state = None
spaces = {
'ev_power': Box(low=-(self.EV_load_max), high=(self.EV_load_max), shape=(1,), dtype=np.float32), #EV load range in kW, depends on the infrastructure of the simulation
'total_load': Box(low=-(self.max_total_load) , high= (self.max_total_load), shape=(1,), dtype=np.float32), #Total load range in kW, depends on the household and EV infrastructure of the simulation
#'total_cost': Box(low=-(self.max_cost), high=(self.max_cost), shape=(1,), dtype=np.float32),#Total cost (EUR) per timestep (15 min) [DELETED]
'Available_energy_sources': Box(low=0.0, high=100, shape=(self.charging_stations_n,), dtype=np.float32),
#'Available_evs_id': Box(low=0, high=(np.iinfo(np.int32).max)-1, shape=(self.charging_stations_n,), dtype=np.int32) [DELETED]
}
dict_space = Dict(spaces)
self.observation_space = dict_space
#Set starting cond.
self.state = self.Init_space
self.done = False
def set_reward_func(self):
"""
Function to set the reward interpolating functions
"""
#Step rewards
#step EV energies -> [0, 50kWh, (75-80kWh), 100kWh]
self.battery_percentage =[0, self.EVs_mean_soc, 90, 100]
self.EVs_energy_reward_range = np.array([-300, -40, +150, 50])
#step Load reward #Normalizing Load range according to datset max load (self.max_total_load = 40 kW)
load_array = np.array([-80, -30, -15, -4, -1, 0, 0.5, 3.5, 15, 30, 80]) #kW
self.load_range = load_array
#self.load_range = (self.max_total_load/100)*np.array([-100, -35, -15, -3, -1.5, 0.1, 3, 15, 35, 100])
#[-40kW, -12, -6kW, -2kW, -0.4kW, 0, 0.04kW, 2kW, 20kW, 40kW]
self.load_reward_range = np.array([-50, -30, -15, -5, 0, 15, 0, -5, -20, -40, -80])
#FINAL REWARDS
#Av_EV_energy_left reward
#self.av_energy_left_range = [0, 50, 60, 90, 100]
self.av_energy_left_range = [0, self.EVs_mean_soc, self.exp_ev_en_left, 100]
self.av_energy_reward_range = np.array([-50000, -10000, 30000, 10000])
#Average load #Normalizing av.load according to av_Elvis_load = 6.85 kW
self.overconsume_range = (self.Elvis_overconsume/100)*np.array([0, 2, 100, 200]) #Elvis over-consume=kWh 43221.9
#av_load_label = ["0 kWh", "800 kWh", "8000 kWh", "40000 kWh"]
self.overconsume_reward_range = np.array([1000, 0, -2000, -5000])*20
#Stand_dev load #Normalizing std.load according to std_Elvis_load = 11.96
self.underconsume_range = (self.Elvis_underconsume/100)*np.array([0, 20, 100, 200]) #Elvis under-consume=kWh 14842.72
#std_load_label = ["0 kWh", "2000 kWh","15000 kWh", "30000 kWh"]
self.underconsume_reward_range = np.array([1000, 0, -3000, -5000])*5
#total COST #Normalizing total cost according to Elvis_overcost = 2115€
self.overcost_range = (self.Elvis_overcost/100)*np.array([0, 10, 30, 100, 150]) #Elvis overcost=€ 1514.69
#cost_label = ["0€", "200€", "800€", "1200€", "2000€"]
self.overcost_reward_range = np.array([2000, 100, 0, -1000, -2000])*10
def eval_reward(self, reward, step, new_ev_departures):
"""
Function to evaluate the agent reward at each timestep of the simulation
"""
#Load step state variables
total_load_t = self.total_load[step]
#energy_resources = self.energy_resources[step] #[DELETED]
#EVs reward: energies available at charging stations
EVs_energy_reward_t = 0
#Reward at each timestep for EVs available energy--> confusing for the Agent to learn the policy [DELETED]
""" for n in range(self.charging_stations_n):
#1. Check if Evs connected and evaluate reward at station n
if energy_resources[n] > 0:
EVs_energy_reward_t += np.interp(energy_resources[n], self.battery_percentage, self.EVs_energy_reward_range)
self.EVs_energy_reward += EVs_energy_reward_t """
EVs_energy_leaving_reward_t = 0
#Apply reward on energy left on vehicle WHEN leaving the station (it accentuate good/bad behaviour)
for j in range(new_ev_departures):
energy_left = self.EVs_energy_at_leaving[-1-j]
EVs_energy_leaving_reward_t += np.interp(energy_left, self.battery_percentage, self.EVs_energy_reward_range)
self.EVs_energy_reward += EVs_energy_leaving_reward_t
self.EVs_reward_hist[step-1] = EVs_energy_leaving_reward_t
#Load reward for each timestep
load_reward_t = np.interp(total_load_t, self.load_range, self.load_reward_range)
self.load_t_reward += load_reward_t
self.load_reward_hist[step-1] = load_reward_t
reward += (EVs_energy_reward_t + EVs_energy_leaving_reward_t + load_reward_t)
return reward
def eval_final_reward(self, reward):
"""
Function to evaluate the final agent reward at the end of the simulation
"""
#EVs ENERGY reward: Evaluating reward for average energy left in EV leaving
AV_EVs_energy_reward = np.interp(self.av_EV_energy_left,
self.av_energy_left_range,
self.av_energy_reward_range)
self.AV_EVs_energy_reward += AV_EVs_energy_reward
#LOAD reward:
#Overconsumed energy reward
final_overconsume_reward = np.interp(self.overconsumed_en, self.overconsume_range, self.overconsume_reward_range)
self.overconsume_reward += final_overconsume_reward
#Underconsumed energy reward
final_underconsume_reward = np.interp(self.underconsumed_en, self.underconsume_range, self.underconsume_reward_range)
self.underconsume_reward += final_underconsume_reward
#OverCOST reward:
final_overcost_reward = np.interp(self.sim_overcost, self.overcost_range, self.overcost_reward_range)
self.overcost_reward += final_overcost_reward
reward += (AV_EVs_energy_reward + final_overconsume_reward + final_underconsume_reward + final_overcost_reward)
return reward
def action_masks(self):
"""
Function to evaluate the "invalid" actions the agent should not take at each timestep depending on the step observation
(used by the Maskable PPO algorithm for training and prediction and by all the algorithms simulation for general behaviour control)
"""
#self.IDLE = 0
#self.CHARGE = 1
#self.DISCHARGE = 2
step = self.tot_simulation_len - self.vpp_length
#loding step variables
Evs_id_t = self.avail_EVs_id[step]
EVs_available = 0
for n in range(self.charging_stations_n):
if Evs_id_t[n] > 0: EVs_available+=1
Energy_sources_t = self.energy_resources[step]
houseRW_load = self.houseRW_load[step+1]
invalid_actions = []
for n in range(self.charging_stations_n):
if Evs_id_t[n] == 0:#if vehicle not present at station n:
for i in range(1, self.possible_actions): #CHARGE,DISCHARGE invalid
invalid_actions.append(n + i*self.charging_stations_n)
else:
#IF vehicle present at station n:
if Energy_sources_t[n] <= self.IDLE_DISCHARGE_threshold:
#IDLE,DISCHARGE invalid if battery below 40%
for i in range(self.IDLE, self.possible_actions, 2):
invalid_actions.append(n + i*self.charging_stations_n)
elif self.IDLE_DISCHARGE_threshold < Energy_sources_t[n] <= self.DISCHARGE_threshold:
#DISCHARGE invalid if battery below 55%
invalid_actions.append(n + self.DISCHARGE*self.charging_stations_n)
elif Energy_sources_t[n] > 91:
#CHARGE invalid if battery over 91%
invalid_actions.append(n + self.CHARGE*self.charging_stations_n)
if houseRW_load > 0: #if load positive
invalid_actions.append(n + self.CHARGE*self.charging_stations_n) #CHARGE invalid
if EVs_available == 1: #if only vehicle available:
invalid_actions.append(n) #IDLE invalid
if (n + self.DISCHARGE*self.charging_stations_n) in invalid_actions: #if DISCHARGE was invalid
invalid_actions.remove(n + self.DISCHARGE*self.charging_stations_n) #DISCHARGE valid
elif EVs_available > 1: #if not the only vehicle available:
if all( Energy_sources_t[n] >= x for x in [bat_perc for bat_perc in Energy_sources_t if bat_perc > 0]):#if vehicle with most charge
invalid_actions.append(n) #IDLE invalid
if (n + self.DISCHARGE*self.charging_stations_n) in invalid_actions: #if DISCHARGE is invalid
invalid_actions.remove(n + self.DISCHARGE*self.charging_stations_n) #DISCHARGE valid
elif houseRW_load < 0: #if load is negative
invalid_actions.append(n + self.DISCHARGE*self.charging_stations_n) #Discharge invalid
#for i in range(self.IDLE, self.possible_actions, 2):
# invalid_actions.append(n + i*self.charging_stations_n) #IDLE,DISCHARGE invalid
if EVs_available == 1: #if only vehicle available:
invalid_actions.append(n) #IDLE invalid
if (n + self.CHARGE*self.charging_stations_n) in invalid_actions: #if CHARGE was invalid
invalid_actions.remove(n + self.CHARGE*self.charging_stations_n) #CHARGE valid
elif EVs_available > 1: #if not the only vehicle available:
if all( Energy_sources_t[n] <= x for x in [bat_perc for bat_perc in Energy_sources_t if bat_perc > 0]):#if vehicle with least charge
invalid_actions.append(n) #IDLE invalid
if (n + self.CHARGE*self.charging_stations_n) in invalid_actions: #if CHARGE is invalid
invalid_actions.remove(n + self.CHARGE*self.charging_stations_n) #CHARGE valid
invalid_actions = [*set(invalid_actions)]
self.invalid_actions_t = [action not in invalid_actions for action in self.actions_set]
return self.invalid_actions_t
def apply_action_on_energy_source(self, step, Energy_sources_t_1, action, total_ev_power_t, ch_station_ideal_pwr):
"""
Function to apply the agent's chosen action (IDLE,CHARGE,DISCHARGE) to the selected charging station with an EV present.
the Adaptive power selection calculates the power to get a total zero-load according to the agent's actions chosen.
"""
#Adaptive power selection
if ch_station_ideal_pwr > 0:
if action == self.CHARGE or (Energy_sources_t_1 <= self.IDLE_DISCHARGE_threshold):
selected_power = ch_station_ideal_pwr
else: selected_power = self.charging_point_min_power
elif ch_station_ideal_pwr < 0:
if (Energy_sources_t_1 <= self.IDLE_DISCHARGE_threshold):
selected_power = self.charging_point_rated_power
elif action == self.DISCHARGE:
selected_power = -ch_station_ideal_pwr
else: selected_power = self.charging_point_min_power
else: selected_power = self.charging_point_rated_power
battery_max_limit = self.battery_max_limit #99.9 kWh
battery_min_limit = self.battery_min_limit #0.1 kWh
ev_power_t = 0
#APPLY ACTION on previous energy state:
if action == self.CHARGE:
Energy_sources_t = Energy_sources_t_1 + (selected_power * 0.25) #5 kW * 15 min = 1.25 kWh STORING ENERGY
ev_power_t += selected_power
if Energy_sources_t > battery_max_limit: #Reached max capacity (kWh)
ev_power_t -= (Energy_sources_t - battery_max_limit)/0.25
Energy_sources_t = battery_max_limit
elif action == self.IDLE:
if Energy_sources_t_1 <= self.IDLE_DISCHARGE_threshold: #if energy below the Idle-Discharge threshold (10%) --> CHARGE
Energy_sources_t = Energy_sources_t_1 + (selected_power * 0.25) #5 kW * 15 min = 1.25 kWh STORING ENERGY
ev_power_t += selected_power
if Energy_sources_t > battery_max_limit: #Reached max capacity (kWh)
ev_power_t -= (Energy_sources_t - battery_max_limit)/0.25
Energy_sources_t = battery_max_limit
#elif Energy_sources_t_1 > self.IDLE_DISCHARGE_threshold: #if energy above the Idle-Discharge threshold (10%) --> IDLE
else: Energy_sources_t = Energy_sources_t_1 #keep energy constant
elif action == self.DISCHARGE:
if Energy_sources_t_1 <= self.IDLE_DISCHARGE_threshold: #if energy below the Idle-Discharge threshold (10%) --> CHARGE
Energy_sources_t = Energy_sources_t_1 + (selected_power * 0.25) #5 kW * 15 min = 1.25 kWh STORING ENERGY
ev_power_t += selected_power
if Energy_sources_t > battery_max_limit: #Reached max capacity (kWh)
ev_power_t -= (Energy_sources_t - battery_max_limit)/0.25
Energy_sources_t = battery_max_limit
elif Energy_sources_t_1 > self.DISCHARGE_threshold: #if energy above the Discharge threshold (25%) --> DISCHARGE
Energy_sources_t = Energy_sources_t_1 - (selected_power * 0.25) #5 kW * 15 min = 1.25 kWh PUSHING ENERGY
ev_power_t -= selected_power
if Energy_sources_t < battery_min_limit: #Reached min capacity (kWh)
ev_power_t += (battery_min_limit - Energy_sources_t)/0.25
Energy_sources_t = battery_min_limit
#elif Energy_sources_t_1 <= self.DISCHARGE_threshold: #if energy below the Discharge threshold (25%) --> IDLE
else: Energy_sources_t = Energy_sources_t_1 #keep energy constant
else:
raise ValueError("Received invalid action={} which is not part of the action space".format(action))
#Charge/discharge power series update
if ev_power_t < 0:
self.discharging_ev_power[step] += ev_power_t
elif ev_power_t > 0:
self.charging_ev_power[step] += ev_power_t
total_ev_power_t += ev_power_t
return Energy_sources_t, total_ev_power_t
def step(self, action):
"""
Function to update the environment every step along the simulation. Divided in 5 main sections:
- 0. Actions analysis and Adaptive power evaluation
- 1. EVs leaving the charging stations check (apply action on those before leaving, then update the present EVs ID array and current_charging_events list)
- 2. Apply action to each station with an EVs present in the EVs ID array
- 3. New EVs arrival at the charging stations check (update the present EVs ID array, the simulation charging_events list and the current_charging_events list)
- 4. Update the VPP environment States and Values
Final_section: Evaluate final reward, data structures and performance when the simulation is over.
"""
#Reduce VPP session length by 1 step [Episode init and reset do as step 0, first step = 1]
self.vpp_length -= 1
#evaluate step
step = self.tot_simulation_len - self.vpp_length
time_step = self.elvis_time_serie[step]
#loding step variables
Evs_id_t = self.avail_EVs_id[step]
Energy_sources_t = self.energy_resources[step]
houseRW_load_t = self.houseRW_load[step]
Evs_id_t_1 = self.avail_EVs_id[step-1] #not used
Energy_sources_t_1 = self.energy_resources[step-1]
#Reward Initialization
reward = 0
#Variables inititalization for calculations
total_ev_power_t = 0
action_truth_array = np.ones(self.charging_stations_n, dtype = bool)
new_ev_departures = 0
charging_events_to_remove = []
#SECTION O. Actions analysis and Adaptive power evaluation (ch_station_ideal_pwr = 0 if no correct actions selected)
charge_actions, forced_charge_actions, discharge_actions, ch_station_ideal_pwr = (0,0,0,0)
for n in range(self.charging_stations_n):
#if Evs_id_t_1[n] > 0: EVs_available+=1
if action[n] == self.CHARGE: charge_actions+= 1
elif action[n] == self.DISCHARGE:
if Energy_sources_t_1[n] > self.DISCHARGE_threshold: discharge_actions+= 1
if Energy_sources_t_1[n] <= self.IDLE_DISCHARGE_threshold and Evs_id_t_1[n] > 0:
charge_actions+= 1
forced_charge_actions+= 1
elif action[n] == self.IDLE:
if Energy_sources_t_1[n] <= self.IDLE_DISCHARGE_threshold and Evs_id_t_1[n] > 0:
charge_actions+= 1
forced_charge_actions+= 1
if houseRW_load_t < 0 and charge_actions > 0:
ch_station_ideal_pwr = -((houseRW_load_t - discharge_actions*self.charging_point_min_power)/charge_actions)
if ch_station_ideal_pwr > self.charging_point_max_power: ch_station_ideal_pwr = self.charging_point_max_power
elif ch_station_ideal_pwr < self.charging_point_min_power: ch_station_ideal_pwr = self.charging_point_min_power
elif houseRW_load_t > 0 and discharge_actions > 0:
ch_station_ideal_pwr = -((houseRW_load_t + forced_charge_actions*self.charging_point_rated_power + (charge_actions-forced_charge_actions)*self.charging_point_min_power)/discharge_actions)
if ch_station_ideal_pwr < -self.charging_point_max_power: ch_station_ideal_pwr = -self.charging_point_max_power
elif ch_station_ideal_pwr > -self.charging_point_min_power: ch_station_ideal_pwr = -self.charging_point_min_power
#__END__ SECTION 0
#SECTION 1. Check if current connected EVs left the charging station
for charging_event in self.current_charging_events:
leaving_time_i = charging_event.leaving_time
if time_step >= leaving_time_i:
#If vehicle left, set correspondant station ID to zero
n = charging_event.station_n
energy_at_leaving_i, total_ev_power_t = self.apply_action_on_energy_source(step, Energy_sources_t_1[n], action[n], total_ev_power_t, ch_station_ideal_pwr)
self.EVs_energy_at_leaving.append(energy_at_leaving_i)
new_ev_departures += 1
Evs_id_t[n] = int(0)
charging_events_to_remove.append(charging_event)
else:
#If Vehicle still connected, correspondant station ID = EV's ID
Evs_id_t[charging_event.station_n] = charging_event.id
for charging_event in charging_events_to_remove:
self.current_charging_events.remove(charging_event)
#__END__ SECTION 1
#SECTION 2. Apply action to each station section
for n in range(self.charging_stations_n):
#1. Check Evs id present and evaluate new Energy available at station n
if Evs_id_t[n] > 0:
Energy_sources_t[n], total_ev_power_t = self.apply_action_on_energy_source(step, Energy_sources_t_1[n], action[n], total_ev_power_t, ch_station_ideal_pwr)
elif Evs_id_t[n] == 0:
#If no car is connected at station n, available energy = 0
if Energy_sources_t[n] != 0:
raise ValueError("Available_energy_sources table not matching EVs id: state={} where there is an empty station with a certain energy.".format(Energy_sources_t))
#Cheching if invalid actions performed, storing them in a table
action_code = (self.charging_stations_n*action[n])+n
action_truth_array[n] = self.invalid_actions_t[action_code]
#Punishment for invalid actions
#if action_truth_array[n] == False:
# reward += -50
if Energy_sources_t[n] < 0 or Energy_sources_t[n] > 100:
#Check if energy sources are out of range
raise ValueError("Available_energy_sources table out of ranges: state={} which is not part of the state space".format(Energy_sources_t))
#__END__ SECTION 2
#Checking free spots
ch_stations_available = []
for n in range(self.charging_stations_n):
if Evs_id_t[n] == 0: ch_stations_available.append(n)
charging_events_to_remove = []
#SECTION 3. Check if new vehicles arrive at charging stations
for charging_event in self.charging_events:
arrival_time_i = charging_event.arrival_time
#Fixing arrival time at step 0, shifted to step 1
if step == 1:
if arrival_time_i == self.elvis_time_serie[0]:
arrival_time_i = self.elvis_time_serie[1]
if arrival_time_i <= time_step:
if len(ch_stations_available)>0:
#If free stations available, pop out free ch_station from list and assign vehicle ID to station
n = ch_stations_available.pop(randrange(len(ch_stations_available)))
#if type(charging_event.id) != int:
charging_event.id = int(charging_event.id[16:])
Evs_id_t[n] = charging_event.id
vehicle_i = charging_event.vehicle_type.to_dict()
soc_i = charging_event.soc
battery_i = vehicle_i['battery']
#efficiency_i = battery_i['efficiency'] #Not implemented right now
capacity_i = battery_i['capacity'] #kWh
#capacity_i = 100 #kWh, considering only Tesla Model S
energy_i = soc_i * capacity_i #kWh
if energy_i < 0.1: #Less than min capacity (kWh) in simulation
energy_i = 0.1
Energy_sources_t[n] = energy_i
charging_event.station_n = n
self.current_charging_events.append(charging_event)
charging_events_to_remove.append(charging_event)
#break
elif arrival_time_i > time_step:
break
for charging_event in charging_events_to_remove:
self.charging_events.remove(charging_event)
self.avail_EVs_n[step] = self.charging_stations_n - len(ch_stations_available)
#__END__ SECTION 3
#SECTION 4. VPP States and Values updates
self.ev_power[step] = total_ev_power_t
self.total_load[step] = houseRW_load_t + total_ev_power_t
if self.total_load[step] > 0 and self.prices_serie[step] > 0:
self.overcost[step] = self.total_load[step] * self.prices_serie[step] / 4
else: self.overcost[step] = 0
self.total_cost[step] = self.total_load[step] * self.prices_serie[step] / 4
self.avail_EVs_id[step] = Evs_id_t
self.energy_resources[step] = Energy_sources_t
#Evaluate step reward
reward = self.eval_reward(reward, step, new_ev_departures)
#VPP Table UPDATE
self.VPP_actions.append(action)
self.action_truth_list.append(action_truth_array)
#self.lstm_states_list.append(self.lstm_state)
#States UPDATE
self.state['Available_energy_sources'] = Energy_sources_t
#self.state['Available_evs_id'] = Evs_id_t #[DELETED]
ev_power_state = np.zeros(1,dtype=np.float32)
ev_power_state[0] = total_ev_power_t
self.state['ev_power'] = ev_power_state
load_state = np.zeros(1,dtype=np.float32)
load_state[0] = self.total_load[step]
self.state['total_load'] = load_state
#cost_state = np.zeros(1,dtype=np.float32) #[DELETED]
#cost_state[0] = self.total_cost[step] #[DELETED]
#self.state['total_cost'] = cost_state #[DELETED]
#__END__ SECTION 4
#FINAL_SECTION: Check if VPP is done
if self.vpp_length <= 1:
self.done = True
self.VPP_actions.append(np.zeros(self.charging_stations_n, dtype=np.int32))
self.action_truth_list.append(np.ones(self.charging_stations_n, dtype = bool))
#self.lstm_states_list.append(None)
#Evaluating load sum (overconsumed, underconsumed), std and average up to timestep t for further rewards
for load in self.total_load:
self.sim_total_load += load/4 #kWh
if load >= 0: self.overconsumed_en += load/4 #kWh
elif load < 0: self.underconsumed_en -= load/4 #kWh
self.sim_av_total_load = np.mean(self.total_load)
self.sim_std_total_load = np.std(self.total_load)
self.sim_overcost = np.sum(self.overcost)
self.sim_total_cost = np.sum(self.total_cost)
self.av_EV_energy_left = np.mean(self.EVs_energy_at_leaving)
self.std_EV_energy_left = np.std(self.EVs_energy_at_leaving)
charging_events_n = len(self.EVs_energy_at_leaving)
charging_events_left = len(self.charging_events)
VPP_loads = self.VPP_loads #Retrieving the VPP loads Dataframe to evaluate autarky and self-consump.
VPP_loads["charging_ev_power"] = self.charging_ev_power
VPP_loads["discharging_ev_power"] = self.discharging_ev_power
#RENEWABLE-SELF-CONSUMPTION evaluation section
#Households consump. energy not covered from the Renewables
VPP_loads["house_self-consump."] = VPP_loads["household_power"] - VPP_loads["RE_power"]
VPP_loads["RE-uncovered_consump."] = VPP_loads["house_self-consump."].mask(VPP_loads["house_self-consump."].lt(0)).fillna(0) #Filter only positive values
self.house_uncovered_RE = self.VPP_loads["RE-uncovered_consump."].sum()/4 #kWh
#Energy from the Renewables directly used by the households
VPP_loads["house_self-consump."] = VPP_loads["household_power"] - VPP_loads["RE-uncovered_consump."]
self.VPP_house_selfc = VPP_loads["house_self-consump."].sum()/4 #kWh
#Energy from the Renewables exported to the grid
VPP_loads["house-unused-RE-power"] = VPP_loads["RE_power"] - VPP_loads["house_self-consump."]
VPP_loads["self_EV-charging"] = VPP_loads["charging_ev_power"] - VPP_loads["house-unused-RE-power"]
VPP_loads["RE-grid-export"] = - VPP_loads["self_EV-charging"].mask(VPP_loads["self_EV-charging"].gt(0)).fillna(0) #Filter only negative values
self.RE_grid_export = VPP_loads["RE-grid-export"].sum()/4 #kWh
#Energy from the Renewables directly stored in the EVs batteries
VPP_loads["RE-uncovered_EV-charging"] = VPP_loads["self_EV-charging"].mask(VPP_loads["self_EV-charging"].lt(0)).fillna(0) #Filter only positive values
VPP_loads["self_EV-charging"] = VPP_loads["charging_ev_power"] - VPP_loads["RE-uncovered_EV-charging"]
self.VPP_RE2battery = VPP_loads["self_EV-charging"].sum()/4 #kWh
#EV-DISCHARGING-Power-SELF-CONSUMPTION evaluation section
#Households consump. grid import (Energy not covered from the EVs discharging power and Renewables)
VPP_loads["battery-self-consump."] = VPP_loads["RE-uncovered_consump."] - (-VPP_loads["discharging_ev_power"]) #THe discharging EV power is a negative serie
VPP_loads["house-grid-import"] = VPP_loads["battery-self-consump."].mask(VPP_loads["battery-self-consump."].lt(0)).fillna(0) #Filter only positive values
self.house_grid_import = VPP_loads["house-grid-import"].sum()/4 #kWh
#Energy from the EVs discharging power used by the households
VPP_loads["battery-self-consump."] = VPP_loads["RE-uncovered_consump."] - VPP_loads["house-grid-import"]
self.VPP_battery_selfc = VPP_loads["battery-self-consump."].sum()/4 #kWh
#Energy from the EVs discharging power, exported to the grid
VPP_loads["self_battery-EV-charging"] = (-VPP_loads["discharging_ev_power"]) - VPP_loads["battery-self-consump."] #THe discharging EV power is a negative serie
VPP_loads["self_battery-EV-charging"] = VPP_loads["RE-uncovered_EV-charging"] - VPP_loads["self_battery-EV-charging"] #ChargingEVs energy not from renwables - (EVs discharging power not used for the house)
VPP_loads["battery-grid-export"] = - VPP_loads["self_battery-EV-charging"].mask(VPP_loads["self_battery-EV-charging"].gt(0)).fillna(0) #Filter only negative values
self.battery_grid_export = VPP_loads["battery-grid-export"].sum()/4 #kWh
#Energy from the grid stored in other EVs batteries
VPP_loads["grid-import_EV-charging"] = VPP_loads["self_battery-EV-charging"].mask(VPP_loads["self_battery-EV-charging"].lt(0)).fillna(0) #Filter only positive values
self.EVs_grid_import = VPP_loads["grid-import_EV-charging"].sum()/4 #kWh
#Energy from the EVs discharging power stored in other EVs batteries
VPP_loads["self_battery-EV-charging"] = VPP_loads["RE-uncovered_EV-charging"] - VPP_loads["grid-import_EV-charging"]
self.VPP_EV2battery = VPP_loads["self_battery-EV-charging"].sum()/4 #kWh
#Rates evaluation
self.VPP_energy_consumed = self.house_grid_import + self.EVs_grid_import + (self.VPP_house_selfc + self.VPP_battery_selfc + self.VPP_RE2battery + self.VPP_EV2battery)
self.VPP_autarky_rate = ((self.VPP_house_selfc + self.VPP_battery_selfc + self.VPP_RE2battery + self.VPP_EV2battery) / self.VPP_energy_consumed) * 100
self.VPP_energy_produced = self.RE_grid_export + self.battery_grid_export + (self.VPP_house_selfc + self.VPP_battery_selfc + self.VPP_RE2battery + self.VPP_EV2battery)
self.VPP_selfc_rate = ((self.VPP_house_selfc + self.VPP_battery_selfc + self.VPP_RE2battery + self.VPP_EV2battery) / self.VPP_energy_produced) * 100
#Storing the modified VPP loads Dataframe
self.VPP_loads = VPP_loads
#Final reward evaluation
reward = self.eval_final_reward(reward)
print("- VPP.Simulation results\n",
"LOAD_INFO: Sum_Energy=KWh ", round(self.sim_total_load,2),
f", Grid_used_en(grid-import)={round(self.overconsumed_en,2)}kWh",
f", Total_demand={round(self.VPP_energy_consumed,2)}kWh",
f", autarky-rate={round(self.VPP_autarky_rate,1)}",
f", RE-to-vehicle_unused_en(grid-export)={round(self.underconsumed_en,2)}kWh",
f", Total_supply={round(self.VPP_energy_produced,2)}kWh",
f", self-consump.rate={round(self.VPP_selfc_rate,1)}",
", Total_selling_cost=€ ", round(self.sim_total_cost,2),
", Grid_cost=€ ", round(self.sim_overcost,2),
"\n",
"EV_INFO: Av.EV_energy_leaving=kWh ", round(self.av_EV_energy_left,2),
", Std.EV_energy_leaving=kWh ", round(self.std_EV_energy_left,2),
", EV_departures = ", charging_events_n,
", EV_queue_left = ", charging_events_left)
else:
self.done = False
self.reward_hist[step-1] = reward
#Building final tables
if self.done == True:
self.optimized_VPP_data = pd.DataFrame({'time':self.elvis_time_serie, "rewards":self.reward_hist, "ev_power":self.ev_power, "total_load":self.total_load, "total_cost":self.total_cost, "overcost":self.overcost})
self.optimized_VPP_data = self.optimized_VPP_data.set_index("time")
self.action_truth_table = np.stack(self.action_truth_list)
self.Evs_id_table = np.stack(self.avail_EVs_id)
self.VPP_energies = np.stack(self.energy_resources)
self.VPP_table = pd.DataFrame(self.VPP_energies)
self.VPP_table["time"] = self.elvis_time_serie
self.VPP_table = self.VPP_table.set_index("time")
self.VPP_table["EVs_id"] = self.avail_EVs_id
self.VPP_table["actions"] = self.VPP_actions
self.VPP_table["mask_truth"] = self.action_truth_list
self.VPP_table["ev_charged_pwr"] = self.charging_ev_power
self.VPP_table["ev_discharged_pwr"] = self.discharging_ev_power
self.VPP_table["load"] = self.total_load
self.VPP_table["load_reward"] = self.load_reward_hist
self.VPP_table["EV_reward"] = self.EVs_reward_hist
self.VPP_table["rewards"] = self.reward_hist
#self.VPP_table["states"] = self.lstm_states_list
self.cumulative_reward = np.sum(self.reward_hist)
self.load_t_reward = np.sum(self.load_reward_hist)
self.EVs_energy_reward = np.sum(self.EVs_reward_hist)
self.quick_results = np.array([str(self.EVs_n)+"_EVs", self.underconsumed_en, self.overconsumed_en, self.sim_overcost, self.av_EV_energy_left, self.cumulative_reward])
print(f"SCORE: Cumulative_reward= {round(self.cumulative_reward,2)} - Step_rewars (load_t= {round(self.load_t_reward,2)}, EVs_energy_t= {round(self.EVs_energy_reward,2)})\n",
f"- Final_rewards (Av.EVs_energy= {round(self.AV_EVs_energy_reward,2)}, Grid_used_en= {round(self.overconsume_reward,2)}, RE-to-vehicle_unused_en= {round(self.underconsume_reward,2)}, Grid_cost= {round(self.overcost_reward,2)})")
#__END__ FINAL SECTION
#set placeholder for info
info = {}
#return step information
return self.state, reward, self.done, info
def render(self, mode = 'human'):
"""
Rendering function not implemented.
"""
#implement visualization
pass
def reset(self):
"""
Reset Environment function to be ready for new simulation. Divided in 3 main sections:
- 1. Create new ELVIS simulation for EVs charging events
- 2. Reset VPP simulation dataset series applying noise on the excrated original dataset instances (not overwriting)
- 3. Reset VPP simulation tables and lists to zero or empty to be filled
"""
#SECTION 1. Create new ELVIS simulation
elvis_config_file = self.elvis_config_file
elvis_realisation = elvis_config_file.create_realisation(self.start, self.end, self.res)
self.charging_events = elvis_realisation.charging_events
if self.current_charging_events != []:
current_charging_events = self.current_charging_events
for i in range(len(current_charging_events)):
current_charging_events[i].leaving_time = current_charging_events[i].leaving_time - relativedelta(years=1)
self.current_charging_events = current_charging_events
self.simul_charging_events_n = len(self.charging_events)
#Evaluate av.EV energy left with Elvis
Elvis_av_EV_energy_left, n_av = [0, 0]
self.EVs_energy_at_arrival = []
for charging_event in self.charging_events:
n_av += 1
vehicle_i = charging_event.vehicle_type.to_dict()
soc_i = charging_event.soc
battery_i = vehicle_i['battery']
#efficiency_i = battery_i['efficiency'] #Not implemented right now
capacity_i = battery_i['capacity'] #kWh
#capacity_i = 100 #kWh, considering only Tesla Model S
energy_i = soc_i * capacity_i #kWh
self.EVs_energy_at_arrival.append(energy_i)
charging_time = charging_event.leaving_time - charging_event.arrival_time
final_energy = energy_i + ((charging_time.total_seconds()/3600) * self.charging_point_max_power)
if final_energy > capacity_i: final_energy = capacity_i #kWh
Elvis_av_EV_energy_left = (final_energy + (n_av-1)*Elvis_av_EV_energy_left)/n_av
self.Elvis_av_EV_energy_left = Elvis_av_EV_energy_left
VPP_data = self.VPP_data
#self.prices_serie = VPP_data["EUR/kWh"].values #EUR/kWh #[DELETED]
#__END__ SECTION 1
#SECTION 2. Reset VPP simulation data applying noise on the original dataset
#Data remaining constant: VPP_data["household_power"], VPP_data["solar_power"], VPP_data["wind_power"], VPP_data["EUR/kWh"]:
mu, sigma = 0, (self.max_energy_price/100)*7 #Mean, standard deviation (self.max_energy_price= 0.13 €/kWh --> 7% = 0.0091)
price_noise = np.random.normal(mu, sigma, self.tot_simulation_len) # creating a noise with the same dimension of the dataset length
mu, sigma = 0, (self.houseRWload_max/100)*4 #Mean, standard deviation (self.houseRWload_max= 10kW --> 2% = 0.4 kW)
load_noise = np.random.normal(mu, sigma, self.tot_simulation_len) # creating a noise with the same dimension of the dataset length
self.VPP_loads["solar_power"] = VPP_data["solar_power"] - load_noise/3
self.VPP_loads["wind_power"] = VPP_data["wind_power"] - load_noise/3
self.VPP_loads["RE_power"] = self.VPP_loads["solar_power"] + self.VPP_loads["wind_power"]
self.VPP_loads["household_power"] = VPP_data["household_power"] + load_noise/3
VPP_data["House&RW_load"] = (VPP_data["household_power"] - VPP_data["solar_power"] - VPP_data["wind_power"]) + load_noise
#Updating series values from noisy table
self.prices_serie = list(VPP_data["EUR/kWh"].values + price_noise) #EUR/kWh
self.houseRW_load = VPP_data["House&RW_load"].values
with open(os.devnull, "w") as f, contextlib.redirect_stdout(f):
result = simulate(elvis_realisation)
VPP_data["ev_power"] = result.aggregate_load_profile(num_time_steps(elvis_realisation.start_date, elvis_realisation.end_date, elvis_realisation.resolution))
#VPP_data["ev_power"].plot()
VPP_data["total_load"] = VPP_data["House&RW_load"] + VPP_data["ev_power"]
VPP_data["total_cost"] = VPP_data["total_load"] * VPP_data["EUR/kWh"]/4
load_array = np.array(VPP_data["total_load"].values)
cost_array = np.array(VPP_data["total_cost"].values)
VPP_data["overcost"] = VPP_data["total_cost"]
VPP_data["overcost"].mask( VPP_data["overcost"] < 0, 0 , inplace=True)
#Elvis RE2house
VPP_data["Elvis_RE2house"] = VPP_data["House&RW_load"].mask(VPP_data["House&RW_load"].lt(0)).fillna(0) #Filter only positive values
VPP_data["Elvis_RE2house"] = self.VPP_loads["household_power"] - VPP_data["Elvis_RE2house"]
self.Elvis_RE2house_en = VPP_data["Elvis_RE2house"].sum()/4 #kWh
#Elvis Grid2EV
VPP_data["Elvis_RE2EV"] = - VPP_data["House&RW_load"].mask(VPP_data["House&RW_load"].gt(0)).fillna(0) #Filter only negative values
VPP_data["Elvis_RE2EV"] = VPP_data["ev_power"] - VPP_data["Elvis_RE2EV"]
VPP_data["Elvis_Grid2EV"] = VPP_data["Elvis_RE2EV"].mask(VPP_data["Elvis_RE2EV"].lt(0)).fillna(0) #Filter only positive values
self.Elvis_Grid2EV_en = VPP_data["Elvis_Grid2EV"].sum()/4 #kWh
#Elvis RE2EV
VPP_data["Elvis_RE2EV"] = VPP_data["ev_power"] - VPP_data["Elvis_Grid2EV"]
self.Elvis_RE2EV_en = VPP_data["Elvis_RE2EV"].sum()/4 #kWh
self.av_Elvis_total_load = np.mean(load_array) #kW
self.std_Elvis_total_load = np.std(load_array) #kW
self.sum_Elvis_total_load = load_array.sum()/4 #kWh
self.Elvis_overconsume = load_array[load_array>0].sum()/4 #kWh
self.Elvis_underconsume = -load_array[load_array<0].sum()/4 #kWh
self.Elvis_total_cost = cost_array.sum() #€
self.Elvis_overcost = cost_array[cost_array > 0].sum()
#Elvis self-consumption and autarky eval
self.Elvis_en_produced = self.Elvis_underconsume + (self.Elvis_RE2house_en + self.Elvis_RE2EV_en)
self.Elvis_selfc_rate = ((self.Elvis_RE2house_en + self.Elvis_RE2EV_en) / self.Elvis_en_produced)*100
self.Elvis_en_consumed = self.Elvis_overconsume + (self.Elvis_RE2house_en + self.Elvis_RE2EV_en)
self.Elvis_autarky_rate = ((self.Elvis_RE2house_en + self.Elvis_RE2EV_en) / self.Elvis_en_consumed)*100
#Reset environment printout:
print("- ELVIS.Simulation (Av.EV_SOC= ", self.EVs_mean_soc, "%):\n",
"Sum_Energy=kWh ", round(self.sum_Elvis_total_load,2),
f", Grid_used_en(grid-import)={round(self.Elvis_overconsume,2)}kWh",
f", Total_demand={round(self.Elvis_en_consumed,2)}kWh",
f", autarky-rate={round(self.Elvis_autarky_rate,1)}",
f", RE-to-vehicle_unused_en(grid-export)={round(self.Elvis_underconsume,2)}kWh",
f", Total_supply={round(self.Elvis_en_produced,2)}kWh",
f", self-consump.rate={round(self.Elvis_selfc_rate,1)}",
", Grid_cost=€ ", round(self.Elvis_overcost,2),
", Total_selling_cost=€ ", round(self.Elvis_total_cost,2),
", Av.EV_en_left=kWh ", round(Elvis_av_EV_energy_left,2),
", Charging_events= ", self.simul_charging_events_n,
"\n- VPP_goal_upper_limit: Grid_used_en=kWh 0, RE-to-vehicle_unused_en=kWh 0, Grid_cost=€ 0",
", Av.EV_en_left=kWh ",round(self.exp_ev_en_left,2))
#__END__ SECTION 2
#SECTION 3. Reset VPP simulation tables and lists to be filled
#Setting reward functions
self.set_reward_func()
#Reset VPP session length
self.vpp_length = self.tot_simulation_len
self.energy_resources, self.avail_EVs_id, self.avail_EVs_n, self.ev_power, self.charging_ev_power, self.discharging_ev_power , self.total_cost, self.overcost, self.total_load, self.reward_hist, self.EVs_reward_hist, self.load_reward_hist = ([],[],[],[],[],[],[],[],[],[],[],[])
#build EV series (Avail_en. and IDs)
for i in range(len(self.elvis_time_serie)):
self.energy_resources.append(np.zeros(self.charging_stations_n, dtype=np.float32))
self.avail_EVs_id.append(np.zeros(self.charging_stations_n, dtype=np.int32))
self.avail_EVs_n.append(0)
self.ev_power.append(0.0)
self.charging_ev_power.append(0.0)
self.discharging_ev_power.append(0.0)
self.total_cost.append(0.0)
self.overcost.append(0.0)
self.total_load.append(0.0)
self.reward_hist.append(0)
self.EVs_reward_hist.append(0)
self.load_reward_hist.append(0)
self.total_load[0] = self.houseRW_load[0]
self.total_cost[0] = self.total_load[0] * self.prices_serie[0]/4
self.energy_resources[0] = self.Init_space["Available_energy_sources"]
#self.avail_EVs_id[0] = self.Init_space['Available_evs_id'] #[DELETED]
self.VPP_data = VPP_data
self.optimized_VPP_data = pd.DataFrame({'time':self.elvis_time_serie, "rewards":self.reward_hist, "ev_power":self.ev_power, "total_load":self.total_load, "total_cost":self.total_cost})
self.optimized_VPP_data = self.optimized_VPP_data.set_index("time")
#self.lstm_states_list = []
self.VPP_actions, self.action_truth_list, self.EVs_energy_at_leaving= ([],[],[])
self.av_EV_energy_left, self.std_EV_energy_left, self.sim_total_load, self.sim_av_total_load, self.sim_std_total_load, self.overconsumed_en, self.underconsumed_en, self.sim_total_cost, self.sim_overcost = [0, 0, 0, 0, 0, 0, 0, 0, 0]
self.cumulative_reward, self.load_t_reward, self.overconsume_reward, self.underconsume_reward, self.overcost_reward, self.EVs_energy_reward, self.AV_EVs_energy_reward = [0, 0, 0, 0, 0, 0, 0]
#For plotting battery levels
#self.VPP_energies = self.Init_space["Available_energy_sources"] #[DELETED]
self.invalid_actions_t = np.ones(len(self.actions_set), dtype=bool)
self.VPP_table = []
self.quick_results = []
#Set starting cond.
self.state = self.Init_space
#reset vpp session time
self.vpp_length = self.tot_simulation_len
self.done = False
print("Simulating VPP....")
#__END__ SECTION 3
return self.state
def save_VPP_table(self, save_path='data/environment_optimized_output/VPP_table.csv'):
"""
Method to save the VPP optimized simulation data.
"""
self.VPP_table.to_csv(save_path)
return self.VPP_table
def plot_ELVIS_data(self):
"""
Method to plot and visualize the ELVIS simulation input data for the EVs infrastructure.
"""
#Weekly arrival distribution simulation
weekly_distribution = self.elvis_config_file.arrival_distribution
time_frame = self.elvis_time_serie[0:len(weekly_distribution)*4:4]
EV_battery_capacities,models = ([], [])
for EV_type in self.EV_types:
EV_battery_capacities.append(EV_type["battery"]["capacity"])
#brand.append()
models.append(str(EV_type['brand'])+str(EV_type['model']))
Elvis_data_fig = make_subplots(subplot_titles=('EVs arrival distribution (weekly)','Simulation parameters', 'EV models', 'Rated powers'),
rows=2, cols=2,
specs=[[{"secondary_y": False},{"type": "table"}],
[{"secondary_y": False},{"secondary_y": False}]])
Elvis_data_fig.add_trace(
go.Scatter(x=time_frame, y=weekly_distribution, name="EVs_arrival distribution"),
row=1, col=1, secondary_y=False)
table_data = [['EV_arrivals(W)','mean_park(h)','mean_park+std','mean_park-std'],[self.EVs_n, self.mean_park, self.mean_park+self.std_deviation_park, self.mean_park-self.std_deviation_park]]
Elvis_data_fig.add_trace(go.Table(
columnorder = [1,2],
columnwidth = [80,400],
header = dict(
values = [['Parameters'],
['Values']],
fill_color='#04cc98',
align=['left','center'],
),
cells=dict(
values=table_data,
fill=dict(color=['royalblue', 'white']),
align=['left', 'center'],
#height=30
)), row=1, col=2)
Elvis_data_fig.add_trace(go.Bar(x=[models[0],'arrival Av.soc','Av.soc-std','Av.soc+std'], y=[EV_battery_capacities[0], self.EVs_mean_soc, (self.EVs_mean_soc-self.EVs_std_deviation_soc), (self.EVs_mean_soc+self.EVs_std_deviation_soc)], marker_color = ['#d62728','#bcbd22','#7f7f7f','#7f7f7f']),
row=2, col=1)
rated_powers_x = ['solar max', 'wind max', 'EVs load max', 'ch.point max', 'houseRWload max']
rated_powers_y = [self.solar_power, self.wind_power, self.EV_load_max, self.charging_point_max_power, self.houseRWload_max]
marker_color = ['#95bf00', '#1ac6ff', '#ee5940', '#7f7f7f', 'orange']
Elvis_data_fig.add_trace(go.Bar(x=rated_powers_x, y=rated_powers_y, marker_color=marker_color),
row=2, col=2)
Elvis_data_fig['layout']['yaxis1'].update(title='Probability')
Elvis_data_fig['layout']['yaxis2'].update(title='Battery capacity (kWh)')
Elvis_data_fig['layout']['yaxis3'].update(title='kW')
#Elvis_data_fig['layout']['legend'].update(title=f'Cumulat.Reward= {round(self.cumulative_reward,2)}')