-
Notifications
You must be signed in to change notification settings - Fork 0
/
helper_functions.py
421 lines (336 loc) · 15.6 KB
/
helper_functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
import json
import requests
import os
import math
import folium
import streamlit as st
import streamlit_folium as st_folium
import pandas as pd
import numpy as np
import polars as pl
from haversine import haversine
from geopy.geocoders import Nominatim
from sklearn.cluster import DBSCAN
from data.charger_data import color_map
# --------------------------------------------------
# DATA HELPERS
# --------------------------------------------------
def process_data():
'''
Process the charger map data and return a dataframe
'''
# Read file containing location details
df = pd.read_json('data/charger_map_data.json')
# If latitude and longitude are not present or empty string, drop the row
df = df.dropna(subset=["latitude", "longitude"])
df = df[df["latitude"] != ""]
df = df[df["longitude"] != ""]
# Create coords column using lat and lng
df["coords"] = list(zip(df["latitude"], df["longitude"]))
return df
# --------------------------------------------------
# MAPS HELPERS
# --------------------------------------------------
def get_coordinates(city_name):
'''
Take a city name and return the coordinates of the city
'''
geolocator = Nominatim(user_agent="reverse_geocoding_example")
location = geolocator.geocode(city_name, exactly_one=True)
if location:
latitude = location.latitude
longitude = location.longitude
return latitude, longitude
else:
return None
def euclidean_distance(point1, point2):
'''
Find Euclidean distance between two points
'''
return math.sqrt(sum((float(a) - float(b)) ** 2 for a, b in zip(point1, point2)))
def find_closest_points(x, Y, n):
'''
Find n closest points to x from list Y
'''
distances = []
for y in Y:
idx = y[0]
point = (float(y[1]), float(y[2]))
distance = euclidean_distance(x, point)
distances.append((idx, distance))
# Sort based on distance
distances.sort(key = lambda x : x[1])
# Get closest points (indices)
closest_points = [distance[0] for distance in distances]
return closest_points[:n]
def find_maps_distance(origin, destination, maps_api_key):
'''
This function takes in the origin and destination coordinates and returns the distance between them.
'''
url = f"https://maps.googleapis.com/maps/api/distancematrix/json?origins={origin}&destinations={destination}&units=metric&key={maps_api_key}"
response = requests.get(url).json()
return response
def find_nearest_coordinate(given_coordinate, maps_api_key, n = 5):
'''
This function takes in a DataFrame, a given coordinate and returns the details of the
coordinates from the DataFrame nearest to the given coordinate.
'''
# Get Google Maps API Key
maps_api_key = os.environ.get("GOOGLE_API_KEY")
# Create DataFrame by processing the data
df = process_data()
# Get closest 20 points (indices) by Euclidean Distance
closest_points = find_closest_points(given_coordinate, list(zip(list(df.index), list(df['latitude']), list(df['longitude']))), 20)
# Convert destination coordinate to string
origin = str(given_coordinate[0]) + ',' + str(given_coordinate[1])
# Values to be returned
indices = []
addresses = []
distances = []
durations = []
# Get distance and duration from origin to each of the closest points using Google Maps API
for point in closest_points:
destination = str(df.loc[point]['latitude']) + ',' + str(df.loc[point]['longitude'])
response = find_maps_distance(origin, destination, maps_api_key)
address = "".join(response['destination_addresses'])
distance = response['rows'][0]['elements'][0]['distance']['text']
duration = response['rows'][0]['elements'][0]['duration']['text']
indices.append(point)
addresses.append(address)
distances.append(distance)
durations.append(duration)
return indices[:n], distances[:n], durations[:n], addresses[:n]
def display_city_chargers(city):
'''
Take a dataframe and city name as input and display the chargers in that city
'''
# Create DataFrame by processing the data
df = process_data()
# Filter the dataframe to only include chargers in the cities
df = df[df["city"].isin(city)]
# If there is only one city, generate a map centered at the city
if len(city) == 1:
center_coords = get_coordinates(city)
zoom_start = 10
# If there are multiple cities, generate a map centered at India
else:
center_coords = (22.845137, 78.672679)
zoom_start = 5
map = folium.Map(location=center_coords, zoom_start=zoom_start)
# Visualize the chargers on the map
for i, row in df.iterrows():
color = color_map[row.charger_type]
coords_x, coords_y = df.loc[i, "coords"]
if coords_x:
coords_x = round(float(coords_x), 4)
if coords_y:
coords_y = round(float(coords_y), 4)
folium.Marker(location=df.loc[i, "coords"],
icon=folium.Icon(color=color),
tooltip=df.loc[i, "charger_type"],
popup=df.loc[i, "address"]).add_to(
map
)
# Render Folium map in Streamlit
return st_folium.st_folium(map, width=725)
def display_chargers_by_location(location):
'''
Take a location as input and display the chargers near that location
'''
# Get Google Maps API Key
maps_api_key = os.environ.get("GOOGLE_API_KEY")
# Create DataFrame by processing the data
df = process_data()
# Get coordinates of the location
given_coordinate = get_coordinates(location)
# Get nearest chargers
indices, distances, durations, addresses = find_nearest_coordinate(
given_coordinate, maps_api_key,
)
display_chargers_df = []
# Display chargers
map = folium.Map(location=given_coordinate, zoom_start=12)
i = 0
for idx, distance, duration, address in zip(indices, distances, durations, addresses):
color = color_map[df.loc[idx, "charger_type"]]
# Create Popups
invisible_character = "⠀"
popup_distance = invisible_character.join(distance.split(" "))
popup_duration = invisible_character.join(duration.split(" "))
coords_x, coords_y = df.loc[idx, "coords"]
if coords_x:
coords_x = round(float(coords_x), 4)
if coords_y:
coords_y = round(float(coords_y), 4)
folium.Marker(location=df.loc[idx, "coords"],
icon=folium.Icon(color=color),
tooltip=df.loc[idx, "charger_type"],
popup=f"{i}\n{popup_distance}\n{popup_duration}").add_to(map)
i += 1
# Add entered location to map in white color
folium.Marker(location=given_coordinate,
icon=folium.Icon(color="red"),
tooltip="Entered Location",
popup=location
).add_to(map)
# Add values to display_chargers_df
display_chargers_df.append({
"Distance": distance,
"Duration": duration,
"Address": address,
"Charger Type": df.loc[idx]["charger_type"]
})
st_folium.st_folium(map, width=725)
st.dataframe(pd.DataFrame(display_chargers_df))
def cluster_by_distance(points, radius):
'''
Using points and specified radius, forms clusters based on density
'''
distances = []
n = len(points)
for i in range(n):
distances.append([])
for j in range(i + 1, n):
distances[-1].append(haversine(points[i], points[j])) #unit is km
# visited = [0]*n
clusters = []
for i in range(n):
clusters.append([])
count = 0
for d in distances[i]:
count += 1
if d <= radius:
clusters[-1].append(points[i+count])
return clusters
def display_user_requested_chargers():
'''
Perform clustering on user requested chargers and display the results
'''
# Read csv file containing user requested chargers
df = pd.read_csv("data/user_requested_chargers.csv")
city_coords = (12.9725881014472, 77.59406890113576)
colors = ['red', 'blue', 'green', 'purple', 'orange', 'darkred', 'beige', 'darkblue', 'darkgreen', 'cadetblue', 'darkpurple', 'pink', 'lightblue', 'lightgreen', 'black']
n = len(colors)
coordinates = df.values.tolist()
st.write("### Locations requested by users ")
# Create streamlit columns
col1, col2, col3 = st.columns(3)
with col1:
st.write("##### Shown below is a map of user recommended locations that was generated by us for this demonstration.")
user_requests_map = folium.Map(location=(12.9725881014472, 77.6406890113576), zoom_start=12, control_scale = True)
for latitude, longitude in coordinates:
folium.Marker(location=(latitude, longitude), icon=folium.Icon()).add_to(user_requests_map)
st_folium.st_folium(user_requests_map, width=725, key="user_requests_map")
with col2:
st.write("##### Here, you can see the values that we generated.")
st.dataframe(df, width=300, height=725)
with col3:
# Add new value
st.write("##### You can add new values to the table below. This will help us improve our algorithm.")
new_latitude = st.text_input("Latitude")
new_longitude = st.text_input("Longitude")
if st.button("Add"):
if new_latitude and new_longitude:
new_row = pd.DataFrame.from_dict({"latitude": [new_latitude], "longitude": [new_longitude]})
df = pd.concat([df, new_row])
df.to_csv("user_requested_chargers.csv", index=False)
st.success("Added new value")
st.experimental_rerun()
else:
st.error("Please enter both latitude and longitude")
st.write("### Configuring values for the alogorithm")
st.write("##### The user can change these values according to their needs. We have set the default values to the ones that we found to be the most optimal.")
# Configuration - Input boxes for radius and min samples
radius = st.number_input("Radius", min_value=0.1, max_value=10.0, value=0.5, step=0.1)
min_samples = st.number_input("Minimum Samples", min_value=5, max_value=50, value=15, step=5)
st.write("### Clusters formed based on distances (Conventional Clustering)")
st.write("##### Since the earth is curved, Euclidean distance is not the most appropriate. So, we have computed Haversine distance, that takes into account the curvature of the earth to find distances between all the points. Using this we have used a conventional clustering algorithm to form clusters.")
clusters_1 = [x for x in cluster_by_distance(coordinates.copy(), radius) if len(x) >= min_samples]
distance_based_clusters_map = folium.Map(location=city_coords, zoom_start=12)
for i in range(len(clusters_1)):
cluster = clusters_1[i]
color = colors[i % n]
for point in cluster:
x, y = point[0], point[1]
folium.Marker(location= (x, y), icon=folium.Icon(color=color)).add_to(distance_based_clusters_map)
st_folium.st_folium(distance_based_clusters_map, width=725, key="distance_based_clusters_map")
st.write("### Clusters formed based on densities (DBSCAN Algorithm)")
st.write("##### As you saw, the results of the conventional clustering algorithm were not that great. To improve on this, we used the DBSCAN algorithm to construct better clusters.")
st.warning("Note: The grey markers don't belong to any cluster.")
epsilon = 0.5 / 6371.0
db = DBSCAN(eps=epsilon, min_samples=min_samples, algorithm='ball_tree', metric='haversine').fit(np.radians(df.to_numpy()))
labels = db.labels_
density_based_clusters_map = folium.Map(location=city_coords, zoom_start=12, control_scale = True)
for i in range(len(labels)):
if labels[i] == -1:
color = 'lightgray'
folium.Marker(location= coordinates[i], icon=folium.Icon(color=color)).add_to(density_based_clusters_map)
else:
color = colors[labels[i] % n]
folium.Marker(location= coordinates[i], icon=folium.Icon(color=color)).add_to(density_based_clusters_map)
st_folium.st_folium(density_based_clusters_map, width=725, key="density_based_clusters_map")
st.write("### Final Result - Removing points not part of any cluster")
st.write("##### After removing the points that don't belong to any cluster we obtain this final result. This can be used by EV charger companies to place their chargers in optimal locations.")
density_based_clusters_map_1 = folium.Map(location=city_coords, zoom_start=12, control_scale = True)
for i in range(len(labels)):
if labels[i] == -1:
continue
else:
color = colors[labels[i] % n]
folium.Marker(location= coordinates[i], icon=folium.Icon(color=color)).add_to(density_based_clusters_map_1)
st_folium.st_folium(density_based_clusters_map_1, width=725, key="density_based_clusters_map")
def st_filter_template(df, attribute, default_all=False):
'''
Streamlit filter template
'''
container = st.container()
all = st.checkbox(f"Select all {attribute}", value=default_all)
values = list(df[attribute].unique().sort())
if all:
selected_options = container.multiselect(
"Select one or more options:", values, values
)
else:
selected_options = container.multiselect("Select one or more options:", values)
return selected_options
def display_charger_consumption_data():
'''
Display an artificially charger consumption dataset with filter and pivot table functionality
'''
data = pl.read_csv("data/charger_consumption_data.csv")
# Drop the first column
data = data.drop("")
# Hide Columns
hide_columns = st.multiselect("Choose columns to hide", data.columns, default=[])
# Filter Columns
# TODO: Add support for datatypes other than categorical
filter_columns = st.multiselect(
"Choose columns to filter", data.columns, default=[]
)
filters = {}
for column in filter_columns:
filters[column] = st_filter_template(data, column)
for column, values in filters.items():
data = data.filter(pl.col(column).is_in(values))
data = data.drop(hide_columns).to_pandas()
st.dataframe(data)
# Add a column having values just 1
data["count"] = 1
# Pivot Table
st.write("### Pivot Table")
st.write("##### Users can create pivot tables according to their needs by selecting the index, values, columns and aggregation function.")
st.write("##### For example, if you want to see the average uptime of each chargers with rating type 2, you can select the index as charger_type, values as uptime, columns as type_2_rating and aggregation function as mean.")
pivot = {"index": "charger_type", "values": [], "columns": [], "aggfunc": ""}
pivot["values"] = st.multiselect("Choose values", data.columns, default=["daily_usage_2"])
pivot["columns"] = st.multiselect("Choose columns", data.columns, default=["type_2_rating"])
pivot["aggfunc"] = st.selectbox("Choose aggregation function", ["mean", "sum"], index=0)
if pivot["index"] and pivot["values"] and pivot["columns"]:
st.dataframe(
pd.pivot_table(
data,
index=pivot["index"],
values=pivot["values"],
columns=pivot["columns"],
aggfunc=pivot["aggfunc"],
)
)