-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrouting.py
162 lines (137 loc) · 5.96 KB
/
routing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import streamlit as st
import duckdb
import folium
from folium.plugins import Geocoder
from geopy.geocoders import Nominatim
from streamlit_folium import st_folium
from utils import load_from_url, shortest_path, find_closest_point, create_feedback_table
# Connect to DuckDB database
conn = duckdb.connect(database='data.duckdb')
cursor = conn.cursor()
create_feedback_table(conn)
# Load nodes and edges from the database
nodes, edges = load_from_url(conn)
# Set the page configuration for the Streamlit app
st.set_page_config(page_title="SafePath")
# Display the title and description
st.write("# Welcome to SafePath!")
st.write("The calculator of the safest routes for women in Amsterdam.")
# Create two columns for user input
col1, col2 = st.columns(2)
# Initialise the Amsterdam map
m = folium.Map(location=[52.3676, 4.9041], zoom_start=13)
Geocoder().add_to(m)
geolocator = Nominatim(user_agent="SafePath")
option = st.selectbox(
"Please specify path type",
("Safe", "Fast", "Both"))
# Initialise source and destination variables
source, destination = None, None
# conn.execute("DELETE FROM shortest_path;")
# User input for the starting point
with col1:
user_input1 = st.text_input("Starting point:")
if user_input1:
location = geolocator.geocode(user_input1)
source = find_closest_point(conn, location.longitude, location.latitude)
# User input for the end point
with col2:
user_input2 = st.text_input("End point:")
if user_input2:
location = geolocator.geocode(user_input2)
destination = find_closest_point(conn, location.longitude, location.latitude)
flag = False
if source is not None and destination is not None:
# Calculate the shortest path between source and destination
path = shortest_path(conn, source[0], destination[0], option)
with col1:
st.write("")
with col2:
st.write("")
if path is not None:
path_nodes, _ = path
nodes_query = (f"SELECT id, ST_X(geom) AS longitude, ST_Y(geom) AS latitude FROM ams_walk_nodes WHERE id IN "
f"({','.join(map(str, path_nodes))})")
nodes_data = conn.execute(nodes_query).fetchall()
nodes_data = {x[0]: x for x in nodes_data}
# Add nodes as markers
for node_id in path_nodes[1:len(nodes_data) - 1]:
node = nodes_data[node_id]
folium.Marker(
location=[node[2], node[1]],
popup=f"Node {node[0]}",
tooltip=f"Node {node[0]}"
).add_to(m)
folium.Marker(
location=[source[2], source[1]],
popup=f"Start",
tooltip=f"{source[0]}",
icon=folium.Icon(color="red")
).add_to(m)
folium.Marker(
location=[destination[2], destination[1]],
popup=f"End",
tooltip=f"{destination[0]}",
icon=folium.Icon(color="green")
).add_to(m)
# Add edges as polylines and display feedback averages
total_safety, total_lighting, total_speed, total_overall, edge_count = 0, 0, 0, 0, 0
for i in range(len(path_nodes) - 1):
nodeid1 = path_nodes[i]
nodeid2 = path_nodes[i + 1]
# Query to get the average ratings for the edge between nodeid1 and nodeid2
feedback_query = f"""
SELECT
AVG(safety_rating) AS avg_safety,
AVG(lighting_rating) AS avg_lighting,
AVG(speed_rating) AS avg_speed,
AVG(overall_rating) AS avg_overall
FROM feedback
WHERE
(nodeOne = {nodeid1} AND nodeTwo = {nodeid2})
OR (nodeOne = {nodeid2} AND nodeTwo = {nodeid1});
"""
feedback_data = conn.execute(feedback_query).fetchone()
# Handle None values
avg_safety = feedback_data[0] if feedback_data[0] is not None else 0.0
avg_lighting = feedback_data[1] if feedback_data[1] is not None else 0.0
avg_speed = feedback_data[2] if feedback_data[2] is not None else 0.0
avg_overall = feedback_data[3] if feedback_data[3] is not None else 0.0
# Sum up the averages for each edge
total_safety += avg_safety
total_lighting += avg_lighting
total_speed += avg_speed
total_overall += avg_overall
edge_count += 1
# Add polyline with feedback info as popup
popup_content = f"Safety: {avg_safety:.2f}, Lighting: {avg_lighting:.2f}, Speed: {avg_speed:.2f}, Overall: {avg_overall:.2f}"
folium.PolyLine(
locations=[
[nodes_data[path_nodes[i]][2], nodes_data[path_nodes[i]][1]],
[nodes_data[path_nodes[i + 1]][2], nodes_data[path_nodes[i + 1]][1]]
],
color="blue",
popup=folium.Popup(popup_content, max_width=300)
).add_to(m)
# Calculate the average ratings across the entire path
if edge_count > 0:
flag=True
else:
flag=False
st_folium(m, width=600, height=400)
if flag is True:
avg_safety = total_safety / edge_count
avg_lighting = total_lighting / edge_count
avg_speed = total_speed / edge_count
avg_overall = total_overall / edge_count
st.write("### Average Ratings for the Selected Route")
def create_progress_bar(label, value):
bar = f"<progress value='{value * 20}' max='100' style='width: 150px'></progress>"
return f"{label}: {bar} {value:.2f}/5.00"
st.markdown(create_progress_bar("Average Safety Rating", avg_safety), unsafe_allow_html=True)
st.markdown(create_progress_bar("Average Lighting Rating", avg_lighting), unsafe_allow_html=True)
st.markdown(create_progress_bar("Average Speed Rating", avg_speed), unsafe_allow_html=True)
st.markdown(create_progress_bar("Average Overall Rating", avg_overall), unsafe_allow_html=True)
else:
st.write("No path found")
conn.close()