forked from Jrohy/multi-v2ray
-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils.py
211 lines (176 loc) · 5.97 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import urllib.request
import os
import re
from enum import Enum, unique
from OpenSSL import crypto
@unique
class Color(Enum):
"""
终端显示颜色 枚举类
"""
# 显示格式: \033[显示方式;前景色;背景色m
# 只写一个字段表示前景色,背景色默认
RED = '\033[31m' # 红色
GREEN = '\033[32m' # 绿色
YELLOW = '\033[33m' # 黄色
BLUE = '\033[34m' # 蓝色
FUCHSIA = '\033[35m' # 紫红色
CYAN = '\033[36m' # 青蓝色
WHITE = '\033[37m' # 白色
#: no color
RESET = '\033[0m' # 终端默认颜色
@unique
class StreamType(Enum):
TCP = 'tcp'
TCP_HOST = 'tcp_host'
SOCKS = 'socks'
SS = 'ss'
MTPROTO = 'mtproto'
H2 = 'h2'
WS = 'ws'
QUIC = 'quic'
KCP = 'kcp'
KCP_UTP = 'utp'
KCP_SRTP = 'srtp'
KCP_DTLS = 'dtls'
KCP_WECHAT = 'wechat'
KCP_WG = 'wireguard'
def stream_list():
return [
StreamType.KCP_WG,
StreamType.KCP_DTLS,
StreamType.KCP_WECHAT,
StreamType.KCP_UTP,
StreamType.KCP_SRTP,
StreamType.MTPROTO,
StreamType.SOCKS,
StreamType.SS
]
def header_type_list():
return ("none", "srtp", "utp", "wechat-video", "dtls", "wireguard")
def ss_method():
return ("aes-256-cfb", "aes-128-cfb", "chacha20",
"chacha20-ietf", "aes-256-gcm", "aes-128-gcm", "chacha20-poly1305")
def color_str(color: Color, str: str) -> str:
"""
返回有色字符串
"""
return '{}{}{}'.format(
color.value,
str,
Color.RESET.value
)
def is_number(s):
"""
判断是否为数字的函数
"""
try:
float(s)
return True
except ValueError:
pass
try:
import unicodedata
unicodedata.numeric(s)
return True
except (TypeError, ValueError):
pass
return False
def get_ip():
"""
获取本地ip
"""
my_ip = urllib.request.urlopen('http://api.ipify.org').read()
return bytes.decode(my_ip)
def port_is_use(port):
"""
判断端口是否占用
"""
cmd = "lsof -i:" + str(port)
result = os.popen(cmd).readlines()
return result != []
def is_email(email):
"""
判断是否是邮箱格式
"""
str=r'^[a-zA-Z0-9_-]+(\.[a-zA-Z0-9_-]+){0,4}@[a-zA-Z0-9_-]+(\.[a-zA-Z0-9_-]+){0,4}$'
return re.match(str, email)
def get_domain_by_crt_file(crt_path):
"""
通过证书文件获取域名, 证书文件有误或不存在则返回空
"""
try:
cert = crypto.load_certificate(crypto.FILETYPE_PEM, open(crt_path).read())
except:
return
return cert.get_subject().CN
def bytes_2_human_readable(number_of_bytes, precision=1):
"""
流量bytes转换为kb, mb, gb等单位
"""
if number_of_bytes < 0:
raise ValueError("!!! number_of_bytes can't be smaller than 0 !!!")
step_to_greater_unit = 1024.
number_of_bytes = float(number_of_bytes)
unit = 'bytes'
if (number_of_bytes / step_to_greater_unit) >= 1:
number_of_bytes /= step_to_greater_unit
unit = 'KB'
if (number_of_bytes / step_to_greater_unit) >= 1:
number_of_bytes /= step_to_greater_unit
unit = 'MB'
if (number_of_bytes / step_to_greater_unit) >= 1:
number_of_bytes /= step_to_greater_unit
unit = 'GB'
if (number_of_bytes / step_to_greater_unit) >= 1:
number_of_bytes /= step_to_greater_unit
unit = 'TB'
number_of_bytes = round(number_of_bytes, precision)
return str(number_of_bytes) + ' ' + unit
def gen_cert(domain):
service_name = ["v2ray", "nginx", "httpd", "apache2"]
start_cmd = "service {} start >/dev/null 2>&1"
stop_cmd = "service {} stop >/dev/null 2>&1"
if not os.path.exists("/root/.acme.sh/acme.sh"):
os.system("curl https://get.acme.sh | sh")
get_ssl_cmd = "bash /root/.acme.sh/acme.sh --issue -d " + domain + " --standalone --keylength ec-256"
for name in service_name:
os.system(stop_cmd.format(name))
os.system(get_ssl_cmd)
for name in service_name:
os.system(start_cmd.format(name))
def calcul_iptables_traffic(port):
traffic_result = os.popen("bash /usr/local/multi-v2ray/global_setting/calcul_traffic.sh {}".format(str(port))).readlines()
if traffic_result:
traffic_list = traffic_result[0].split()
upload_traffic = bytes_2_human_readable(int(traffic_list[0]), 2)
download_traffic = bytes_2_human_readable(int(traffic_list[1]), 2)
total_traffic = bytes_2_human_readable(int(traffic_list[2]), 2)
return "{0}: upload:{1} download:{2} total:{3}".format(color_str(Color.GREEN, str(port)),
color_str(Color.CYAN, upload_traffic), color_str(Color.CYAN, download_traffic), color_str(Color.CYAN, total_traffic))
def clean_iptables(port):
clean_cmd = "iptables -D {0} {1}"
check_cmd = "iptables -nvL %s --line-number|grep -w \"%s\"|awk '{print $1}'|sort -r"
input_result = os.popen(check_cmd % ("INPUT", str(port))).readlines()
for line in input_result:
os.system(clean_cmd.format("INPUT", str(line)))
output_result = os.popen(check_cmd % ("OUTPUT", str(port))).readlines()
for line in output_result:
os.system(clean_cmd.format("OUTPUT", str(line)))
def open_port():
input_cmd = "iptables -I INPUT -p {0} --dport {1} -j ACCEPT"
output_cmd = "iptables -I OUTPUT -p {0} --sport {1}"
check_cmd = "iptables -nvL --line-number|grep -w \"%s\""
from loader import Loader
group_list = Loader().profile.group_list
port_set = set([group.port for group in group_list])
for port in port_set:
port_str = str(port)
if len(os.popen(check_cmd % port_str).readlines()) > 0:
continue
os.system(input_cmd.format("tcp", port_str))
os.system(input_cmd.format("udp", port_str))
os.system(output_cmd.format("tcp", port_str))
os.system(output_cmd.format("udp", port_str))