forked from ramnode/LookingGlass
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlookingglass.py
139 lines (127 loc) · 3.79 KB
/
lookingglass.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import flask
import re
import sh
app = flask.Flask(__name__)
app.config.from_pyfile('instance/default.cfg')
app.config.from_envvar('CONFIG_FILE', silent=True)
def get_and_validate_host(request):
"""Confirms that request's host parameter is a host or ip(v6)
returns None on failure.
"""
target = request.args.get('host')
if target is None:
return
match = re.search("^[a-zA-Z0-9\-\.:]+$", target)
if match is not None:
return target
return
def execute(encoder, command, *args, **kwargs):
"""Runs <command> with *args and **kwargs, then handles output using the
requested <encoder>.
For raw output, a generator is used to stream results.
"""
def raw_stream_generator():
"""Generator for streaming output"""
try:
for chunk in command(*args, _iter=True, **kwargs):
yield flask.escape(chunk)
except sh.ErrorReturnCode:
pass
if encoder == "raw":
return flask.Response(raw_stream_generator(), mimetype='text/html')
elif encoder == "json":
try:
output = command(*args, **kwargs)
status = 0
except sh.ErrorReturnCode as exc:
output = exc.stdout
status = exc.exit_code
return flask.jsonify({
"output": "{0}".format(output),
"status": status
})
else:
return flask.escape("Error, invalid encoder")
def error_response(encoder, error):
"""Responds with <error> text based on <encoder>"""
if encoder == "json":
return flask.jsonify({
"output": error,
"status": 1
})
else:
return flask.escape(error)
@app.route("/<encoder>/host")
def api_host(encoder):
target = get_and_validate_host(flask.request)
if target is None:
return error_response(encoder, "Error, invalid host")
return execute(
encoder,
sh.host,
target
)
@app.route("/<encoder>/mtr<version>")
def api_mtr(encoder, version):
target = get_and_validate_host(flask.request)
if target is None:
return error_response(encoder, "Error, invalid host")
if version not in ('4', '6'):
return error_response(encoder, "Error, invalid ip version")
return execute(
encoder,
sh.mtr,
'-{0}'.format(version),
'--report',
'--report-wide',
target,
c=4
)
@app.route("/<encoder>/ping<version>")
def api_ping(encoder, version):
target = get_and_validate_host(flask.request)
if target is None:
return error_response(encoder, "Error, invalid host")
if version == '4':
command = sh.ping
elif version == '6':
command = sh.ping6
else:
return error_response(encoder, "Error, invalid ip version")
return execute(
encoder,
command,
target,
c=4,
i=1,
w=8
)
@app.route("/<encoder>/traceroute<version>")
def api_traceroute(encoder, version):
target = get_and_validate_host(flask.request)
if target is None:
return error_response(encoder, "Error, invalid host")
if version not in ('4', '6'):
return error_response(encoder, "Error, invalid ip version")
return execute(
encoder,
sh.traceroute,
'-{0}'.format(version),
target,
A=True,
w=2
)
@app.route("/")
def index():
return flask.render_template(
'index.html',
site_name = app.config["SITE_NAME"],
theme = app.config["THEME"],
site_location = app.config["SITE_LOCATION"],
test_ipv4 = app.config["TEST_IPV4"],
test_ipv6 = app.config["TEST_IPV6"],
test_files = app.config["TEST_FILES"],
additional_lgs = app.config["ADDITIONAL_LG_LIST"]
)
if __name__ == "__main__":
app.run()