Skip to content

Commit a429ab6

Browse files
committed
wip debugging pthreads
1 parent 160bcca commit a429ab6

File tree

2 files changed

+267
-17
lines changed

2 files changed

+267
-17
lines changed

lightbug_http_v1/epoll.mojo

Lines changed: 266 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ from memory import memcmp, UnsafePointer, stack_allocation
66
from sys.info import sizeof
77
from time import sleep
88
from lightbug_http.external.small_time import now
9-
from algorithm import parallelize
109
from memory import memset_zero, memcpy
1110
from lightbug_http._libc import get_errno, sockaddr, socklen_t, c_ssize_t
1211
from sys.ffi import external_call, c_int, c_uint, c_ushort, c_size_t
@@ -17,6 +16,17 @@ from utils import StaticTuple
1716
# Below is a straightforward translation of the Rust FAF (Fast As Fuck) code:
1817
# Courtesy of the original author @errantmind
1918
# https://github.com/errantmind/faf/blob/master/src/epoll.rs
19+
#
20+
# THREADING CHANGES:
21+
# - Removed `parallelize` usage in favor of manual pthread creation like the original Rust code
22+
# - Added pthread external function calls and structures for real threading
23+
# - Implemented pure Mojo pthread creation using external_call
24+
# - Now creates actual parallel threads like the original Rust implementation
25+
#
26+
# COMPILATION NOTES:
27+
# Pure Mojo implementation - no C files needed!
28+
# Just compile normally: mojo build your_main.mojo -o your_server
29+
# Uses external_call to pthread library functions directly
2030

2131
# ===----------------------------------------------------------------------=== #
2232
# Epoll Constants
@@ -497,6 +507,229 @@ struct cpu_set_t:
497507
"""CPU affinity set structure."""
498508
var data: StaticTuple[UInt64, CPU_SET_LEN]
499509

