-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpwn.py
131 lines (114 loc) · 4.19 KB
/
pwn.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
#!/usr/bin/env python3
# _*_ coding: utf-8 _*_
import socket
import select
import sys
class context:
log_level = "info"
@staticmethod
def log_level(level):
context.log_level = level
def log(message, level="info", length=None, interactive_mode=False, is_input=False):
log_levels = {
"debug": 10,
"info": 20,
"warning": 30,
"error": 40
}
current_log_level = log_levels.get(context.log_level, 20)
message_log_level = log_levels.get(level, 20)
if message_log_level < current_log_level:
return # 不打印低于当前日志级别的消息
colors = {
"info": "\033[94m",
"debug": "\033[92m",
"warning": "\033[93m",
"error": "\033[91m",
"input": "\033[95m",
"output": "\033[96m"
}
reset = "\033[0m"
action = "Received" if not is_input else "Sent"
colored_action = f"{colors.get('input' if is_input else 'output', '')}{action}{reset}"
prefix = ""
if interactive_mode:
prefix = "[IN]" if is_input else "[OUT]"
colored_prefix = f"{colors.get('input' if is_input else 'output', '')}{prefix}{reset}"
colored_level_tag = f"{colors.get(level, '')}[{level.upper()}]{reset}"
message_format = f"{colored_level_tag} {colored_prefix} {colored_action} {length} bytes:"
else:
colored_level_tag = f"{colors.get(level, '')}[{level.upper()}]{reset}"
message_format = f"{colored_level_tag} {colored_action} {length} bytes:" if length is not None else f"{colored_level_tag}"
if length is not None:
print(message_format)
for line in message.splitlines():
print(f" {line}")
else:
for line in message.splitlines():
print(f"{message_format} {line}")
class remote:
def __init__(self, HOST, PORT):
self.sh = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sh.connect((HOST, PORT))
log(f"Connected to {HOST}:{PORT}", "info")
def send(self, msg):
assert isinstance(msg, bytes)
self.sh.send(msg)
log(msg, "debug", len(msg), interactive_mode=True, is_input=True)
def sendline(self, msg: bytes):
self.send(msg + b'\n')
def recv(self, num: int):
data = self.sh.recv(num)
if context.log_level == "debug":
log(data, "debug", len(data), interactive_mode=True)
return data
def recvuntil(self, msg):
assert isinstance(msg, bytes)
tmp = b''
while msg not in tmp:
tmp += self.sh.recv(1)
if context.log_level == "debug":
log(tmp, "debug", len(tmp), interactive_mode=True)
return tmp
def recvline(self):
return self.recvuntil(b'\n')
def recvline_contains(self, keyword):
assert isinstance(keyword, bytes)
line = b''
while True:
line = self.recvline()
if keyword in line:
return line
def sendafter(self, delim, msg):
assert isinstance(delim, bytes) and isinstance(msg, bytes)
self.recvuntil(delim)
self.send(msg)
def close(self):
self.sh.close()
log("Connection closed", "info")
def interactive(self):
# 特殊染色
print("\033[93m[Switched]\033[0m to interactive mode")
try:
while True:
# 使用float()确保timeout参数是浮点数类型
ready = select.select([self.sh, sys.stdin], [], [], float(0.1))
if self.sh in ready[0]:
data = self.sh.recv(4096)
if not data:
log("Connection closed by remote host", "info")
break
log(data, "info", len(data), interactive_mode=True)
if sys.stdin in ready[0]:
input_data = input().encode() + b'\n'
self.send(input_data)
except KeyboardInterrupt:
log("Interactive session ended by user", "info")
finally:
self.close()
if __name__ == "__main__":
context.log_level("debug") # 生效于非交互模式
r = remote("titan.picoctf.net", 52525)
# r.interactive()
r.sendline(b"hello")
r.close()