diff --git a/pkg/common/test/mock/network.go b/pkg/common/test/mock/network.go index fe671ae56..b3ae54bb8 100644 --- a/pkg/common/test/mock/network.go +++ b/pkg/common/test/mock/network.go @@ -325,7 +325,8 @@ func (m *Conn) AddCloseCallback(callback netpoll.CloseCallback) error { } type StreamConn struct { - Data []byte + HasReleased bool + Data []byte } func NewStreamConn() *StreamConn { @@ -354,7 +355,8 @@ func (m *StreamConn) Skip(n int) error { } func (m *StreamConn) Release() error { - panic("implement me") + m.HasReleased = true + return nil } func (m *StreamConn) Len() int { diff --git a/pkg/common/test/mock/network_test.go b/pkg/common/test/mock/network_test.go index 4c9c4cf5b..34b0769db 100644 --- a/pkg/common/test/mock/network_test.go +++ b/pkg/common/test/mock/network_test.go @@ -174,13 +174,13 @@ func TestStreamConn(t *testing.T) { assert.DeepEqual(t, cap(conn.Data), conn.Len()) err = conn.Skip(conn.Len() + 1) assert.DeepEqual(t, "not enough data", err.Error()) + err = conn.Release() + assert.DeepEqual(t, nil, err) + assert.DeepEqual(t, true, conn.HasReleased) }) t.Run("TestNotImplement", func(t *testing.T) { conn := NewStreamConn() - assert.Panic(t, func() { - conn.Release() - }) assert.Panic(t, func() { conn.ReadByte() }) diff --git a/pkg/protocol/client/client_test.go b/pkg/protocol/client/client_test.go index 49e7e7df6..2babab4ab 100644 --- a/pkg/protocol/client/client_test.go +++ b/pkg/protocol/client/client_test.go @@ -33,7 +33,6 @@ type MockDoer struct { } func (m *MockDoer) Do(ctx context.Context, req *protocol.Request, resp *protocol.Response) error { - // this is the real logic in (c *HostClient) doNonNilReqResp method if len(req.Header.Host()) == 0 { req.Header.SetHostBytes(req.URI().Host()) diff --git a/pkg/protocol/http1/ext/stream.go b/pkg/protocol/http1/ext/stream.go index ae81b560e..c771cb82b 100644 --- a/pkg/protocol/http1/ext/stream.go +++ b/pkg/protocol/http1/ext/stream.go @@ -272,6 +272,12 @@ func (rs *bodyStream) skipRest() error { if err != nil { return err } + // After Skip, the buffer needs to be released to prevent OOM if there are too much data on conn. + err = rs.reader.Release() + if err != nil { + return err + } + } } // max value of pSize is 8193, it's safe. @@ -300,7 +306,15 @@ func (rs *bodyStream) skipRest() error { if skip > needSkipLen { skip = needSkipLen } - rs.reader.Skip(skip) + err := rs.reader.Skip(skip) + if err != nil { + return err + } + // After Skip, the buffer needs to be released to prevent OOM if there are too much data on conn. + err = rs.reader.Release() + if err != nil { + return err + } needSkipLen -= skip if needSkipLen == 0 { return nil diff --git a/pkg/protocol/http1/req/request_test.go b/pkg/protocol/http1/req/request_test.go index 0411187a5..e9451e776 100644 --- a/pkg/protocol/http1/req/request_test.go +++ b/pkg/protocol/http1/req/request_test.go @@ -1425,6 +1425,7 @@ func TestStreamNotEnoughData(t *testing.T) { err = ext.ReleaseBodyStream(req.BodyStream()) assert.Nil(t, err) assert.DeepEqual(t, 0, len(conn.Data)) + assert.DeepEqual(t, true, conn.HasReleased) } func TestRequestBodyStreamWithTrailer(t *testing.T) {