510+
# ===----------------------------------------------------------------------=== #
511+
# pthread types and constants
512+
# ===----------------------------------------------------------------------=== #
513+
514+
alias PTHREAD_CREATE_DETACHED = 1
515+
516+
# pthread_t is typically a pointer type on most systems
517+
@register_passable("trivial")
518+
struct pthread_t:
519+
var data: UInt64
520+
521+
fn __init__(out self):
522+
self.data = 0
523+
524+
@register_passable("trivial")
525+
struct pthread_attr_t:
526+
var data: StaticTuple[UInt8, 56] # Size for Linux x86_64
527+
528+
fn __init__(out self):
529+
var zero_data = StaticTuple[UInt8, 56]()
530+
for i in range(56):
531+
zero_data[i] = 0
532+
self.data = zero_data
533+
534+
# ===----------------------------------------------------------------------=== #
535+
# pthread external function calls
536+
# ===----------------------------------------------------------------------=== #
537+
538+
fn _pthread_create(
539+
thread: UnsafePointer[pthread_t],
540+
attr: UnsafePointer[pthread_attr_t],
541+
start_routine: UnsafePointer[NoneType],
542+
arg: UnsafePointer[NoneType]
543+
) -> c_int:
544+
"""Create a new thread."""
545+
return external_call["pthread_create", c_int,
546+
UnsafePointer[pthread_t],
547+
UnsafePointer[pthread_attr_t],
548+
UnsafePointer[NoneType],
549+
UnsafePointer[NoneType]
550+
](thread, attr, start_routine, arg)
551+
552+
fn _pthread_attr_init(attr: UnsafePointer[pthread_attr_t]) -> c_int:
553+
"""Initialize thread attributes."""
554+
return external_call["pthread_attr_init", c_int, UnsafePointer[pthread_attr_t]](attr)
555+
556+
fn _pthread_attr_setdetachstate(attr: UnsafePointer[pthread_attr_t], detachstate: c_int) -> c_int:
557+
"""Set thread detach state."""
558+
return external_call["pthread_attr_setdetachstate", c_int, UnsafePointer[pthread_attr_t], c_int](attr, detachstate)
559+
560+
fn _pthread_attr_setstacksize(attr: UnsafePointer[pthread_attr_t], stacksize: c_size_t) -> c_int:
561+
"""Set thread stack size."""
562+
return external_call["pthread_attr_setstacksize", c_int, UnsafePointer[pthread_attr_t], c_size_t](attr, stacksize)
563+
564+
fn _pthread_attr_destroy(attr: UnsafePointer[pthread_attr_t]) -> c_int:
565+
"""Destroy thread attributes."""
566+
return external_call["pthread_attr_destroy", c_int, UnsafePointer[pthread_attr_t]](attr)
567+
568+
fn _usleep(usec: c_uint) -> c_int:
569+
"""Sleep for microseconds."""
570+
return external_call["usleep", c_int, c_uint](usec)
571+
572+
# ===----------------------------------------------------------------------=== #
573+
# Thread argument structure and helper functions
574+
# ===----------------------------------------------------------------------=== #
575+
576+
@fieldwise_init
577+
@register_passable("trivial")
578+
struct ThreadArgs:
579+
"""Arguments to pass to worker thread."""
580+
var port: UInt16
581+
var core: Int
582+
var num_cpu_cores: Int
583+
var num_workers_inited_ptr: UnsafePointer[Int]
584+
585+
fn thread_sleep_millis(millis: Int):
586+
"""Sleep for specified milliseconds."""
587+
var _ = _usleep(c_uint(millis * 1000)) # usleep takes microseconds
588+
589+
fn create_actual_pthread(
590+
port: UInt16,
591+
core: Int,
592+
num_cpu_cores: Int,
593+
num_workers_inited_ptr: UnsafePointer[Int],
594+
cb: fn(UnsafePointer[UInt8], Int, UnsafePointer[UInt8], Int, UnsafePointer[UInt8], UnsafePointer[UInt8]) -> Int,
595+
thread_args_ptr: UnsafePointer[ThreadArgs]
596+
) raises -> Bool:
597+
"""Create pthread using pure Mojo external calls - replicating pthread_wrapper.c logic."""
598+
599+
# Store thread arguments at the provided pointer
600+
thread_args_ptr[] = ThreadArgs(port, core, num_cpu_cores, num_workers_inited_ptr)
601+
602+
# Create pthread using direct external calls, passing thread_args_ptr as argument
603+
return create_pthread_direct_with_args(core, 8 * 1024 * 1024, thread_args_ptr) # 8MB stack like Rust
604+
605+
fn create_pthread_direct(thread_id: Int, stack_size: Int) raises -> Bool:
606+
"""Create pthread using pure Mojo external calls to pthread library."""
607+
608+
# Create pthread structures
609+
var thread = pthread_t()
610+
var thread_ptr = UnsafePointer(to=thread)
611+
612+
var attr = pthread_attr_t()
613+
var attr_ptr = UnsafePointer(to=attr)
614+
615+
# Initialize pthread attributes (direct external call to pthread_attr_init)
616+
var result = _pthread_attr_init(attr_ptr)
617+
if result != 0:
618+
raise Error("pthread_attr_init failed")
619+
620+
# Set stack size (direct external call to pthread_attr_setstacksize)
621+
result = _pthread_attr_setstacksize(attr_ptr, c_size_t(stack_size))
622+
if result != 0:
623+
var _ = _pthread_attr_destroy(attr_ptr)
624+
raise Error("pthread_attr_setstacksize failed")
625+
626+
# Set detached state (direct external call to pthread_attr_setdetachstate)
627+
result = _pthread_attr_setdetachstate(attr_ptr, PTHREAD_CREATE_DETACHED)
628+
if result != 0:
629+
var _ = _pthread_attr_destroy(attr_ptr)
630+
raise Error("pthread_attr_setdetachstate failed")
631+
632+
# Get function pointer to our exported Mojo worker function
633+
var worker_fn_ptr = get_worker_function_pointer()
634+
635+
# Create thread argument (thread_id as void pointer)
636+
var thread_arg = UnsafePointer(to=thread_id).bitcast[NoneType]()
637+
638+
# Create the pthread directly with external call to pthread_create
639+
result = _pthread_create(thread_ptr, attr_ptr, worker_fn_ptr, thread_arg)
640+
641+
fn create_pthread_direct_with_args(thread_id: Int, stack_size: Int, thread_args_ptr: UnsafePointer[ThreadArgs]) raises -> Bool:
642+
"""Create pthread using pure Mojo external calls to pthread library with thread args."""
643+
644+
# Create pthread structures
645+
var thread = pthread_t()
646+
var thread_ptr = UnsafePointer(to=thread)
647+
648+
var attr = pthread_attr_t()
649+
var attr_ptr = UnsafePointer(to=attr)
650+
651+
# Initialize pthread attributes (direct external call to pthread_attr_init)
652+
var result = _pthread_attr_init(attr_ptr)
653+
if result != 0:
654+
raise Error("pthread_attr_init failed")
655+
656+
# Set stack size (direct external call to pthread_attr_setstacksize)
657+
result = _pthread_attr_setstacksize(attr_ptr, c_size_t(stack_size))
658+
if result != 0:
659+
var _ = _pthread_attr_destroy(attr_ptr)
660+
raise Error("pthread_attr_setstacksize failed")
661+
662+
# Set detached state (direct external call to pthread_attr_setdetachstate)
663+
result = _pthread_attr_setdetachstate(attr_ptr, PTHREAD_CREATE_DETACHED)
664+
if result != 0:
665+
var _ = _pthread_attr_destroy(attr_ptr)
666+
raise Error("pthread_attr_setdetachstate failed")
667+
668+
# Get function pointer to our exported Mojo worker function
669+
var worker_fn_ptr = get_worker_function_pointer()
670+
671+
# Pass thread_args_ptr as argument
672+
var thread_arg = thread_args_ptr.bitcast[NoneType]()
673+
674+
# Create the pthread directly with external call to pthread_create
675+
result = _pthread_create(thread_ptr, attr_ptr, worker_fn_ptr, thread_arg)
676+
677+
var _ = _pthread_attr_destroy(attr_ptr)
678+
679+
if result != 0:
680+
print("pthread_create failed with code:", result)
681+
return False
682+
683+
print("Successfully created pthread for thread_id:", thread_id)
684+
return True
685+
686+
# Exported worker function that pthread can call
687+
@export("mojo_pthread_worker_entry")
688+
fn mojo_pthread_worker_entry(arg: UnsafePointer[NoneType]) -> UnsafePointer[NoneType]:
689+
"""Exported function that serves as pthread entry point."""
690+
try:
691+
# Extract ThreadArgs pointer from argument
692+
var thread_args_ptr = arg.bitcast[ThreadArgs]()
693+
var args = thread_args_ptr[]
694+
var thread_id = args.core
695+
# print("Pthread started for thread_id:", thread_id)
696+
697+
# Unshare file descriptor table like Rust code
698+
unshare(CLONE_FILES)
699+
set_current_thread_cpu_affinity_to(args.core)
700+
701+
# Call the main worker function
702+
threaded_worker(
703+
args.port,
704+
example_callback,
705+
args.core,
706+
args.num_cpu_cores,
707+
args.num_workers_inited_ptr
708+
)
709+
710+
except e:
711+
print("Pthread worker error:", e)
712+
713+
return UnsafePointer[NoneType]()
714+
715+
fn get_worker_function_pointer() -> UnsafePointer[NoneType]:
716+
"""Get function pointer to our exported worker function."""
717+
# Use dlsym to get the address of our exported function
718+
var handle = external_call["dlopen", UnsafePointer[NoneType], UnsafePointer[UInt8], c_int](
719+
UnsafePointer[UInt8](), # NULL for main program
720+
0x00002 # RTLD_NOW
721+
)
722+
723+
var func_name = "mojo_pthread_worker_entry"
724+
var func_ptr = external_call["dlsym", UnsafePointer[NoneType], UnsafePointer[NoneType], UnsafePointer[UInt8]](
725+
handle,
726+
func_name.unsafe_ptr()
727+
)
728+
729+
return func_ptr
730+
731+
# Old functions removed - now using pure Mojo pthread implementation above
732+
500733
# ===----------------------------------------------------------------------=== #
501734
# External C Functions
502735
# ===----------------------------------------------------------------------=== #
@@ -1134,7 +1367,7 @@ fn set_so_busy_poll(fd: Int, timeout_us: Int) raises:
11341367
# ===----------------------------------------------------------------------=== #
11351368

