-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsettings.py
146 lines (120 loc) · 4.89 KB
/
settings.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import os.path
import ssl
from tornado.options import define
from webssh.policy import (
load_host_keys, get_policy_class, check_policy_setting
)
from webssh.utils import (
to_ip_address, parse_origin_from_url, is_valid_encoding
)
define('address', default='0.0.0.0', help='绑定IP地址')
define('port', type=int, default=80, help='绑定端口')
define('debug', type=bool, default=False, help='Debug 模式')
define('policy', default='warning',
help='Missing host key policy, reject|autoadd|warning')
define('hostfile', default='', help='User defined host keys file')
define('syshostfile', default='', help='System wide host keys file')
define('tdstream', default='', help='Trusted downstream, separated by comma')
define('fbidhttp', type=bool, default=True,
help='Forbid public plain http incoming requests')
define('xheaders', type=bool, default=True, help='Support xheaders')
define('xsrf', type=bool, default=True, help='CSRF protection')
define('origin', default='same', help='''Origin policy,
'same': same origin policy, matches host name and port number;
'primary': primary domain policy, matches primary domain only;
'<domains>': custom domains policy, matches any domain in the <domains> list
separated by comma;
'*': wildcard policy, matches any domain, allowed in debug mode only.''')
define('wpintvl', type=float, default=0, help='Websocket ping interval')
define('timeout', type=float, default=3, help='SSH connection timeout')
define('delay', type=float, default=3, help='The delay to call recycle_worker')
define('maxconn', type=int, default=20,
help='Maximum live connections (ssh sessions) per client')
define('encoding', default='',
help='''The default character encoding of ssh servers.
Example: --encoding='utf-8' to solve the problem with some switches&routers''')
# base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
max_body_size = 1 * 1024 * 1024 * 1024
def get_app_settings(options):
settings = dict(
template_path=os.path.join(os.path.dirname(__file__), 'core', 'html', 'templates'),
static_path=os.path.join(os.path.dirname(__file__), 'core', 'html', 'static'),
websocket_ping_interval=options.wpintvl,
debug=options.debug,
xsrf_cookies=options.xsrf,
origin_policy=get_origin_setting(options)
)
return settings
def get_server_settings(options):
settings = dict(
xheaders=options.xheaders,
max_body_size=max_body_size,
trusted_downstream=get_trusted_downstream(options.tdstream)
)
return settings
def get_host_keys_settings(options):
if not options.hostfile:
host_keys_filename = os.path.join('known_hosts')
else:
host_keys_filename = options.hostfile
host_keys = load_host_keys(host_keys_filename)
if not options.syshostfile:
filename = os.path.expanduser('~/.ssh/known_hosts')
else:
filename = options.syshostfile
system_host_keys = load_host_keys(filename)
settings = dict(
host_keys=host_keys,
system_host_keys=system_host_keys,
host_keys_filename=host_keys_filename
)
return settings
def get_policy_setting(options, host_keys_settings):
policy_class = get_policy_class(options.policy)
check_policy_setting(policy_class, host_keys_settings)
return policy_class()
def get_ssl_context(options):
if not options.certfile and not options.keyfile:
return None
elif not options.certfile:
raise ValueError('certfile is not provided')
elif not options.keyfile:
raise ValueError('keyfile is not provided')
elif not os.path.isfile(options.certfile):
raise ValueError('File {!r} does not exist'.format(options.certfile))
elif not os.path.isfile(options.keyfile):
raise ValueError('File {!r} does not exist'.format(options.keyfile))
else:
ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_ctx.load_cert_chain(options.certfile, options.keyfile)
return ssl_ctx
def get_trusted_downstream(tdstream):
result = set()
for ip in tdstream.split(','):
ip = ip.strip()
if ip:
to_ip_address(ip)
result.add(ip)
return result
def get_origin_setting(options):
if options.origin == '*':
if not options.debug:
raise ValueError(
'Wildcard origin policy is only allowed in debug mode.'
)
else:
return '*'
origin = options.origin.lower()
if origin in ['same', 'primary']:
return origin
origins = set()
for url in origin.split(','):
orig = parse_origin_from_url(url)
if orig:
origins.add(orig)
if not origins:
raise ValueError('Empty origin list')
return origins
def check_encoding_setting(encoding):
if encoding and not is_valid_encoding(encoding):
raise ValueError('Unknown character encoding {!r}.'.format(encoding))