-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathprogram_logic.py
315 lines (272 loc) · 14.2 KB
/
program_logic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
#!/usr/bin/python
"""
###########################
# PSEUDO CODE FOR PROGRAM #
###########################
-> Start at if __name__ == "__main__" <-
The logic example behind program in python:
1. try Create socket else "could not create socket".
2. Call to htons() - Host to Network short.
3. try Bind print "bind done" else "bind failed" to a port -> exit
4. listen for connection in while true - repeat itself "waiting for incoming connections"
5. try accept connections; print "connection accepted" except error - print "accept failed" -> exit
6. recv(1) 1 byte and store in memory.
7. if msg[0] == '\n': begin login process.
elif msg[0] == '\x0f': Send 'You are close'
else: Send 'Meh'
8. login process:
I. Calls get_string 3 times: heap allocates memory. strcpy the usernames to different heap allocated memory areas.
+ Prints the decimal value of the each username's prefix number with +0x01 to it's value.
II. Prints the vars it received from the msg (Need a NULL byte).
III.strcmp the usernames 3 times with 3 pre-defined usernames from the program in this order: 'root', 'root', 'toor'
IV. malloc is called with 0x40 as an argument.
V. memcpy "check\x00" to the the address malloc returned with size of 0x6, the size of "check" + NULL byte.
VI. OR return value of strcmp with '0x00'. saving result to EAX.
VII.Prints 'Success iteration X: %x' ; %x = The result from OR (0 means success in strcmp. Otherwise, strcmp failed.
VIII.Frees all 4 allocated memory in the heap in reverse order. ; 'check', 'toor', 'root', and lastly the first 'root' in msg.
IX. CMP [EBP+return_value_of_strcmp], 0
-> if cmp is successful: Sets byte to AL
X. AL moves to EAX
XI. return from login to main.
9. EAX moves to stack-0x44 (it can be either 0x00 or 0x01)
10. cmp stack-0x44 with 0:
if stack-0x44 == 0: Jumps to sending 'failed' message reply.
if stack-0x44 != 0: jumps to sending 'successful' message reply.
Conclusion:
A program that demonstrates an authentication process.
A client will send a msg that will contain:
a character(msg[0])
kind of like an authentication token (represented as msg[0])
3 usernames with a prefix number before each one.
Between each usernames there should be a padding of 43 chars. Each username should end with NULL byte or else program will break and wont allow access.
"""
import socket
from core.colors import *
import os
PORT = 8888
def start():
os.system('cls' if os.name == 'nt' else 'clear')
print(f"{bold}{underline}This is a PSEUDO CODE for the file program.\n{end}"
"It explains the program work flow while parsing the message from the client.\n"
"To use this code and check how the program parses the message:\n"
f" 1. {underline}Start the program{end} (It will bind to port {bold}{PORT}{end} and listen for connections.\n"
f" 2. {underline}Send the message{end} to the binded address using the provided python script (connect.py), feel free to change the message and see how the program reacts.\n"
f" 3. This code will {underline}parse the message{end} and output the steps that the program uses to parse the message, as well the different functions it uses to allocate and free memory.\n"
f" 4. This code will {underline}send the different replies{end} depending on the message sent.\n"
f"Notes:\n"
f" - Every indented output is an actual output from the program. The rest of the output is information for helping in understanding the process\n"
f" - Check comments in the code, the i64 file (IDA) and the README.md for even more details about the program.\n")
input(f"{run} Press any key to continue...")
os.system('cls' if os.name == 'nt' else 'clear')
def create_socket(ip, port):
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((ip, port))
print(f" {good} Socket binded at {underline}{ip}{end} on port {underline}{port}{end}...")
except Exception as e:
print(f"{bad} Could not create socket, exiting...\n", e)
exit()
while True:
try:
s.listen(1)
print(f" {run} Waiting for incoming connections...")
conn, addr = s.accept()
main(conn, addr)
except Exception as e:
print(f"{bad} Accept Failed...\n", e)
def rotate(inp, d) -> str:
"""Slices strings to the right by d amount"""
Rfirst = inp[0: len(inp) - d]
Rsecond = inp[len(inp) - d:]
result = Rsecond + Rfirst
return result
def htons(port):
"""Converts the unsigned short integer from host byte order to network byte order."""
host_byte_order = hex(port)
# Prints Host Byte Order:
print(f"{info} Host Byte Order For PORT '{port}': {underline}{host_byte_order}{end}")
and_bitwise_result = hex(port & 0xffff) # AND bitwise operation with 0xffff:
port_after_ror = rotate(and_bitwise_result[2:], 2) # ROR operation with and_bitwise_result
print(f"{info} _htons returns byte shifted PORT number as: {port_after_ror}")
def main(conn, addr):
print(f" {good} Connection established with {underline}{addr[0]}{end}\n")
garbage_char = conn.recv(1).decode()
print(f"{info} Program moves with zero-extend (padding with zeros) first char: {garbage_char}")
msg = conn.recv(1024).decode()
print(f"{info} Main calls {underline}_malloc{end}.")
print(f"{info} _malloc will return a pointer to the second character of the message: {hex(msg.encode()[0])}")
if msg[0] == '\n':
reply = login_block(msg)
print(f"""{info} login will return the final value for determining if the auth process was successful or not.
main will proceed to send the correct message to the client regarding the success or failure of the authentication.""")
if reply == "Successful": conn.send(reply.encode())
elif reply == "Failed": conn.send(reply.encode())
print(f"{info} Sent: {reply}.")
elif msg[0] == '\x0f': you_are_close(conn)
else: meh(conn)
print(f"{info} Returning to accepting connections...")
def login_block(msg):
"""This is the main block of the program.
Here, the program authenticates the client's msg with what it expects."""
print(f"{info} Running in {underline}login{end} function now...")
"""Login will call get_string 3 times with these 3 arguments:
(1) Pointer to empty space (later be used for _strcpy destination.
(2) Address to msg at index[1].
(3) The result of MOVZX with garbage_char."""
iter = 1
username_prefix_index = 1
index = 2
slice = 6
result_iteration_1 = get_string(msg, iter, username_prefix_index, index, slice)
# Subs result_iteration_1 with (3) - the result of MOVZX with garbage_char.
# That value ^^ will be used
iter = 2
username_prefix_index = 50
index = 51
slice = 55
result_iteration_2 = get_string(msg, iter, username_prefix_index, index, slice)
print(f"{info} Adds results of both iterations to somewhere in stack.")
# Subs (3) with results of both iterations.
"""Ex.: Until now we had 2 iteration with get_string.
Both of them returned the value of adding 0x01 to the username_prefix_char (0x30 + 0x01 = 0x31)
Adding both results will sum 0x62
Sub this result with the result of (3) MOVZX with garbage_char will return us new value that will be used for the next iteration in get_string.
This value will assist get_string to get the right effective address of usernames in the msg for malloc, strcpy and the next prefix_char in the msg."""
iter = 3
username_prefix_index = 99
index = 100
slice = 104
result_iteration_3 = get_string(msg, iter, username_prefix_index, index, slice)
# Again subs results with (3)
# Prints the 3 variables given in msg
print(f"{info} Prints variables as they were provided to the strcpy:")
string = " var_a: "
index_in_msg = 2
slice = 6
printf_vars(msg, string, index_in_msg, slice)
string = " var_b: "
index_in_msg = 51
slice = 55
printf_vars(msg, string, index_in_msg, slice)
string = " var_c: "
index_in_msg = 100
slice = 104
printf_vars(msg, string, index_in_msg, slice)
"""This should result in the following:
var_a: root<NULL>
var_b: root<NULL>
var_c: toor<NULL>"""
"""strcompares: Before the comparison process begins program will:
mov [ebp+return_value_of_strcmp], 0 # Zeros address"""
nIteration = "1"
addr_of_var1_in_heap_after_strcpy = msg[2:6]
aRoot = "root"
returned_value = strcmp(addr_of_var1_in_heap_after_strcpy, aRoot)
# strcmp will return value 0x00 to EAX if the comparison is successful.
print(f"""{info} Logical Inclusive OR [EBP+return_value_of_strcmp], EAX # [ebp+return_value_of_strcmp] = 0x00 (got zeroed above)
OR 0x00 value with return value of strcmp.
if strcmp returned 0, meaning value from msg and expected value is ==
if strcmp returned not 0, meaning they are not equal.
{info} Prints the value of the OR:""")
printf_success(nIteration, returned_value)
nIteration = "2"
addr_of_var2_in_heap_after_strcpy = msg[51:55]
bRoot = "root"
returned_value = strcmp(addr_of_var2_in_heap_after_strcpy, bRoot)
# strcmp will return value 0x00 to EAX if the comparison is successful.
"""Logical Inclusive OR [EBP+return_value_of_strcmp], EAX"""
printf_success(nIteration, returned_value)
nIteration = "3"
addr_of_var3_in_heap_after_strcpy = msg[100:104]
toor = "toor"
returned_value = strcmp(addr_of_var3_in_heap_after_strcpy, toor)
# strcmp will return value 0x00 to EAX if the comparison is successful.
"""Logical Inclusive OR [EBP+return_value_of_strcmp], EAX"""
printf_success(nIteration, returned_value)
print(" ")
"""malloc is called with the size of 0x40 as argument and returns an address for the memcpy next"""
"""After malloc returned an empty address, memcpy is called with these arguments:
arg[0]: 0x804ae80 --> 0x0
arg[1]: 0x8048dff ("check")
arg[2]: 0x6
; memcpy "check\x00" with size of 6 to 0x804ae80"""
free_address_for_check = malloc(40)
check_addr = "0x8048dff" # "check" is stored in this address
len_check = 6
memcpy(free_address_for_check, check_addr, len_check)
addr_of_check_in_heap = "0x804ae80 --> (\"check\")"
"""After comparing the 3 usernames login will call free 4 times
the free is reversed (perhaps a vulnerable to unlink exploit)
first free is on the address of 'check'
second free is on the address of the third username in msg 'toor'
third free is on the address of the second username in msg 'root'
fourth free is on the address of the first username in msg 'root'"""
free(addr_of_check_in_heap)
free(addr_of_var3_in_heap_after_strcpy)
free(addr_of_var2_in_heap_after_strcpy)
free(addr_of_var1_in_heap_after_strcpy)
print(" ")
"""CMP [EBP+return_value_of_strcmp], 0"""
al = False
if returned_value == "0": al = True
"""After all 3 frees are done, a CMP instructions is executed on the returned_value of strcmp with 0x00
SETZ AL ; Sets byte if Zero (ZF=1) (Sets AL as 0x01 if comparison is equal.
AL moves to EAX"""
eax = al
if eax:
reply = "Successful"
return reply
else:
reply = "Failed"
return reply
def memcpy(addr, check_addr, size):
print(f"{info} Memcpy is copying the string in {check_addr} in size of {size} to {addr}.\n")
def malloc(size):
print(f"{info} Malloc is called with size of {size}")
print(f"{good} Returned an empty address for memcpy.")
return "0x804ae80"
def free(addr):
print(f"{info} Frees {addr}...")
def printf_success(nIteration, returned_value):
print(f" [*] Success (iteration {nIteration}): {returned_value}")
def strcmp(string_addr, username):
if string_addr == username: return "0"
else: return "1"
def get_string(msg, iter, username_prefix_index, index, slice):
print(f"{info} Running {underline}get_string(){end} {iter} times...")
"""get_string is called 3 times in the login block.
Checks the packet's form.
If the packet is correctly formed it will:
malloc the username entered after the prefix char
strcpy ONLY the usernames until NULL byte (if entered)
Adds 0x01 to prefix char and returns the result (Ex: 0x30 + 0x01 = 0x31)"""
print(f" [*] Byte as hex: {hex(ord(msg[username_prefix_index]))}\n")
# malloc(msg[1]) # msg[1] is username prefix number <- returns empty address
print(f"{info} Loads Effective Address of '{msg[username_prefix_index]}' + 1.")
print(f"{info} Address loaded points to '{msg[index:slice]}'.")
# strcpy(empty_address, address_to_msg[2]) # Copies everything until NULL byte to empty memory space in the heap.
print(f"{run} strcpy from '{msg[index]}' in the string until NULL byte if provided to heap.")
print(f"{info} Loads Effective Address of {msg[username_prefix_index]} after adding 0x01 to value (address of [ {hex(ord(msg[username_prefix_index]))} + 1 ] is loaded.")
print(f" [*] Returning: {ord(msg[username_prefix_index])} + 0x01 # Decimal Value of prefix char + 0x01")
print(f"{info} Returns result of ['{msg[username_prefix_index]}'] + 0x01\n")
return
def printf_vars(msg, string, index_in_msg, slice):
"""Will be executed 3 times. each time with different addresses to print.
This will print everything from the given starting address until the NULL byte if entered.
"""
# printf(address given:NULL byte or EOF)
print(f"{string}{msg[index_in_msg:slice]}")
def you_are_close(conn):
reply = "You're Close!"
with conn: conn.send(reply.encode())
def meh(conn):
reply = "Meh.."
with conn: conn.send(reply.encode())
if __name__ == "__main__":
start()
HOSTNAME = socket.gethostname()
IP = socket.gethostbyname(HOSTNAME)
print(f"{info} Main calls to {underline}_htons{end}.")
htons(PORT)
print(f"{info} Main calls to {underline}_socket{end}.")
create_socket(IP, PORT)