Skip to content

Commit

Permalink
WIP(x/net/http/client): Implement BodyChunk
Browse files Browse the repository at this point in the history
  • Loading branch information
spongehah committed Sep 11, 2024
1 parent 0d8cc27 commit 9cf4537
Show file tree
Hide file tree
Showing 5 changed files with 504 additions and 556 deletions.
2 changes: 1 addition & 1 deletion x/net/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func send(ireq *Request, rt RoundTripper, deadline time.Time) (resp *Response, d
forkReq()
}

// TODO(spongehah) timeout(send)
// TODO(spongehah) tmp timeout(send)
//stopTimer, didTimeout := setRequestCancel(req, rt, deadline)
req.timeoutch = make(chan struct{}, 1)
req.deadline = deadline
Expand Down
11 changes: 3 additions & 8 deletions x/net/http/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func (r *Request) write(client *hyper.ClientConn, taskData *taskData, exec *hype
//}

// Prepare the hyper.Request
hyperReq, err := r.newHyperRequest(taskData.pc.isProxy, taskData.req.extra)
hyperReq, err := r.newHyperRequest(taskData.pc.isProxy, taskData.req.extra, taskData.req)
if err != nil {
return err
}
Expand All @@ -308,7 +308,7 @@ func (r *Request) write(client *hyper.ClientConn, taskData *taskData, exec *hype
return err
}

func (r *Request) newHyperRequest(usingProxy bool, extraHeader Header) (*hyper.Request, error) {
func (r *Request) newHyperRequest(usingProxy bool, extraHeader Header, treq *transportRequest) (*hyper.Request, error) {

Check warning on line 311 in x/net/http/request.go

View check run for this annotation

qiniu-x / golangci-lint

x/net/http/request.go#L311

flag-parameter: parameter 'usingProxy' seems to be a control flag, avoid control coupling (revive)
// Find the target host. Prefer the Host: header, but if that
// is not given, use the host from the request URL.
//
Expand Down Expand Up @@ -401,11 +401,6 @@ func (r *Request) newHyperRequest(usingProxy bool, extraHeader Header) (*hyper.R
}

// Process Body,ContentLength,Close,Trailer
//tw, err := newTransferWriter(r)
//if err != nil {
// return err
//}
//err = tw.writeHeader(w, trace)
err = r.writeHeader(reqHeaders)
if err != nil {
return nil, err
Expand Down Expand Up @@ -433,7 +428,7 @@ func (r *Request) newHyperRequest(usingProxy bool, extraHeader Header) (*hyper.R
}

// Write body and trailer
err = r.writeBody(hyperReq)
err = r.writeBody(hyperReq, treq)
if err != nil {
return nil, err
}
Expand Down
72 changes: 71 additions & 1 deletion x/net/http/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (r *Response) closeBody() {
}
}

func ReadResponse(r *io.PipeReader, req *Request, hyperResp *hyper.Response) (*Response, error) {
func ReadResponse(r *bodyChunk, req *Request, hyperResp *hyper.Response) (*Response, error) {
resp := &Response{
Request: req,
Header: make(Header),
Expand Down Expand Up @@ -117,3 +117,73 @@ func (r *Response) bodyIsWritable() bool {
_, ok := r.Body.(io.Writer)
return ok
}

func (resp *Response) checkRespBody(taskData *taskData) (needContinue bool) {
pc := taskData.pc
bodyWritable := resp.bodyIsWritable()
hasBody := taskData.req.Method != "HEAD" && resp.ContentLength != 0

if resp.Close || taskData.req.Close || resp.StatusCode <= 199 || bodyWritable {
// Don't do keep-alive on error if either party requested a close
// or we get an unexpected informational (1xx) response.
// StatusCode 100 is already handled above.
pc.alive = false
}

if !hasBody || bodyWritable {
replaced := pc.t.replaceReqCanceler(taskData.req.cancelKey, nil)

// Put the idle conn back into the pool before we send the response
// so if they process it quickly and make another request, they'll
// get this same conn. But we use the unbuffered channel 'rc'
// to guarantee that persistConn.roundTrip got out of its select
// potentially waiting for this persistConn to close.
pc.alive = pc.alive &&
replaced && pc.tryPutIdleConn()

if bodyWritable {
pc.closeErr = errCallerOwnsConn
}

select {
case taskData.resc <- responseAndError{res: resp}:
case <-taskData.callerGone:
readLoopDefer(pc, true)
return true
}
// Now that they've read from the unbuffered channel, they're safely
// out of the select that also waits on this goroutine to die, so
// we're allowed to exit now if needed (if alive is false)
readLoopDefer(pc, false)
return true
}
return false
}

func (r *Response) wrapBodyEOFSignalAndGzip(taskData *taskData) {
body := &bodyEOFSignal{
body: r.Body,
earlyCloseFn: func() error {
return nil
},
fn: func(err error) error {
isEOF := err == io.EOF
if !isEOF {
if cerr := taskData.pc.canceled(); cerr != nil {
return cerr
}
}
return err
},
}
r.Body = body
// TODO(spongehah) gzip(wrapBodyEOFSignal)
//if taskData.addedGzip && EqualFold(r.Header.Get("Content-Encoding"), "gzip") {
// println("gzip reader")
// r.Body = &gzipReader{body: body}
// r.Header.Del("Content-Encoding")
// r.Header.Del("Content-Length")
// r.ContentLength = -1
// r.Uncompressed = true
//}
}
28 changes: 14 additions & 14 deletions x/net/http/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (uste *unsupportedTEError) Error() string {
}

// msg is *Request or *Response.
func readTransfer(msg any, r *io.PipeReader) (err error) {
func readTransfer(msg any, r *bodyChunk) (err error) {

Check warning on line 101 in x/net/http/transfer.go

View check run for this annotation

qiniu-x / golangci-lint

x/net/http/transfer.go#L101

cognitive complexity 31 of func `readTransfer` is high (> 30) (gocognit)
t := &transferReader{RequestMethod: "GET"}

// Unify input
Expand Down Expand Up @@ -173,8 +173,6 @@ func readTransfer(msg any, r *io.PipeReader) (err error) {
if isResponse && noResponseBodyExpected(t.RequestMethod) || !bodyAllowedForStatus(t.StatusCode) {
t.Body = NoBody
} else {
// TODO(spongehah) ChunkReader(readTransfer)
//t.Body = &body{src: internal.NewChunkedReader(r), hdr: msg, r: r, closing: t.Close}
t.Body = &body{src: r, hdr: msg, r: r, closing: t.Close}
}
case realLength == 0:
Expand Down Expand Up @@ -665,15 +663,15 @@ func (req *Request) unwrapBody() io.Reader {
return req.Body
}

func (r *Request) writeBody(hyperReq *hyper.Request) error {
func (r *Request) writeBody(hyperReq *hyper.Request, treq *transportRequest) error {

Check warning on line 666 in x/net/http/transfer.go

View check run for this annotation

qiniu-x / golangci-lint

x/net/http/transfer.go#L666

(*Request).writeBody - result 0 (error) is always nil (unparam)
if r.Body != nil {
var body = r.unwrapBody()
hyperReqBody := hyper.NewBody()
buf := make([]byte, defaultChunkSize)
reqData := &bodyReq{
body: body,
buf: buf,
closeBody: r.closeBody,
body: body,
buf: buf,
treq: treq,
}
hyperReqBody.SetUserdata(c.Pointer(reqData))
hyperReqBody.SetDataFunc(setPostData)
Expand All @@ -683,9 +681,9 @@ func (r *Request) writeBody(hyperReq *hyper.Request) error {
}

type bodyReq struct {
body io.Reader
buf []byte
closeBody func() error
body io.Reader
buf []byte
treq *transportRequest
}

func setPostData(userdata c.Pointer, ctx *hyper.Context, chunk **hyper.Buf) c.Int {
Expand All @@ -694,10 +692,11 @@ func setPostData(userdata c.Pointer, ctx *hyper.Context, chunk **hyper.Buf) c.In
if err != nil {
if err == io.EOF {

Check warning on line 693 in x/net/http/transfer.go

View check run for this annotation

qiniu-x / golangci-lint

x/net/http/transfer.go#L693

comparing with == will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)
*chunk = nil
req.closeBody()
req.treq.closeBody()

Check warning on line 695 in x/net/http/transfer.go

View check run for this annotation

qiniu-x / golangci-lint

x/net/http/transfer.go#L695

Error return value of `req.treq.closeBody` is not checked (errcheck)
return hyper.PollReady
}
fmt.Println("error reading request body: ", err)

Check warning on line 698 in x/net/http/transfer.go

View check run for this annotation

qiniu-x / golangci-lint

x/net/http/transfer.go#L698

use of `fmt.Println` forbidden by pattern `^(fmt\.Print(|f|ln)|print|println)$` (forbidigo)
req.treq.setError(requestBodyReadError{err})
return hyper.PollError
}
if n > 0 {
Expand All @@ -706,10 +705,11 @@ func setPostData(userdata c.Pointer, ctx *hyper.Context, chunk **hyper.Buf) c.In
}
if n == 0 {
*chunk = nil
req.closeBody()
req.treq.closeBody()

Check warning on line 708 in x/net/http/transfer.go

View check run for this annotation

qiniu-x / golangci-lint

x/net/http/transfer.go#L708

Error return value of `req.treq.closeBody` is not checked (errcheck)
return hyper.PollReady
}
req.closeBody()
fmt.Printf("error reading request body: %s\n", c.GoString(c.Strerror(os.Errno)))
req.treq.closeBody()

Check warning on line 711 in x/net/http/transfer.go

View check run for this annotation

qiniu-x / golangci-lint

x/net/http/transfer.go#L711

Error return value of `req.treq.closeBody` is not checked (errcheck)
err = fmt.Errorf("error reading request body: %s\n", c.GoString(c.Strerror(os.Errno)))
req.treq.setError(requestBodyReadError{err})
return hyper.PollError
}
Loading

0 comments on commit 9cf4537

Please sign in to comment.