11361369
@no_inline
1137-
fn go(port: UInt16, cb: fn(UnsafePointer[UInt8], Int, UnsafePointer[UInt8], Int, UnsafePointer[UInt8], UnsafePointer[UInt8]) -> Int, mut NUM_WORKERS_INITED: Int, HTTP_DATE: AlignedHttpDate):
1370+
fn go(port: UInt16, cb: fn(UnsafePointer[UInt8], Int, UnsafePointer[UInt8], Int, UnsafePointer[UInt8], UnsafePointer[UInt8]) -> Int, HTTP_DATE: AlignedHttpDate, num_workers_inited_storage: Int):
11381371
# Set higher process priority (requires root on most systems)
11391372
try:
11401373
setpriority(PRIO_PROCESS, 0, -19)
@@ -1150,19 +1383,35 @@ fn go(port: UInt16, cb: fn(UnsafePointer[UInt8], Int, UnsafePointer[UInt8], Int,
11501383
var num_cpu_cores = num_logical_cores()
11511384
print("Starting", num_cpu_cores, "worker threads")
11521385

1153-
# In Mojo, we use parallelize instead of manual thread creation
1154-
@parameter
1155-
fn worker_task(core: Int):
1386+
# Create threads manually like in the original Rust code
1387+
# var num_workers_inited_storage = Int(0)
1388+
var num_workers_inited_ptr = UnsafePointer(to=num_workers_inited_storage)
1389+
1390+
# Allocate thread arguments storage on stack (one per core)
1391+
var thread_args_storage = StaticTuple[ThreadArgs, 64]()
1392+
1393+
# Create actual pthreads like in the original Rust code
1394+
for core in range(num_cpu_cores):
11561395
try:
1157-
# Unshare file descriptor table
1158-
unshare(CLONE_FILES)
1159-
set_current_thread_cpu_affinity_to(core)
1160-
threaded_worker(port, cb, core, num_cpu_cores, NUM_WORKERS_INITED)
1396+
print("Creating pthread worker for core", core)
1397+
var thread_args_ptr = UnsafePointer(to=thread_args_storage[core])
1398+
var success = create_actual_pthread(
1399+
port,
1400+
core,
1401+
num_cpu_cores,
1402+
num_workers_inited_ptr,
1403+
cb,
1404+
thread_args_ptr
1405+
)
1406+
1407+
if not success:
1408+
print("Failed to create pthread for core", core)
1409+
1410+
# Sleep to ensure workers are initialized in sequence like Rust code
1411+
thread_sleep_millis(5)
1412+
11611413
except e:
1162-
print("Worker", core, "error:", e)
1163-
1164-
# Start worker tasks
1165-
parallelize[worker_task](num_cpu_cores)
1414+
print("Error creating pthread for core", core, ":", e)
11661415

11671416
# Main date update loop
11681417
while True:
@@ -1180,7 +1429,7 @@ fn threaded_worker(
11801429
cb: fn(UnsafePointer[UInt8], Int, UnsafePointer[UInt8], Int, UnsafePointer[UInt8], UnsafePointer[UInt8]) -> Int,
11811430
cpu_core: Int,
11821431
num_cpu_cores: Int,
1183-
mut NUM_WORKERS_INITED: Int
1432+
num_workers_inited_ptr: UnsafePointer[Int]
11841433
):
11851434
"""Worker thread function using real syscalls."""
11861435

@@ -1190,9 +1439,10 @@ fn threaded_worker(
11901439
setup_connection(listener_fd)
11911440

11921441
# Synchronization for REUSEPORT_CBPF attachment
1193-
NUM_WORKERS_INITED += 1
1442+
# Atomically increment the workers initialized counter
1443+
num_workers_inited_ptr[] += 1
11941444
if cpu_core == 0:
1195-
while NUM_WORKERS_INITED < num_cpu_cores:
1445+
while num_workers_inited_ptr[] < num_cpu_cores:
11961446
nanosleep(0.000001) # 1 microsecond
11971447
attach_reuseport_cbpf(listener_fd)
11981448

test.mojo

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,4 @@ fn main():
99

1010
# Start the server
1111
var NUM_WORKERS_INITED = 0
12-
go(8080, example_callback, NUM_WORKERS_INITED, HTTP_DATE)
12+
go(8080, example_callback, HTTP_DATE, NUM_WORKERS_INITED)

0 commit comments

Comments
 (0)