-
Notifications
You must be signed in to change notification settings - Fork 0
/
app.py
233 lines (186 loc) · 8.74 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
from flask import Flask, flash, redirect, render_template, request
from flask_session import Session
from helpers import (hosts_config_required, make_config_file, load_config,
config_file_exists, add_host_to_config, add_client_to_host,
CONFIG_FN)
# We use the ping3 library to be able to communicate the status of hosts to the user
from ping3 import ping
# We use wakeonlan so that local machines to the server can be turned on remotely
from wakeonlan import send_magic_packet
# We use the paramiko library to send SSH commands
from paramiko import SSHClient, AutoAddPolicy, SSHException
# Configure application
app = Flask(__name__)
# Configure session to use filesystem (instead of signed cookies)
# app.config["SESSION_PERMANENT"] = False
app.config["SESSION_TYPE"] = "filesystem"
Session(app)
@app.after_request
def after_request(response):
"""Ensure responses aren't cached"""
response.headers["Cache-Control"] = "no-cache, no-store, must-revalidate"
response.headers["Expires"] = 0
response.headers["Pragma"] = "no-cache"
return response
@app.route("/", methods=["GET", "POST"])
@hosts_config_required
def index():
config = load_config()
# We ping all hosts and ultimately add the result into the config that's passed to our template
for idx, host in enumerate(config["hosts"]):
try:
ping_time = ping(host["ip_address"])
if ping_time is False or ping_time is None:
config["hosts"][idx]["ping"] = "failure"
else:
config["hosts"][idx]["ping"] = "success"
# The ping3 library can raise an error on some systems if it doesn't have elevated privileges
except PermissionError:
config["hosts"][idx]["ping"] = "warning"
return render_template("index.html", config=config)
@app.route("/hosts-config", methods=["GET", "POST"])
def host_config():
if request.method == "GET":
return render_template("hosts-config.html")
host_name, ssh_username, ip_address, mac_address = (request.form.get("host_name"),
request.form.get("ssh_username"),
request.form.get("ip_address"),
request.form.get("mac_address"))
# Validate that all fields are filled
if not host_name or not ssh_username or not ip_address or not mac_address:
flash("Must fill all fields", "danger")
return render_template("/hosts-config.html")
if config_file_exists():
config_successfully_updated = add_host_to_config(host_name, ssh_username, ip_address, mac_address)
if not config_successfully_updated:
flash("A host with this name already exists, consider editing it instead", "danger")
# We redirect to the homepage so that the host can be edited (TODO)
return redirect("/")
else:
# If the user has no config file setup, we create it with the provided info
make_config_file(host_name, ssh_username, ip_address, mac_address)
flash("Host added successfully!", "success")
# On setup of any host, we give the user a bit more information about the host's reachability
try:
if ping(ip_address):
flash("Host is reachable by the server", "success")
else:
flash("Could not reach entered IP, may be incorrect", "warning")
except PermissionError:
flash("The server can't ping the host, run with elevated privileges for more functionality", "warning")
return redirect("/")
@app.route("/clients-config", methods=["GET", "POST"])
@hosts_config_required
def client_config():
if request.method == "GET":
# We load all host names so that the user can select which host to add the new client to
hosts = [x["name"] for x in load_config()["hosts"]]
return render_template("clients-config.html", hosts=hosts)
host_name, name, resolution, ssh_command = (request.form.get("host_name"),
request.form.get("client_name"),
request.form.get("resolution"),
request.form.get("ssh_command"))
# We validate that all fields are filled
if not host_name or not name or not ssh_command:
flash("All required fields must be filled", "danger")
return redirect("/clients-config")
if not resolution:
resolution = None
if add_client_to_host(host_name, name, resolution, ssh_command):
flash("Client added successfully", "success")
else:
flash("Invalid client entered! Make sure you're not trying to add a client that already exists.", "danger")
redirect("/clients-config")
return redirect("/")
@app.route("/wakeup", methods=["GET"])
@hosts_config_required
def wakeup():
""" A sort of API to send magic packets to a host located in the config file """
host_label = request.args.get("host-label")
# We validate that the parameter is given
if not host_label:
flash("Expected a host label but did not receive any", "danger")
return redirect("/")
# We make sure the host is in our config
hosts = load_config()["hosts"]
if host_label not in [x["name"] for x in hosts]:
flash("This host doesn't exist!", "danger")
return redirect("/")
for host in hosts:
if host["name"] != host_label:
continue
try:
send_magic_packet(host["mac_address"])
flash("Sent magic packet!", "success")
except ValueError:
flash("Invalid MAC address! Are you sure the host is configured correctly?", "danger")
finally:
return redirect("/")
return redirect("/")
@app.route("/command", methods=["GET"])
@hosts_config_required
def run_client_command():
""" A sort of API to run SSH commands on a host located in the config file """
host_label = request.args.get("host-label")
client_label = request.args.get("client")
# We validate that both parameters are given
if not host_label:
flash("Expected a host label but did not receive any", "danger")
return redirect("/")
if not client_label:
flash("Expected a client label but did not receive any", "danger")
return redirect("/")
hosts = load_config()["hosts"]
# We make sure the host is in our config
if host_label not in [x["name"] for x in hosts]:
flash("This host doesn't exist!", "danger")
return redirect("/")
for host in hosts:
# We don't do anything if the host doesn't match the one given in the GET request
if host["name"] != host_label:
continue
clients = [x for x in host["clients"]]
# We make sure the client is in our config
if client_label not in [x["name"] for x in clients]:
flash("This client doesn't exist!", "danger")
return redirect("/")
for client in clients:
# We don't do anything if the client doesn't match the one given in the GET request
if client["name"] != client_label:
continue
# We grab the command associated with the client
ssh_command = client["ssh_command"]
user = host["ssh_username"]
ip_address = host["ip_address"]
# We set up a paramiko SSH tunnel
# We assume the user has already copied over his SSH keys from the machine
# That runs this flask app to the targeted host
client = SSHClient()
client.load_system_host_keys()
client.set_missing_host_key_policy(AutoAddPolicy())
try:
# Connect and run the command
client.connect(ip_address, username=user)
stdin, stdout, stderr = client.exec_command(ssh_command)
errors = [x for x in stderr]
if not errors:
flash("Command ran successfully", "success")
else:
# In case of any errors, we display them directly in the web interface
flash("Command execution may have failed", "danger")
flash(f"Command sent: {ssh_command}", "warning")
for error in errors:
flash(error, "warning")
except SSHException as e:
flash(f"Error: {str(e)}", "danger")
flash("Make sure you've copied over SSH keys.", "danger")
return redirect("/")
@app.route("/about")
def about():
return render_template("/about.html")
@app.route("/README")
def readme():
with open("./README.md") as readme_file:
readme = readme_file.read()
# The readme template renders any markdown appropriately
return render_template("/README.html", readme=readme)