Skip to content

Commit

Permalink
feat(x/net/http): Implement RequestBody logic
Browse files Browse the repository at this point in the history
Signed-off-by: hackerchai <[email protected]>
  • Loading branch information
hackerchai committed Sep 9, 2024
1 parent 650cb6c commit 34ace8f
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 38 deletions.
63 changes: 29 additions & 34 deletions x/net/http/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Request struct {
timeout time.Duration
}

func (conn *conn) readRequest(hyperReq *hyper.Request) (*Request, error) {
func (conn *conn) readRequest(server *Server, hyperReq *hyper.Request) (*Request, error) {
println("readRequest called")
req := Request{
Header: make(Header),
Expand Down Expand Up @@ -135,7 +135,9 @@ func (conn *conn) readRequest(hyperReq *hyper.Request) (*Request, error) {

body := hyperReq.Body()
if body != nil {
req.Body, conn.bodyWriter = io.Pipe()
requestBody := newRequestBody()
conn.requestBody = requestBody
req.Body = requestBody
//task := body.Foreach(getBodyChunk, c.Pointer(conn.bodyWriter), nil)
task := body.Data()
taskID := taskGetBody
Expand Down Expand Up @@ -181,35 +183,28 @@ func addHeader(data unsafe.Pointer, name *byte, nameLen uintptr, value *byte, va
return hyper.IterContinue
}

func getBodyChunk(userdata c.Pointer, chunk *hyper.Buf) c.Int {
fmt.Printf("getBodyChunk called\n")
writer := (*io.PipeWriter)(userdata)
if writer == nil {
fmt.Printf("writer is nil\n")
return hyper.IterBreak
}
buf := chunk.Bytes()
len := chunk.Len()
bytes := unsafe.Slice(buf, len)
//debug
fmt.Printf("Writing %d bytes to response body\n", len)
fmt.Printf("Body chunk: %s\n", string(bytes))

go func() {
count, err := writer.Write(bytes)
fmt.Printf("Body chunk written: %d bytes\n", count)
if err != nil {
fmt.Println("Error writing to response body:", err)
writer.Close()
}
}()
// count, err := writer.Write(bytes)
// fmt.Printf("Body chunk written: %d bytes\n", count)
// if err != nil {
// fmt.Println("Error writing to response body:", err)
// writer.Close()
// return hyper.IterBreak
// }

return hyper.IterContinue
}
// func getBodyChunk(userdata c.Pointer, chunk *hyper.Buf) c.Int {
// fmt.Printf("getBodyChunk called\n")
// writer := (*io.PipeWriter)(userdata)
// if writer == nil {
// fmt.Printf("writer is nil\n")
// return hyper.IterBreak
// }
// buf := chunk.Bytes()
// len := chunk.Len()
// bytes := unsafe.Slice(buf, len)
// //debug
// fmt.Printf("Writing %d bytes to response body\n", len)
// fmt.Printf("Body chunk: %s\n", string(bytes))

// go func() {
// count, err := writer.Write(bytes)
// fmt.Printf("Body chunk written: %d bytes\n", count)
// if err != nil {
// fmt.Println("Error writing to response body:", err)
// writer.Close()
// }
// }()

// return hyper.IterContinue
// }
79 changes: 75 additions & 4 deletions x/net/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package http
import (
"errors"
"fmt"
"io"
"os"
"strconv"
"sync"
Expand All @@ -17,6 +16,7 @@ import (
"github.com/goplus/llgo/c/syscall"
"github.com/goplus/llgo/rust/hyper"
"github.com/goplus/llgo/x/net"

Check failure on line 18 in x/net/http/server.go

View workflow job for this annotation

GitHub Actions / build

no required module provides package github.com/goplus/llgo/x/net; to add it:
"github.com/goplus/llgo/x/net/http/response_stream"

Check failure on line 19 in x/net/http/server.go

View workflow job for this annotation

GitHub Actions / build

no required module provides package github.com/goplus/llgo/x/net/http/response_stream; to add it:
)

type Handler interface {
Expand All @@ -41,6 +41,9 @@ type Server struct {

mu sync.Mutex
activeConnections map[*conn]struct{}

channels []<-chan struct{}
channelMutex sync.RWMutex
}

type conn struct {
Expand All @@ -55,7 +58,35 @@ type conn struct {
closedHandles int32
executor *hyper.Executor
remoteAddr string
bodyWriter *io.PipeWriter
requestBody *requestBody
}

type requestBody struct {
chunk []byte
readCh chan []byte
readToReadCh chan struct{}
}

func newRequestBody() *requestBody {
return &requestBody{
readCh: make(chan []byte, 1),
readToReadCh: make(chan struct{}, 1),
}
}

func (rb *requestBody) Read(p []byte) (n int, err error) {
if len(rb.chunk) == 0 {
n = copy(p, rb.chunk)
rb.chunk = rb.chunk[n:]
if len(rb.chunk) > 0 {
return
}
}
rb.readToReadCh <- struct{}{}
rb.chunk = <-rb.readCh
n = copy(p, rb.chunk)
rb.chunk = rb.chunk[n:]
return
}

type serviceUserdata struct {
Expand Down Expand Up @@ -308,13 +339,52 @@ func onIdle(handle *libuv.Idle) {
//fmt.Println("onIdle conn called")
if conn.executor != nil {
task := conn.executor.Poll()
select {
case <-conn.requestBody.readToReadCh:
fmt.Println("readToReadCh signaled")
payload := (*taskData)(task.Userdata())
if payload == nil {
fmt.Println("taskData is nil")
task.Free()
return
}

taskID := payload.hyperTaskID

fmt.Printf("taskID: %d\n", taskID)

fmt.Println("taskGetBody get body task")
getBodyTask := payload.hyperBody.Data()
getBodyTask.SetUserdata(c.Pointer(payload), nil)
if getBodyTask != nil {
fmt.Println("taskGetBody push get body task")
r := payload.conn.executor.Push(getBodyTask)
fmt.Printf("taskGetBody push get body task: %d\n", r)
if r != hyper.OK {
fmt.Printf("failed to push get body task: %d\n", r)
getBodyTask.Free()
}
}
default:
fmt.Println("readToReadCh not signaled")
}
for task != nil {
srv.handleTask(task)
task = conn.executor.Poll()
}
}
}

srv.channelMutex.RLock()
for _, ch := range srv.channels {
select {
case <-ch:
fmt.Printf("Received signal from channel\n")
default:
}
}
srv.channelMutex.RUnlock()

if srv.shuttingDown() {
fmt.Println("Shutdown initiated, cleaning up...")
handle.Stop()
Expand All @@ -329,7 +399,7 @@ func serverCallback(userdata unsafe.Pointer, hyperReq *hyper.Request, channel *h
return
}

req, err := userData.conn.readRequest(hyperReq)
req, err := userData.conn.readRequest(userData.server, hyperReq)
if err != nil {
fmt.Printf("Error creating request: %v\n", err)
return
Expand All @@ -349,6 +419,7 @@ func serverCallback(userdata unsafe.Pointer, hyperReq *hyper.Request, channel *h
}

func (srv *Server) handleTask(task *hyper.Task) {

taskType := task.Type()
fmt.Printf("taskType: %d\n", taskType)
payload := (*taskData)(task.Userdata())
Expand Down Expand Up @@ -380,7 +451,7 @@ func (srv *Server) handleTask(task *hyper.Task) {
fmt.Println("taskGetBody write buf")
buf := (*hyper.Buf)(task.Value())
bytes := unsafe.Slice(buf.Bytes(), buf.Len())
payload.conn.bodyWriter.Write(bytes)
payload.conn.requestBody.readCh <- bytes
fmt.Println("taskGetBody wrote to bodyWriter")
buf.Free()
task.Free()
Expand Down

0 comments on commit 34ace8f

Please sign in to comment.