From 22d0b9fe7286aeba9e1b2b25dc0f6d0e3c29d91b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=B5=E8=8B=B1=E6=9D=B0?= <2635879218@qq.com> Date: Tue, 30 Jul 2024 16:16:20 +0800 Subject: [PATCH] feat(x/http/get): Using libuv to speed up http.Get() --- go.mod | 2 +- go.sum | 12 +- x/http/_demo/get/get.go | 11 +- x/http/_demo/test.go | 141 -------------- x/http/client.go | 393 +++++++++++++++++++++++++--------------- x/http/header.go | 4 +- x/http/hyper-go.go | 12 -- x/http/response.go | 125 +++++++------ 8 files changed, 329 insertions(+), 371 deletions(-) delete mode 100644 x/http/_demo/test.go delete mode 100644 x/http/hyper-go.go diff --git a/go.mod b/go.mod index ff4ef8f..39080a4 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,4 @@ module github.com/goplus/llgoexamples go 1.20 -require github.com/goplus/llgo v0.9.4-0.20240729010130-b3b4f55c68f0 +require github.com/goplus/llgo v0.9.5-0.20240731053840-36072584d0be diff --git a/go.sum b/go.sum index 8799390..e2c5d17 100644 --- a/go.sum +++ b/go.sum @@ -1,10 +1,2 @@ -github.com/goplus/llgo v0.9.0 h1:yaJzQperGUafEaHc9VlVQVskIngacoTNweEXY0GRi0Q= -github.com/goplus/llgo v0.9.0/go.mod h1:M3UwiYdPZFyx7m2J0+6Ti1dYVA3uOO1WvSBocuE8N7M= -github.com/goplus/llgo v0.9.1-0.20240709104849-d6a38a567fda h1:UIPwlgzCb8dV/7WFMyprhZuq8CSLAQIqwFpH5AhrNOM= -github.com/goplus/llgo v0.9.1-0.20240709104849-d6a38a567fda/go.mod h1:zsrtWZapL4aklZc99xBSZRynGzLTIT1mLRjP0VSn9iw= -github.com/goplus/llgo v0.9.1-0.20240712060421-858d38d314a3 h1:2fZ2zQ8S58KvOsJTx6s6MHoi6n1K4sqQwIbTauMrgEE= -github.com/goplus/llgo v0.9.1-0.20240712060421-858d38d314a3/go.mod h1:zsrtWZapL4aklZc99xBSZRynGzLTIT1mLRjP0VSn9iw= -github.com/goplus/llgo v0.9.3-0.20240726020431-98d075728f2b h1:z9FUoeAALL5ytBhhGhE1dXm4+L1Q2eMUTcfiqLAZgf8= -github.com/goplus/llgo v0.9.3-0.20240726020431-98d075728f2b/go.mod h1:zsrtWZapL4aklZc99xBSZRynGzLTIT1mLRjP0VSn9iw= -github.com/goplus/llgo v0.9.4-0.20240729010130-b3b4f55c68f0 h1:02gSx3Oj3cLlBMed+9IWBUGHThEZMnCNiR67yaQbpqo= -github.com/goplus/llgo v0.9.4-0.20240729010130-b3b4f55c68f0/go.mod h1:zsrtWZapL4aklZc99xBSZRynGzLTIT1mLRjP0VSn9iw= +github.com/goplus/llgo v0.9.5-0.20240731053840-36072584d0be h1:FTALxA3ivIeVRAO93e1hCSCLaPbjKn+RZx40p5lx8KE= +github.com/goplus/llgo v0.9.5-0.20240731053840-36072584d0be/go.mod h1:zsrtWZapL4aklZc99xBSZRynGzLTIT1mLRjP0VSn9iw= diff --git a/x/http/_demo/get/get.go b/x/http/_demo/get/get.go index e7591a6..09f32a0 100644 --- a/x/http/_demo/get/get.go +++ b/x/http/_demo/get/get.go @@ -8,10 +8,17 @@ import ( func main() { // 使用 http.Get 发送 GET 请求 - resp := http.Get("https://www.baidu.com/") + resp, err := http.Get("https://www.baidu.com/") + if err != nil { + fmt.Println(err) + return + } fmt.Println(resp.Status) fmt.Println(resp.StatusCode) resp.PrintHeaders() fmt.Println() - resp.PrintBody() + resp.PrintBody2() + + resp.PrintBody1() + defer resp.Content.Close() } diff --git a/x/http/_demo/test.go b/x/http/_demo/test.go deleted file mode 100644 index 2ddd3ea..0000000 --- a/x/http/_demo/test.go +++ /dev/null @@ -1,141 +0,0 @@ -package main - -import ( - "fmt" - "io" -) - -func main() { - - // 假设你有一个 []byte 数组 - data := []byte("This is some data that needs to be stored in Body.") - - // 创建一个 io.Pipe - pr, pw := io.Pipe() - - // 启动一个 goroutine 将数据写入 io.Pipe 的写入端 - go func() { - defer pw.Close() // 确保写入完成后关闭写入端 - - if _, err := pw.Write(data); err != nil { - fmt.Println("Error writing to pipe:", err) - return - } - }() - - // 读取 Body 中的数据进行验证 - readData, err := io.ReadAll(pr) - if err != nil { - fmt.Println("Error reading from Body:", err) - return - } - - // 输出 Body 中的数据 - fmt.Println("Body content:", string(readData)) - // - //http.Get() - - //r, w := io.Pipe() - // - //go func() { - // fmt.Fprint(w, "some io.Reader stream to be read\n") - // w.Close() - //}() - // - //if _, err := io.Copy(os.Stdout, r); err != nil { - // log.Fatal(err) - //} - - // 使用 http.Get 发送 GET 请求 - //resp, err := http.Get("https://www.baidu.com/") - //if err != nil { - // fmt.Println("Error:", err) - // return - //} - //defer resp.Body.Close() - // - //body, err := io.ReadAll(resp.Body) - //if err != nil { - // fmt.Println("Error reading response:", err) - // return - //} - //fmt.Println("GET Response:\n", string(body)) - - //rawURL := "http://example.com:8080/path/to/resource?query=123#fragment" - //parsedURL, err := url.Parse(rawURL) - //if err != nil { - // fmt.Println("Error parsing URL:", err) - // return - //} - // - //hostname := parsedURL.Hostname() - //port := parsedURL.Port() - // - //uri := parsedURL.RequestURI() - // - //fmt.Println("Hostname:", hostname) - //fmt.Println("Port:", port) - //fmt.Println("URI:", uri) - - //// 使用 http.Post 发送 POST 请求上传文件 - //file, err := os.Open("path/to/your/file.jpg") - //if err != nil { - // fmt.Println("Error opening file:", err) - // return - //} - //defer file.Close() - // - //var buf bytes.Buffer - //writer := multipart.NewWriter(&buf) - //_, err = writer.CreateFormFile("file", "file.jpg") - //if err != nil { - // fmt.Println("Error creating form file:", err) - // return - //} - // - //_, err = io.ReadAll(file) - //if err != nil { - // fmt.Println("Error reading file:", err) - // return - //} - // - //err = writer.Close() - //if err != nil { - // fmt.Println("Error closing writer:", err) - // return - //} - // - //resp, err = http.Post("https://www.baidu.com/upload", writer.FormDataContentType(), &buf) - //if err != nil { - // fmt.Println("Error:", err) - // return - //} - //defer resp.Body.Close() - // - //body, err = io.ReadAll(resp.Body) - //if err != nil { - // fmt.Println("Error reading response:", err) - // return - //} - //fmt.Println("POST Response:\n", string(body)) - // - //// 使用 http.PostForm 发送表单数据 - //formData := url.Values{ - // "key": {"Value"}, - // "id": {"123"}, - //} - // - //resp, err = http.PostForm("https://www.baidu.com/form", formData) - //if err != nil { - // fmt.Println("Error:", err) - // return - //} - //defer resp.Body.Close() - // - //body, err = io.ReadAll(resp.Body) - //if err != nil { - // fmt.Println("Error reading response:", err) - // return - //} - //fmt.Println("POST Form Response:\n", string(body)) -} diff --git a/x/http/client.go b/x/http/client.go index 32cf658..d173574 100644 --- a/x/http/client.go +++ b/x/http/client.go @@ -2,63 +2,127 @@ package http import ( "fmt" + io2 "io" "strconv" "strings" + "unsafe" "github.com/goplus/llgo/c" + "github.com/goplus/llgo/c/libuv" "github.com/goplus/llgo/c/net" - "github.com/goplus/llgo/c/os" - "github.com/goplus/llgo/c/sys" "github.com/goplus/llgo/c/syscall" "github.com/goplus/llgoexamples/rust/hyper" ) type ConnData struct { - Fd c.Int - ReadWaker *hyper.Waker - WriteWaker *hyper.Waker + TcpHandle libuv.Tcp + ConnectReq libuv.Connect + ReadBuf libuv.Buf + ReadBufFilled uintptr + ReadWaker *hyper.Waker + WriteWaker *hyper.Waker } -type RequestConfig struct { - ReqMethod string - ReqHost string - ReqPort string - ReqUri string - ReqHeaders map[string]string - ReqHTTPVersion hyper.HTTPVersion - TimeoutSec int64 - TimeoutUsec int32 - //ReqBody - //ReqURIParts +type Client struct { + Transport RoundTripper } -func Get(url string) *Response { - host, port, uri := parseURL(url) - req := hyper.NewRequest() +var DefaultClient = &Client{} + +type RoundTripper interface { + RoundTrip(*hyper.Request) (*Response, error) +} + +func (c *Client) transport() RoundTripper { + if c.Transport != nil { + return c.Transport + } + return DefaultTransport +} + +func Get2(url string) (*Response, error) { + return DefaultClient.Get(url) +} +func (c *Client) Get(url string) (*Response, error) { + req, err := NewRequest("GET", url, nil) + if err != nil { + return nil, err + } + return c.Do(req) +} + +func (c *Client) Do(req *hyper.Request) (*Response, error) { + return c.do(req) +} + +func (c *Client) do(req *hyper.Request) (*Response, error) { + return c.send(req, nil) +} + +func (c *Client) send(req *hyper.Request, deadline any) (*Response, error) { + return send(req, c.transport(), deadline) +} + +func send(req *hyper.Request, rt RoundTripper, deadline any) (resp *Response, err error) { + return rt.RoundTrip(req) +} + +func NewRequest(method, url string, body io2.Reader) (*hyper.Request, error) { + host, _, uri := parseURL(url) // Prepare the request + req := hyper.NewRequest() // Set the request method and uri - if req.SetMethod((*uint8)(&[]byte("GET")[0]), c.Strlen(c.Str("GET"))) != hyper.OK { - panic(fmt.Sprintf("error setting method %s\n", "GET")) + if req.SetMethod((*uint8)(&[]byte(method)[0]), c.Strlen(c.AllocaCStr(method))) != hyper.OK { + return nil, fmt.Errorf("error setting method %s\n", method) } if req.SetURI((*uint8)(&[]byte(uri)[0]), c.Strlen(c.AllocaCStr(uri))) != hyper.OK { - panic(fmt.Sprintf("error setting uri %s\n", uri)) + return nil, fmt.Errorf("error setting uri %s\n", uri) } // Set the request headers reqHeaders := req.Headers() if reqHeaders.Set((*uint8)(&[]byte("Host")[0]), c.Strlen(c.Str("Host")), (*uint8)(&[]byte(host)[0]), c.Strlen(c.AllocaCStr(host))) != hyper.OK { - panic("error setting headers\n") + return nil, fmt.Errorf("error setting headers\n") + } + return req, nil +} + +func Get(url string) (_ *Response, err error) { + host, port, uri := parseURL(url) + + loop := libuv.DefaultLoop() + conn := (*ConnData)(c.Malloc(unsafe.Sizeof(ConnData{}))) + if conn == nil { + return nil, fmt.Errorf("Failed to allocate memory for conn_data\n") } - //var response RequestResponse + libuv.InitTcp(loop, &conn.TcpHandle) + //conn.TcpHandle.Data = c.Pointer(conn) + (*libuv.Handle)(c.Pointer(&conn.TcpHandle)).SetData(c.Pointer(conn)) + + var hints net.AddrInfo + c.Memset(c.Pointer(&hints), 0, unsafe.Sizeof(hints)) + hints.Family = syscall.AF_UNSPEC + hints.SockType = syscall.SOCK_STREAM + + var res *net.AddrInfo + status := net.Getaddrinfo(c.AllocaCStr(host), c.AllocaCStr(port), &hints, &res) + if status != 0 { + return nil, fmt.Errorf("getaddrinfo error\n") + } - fd := ConnectTo(host, port) + //conn.ConnectReq.Data = c.Pointer(conn) + (*libuv.Req)(c.Pointer(&conn.ConnectReq)).SetData(c.Pointer(conn)) + status = libuv.TcpConnect(&conn.ConnectReq, &conn.TcpHandle, res.Addr, OnConnect) + if status != 0 { + return nil, fmt.Errorf("connect error: %s\n", c.GoString(libuv.Strerror(libuv.Errno(status)))) + } - connData := NewConnData(fd) + net.Freeaddrinfo(res) // Hookup the IO - io := NewIoWithConnReadWrite(connData) + io := NewIoWithConnReadWrite(conn) // We need an executor generally to poll futures exec := hyper.NewExecutor() @@ -73,8 +137,7 @@ func Get(url string) *Response { // Let's wait for the handshake to finish... exec.Push(handshakeTask) - var fdsRead, fdsWrite, fdsExcep syscall.FdSet - var err *hyper.Error + var hyperErr *hyper.Error var response Response // The polling state machine! @@ -82,7 +145,6 @@ func Get(url string) *Response { // Poll all ready tasks and act on them... for { task := exec.Poll() - if task == nil { break } @@ -91,23 +153,41 @@ func Get(url string) *Response { case hyper.ExampleHandshake: if task.Type() == hyper.TaskError { c.Printf(c.Str("handshake error!\n")) - err = (*hyper.Error)(task.Value()) - Fail(err) + hyperErr = (*hyper.Error)(task.Value()) + err = Fail(hyperErr) + return nil, err } if task.Type() != hyper.TaskClientConn { c.Printf(c.Str("unexpected task type\n")) - Fail(err) + err = Fail(hyperErr) + return nil, err } client := (*hyper.ClientConn)(task.Value()) task.Free() + // Prepare the request + req := hyper.NewRequest() + // Set the request method and uri + if req.SetMethod((*uint8)(&[]byte("GET")[0]), c.Strlen(c.Str("GET"))) != hyper.OK { + return nil, fmt.Errorf("error setting method %s\n", "GET") + } + if req.SetURI((*uint8)(&[]byte(uri)[0]), c.Strlen(c.AllocaCStr(uri))) != hyper.OK { + return nil, fmt.Errorf("error setting uri %s\n", uri) + } + + // Set the request headers + reqHeaders := req.Headers() + if reqHeaders.Set((*uint8)(&[]byte("Host")[0]), c.Strlen(c.Str("Host")), (*uint8)(&[]byte(host)[0]), c.Strlen(c.AllocaCStr(host))) != hyper.OK { + return nil, fmt.Errorf("error setting headers\n") + } + // Send it! sendTask := client.Send(req) SetUserData(sendTask, hyper.ExampleSend) sendRes := exec.Push(sendTask) if sendRes != hyper.OK { - panic("error send\n") + return nil, fmt.Errorf("error send\n") } // For this example, no longer need the client @@ -117,12 +197,14 @@ func Get(url string) *Response { case hyper.ExampleSend: if task.Type() == hyper.TaskError { c.Printf(c.Str("send error!\n")) - err = (*hyper.Error)(task.Value()) - Fail(err) + hyperErr = (*hyper.Error)(task.Value()) + err = Fail(hyperErr) + return nil, err } if task.Type() != hyper.TaskResponse { c.Printf(c.Str("unexpected task type\n")) - Fail(err) + err = Fail(hyperErr) + return nil, err } // Take the results @@ -139,133 +221,127 @@ func Get(url string) *Response { headers.Foreach(AppendToResponseHeader, c.Pointer(&response)) respBody := resp.Body() + response.Body, response.respBodyWriter = io2.Pipe() + + /*go func() { + fmt.Println("writing...") + for { + fmt.Println("writing for...") + dataTask := respBody.Data() + exec.Push(dataTask) + dataTask = exec.Poll() + if dataTask.Type() == hyper.TaskBuf { + buf := (*hyper.Buf)(dataTask.Value()) + len := buf.Len() + bytes := unsafe.Slice((*byte)(buf.Bytes()), len) + _, err := response.respBodyWriter.Write(bytes) + if err != nil { + fmt.Printf("Failed to write response body: %v\n", err) + break + } + dataTask.Free() + } else if dataTask.Type() == hyper.TaskEmpty { + fmt.Println("writing empty") + dataTask.Free() + break + } + } + fmt.Println("end writing") + defer response.respBodyWriter.Close() + }()*/ + foreachTask := respBody.Foreach(AppendToResponseBody, c.Pointer(&response)) SetUserData(foreachTask, hyper.ExampleRespBody) exec.Push(foreachTask) + return &response, nil + // No longer need the response - resp.Free() + //resp.Free() break case hyper.ExampleRespBody: + println("ExampleRespBody") if task.Type() == hyper.TaskError { c.Printf(c.Str("body error!\n")) - err = (*hyper.Error)(task.Value()) - Fail(err) + hyperErr = (*hyper.Error)(task.Value()) + err = Fail(hyperErr) + return nil, err } if task.Type() != hyper.TaskEmpty { c.Printf(c.Str("unexpected task type\n")) - Fail(err) + err = Fail(hyperErr) + return nil, err } // Cleaning up before exiting task.Free() exec.Free() - FreeConnData(connData) + (*libuv.Handle)(c.Pointer(&conn.TcpHandle)).Close(nil) - if response.respBodyWriter != nil { - defer response.respBodyWriter.Close() - } + FreeConnData(conn) + + //if response.respBodyWriter != nil { + // defer response.respBodyWriter.Close() + //} - return &response + return &response, nil case hyper.ExampleNotSet: + println("ExampleNotSet") // A background task for hyper_client completed... task.Free() break } } - // All futures are pending on IO work, so select on the fds. - - sys.FD_ZERO(&fdsRead) - sys.FD_ZERO(&fdsWrite) - sys.FD_ZERO(&fdsExcep) - - if connData.ReadWaker != nil { - sys.FD_SET(connData.Fd, &fdsRead) - } - if connData.WriteWaker != nil { - sys.FD_SET(connData.Fd, &fdsWrite) - } - - // Set the default request timeout - var tv syscall.Timeval - tv.Sec = 10 - - selRet := sys.Select(connData.Fd+1, &fdsRead, &fdsWrite, &fdsExcep, &tv) - if selRet < 0 { - panic("select() error\n") - } else if selRet == 0 { - panic("select() timeout\n") - } - - if sys.FD_ISSET(connData.Fd, &fdsRead) != 0 { - connData.ReadWaker.Wake() - connData.ReadWaker = nil - } - - if sys.FD_ISSET(connData.Fd, &fdsWrite) != 0 { - connData.WriteWaker.Wake() - connData.WriteWaker = nil - } + libuv.Run(loop, libuv.RUN_ONCE) } } -// ConnectTo connects to a host and port -func ConnectTo(host string, port string) c.Int { - var hints net.AddrInfo - hints.Family = net.AF_UNSPEC - hints.SockType = net.SOCK_STREAM - - var result, rp *net.AddrInfo - - if net.Getaddrinfo(c.AllocaCStr(host), c.AllocaCStr(port), &hints, &result) != 0 { - panic(fmt.Sprintf("dns failed for %s\n", host)) +// AllocBuffer allocates a buffer for reading from a socket +func AllocBuffer(handle *libuv.Handle, suggestedSize uintptr, buf *libuv.Buf) { + //conn := (*ConnData)(handle.Data) + //conn := (*struct{ data *ConnData })(c.Pointer(handle)).data + conn := (*ConnData)(handle.GetData()) + if conn.ReadBuf.Base == nil { + conn.ReadBuf = libuv.InitBuf((*c.Char)(c.Malloc(suggestedSize)), c.Uint(suggestedSize)) + conn.ReadBufFilled = 0 } + *buf = libuv.InitBuf((*c.Char)(c.Pointer(uintptr(c.Pointer(conn.ReadBuf.Base))+conn.ReadBufFilled)), c.Uint(suggestedSize-conn.ReadBufFilled)) +} - var sfd c.Int - for rp = result; rp != nil; rp = rp.Next { - sfd = net.Socket(rp.Family, rp.SockType, rp.Protocol) - if sfd == -1 { - continue - } - if net.Connect(sfd, rp.Addr, rp.AddrLen) != -1 { - break - } - os.Close(sfd) - } - - net.Freeaddrinfo(result) - - // no address succeeded - if rp == nil || sfd < 0 { - panic(fmt.Sprintf("connect failed for %s\n", host)) +// OnRead is the libuv callback for reading from a socket +func OnRead(stream *libuv.Stream, nread c.Long, buf *libuv.Buf) { + //conn := (*ConnData)(stream.Data) + //conn := (*struct{ data *ConnData })(c.Pointer(stream)).data + conn := (*ConnData)((*libuv.Handle)(c.Pointer(stream)).GetData()) + if nread > 0 { + conn.ReadBufFilled += uintptr(nread) } - - if os.Fcntl(sfd, os.F_SETFL, os.O_NONBLOCK) != 0 { - panic("failed to set net to non-blocking\n") + if conn.ReadWaker != nil { + conn.ReadWaker.Wake() + conn.ReadWaker = nil } - return sfd } -// ReadCallBack is the callback for reading from a socket +// ReadCallBack is the hyper callback for reading from a socket func ReadCallBack(userdata c.Pointer, ctx *hyper.Context, buf *uint8, bufLen uintptr) uintptr { conn := (*ConnData)(userdata) - ret := os.Read(conn.Fd, c.Pointer(buf), bufLen) - - if ret >= 0 { - return uintptr(ret) - } - - if os.Errno != os.EAGAIN { - c.Perror(c.Str("[read callback fail]")) - // kaboom - return hyper.IoError + if conn.ReadBufFilled > 0 { + var toCopy uintptr + if bufLen < conn.ReadBufFilled { + toCopy = bufLen + } else { + toCopy = conn.ReadBufFilled + } + c.Memcpy(c.Pointer(buf), c.Pointer(conn.ReadBuf.Base), toCopy) + c.Memmove(c.Pointer(conn.ReadBuf.Base), c.Pointer(uintptr(c.Pointer(conn.ReadBuf.Base))+toCopy), conn.ReadBufFilled-toCopy) + conn.ReadBufFilled -= toCopy + return toCopy } - // would block, register interest if conn.ReadWaker != nil { conn.ReadWaker.Free() } @@ -273,22 +349,32 @@ func ReadCallBack(userdata c.Pointer, ctx *hyper.Context, buf *uint8, bufLen uin return hyper.IoPending } -// WriteCallBack is the callback for writing to a socket -func WriteCallBack(userdata c.Pointer, ctx *hyper.Context, buf *uint8, bufLen uintptr) uintptr { - conn := (*ConnData)(userdata) - ret := os.Write(conn.Fd, c.Pointer(buf), bufLen) +// OnWrite is the libuv callback for writing to a socket +func OnWrite(req *libuv.Write, status c.Int) { + //conn := (*ConnData)(req.Data) + //conn := (*struct{ data *ConnData })(c.Pointer(req)).data + conn := (*ConnData)((*libuv.Req)(c.Pointer(req)).GetData()) - if int(ret) >= 0 { - return uintptr(ret) + if conn.WriteWaker != nil { + conn.WriteWaker.Wake() + conn.WriteWaker = nil } + c.Free(c.Pointer(req)) +} + +// WriteCallBack is the hyper callback for writing to a socket +func WriteCallBack(userdata c.Pointer, ctx *hyper.Context, buf *uint8, bufLen uintptr) uintptr { + conn := (*ConnData)(userdata) + initBuf := libuv.InitBuf((*c.Char)(c.Pointer(buf)), c.Uint(bufLen)) + req := (*libuv.Write)(c.Malloc(unsafe.Sizeof(libuv.Write{}))) + //req.Data = c.Pointer(conn) + (*libuv.Req)(c.Pointer(req)).SetData(c.Pointer(conn)) + ret := req.Write((*libuv.Stream)(c.Pointer(&conn.TcpHandle)), &initBuf, 1, OnWrite) - if os.Errno != os.EAGAIN { - c.Perror(c.Str("[write callback fail]")) - // kaboom - return hyper.IoError + if ret >= 0 { + return bufLen } - // would block, register interest if conn.WriteWaker != nil { conn.WriteWaker.Free() } @@ -296,6 +382,19 @@ func WriteCallBack(userdata c.Pointer, ctx *hyper.Context, buf *uint8, bufLen ui return hyper.IoPending } +// OnConnect is the libuv callback for a successful connection +func OnConnect(req *libuv.Connect, status c.Int) { + //conn := (*ConnData)(req.Data) + //conn := (*struct{ data *ConnData })(c.Pointer(req)).data + conn := (*ConnData)((*libuv.Req)(c.Pointer(req)).GetData()) + + if status < 0 { + c.Fprintf(c.Stderr, c.Str("connect error: %d\n"), libuv.Strerror(libuv.Errno(status))) + return + } + (*libuv.Stream)(c.Pointer(&conn.TcpHandle)).StartRead(AllocBuffer, OnRead) +} + // FreeConnData frees the connection data func FreeConnData(conn *ConnData) { if conn.ReadWaker != nil { @@ -306,10 +405,15 @@ func FreeConnData(conn *ConnData) { conn.WriteWaker.Free() conn.WriteWaker = nil } + if conn.ReadBuf.Base != nil { + c.Free(c.Pointer(conn.ReadBuf.Base)) + conn.ReadBuf.Base = nil + } + c.Free(c.Pointer(conn)) } // Fail prints the error details and panics -func Fail(err *hyper.Error) { +func Fail(err *hyper.Error) error { if err != nil { c.Printf(c.Str("error code: %d\n"), err.Code()) // grab the error details @@ -317,22 +421,12 @@ func Fail(err *hyper.Error) { errLen := err.Print((*uint8)(c.Pointer(&errBuf[:][0])), uintptr(len(errBuf))) c.Printf(c.Str("details: %.*s\n"), c.Int(errLen), c.Pointer(&errBuf[:][0])) - c.Printf(c.Str("details: ")) - for i := 0; i < int(errLen); i++ { - c.Printf(c.Str("%c"), errBuf[i]) - } - c.Printf(c.Str("\n")) // clean up the error err.Free() - panic("request failed\n") + return fmt.Errorf("hyper error\n") } - return -} - -// NewConnData creates a new connection data -func NewConnData(fd c.Int) *ConnData { - return &ConnData{Fd: fd, ReadWaker: nil, WriteWaker: nil} + return nil } // NewIoWithConnReadWrite creates a new IO with read and write callbacks @@ -344,6 +438,12 @@ func NewIoWithConnReadWrite(connData *ConnData) *hyper.Io { return io } +// SetUserData Set the user data for the task +func SetUserData(task *hyper.Task, userData hyper.ExampleId) { + var data = userData + task.SetUserdata(c.Pointer(uintptr(data))) +} + // parseURL Parse the URL and extract the host name, port number, and URI func parseURL(rawURL string) (hostname, port, uri string) { // 找到 "://" 的位置,以分隔协议和主机名 @@ -383,6 +483,5 @@ func parseURL(rawURL string) (hostname, port, uri string) { //} port = "80" } - return } diff --git a/x/http/header.go b/x/http/header.go index ea313a2..4710854 100644 --- a/x/http/header.go +++ b/x/http/header.go @@ -25,10 +25,8 @@ func AppendToResponseHeader(userdata c.Pointer, name *uint8, nameLen uintptr, va func (resp *Response) PrintHeaders() { for key, values := range resp.Header { - fmt.Printf("%s: ", key) for _, value := range values { - fmt.Printf(value + "; ") + fmt.Printf("%s: %s\n", key, value) } - fmt.Printf("\n") } } diff --git a/x/http/hyper-go.go b/x/http/hyper-go.go deleted file mode 100644 index a1db081..0000000 --- a/x/http/hyper-go.go +++ /dev/null @@ -1,12 +0,0 @@ -package http - -import ( - "github.com/goplus/llgo/c" - "github.com/goplus/llgoexamples/rust/hyper" -) - -// SetUserData Set the user data for the task -func SetUserData(task *hyper.Task, userData hyper.ExampleId) { - var data = userData - task.SetUserdata(c.Pointer(uintptr(data))) -} diff --git a/x/http/response.go b/x/http/response.go index 9263461..f1ce501 100644 --- a/x/http/response.go +++ b/x/http/response.go @@ -3,48 +3,60 @@ package http import ( "fmt" "io" - "unsafe" "github.com/goplus/llgo/c" "github.com/goplus/llgoexamples/rust/hyper" ) type Response struct { - Status string - StatusCode int - Header Header - ResponseBody io.ReadCloser - respBodyWriter *io.PipeWriter - ResponseBodyLen int64 + Status string + StatusCode int + Header Header + Content io.ReadCloser + ContentLen int64 + respBodyWriter *io.PipeWriter + ResponseBody *uint8 + ResponseBodyLen uintptr } // AppendToResponseBody (BodyForEachCallback) appends the body to the response -func AppendToResponseBody(userdata c.Pointer, chunk *hyper.Buf) c.Int { - resp := (*Response)(userdata) - len := chunk.Len() - buf := unsafe.Slice((*byte)(chunk.Bytes()), len) +//func AppendToResponseBody(userdata c.Pointer, chunk *hyper.Buf) c.Int { +// resp := (*Response)(userdata) +// len := chunk.Len() +// buf := unsafe.Slice((*byte)(chunk.Bytes()), len) +// +// if resp.Content == nil { +// var reader *io.PipeReader +// reader, resp.respBodyWriter = io.Pipe() +// resp.Content = io.ReadCloser(reader) +// } +// resp.ContentLen += int64(len) +// var err error +// go func() { +// _, err = resp.respBodyWriter.Write(buf) +// }() +// if err != nil { +// fmt.Printf("Failed to write response body: %v\n", err) +// return hyper.IterBreak +// } +// return hyper.IterContinue +//} - if resp.ResponseBody == nil { - var reader *io.PipeReader - reader, resp.respBodyWriter = io.Pipe() - resp.ResponseBody = io.ReadCloser(reader) - } - resp.ResponseBodyLen += int64(len) - var err error +func (resp *Response) PrintBody1() { go func() { - _, err = resp.respBodyWriter.Write(buf) + var reader *io.PipeReader + reader, writer := io.Pipe() + resp.Content = reader + writer.Write((*[1 << 30]byte)(c.Pointer(resp.ResponseBody))[:resp.ResponseBodyLen:resp.ResponseBodyLen]) + defer writer.Close() }() - if err != nil { - fmt.Printf("Failed to write response body: %v\n", err) - return hyper.IterBreak + for i := 0; i < 10; i++ { + c.Usleep(1 * 1000 * 1000) + fmt.Println("Sleeping...") } - return hyper.IterContinue -} - -func (resp *Response) PrintBody() { - var buffer = make([]byte, resp.ResponseBodyLen) + var buffer = make([]byte, 4096) for { - n, err := resp.ResponseBody.Read(buffer) + n, err := resp.Content.Read(buffer) if err == io.EOF { fmt.Printf("\n") break @@ -55,33 +67,36 @@ func (resp *Response) PrintBody() { } fmt.Printf("%s", string(buffer[:n])) } + buffer = nil + //body, _ := io.ReadAll(resp.Content) + //fmt.Println(string(body)) } -//// AppendToResponseBody (BodyForEachCallback) appends the body to the response -//func AppendToResponseBody(userdata c.Pointer, chunk *hyper.Buf) c.Int { -// resp := (*Response)(userdata) -// buf := chunk.Bytes() -// len := chunk.Len() -// responseBody := (*uint8)(c.Malloc(resp.ResponseBodyLen + len)) -// if responseBody == nil { -// c.Fprintf(c.Stderr, c.Str("Failed to allocate memory for response body\n")) -// return hyper.IterBreak -// } -// -// // Copy the existing response body to the new buffer -// if resp.ResponseBody != nil { -// c.Memcpy(c.Pointer(responseBody), c.Pointer(resp.ResponseBody), resp.ResponseBodyLen) -// c.Free(c.Pointer(resp.ResponseBody)) -// } -// -// // Append the new data -// c.Memcpy(c.Pointer(uintptr(c.Pointer(responseBody))+resp.ResponseBodyLen), c.Pointer(buf), len) -// resp.ResponseBody = responseBody -// resp.ResponseBodyLen += len -// return hyper.IterContinue -//} +// AppendToResponseBody (BodyForEachCallback) appends the body to the response +func AppendToResponseBody(userdata c.Pointer, chunk *hyper.Buf) c.Int { + resp := (*Response)(userdata) + buf := chunk.Bytes() + len := chunk.Len() + responseBody := (*uint8)(c.Malloc(resp.ResponseBodyLen + len)) + if responseBody == nil { + c.Fprintf(c.Stderr, c.Str("Failed to allocate memory for response body\n")) + return hyper.IterBreak + } -//func (resp *Response) PrintBody() { -// //c.Printf(c.Str("%.*s\n"), c.Int(resp.ResponseBodyLen), resp.ResponseBody) -// fmt.Println(string((*[1 << 30]byte)(c.Pointer(resp.ResponseBody))[:resp.ResponseBodyLen:resp.ResponseBodyLen])) -//} + // Copy the existing response body to the new buffer + if resp.ResponseBody != nil { + c.Memcpy(c.Pointer(responseBody), c.Pointer(resp.ResponseBody), resp.ResponseBodyLen) + c.Free(c.Pointer(resp.ResponseBody)) + } + + // Append the new data + c.Memcpy(c.Pointer(uintptr(c.Pointer(responseBody))+resp.ResponseBodyLen), c.Pointer(buf), len) + resp.ResponseBody = responseBody + resp.ResponseBodyLen += len + return hyper.IterContinue +} + +func (resp *Response) PrintBody2() { + //c.Printf(c.Str("%.*s\n"), c.Int(resp.ResponseBodyLen), resp.ResponseBody) + fmt.Println(string((*[1 << 30]byte)(c.Pointer(resp.ResponseBody))[:resp.ResponseBodyLen:resp.ResponseBodyLen])) +} \ No newline at end of file