diff --git a/x/net/http/request.go b/x/net/http/request.go index ba97c85..58c1c82 100644 --- a/x/net/http/request.go +++ b/x/net/http/request.go @@ -35,7 +35,7 @@ type Request struct { timeout time.Duration } -func (conn *conn) readRequest(hyperReq *hyper.Request) (*Request, error) { +func (conn *conn) readRequest(server *Server, hyperReq *hyper.Request) (*Request, error) { println("readRequest called") req := Request{ Header: make(Header), @@ -135,7 +135,9 @@ func (conn *conn) readRequest(hyperReq *hyper.Request) (*Request, error) { body := hyperReq.Body() if body != nil { - req.Body, conn.bodyWriter = io.Pipe() + requestBody := newRequestBody() + conn.requestBody = requestBody + req.Body = requestBody //task := body.Foreach(getBodyChunk, c.Pointer(conn.bodyWriter), nil) task := body.Data() taskID := taskGetBody @@ -181,35 +183,28 @@ func addHeader(data unsafe.Pointer, name *byte, nameLen uintptr, value *byte, va return hyper.IterContinue } -func getBodyChunk(userdata c.Pointer, chunk *hyper.Buf) c.Int { - fmt.Printf("getBodyChunk called\n") - writer := (*io.PipeWriter)(userdata) - if writer == nil { - fmt.Printf("writer is nil\n") - return hyper.IterBreak - } - buf := chunk.Bytes() - len := chunk.Len() - bytes := unsafe.Slice(buf, len) - //debug - fmt.Printf("Writing %d bytes to response body\n", len) - fmt.Printf("Body chunk: %s\n", string(bytes)) - - go func() { - count, err := writer.Write(bytes) - fmt.Printf("Body chunk written: %d bytes\n", count) - if err != nil { - fmt.Println("Error writing to response body:", err) - writer.Close() - } - }() - // count, err := writer.Write(bytes) - // fmt.Printf("Body chunk written: %d bytes\n", count) - // if err != nil { - // fmt.Println("Error writing to response body:", err) - // writer.Close() - // return hyper.IterBreak - // } - - return hyper.IterContinue -} +// func getBodyChunk(userdata c.Pointer, chunk *hyper.Buf) c.Int { +// fmt.Printf("getBodyChunk called\n") +// writer := (*io.PipeWriter)(userdata) +// if writer == nil { +// fmt.Printf("writer is nil\n") +// return hyper.IterBreak +// } +// buf := chunk.Bytes() +// len := chunk.Len() +// bytes := unsafe.Slice(buf, len) +// //debug +// fmt.Printf("Writing %d bytes to response body\n", len) +// fmt.Printf("Body chunk: %s\n", string(bytes)) + +// go func() { +// count, err := writer.Write(bytes) +// fmt.Printf("Body chunk written: %d bytes\n", count) +// if err != nil { +// fmt.Println("Error writing to response body:", err) +// writer.Close() +// } +// }() + +// return hyper.IterContinue +// } diff --git a/x/net/http/server.go b/x/net/http/server.go index cf1df59..23ccdd1 100644 --- a/x/net/http/server.go +++ b/x/net/http/server.go @@ -3,7 +3,6 @@ package http import ( "errors" "fmt" - "io" "os" "strconv" "sync" @@ -17,6 +16,7 @@ import ( "github.com/goplus/llgo/c/syscall" "github.com/goplus/llgo/rust/hyper" "github.com/goplus/llgo/x/net" + "github.com/goplus/llgo/x/net/http/response_stream" ) type Handler interface { @@ -41,6 +41,9 @@ type Server struct { mu sync.Mutex activeConnections map[*conn]struct{} + + channels []<-chan struct{} + channelMutex sync.RWMutex } type conn struct { @@ -55,7 +58,35 @@ type conn struct { closedHandles int32 executor *hyper.Executor remoteAddr string - bodyWriter *io.PipeWriter + requestBody *requestBody +} + +type requestBody struct { + chunk []byte + readCh chan []byte + readToReadCh chan struct{} +} + +func newRequestBody() *requestBody { + return &requestBody{ + readCh: make(chan []byte, 1), + readToReadCh: make(chan struct{}, 1), + } +} + +func (rb *requestBody) Read(p []byte) (n int, err error) { + if len(rb.chunk) == 0 { + n = copy(p, rb.chunk) + rb.chunk = rb.chunk[n:] + if len(rb.chunk) > 0 { + return + } + } + rb.readToReadCh <- struct{}{} + rb.chunk = <-rb.readCh + n = copy(p, rb.chunk) + rb.chunk = rb.chunk[n:] + return } type serviceUserdata struct { @@ -308,6 +339,35 @@ func onIdle(handle *libuv.Idle) { //fmt.Println("onIdle conn called") if conn.executor != nil { task := conn.executor.Poll() + select { + case <-conn.requestBody.readToReadCh: + fmt.Println("readToReadCh signaled") + payload := (*taskData)(task.Userdata()) + if payload == nil { + fmt.Println("taskData is nil") + task.Free() + return + } + + taskID := payload.hyperTaskID + + fmt.Printf("taskID: %d\n", taskID) + + fmt.Println("taskGetBody get body task") + getBodyTask := payload.hyperBody.Data() + getBodyTask.SetUserdata(c.Pointer(payload), nil) + if getBodyTask != nil { + fmt.Println("taskGetBody push get body task") + r := payload.conn.executor.Push(getBodyTask) + fmt.Printf("taskGetBody push get body task: %d\n", r) + if r != hyper.OK { + fmt.Printf("failed to push get body task: %d\n", r) + getBodyTask.Free() + } + } + default: + fmt.Println("readToReadCh not signaled") + } for task != nil { srv.handleTask(task) task = conn.executor.Poll() @@ -315,6 +375,16 @@ func onIdle(handle *libuv.Idle) { } } + srv.channelMutex.RLock() + for _, ch := range srv.channels { + select { + case <-ch: + fmt.Printf("Received signal from channel\n") + default: + } + } + srv.channelMutex.RUnlock() + if srv.shuttingDown() { fmt.Println("Shutdown initiated, cleaning up...") handle.Stop() @@ -329,7 +399,7 @@ func serverCallback(userdata unsafe.Pointer, hyperReq *hyper.Request, channel *h return } - req, err := userData.conn.readRequest(hyperReq) + req, err := userData.conn.readRequest(userData.server, hyperReq) if err != nil { fmt.Printf("Error creating request: %v\n", err) return @@ -349,6 +419,7 @@ func serverCallback(userdata unsafe.Pointer, hyperReq *hyper.Request, channel *h } func (srv *Server) handleTask(task *hyper.Task) { + taskType := task.Type() fmt.Printf("taskType: %d\n", taskType) payload := (*taskData)(task.Userdata()) @@ -380,7 +451,7 @@ func (srv *Server) handleTask(task *hyper.Task) { fmt.Println("taskGetBody write buf") buf := (*hyper.Buf)(task.Value()) bytes := unsafe.Slice(buf.Bytes(), buf.Len()) - payload.conn.bodyWriter.Write(bytes) + payload.conn.requestBody.readCh <- bytes fmt.Println("taskGetBody wrote to bodyWriter") buf.Free() task.Free()