Skip to content

Commit 3ea3708

Browse files
committed
wip fixing segfault
1 parent a429ab6 commit 3ea3708

File tree

2 files changed

+122
-65
lines changed

2 files changed

+122
-65
lines changed

lightbug_http_v1/epoll.mojo

Lines changed: 103 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -596,9 +596,7 @@ fn create_actual_pthread(
596596
) raises -> Bool:
597597
"""Create pthread using pure Mojo external calls - replicating pthread_wrapper.c logic."""
598598

599-
# Store thread arguments at the provided pointer
600-
thread_args_ptr[] = ThreadArgs(port, core, num_cpu_cores, num_workers_inited_ptr)
601-
599+
# Thread arguments are already initialized by caller
602600
# Create pthread using direct external calls, passing thread_args_ptr as argument
603601
return create_pthread_direct_with_args(core, 8 * 1024 * 1024, thread_args_ptr) # 8MB stack like Rust
604602

@@ -688,14 +686,22 @@ fn create_pthread_direct_with_args(thread_id: Int, stack_size: Int, thread_args_
688686
fn mojo_pthread_worker_entry(arg: UnsafePointer[NoneType]) -> UnsafePointer[NoneType]:
689687
"""Exported function that serves as pthread entry point."""
690688
try:
689+
if not arg:
690+
print("Error: NULL thread argument")
691+
return UnsafePointer[NoneType]()
692+
691693
# Extract ThreadArgs pointer from argument
692694
var thread_args_ptr = arg.bitcast[ThreadArgs]()
693695
var args = thread_args_ptr[]
694696
var thread_id = args.core
695-
# print("Pthread started for thread_id:", thread_id)
697+
print("Pthread started for thread_id:", thread_id)
696698

697699
# Unshare file descriptor table like Rust code
698-
unshare(CLONE_FILES)
700+
try:
701+
unshare(CLONE_FILES)
702+
except e:
703+
print("Warning: unshare failed:", e)
704+
699705
set_current_thread_cpu_affinity_to(args.core)
700706

701707
# Call the main worker function
@@ -714,19 +720,15 @@ fn mojo_pthread_worker_entry(arg: UnsafePointer[NoneType]) -> UnsafePointer[None
714720

715721
fn get_worker_function_pointer() -> UnsafePointer[NoneType]:
716722
"""Get function pointer to our exported worker function."""
717-
# Use dlsym to get the address of our exported function
718-
var handle = external_call["dlopen", UnsafePointer[NoneType], UnsafePointer[UInt8], c_int](
719-
UnsafePointer[UInt8](), # NULL for main program
720-
0x00002 # RTLD_NOW
721-
)
722-
723-
var func_name = "mojo_pthread_worker_entry"
724-
var func_ptr = external_call["dlsym", UnsafePointer[NoneType], UnsafePointer[NoneType], UnsafePointer[UInt8]](
725-
handle,
726-
func_name.unsafe_ptr()
723+
# Get address of the exported function
724+
# This should work with the @export decorator
725+
return external_call["dlsym", UnsafePointer[NoneType], UnsafePointer[NoneType], UnsafePointer[UInt8]](
726+
external_call["dlopen", UnsafePointer[NoneType], UnsafePointer[UInt8], c_int](
727+
UnsafePointer[UInt8](), # NULL for main program
728+
2 # RTLD_NOW
729+
),
730+
String("mojo_pthread_worker_entry").unsafe_ptr()
727731
)
728-
729-
return func_ptr
730732

731733
# Old functions removed - now using pure Mojo pthread implementation above
732734

@@ -1387,14 +1389,23 @@ fn go(port: UInt16, cb: fn(UnsafePointer[UInt8], Int, UnsafePointer[UInt8], Int,
13871389
# var num_workers_inited_storage = Int(0)
13881390
var num_workers_inited_ptr = UnsafePointer(to=num_workers_inited_storage)
13891391

1390-
# Allocate thread arguments storage on stack (one per core)
1391-
var thread_args_storage = StaticTuple[ThreadArgs, 64]()
1392+
# Allocate thread arguments storage on heap (persistent memory)
1393+
var thread_args_storage = UnsafePointer[ThreadArgs].alloc(num_cpu_cores)
13921394

13931395
# Create actual pthreads like in the original Rust code
13941396
for core in range(num_cpu_cores):
13951397
try:
13961398
print("Creating pthread worker for core", core)
1397-
var thread_args_ptr = UnsafePointer(to=thread_args_storage[core])
1399+
var thread_args_ptr = thread_args_storage + core
1400+
1401+
# Initialize the thread args properly
1402+
thread_args_ptr[] = ThreadArgs(
1403+
UInt16(port),
1404+
core,
1405+
num_cpu_cores,
1406+
num_workers_inited_ptr
1407+
)
1408+
13981409
var success = create_actual_pthread(
13991410
port,
14001411
core,
@@ -1452,18 +1463,28 @@ fn threaded_worker(
14521463
var epoll_event_listener = epoll_event(EPOLLIN, epoll_data(UInt64(listener_fd)))
14531464
epoll_ctl(epfd, EPOLL_CTL_ADD, listener_fd, UnsafePointer(to=epoll_event_listener))
14541465

1455-
# Initialize buffers and state
1466+
# Initialize buffers and state with error checking
14561467
var epoll_events = AlignedEpollEvents(AlignedEpollEventsTuple())
1457-
memset_zero(rebind[UnsafePointer[UInt8]](UnsafePointer(to=epoll_events)), sizeof[AlignedEpollEvents]())
1468+
var epoll_events_ptr = UnsafePointer(to=epoll_events)
1469+
if epoll_events_ptr:
1470+
memset_zero(rebind[UnsafePointer[UInt8]](epoll_events_ptr), sizeof[AlignedEpollEvents]())
14581471

14591472
var saved_event = epoll_event(EPOLLIN, epoll_data(0))
14601473

14611474
var reqbuf = ReqBufAligned(ReqBufAlignedTuple())
1462-
memset_zero(rebind[UnsafePointer[UInt8]](UnsafePointer(to=reqbuf)), sizeof[ReqBufAligned]())
1475+
var reqbuf_ptr = UnsafePointer(to=reqbuf)
1476+
if reqbuf_ptr:
1477+
memset_zero(rebind[UnsafePointer[UInt8]](reqbuf_ptr), sizeof[ReqBufAligned]())
14631478

1464-
# Request buffer position tracking arrays
1479+
# Request buffer position tracking arrays with bounds checking
14651480
var reqbuf_cur_addr = StaticTuple[Int, MAX_CONN]()
14661481
var reqbuf_start_address = Int(UnsafePointer(to=reqbuf.data).bitcast[Int]())
1482+
1483+
# Validate buffer address is reasonable
1484+
if reqbuf_start_address == 0:
1485+
print("Error: Invalid request buffer address")
1486+
return
1487+
14671488
for i in range(MAX_CONN):
14681489
reqbuf_cur_addr[i] = reqbuf_start_address + i * REQ_BUF_SIZE
14691490

@@ -1472,7 +1493,9 @@ fn threaded_worker(
14721493
reqbuf_residual[i] = 0
14731494

14741495
var resbuf = ResBufAligned(ResBufAlignedTuple())
1475-
memset_zero(rebind[UnsafePointer[UInt8]](UnsafePointer(to=resbuf)), sizeof[ResBufAligned]())
1496+
var resbuf_ptr = UnsafePointer(to=resbuf)
1497+
if resbuf_ptr:
1498+
memset_zero(rebind[UnsafePointer[UInt8]](resbuf_ptr), sizeof[ResBufAligned]())
14761499
var resbuf_start_address = UnsafePointer(to=resbuf.data)
14771500

14781501
var epoll_wait_type = -1 # EPOLL_TIMEOUT_BLOCKING
@@ -1498,49 +1521,64 @@ fn threaded_worker(
14981521

14991522
if cur_fd == listener_fd:
15001523
# Handle new connection
1501-
var incoming_fd = accept_connection(listener_fd)
1502-
1503-
if incoming_fd >= 0 and incoming_fd < MAX_CONN:
1504-
var req_buf_start_address = reqbuf_start_address + incoming_fd * REQ_BUF_SIZE
1505-
reqbuf_cur_addr[incoming_fd] = req_buf_start_address
1506-
reqbuf_residual[incoming_fd] = 0
1507-
setup_connection(incoming_fd)
1508-
saved_event.data.u64 = UInt64(incoming_fd)
1509-
epoll_ctl(epfd, EPOLL_CTL_ADD, incoming_fd, UnsafePointer(to=saved_event))
1510-
else:
1511-
close_connection(epfd, cur_fd)
1512-
else:
1513-
# Handle existing connection data
1514-
var req_buf_start_address = reqbuf_start_address + cur_fd * REQ_BUF_SIZE
1515-
var req_buf_cur_position = reqbuf_cur_addr[cur_fd]
1516-
var residual = reqbuf_residual[cur_fd]
1517-
1518-
var buffer_remaining = REQ_BUF_SIZE - (req_buf_cur_position - req_buf_start_address)
1519-
var read = recv_from(cur_fd, rebind[UnsafePointer[UInt8]](UnsafePointer(to=req_buf_cur_position)), buffer_remaining)
1520-
1521-
if read > 0:
1522-
# Process the received data (simplified)
1523-
var response = "HTTP/1.1 200 OK\r\nContent-Length: 13\r\n\r\nHello, World!"
1524-
var response_len = len(response)
1525-
1526-
# Copy response to buffer
1527-
var response_ptr = response.unsafe_ptr()
1528-
var dest_ptr = rebind[UnsafePointer[UInt8]](resbuf_start_address)
1529-
memcpy(dest_ptr, response_ptr, response_len)
1530-
1531-
var wrote = send_to(cur_fd, dest_ptr, response_len)
1524+
try:
1525+
var incoming_fd = accept_connection(listener_fd)
15321526

1533-
# Reset buffer state
1534-
reqbuf_cur_addr[cur_fd] = req_buf_start_address
1535-
reqbuf_residual[cur_fd] = 0
1536-
1537-
if wrote != response_len:
1527+
if incoming_fd >= 0 and incoming_fd < MAX_CONN:
1528+
var req_buf_start_address = reqbuf_start_address + incoming_fd * REQ_BUF_SIZE
1529+
reqbuf_cur_addr[incoming_fd] = req_buf_start_address
1530+
reqbuf_residual[incoming_fd] = 0
1531+
setup_connection(incoming_fd)
1532+
saved_event.data.u64 = UInt64(incoming_fd)
1533+
epoll_ctl(epfd, EPOLL_CTL_ADD, incoming_fd, UnsafePointer(to=saved_event))
1534+
else:
1535+
if incoming_fd >= 0:
1536+
_ = sys_close(incoming_fd) # Close the fd if it's valid but out of range
1537+
except e:
1538+
print("Error accepting connection:", e)
1539+
else:
1540+
# Handle existing connection data with bounds checking
1541+
if cur_fd >= 0 and cur_fd < MAX_CONN:
1542+
try:
1543+
var req_buf_start_address = reqbuf_start_address + cur_fd * REQ_BUF_SIZE
1544+
var req_buf_cur_position = reqbuf_cur_addr[cur_fd]
1545+
var residual = reqbuf_residual[cur_fd]
1546+
1547+
var buffer_remaining = REQ_BUF_SIZE - (req_buf_cur_position - req_buf_start_address)
1548+
if buffer_remaining > 0:
1549+
var read = recv_from(cur_fd, rebind[UnsafePointer[UInt8]](UnsafePointer(to=req_buf_cur_position)), buffer_remaining)
1550+
1551+
if read > 0:
1552+
# Process the received data (simplified)
1553+
var response = "HTTP/1.1 200 OK\r\nContent-Length: 13\r\n\r\nHello, World!"
1554+
var response_len = len(response)
1555+
1556+
# Copy response to buffer
1557+
var response_ptr = response.unsafe_ptr()
1558+
var dest_ptr = rebind[UnsafePointer[UInt8]](resbuf_start_address)
1559+
memcpy(dest_ptr, response_ptr, response_len)
1560+
1561+
var wrote = send_to(cur_fd, dest_ptr, response_len)
1562+
1563+
# Reset buffer state
1564+
reqbuf_cur_addr[cur_fd] = req_buf_start_address
1565+
reqbuf_residual[cur_fd] = 0
1566+
1567+
if wrote != response_len:
1568+
close_connection(epfd, cur_fd)
1569+
else:
1570+
# Connection closed or error
1571+
reqbuf_cur_addr[cur_fd] = req_buf_start_address
1572+
reqbuf_residual[cur_fd] = 0
1573+
close_connection(epfd, cur_fd)
1574+
else:
1575+
# Buffer full, close connection
1576+
close_connection(epfd, cur_fd)
1577+
except e:
1578+
print("Error handling connection data:", e)
15381579
close_connection(epfd, cur_fd)
15391580
else:
1540-
# Connection closed or error
1541-
reqbuf_cur_addr[cur_fd] = req_buf_start_address
1542-
reqbuf_residual[cur_fd] = 0
1543-
close_connection(epfd, cur_fd)
1581+
print("Invalid file descriptor:", cur_fd)
15441582

15451583
except e:
15461584
print("Worker thread error:", e)

test_epoll_simple.mojo

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
#!/usr/bin/env mojo
2+
3+
from lightbug_http_v1.epoll import go, example_callback, AlignedHttpDate, StaticTuple
4+
5+
fn main():
6+
"""Simple test to check if the epoll server starts without segfaulting."""
7+
print("Testing epoll server...")
8+
9+
# Initialize HTTP date structure
10+
var http_date = AlignedHttpDate(StaticTuple[UInt8, 35]())
11+
12+
# Initialize worker counter
13+
var num_workers_inited_storage = 0
14+
15+
try:
16+
# Start server on port 8080 with minimal setup
17+
go(8080, example_callback, http_date, num_workers_inited_storage)
18+
except e:
19+
print("Error starting server:", e)

0 commit comments

Comments
 (0)