From cd0573190524675a0a2030cdcc68905a6efeea71 Mon Sep 17 00:00:00 2001 From: "luozhengjie.lzj" Date: Wed, 17 Apr 2024 14:38:26 +0800 Subject: [PATCH 1/5] Optimize memory usage by avoiding intermediate buffer in message serialization This commit replaces the use of an intermediate buffer in the message serialization process with a direct write-to-buffer approach. The original implementation used MustMarshalBinary() which involved an extra memory copy to an intermediate buffer before writing to the final writeBuffer, leading to high memory consumption for large messages. The new WriteTo function writes message data directly to the writeBuffer, significantly reducing memory overhead and CPU time spent on garbage collection. --- peer-conn-msg-writer.go | 27 ++++++- peer_protocol/msg.go | 160 +++++++++++++++++++++++++++++----------- 2 files changed, 141 insertions(+), 46 deletions(-) diff --git a/peer-conn-msg-writer.go b/peer-conn-msg-writer.go index 1bacc59d18..fc5dfa3287 100644 --- a/peer-conn-msg-writer.go +++ b/peer-conn-msg-writer.go @@ -2,6 +2,7 @@ package torrent import ( "bytes" + "encoding/binary" "io" "time" @@ -117,10 +118,34 @@ func (cn *peerConnMsgWriter) run(keepAliveTimeout time.Duration) { } } +func (cn *peerConnMsgWriter) writeToBuffer(msg pp.Message) (err error) { + originalLen := cn.writeBuffer.Len() + defer func() { + if err != nil { + // Since an error occurred during buffer write, revert buffer to its original state before the write. + cn.writeBuffer.Truncate(originalLen) + } + }() + length, err := msg.GetDataLength() + if err != nil { + return err + } + // Pre-calculate buffer capacity to avoid multiple reallocations. + const msgBufLen = 4 // uint32 + cn.writeBuffer.Grow(length + msgBufLen) + // Write message length to buffer. + err = binary.Write(cn.writeBuffer, binary.BigEndian, uint32(length)) + // Write message data to buffer. + if !msg.Keepalive { + _, err = msg.WriteTo(cn.writeBuffer) + } + return err +} + func (cn *peerConnMsgWriter) write(msg pp.Message) bool { cn.mu.Lock() defer cn.mu.Unlock() - cn.writeBuffer.Write(msg.MustMarshalBinary()) + cn.writeToBuffer(msg) cn.writeCond.Broadcast() return !cn.writeBufferFull() } diff --git a/peer_protocol/msg.go b/peer_protocol/msg.go index b08bb5380e..32d8cf34e8 100644 --- a/peer_protocol/msg.go +++ b/peer_protocol/msg.go @@ -6,6 +6,7 @@ import ( "encoding" "encoding/binary" "fmt" + "io" ) // This is a lazy union representing all the possible fields for messages. Go doesn't have ADTs, and @@ -61,69 +62,98 @@ func (msg Message) MustMarshalBinary() []byte { return b } -func (msg Message) MarshalBinary() (data []byte, err error) { - // It might look like you could have a pool of buffers and preallocate the message length - // prefix, but because we have to return []byte, it becomes non-trivial to make this fast. You - // will need a benchmark. - var buf bytes.Buffer - mustWrite := func(data any) { - err := binary.Write(&buf, binary.BigEndian, data) +func (msg Message) WriteTo(w io.Writer) (n int64, err error) { + dw := newDataWriter(w) + defer func() { + n = dw.GetBytesWritten() + }() + + err = dw.WriteByte(byte(msg.Type)) + if err != nil { + return + } + + switch msg.Type { + case Choke, Unchoke, Interested, NotInterested, HaveAll, HaveNone: + case Have, AllowedFast, Suggest: + err = dw.BinaryWrite(binary.BigEndian, msg.Index) + case Request, Cancel, Reject: + for _, i := range []Integer{msg.Index, msg.Begin, msg.Length} { + err = dw.BinaryWrite(binary.BigEndian, i) + if err != nil { + break + } + } + case Bitfield: + _, err = dw.Write(marshalBitfield(msg.Bitfield)) + case Piece: + for _, i := range []Integer{msg.Index, msg.Begin} { + err = dw.BinaryWrite(binary.BigEndian, i) + if err != nil { + return + } + } + written, err := dw.Write(msg.Piece) if err != nil { - panic(err) + break } - } - writeConsecutive := func(data ...any) { - for _, d := range data { - mustWrite(d) + if written != len(msg.Piece) { + panic(written) } - } - if !msg.Keepalive { - err = buf.WriteByte(byte(msg.Type)) + case Extended: + err = dw.WriteByte(byte(msg.ExtendedID)) if err != nil { return } + _, err = dw.Write(msg.ExtendedPayload) + case Port: + err = dw.BinaryWrite(binary.BigEndian, msg.Port) + default: + err = fmt.Errorf("unknown message type: %v", msg.Type) + } + return +} + +const ( + msgTypeLen = 1 // byte + msgIndexLen = 4 // uint32 + msgBeginLen = 4 // uint32 + msgExtendedIDLen = 1 // byte + msgPortLen = 2 // uint16 +) + +func (msg Message) GetDataLength() (length int, err error) { + if !msg.Keepalive { + length += msgTypeLen switch msg.Type { case Choke, Unchoke, Interested, NotInterested, HaveAll, HaveNone: case Have, AllowedFast, Suggest: - err = binary.Write(&buf, binary.BigEndian, msg.Index) + length += msgIndexLen case Request, Cancel, Reject: - for _, i := range []Integer{msg.Index, msg.Begin, msg.Length} { - err = binary.Write(&buf, binary.BigEndian, i) - if err != nil { - break - } - } + length += msgIndexLen + msgBeginLen + msgBeginLen case Bitfield: - _, err = buf.Write(marshalBitfield(msg.Bitfield)) + length += (len(msg.Bitfield) + 7) / 8 case Piece: - for _, i := range []Integer{msg.Index, msg.Begin} { - err = binary.Write(&buf, binary.BigEndian, i) - if err != nil { - return - } - } - n, err := buf.Write(msg.Piece) - if err != nil { - break - } - if n != len(msg.Piece) { - panic(n) - } + length += msgIndexLen + msgBeginLen + len(msg.Piece) case Extended: - err = buf.WriteByte(byte(msg.ExtendedID)) - if err != nil { - return - } - _, err = buf.Write(msg.ExtendedPayload) + length += msgExtendedIDLen + len(msg.ExtendedPayload) case Port: - err = binary.Write(&buf, binary.BigEndian, msg.Port) - case HashRequest: - buf.Write(msg.PiecesRoot[:]) - writeConsecutive(msg.BaseLayer, msg.Index, msg.Length, msg.ProofLayers) + length += msgPortLen default: err = fmt.Errorf("unknown message type: %v", msg.Type) } } + return +} + +func (msg Message) MarshalBinary() (data []byte, err error) { + // It might look like you could have a pool of buffers and preallocate the message length + // prefix, but because we have to return []byte, it becomes non-trivial to make this fast. You + // will need a benchmark. + var buf bytes.Buffer + if !msg.Keepalive { + _, err = msg.WriteTo(&buf) + } data = make([]byte, 4+buf.Len()) binary.BigEndian.PutUint32(data, uint32(buf.Len())) if buf.Len() != copy(data[4:], buf.Bytes()) { @@ -158,3 +188,43 @@ func (me *Message) UnmarshalBinary(b []byte) error { } return nil } + +type dataWriter struct { + writer io.Writer + n int64 +} + +func (d *dataWriter) BinaryWrite(order binary.ByteOrder, data any) error { + err := binary.Write(d.writer, order, data) + if err != nil { + return err + } + d.n += int64(binary.Size(data)) + return nil +} + +func (d *dataWriter) Write(bytes []byte) (int, error) { + n, err := d.writer.Write(bytes) + if err != nil { + return n, err + } + d.n += int64(n) + return n, nil +} + +func (d *dataWriter) WriteByte(b byte) error { + n, err := d.Write([]byte{b}) + if err != nil { + return err + } + d.n += int64(n) + return nil +} + +func (d *dataWriter) GetBytesWritten() int64 { + return d.n +} + +func newDataWriter(writer io.Writer) *dataWriter { + return &dataWriter{writer, 0} +} From 1c14f458f4a07d710b684022b6823d57267d76ca Mon Sep 17 00:00:00 2001 From: "luozhengjie.lzj" Date: Mon, 22 Apr 2024 11:47:16 +0800 Subject: [PATCH 2/5] add benchmark for write --- peer-conn-msg-writer.go | 3 -- peer-conn-msg-writer_test.go | 82 ++++++++++++++++++++++++++++++++++++ 2 files changed, 82 insertions(+), 3 deletions(-) create mode 100644 peer-conn-msg-writer_test.go diff --git a/peer-conn-msg-writer.go b/peer-conn-msg-writer.go index fc5dfa3287..5d4120887d 100644 --- a/peer-conn-msg-writer.go +++ b/peer-conn-msg-writer.go @@ -130,9 +130,6 @@ func (cn *peerConnMsgWriter) writeToBuffer(msg pp.Message) (err error) { if err != nil { return err } - // Pre-calculate buffer capacity to avoid multiple reallocations. - const msgBufLen = 4 // uint32 - cn.writeBuffer.Grow(length + msgBufLen) // Write message length to buffer. err = binary.Write(cn.writeBuffer, binary.BigEndian, uint32(length)) // Write message data to buffer. diff --git a/peer-conn-msg-writer_test.go b/peer-conn-msg-writer_test.go new file mode 100644 index 0000000000..c5ee282b18 --- /dev/null +++ b/peer-conn-msg-writer_test.go @@ -0,0 +1,82 @@ +package torrent + +import ( + "bytes" + "testing" + + pp "github.com/anacrolix/torrent/peer_protocol" +) + +func PieceMsg(length int64) pp.Message { + return pp.Message{ + Type: pp.Piece, + Index: pp.Integer(0), + Begin: pp.Integer(0), + Piece: make([]byte, length), + } +} + +const ( + // 4M + MsgLength4M = 4 * 1024 * 1024 + // 1M + MsgLength8M = 1 * 1024 * 1024 + // 512K + MsgLength512K = 512 * 1024 +) + +func runBenchmarkWriteToBuffer(b *testing.B, length int64) { + writer := &peerConnMsgWriter{ + writeBuffer: &bytes.Buffer{}, + } + msg := PieceMsg(MsgLength4M) + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + b.StopTimer() + writer.writeBuffer.Reset() + b.StartTimer() + writer.writeToBuffer(msg) + } +} + +func BenchmarkWriteToBuffer4M(b *testing.B) { + runBenchmarkWriteToBuffer(b, MsgLength4M) +} + +func BenchmarkWriteToBuffer8M(b *testing.B) { + runBenchmarkWriteToBuffer(b, MsgLength8M) +} + +func BenchmarkWriteToBuffer512K(b *testing.B) { + runBenchmarkWriteToBuffer(b, MsgLength512K) +} + +func runBenchmarkMarshalBinaryWrite(b *testing.B, length int64) { + writer := &peerConnMsgWriter{ + writeBuffer: &bytes.Buffer{}, + } + msg := PieceMsg(length) + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + b.StopTimer() + writer.writeBuffer.Reset() + b.StartTimer() + writer.writeBuffer.Write(msg.MustMarshalBinary()) + } +} + +func BenchmarkMarshalBinaryWrite4M(b *testing.B) { + runBenchmarkMarshalBinaryWrite(b, MsgLength4M) +} + +func BenchmarkMarshalBinaryWrite8M(b *testing.B) { + runBenchmarkMarshalBinaryWrite(b, MsgLength8M) +} + +func BenchmarkMarshalBinaryWrite512K(b *testing.B) { + runBenchmarkMarshalBinaryWrite(b, MsgLength512K) +} From 0aea51f40d35f6a94933e82b497acae1b516e9d2 Mon Sep 17 00:00:00 2001 From: "luozhengjie.lzj" Date: Mon, 22 Apr 2024 14:04:02 +0800 Subject: [PATCH 3/5] benchmark for 1M/4M/8M --- peer-conn-msg-writer_test.go | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/peer-conn-msg-writer_test.go b/peer-conn-msg-writer_test.go index c5ee282b18..e6549c6b1b 100644 --- a/peer-conn-msg-writer_test.go +++ b/peer-conn-msg-writer_test.go @@ -17,12 +17,12 @@ func PieceMsg(length int64) pp.Message { } const ( + // 8M + MsgLength8M = 8 * 1024 * 1024 // 4M MsgLength4M = 4 * 1024 * 1024 // 1M - MsgLength8M = 1 * 1024 * 1024 - // 512K - MsgLength512K = 512 * 1024 + MsgLength1M = 1 * 1024 * 1024 ) func runBenchmarkWriteToBuffer(b *testing.B, length int64) { @@ -41,16 +41,16 @@ func runBenchmarkWriteToBuffer(b *testing.B, length int64) { } } -func BenchmarkWriteToBuffer4M(b *testing.B) { - runBenchmarkWriteToBuffer(b, MsgLength4M) -} - func BenchmarkWriteToBuffer8M(b *testing.B) { runBenchmarkWriteToBuffer(b, MsgLength8M) } -func BenchmarkWriteToBuffer512K(b *testing.B) { - runBenchmarkWriteToBuffer(b, MsgLength512K) +func BenchmarkWriteToBuffer4M(b *testing.B) { + runBenchmarkWriteToBuffer(b, MsgLength4M) +} + +func BenchmarkWriteToBuffer1M(b *testing.B) { + runBenchmarkWriteToBuffer(b, MsgLength1M) } func runBenchmarkMarshalBinaryWrite(b *testing.B, length int64) { @@ -69,14 +69,14 @@ func runBenchmarkMarshalBinaryWrite(b *testing.B, length int64) { } } -func BenchmarkMarshalBinaryWrite4M(b *testing.B) { - runBenchmarkMarshalBinaryWrite(b, MsgLength4M) -} - func BenchmarkMarshalBinaryWrite8M(b *testing.B) { runBenchmarkMarshalBinaryWrite(b, MsgLength8M) } -func BenchmarkMarshalBinaryWrite512K(b *testing.B) { - runBenchmarkMarshalBinaryWrite(b, MsgLength512K) +func BenchmarkMarshalBinaryWrite4M(b *testing.B) { + runBenchmarkMarshalBinaryWrite(b, MsgLength4M) +} + +func BenchmarkMarshalBinaryWrite1M(b *testing.B) { + runBenchmarkMarshalBinaryWrite(b, MsgLength1M) } From 3b1eaa9d20b5199dbab321dc084a6fba2a7bca44 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Thu, 25 Apr 2024 14:24:26 +1000 Subject: [PATCH 4/5] Tidy up new benchmarks --- peer-conn-msg-writer_test.go | 56 ++++++++++++++---------------------- 1 file changed, 21 insertions(+), 35 deletions(-) diff --git a/peer-conn-msg-writer_test.go b/peer-conn-msg-writer_test.go index e6549c6b1b..308d18e5af 100644 --- a/peer-conn-msg-writer_test.go +++ b/peer-conn-msg-writer_test.go @@ -4,6 +4,8 @@ import ( "bytes" "testing" + "github.com/dustin/go-humanize" + pp "github.com/anacrolix/torrent/peer_protocol" ) @@ -16,41 +18,37 @@ func PieceMsg(length int64) pp.Message { } } -const ( - // 8M - MsgLength8M = 8 * 1024 * 1024 - // 4M - MsgLength4M = 4 * 1024 * 1024 - // 1M - MsgLength1M = 1 * 1024 * 1024 -) +var benchmarkPieceLengths = []int{defaultChunkSize, 1 << 20, 4 << 20, 8 << 20} func runBenchmarkWriteToBuffer(b *testing.B, length int64) { writer := &peerConnMsgWriter{ writeBuffer: &bytes.Buffer{}, } - msg := PieceMsg(MsgLength4M) + msg := PieceMsg(length) b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - b.StopTimer() + //b.StopTimer() writer.writeBuffer.Reset() - b.StartTimer() + //b.StartTimer() writer.writeToBuffer(msg) } } -func BenchmarkWriteToBuffer8M(b *testing.B) { - runBenchmarkWriteToBuffer(b, MsgLength8M) -} - -func BenchmarkWriteToBuffer4M(b *testing.B) { - runBenchmarkWriteToBuffer(b, MsgLength4M) -} - -func BenchmarkWriteToBuffer1M(b *testing.B) { - runBenchmarkWriteToBuffer(b, MsgLength1M) +func BenchmarkWritePieceMsg(b *testing.B) { + for _, length := range benchmarkPieceLengths { + b.Run(humanize.IBytes(uint64(length)), func(b *testing.B) { + b.Run("ToBuffer", func(b *testing.B) { + b.SetBytes(int64(length)) + runBenchmarkWriteToBuffer(b, int64(length)) + }) + b.Run("MarshalBinary", func(b *testing.B) { + b.SetBytes(int64(length)) + runBenchmarkMarshalBinaryWrite(b, int64(length)) + }) + }) + } } func runBenchmarkMarshalBinaryWrite(b *testing.B, length int64) { @@ -62,21 +60,9 @@ func runBenchmarkMarshalBinaryWrite(b *testing.B, length int64) { b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - b.StopTimer() + //b.StopTimer() writer.writeBuffer.Reset() - b.StartTimer() + //b.StartTimer() writer.writeBuffer.Write(msg.MustMarshalBinary()) } } - -func BenchmarkMarshalBinaryWrite8M(b *testing.B) { - runBenchmarkMarshalBinaryWrite(b, MsgLength8M) -} - -func BenchmarkMarshalBinaryWrite4M(b *testing.B) { - runBenchmarkMarshalBinaryWrite(b, MsgLength4M) -} - -func BenchmarkMarshalBinaryWrite1M(b *testing.B) { - runBenchmarkMarshalBinaryWrite(b, MsgLength1M) -} From 25b5a4f3dce457095ae8e60946648b1f3a8eec3b Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Thu, 25 Apr 2024 15:10:37 +1000 Subject: [PATCH 5/5] Maintain older payload write implementation --- peer-conn-msg-writer.go | 13 +-- peer_protocol/msg.go | 177 +++++++++++++++++----------------------- requesting.go | 3 +- 3 files changed, 76 insertions(+), 117 deletions(-) diff --git a/peer-conn-msg-writer.go b/peer-conn-msg-writer.go index 5d4120887d..4f17e5edec 100644 --- a/peer-conn-msg-writer.go +++ b/peer-conn-msg-writer.go @@ -2,7 +2,6 @@ package torrent import ( "bytes" - "encoding/binary" "io" "time" @@ -126,17 +125,7 @@ func (cn *peerConnMsgWriter) writeToBuffer(msg pp.Message) (err error) { cn.writeBuffer.Truncate(originalLen) } }() - length, err := msg.GetDataLength() - if err != nil { - return err - } - // Write message length to buffer. - err = binary.Write(cn.writeBuffer, binary.BigEndian, uint32(length)) - // Write message data to buffer. - if !msg.Keepalive { - _, err = msg.WriteTo(cn.writeBuffer) - } - return err + return msg.WriteTo(cn.writeBuffer) } func (cn *peerConnMsgWriter) write(msg pp.Message) bool { diff --git a/peer_protocol/msg.go b/peer_protocol/msg.go index 32d8cf34e8..2ce2b9565c 100644 --- a/peer_protocol/msg.go +++ b/peer_protocol/msg.go @@ -62,83 +62,66 @@ func (msg Message) MustMarshalBinary() []byte { return b } -func (msg Message) WriteTo(w io.Writer) (n int64, err error) { - dw := newDataWriter(w) - defer func() { - n = dw.GetBytesWritten() - }() - - err = dw.WriteByte(byte(msg.Type)) - if err != nil { - return - } +type MessageWriter interface { + io.ByteWriter + io.Writer +} - switch msg.Type { - case Choke, Unchoke, Interested, NotInterested, HaveAll, HaveNone: - case Have, AllowedFast, Suggest: - err = dw.BinaryWrite(binary.BigEndian, msg.Index) - case Request, Cancel, Reject: - for _, i := range []Integer{msg.Index, msg.Begin, msg.Length} { - err = dw.BinaryWrite(binary.BigEndian, i) - if err != nil { - break - } - } - case Bitfield: - _, err = dw.Write(marshalBitfield(msg.Bitfield)) - case Piece: - for _, i := range []Integer{msg.Index, msg.Begin} { - err = dw.BinaryWrite(binary.BigEndian, i) - if err != nil { - return - } - } - written, err := dw.Write(msg.Piece) +func (msg *Message) writePayloadTo(buf MessageWriter) (err error) { + mustWrite := func(data any) { + err := binary.Write(buf, binary.BigEndian, data) if err != nil { - break + panic(err) } - if written != len(msg.Piece) { - panic(written) + } + writeConsecutive := func(data ...any) { + for _, d := range data { + mustWrite(d) } - case Extended: - err = dw.WriteByte(byte(msg.ExtendedID)) + } + if !msg.Keepalive { + err = buf.WriteByte(byte(msg.Type)) if err != nil { return } - _, err = dw.Write(msg.ExtendedPayload) - case Port: - err = dw.BinaryWrite(binary.BigEndian, msg.Port) - default: - err = fmt.Errorf("unknown message type: %v", msg.Type) - } - return -} - -const ( - msgTypeLen = 1 // byte - msgIndexLen = 4 // uint32 - msgBeginLen = 4 // uint32 - msgExtendedIDLen = 1 // byte - msgPortLen = 2 // uint16 -) - -func (msg Message) GetDataLength() (length int, err error) { - if !msg.Keepalive { - length += msgTypeLen switch msg.Type { case Choke, Unchoke, Interested, NotInterested, HaveAll, HaveNone: case Have, AllowedFast, Suggest: - length += msgIndexLen + err = binary.Write(buf, binary.BigEndian, msg.Index) case Request, Cancel, Reject: - length += msgIndexLen + msgBeginLen + msgBeginLen + for _, i := range []Integer{msg.Index, msg.Begin, msg.Length} { + err = binary.Write(buf, binary.BigEndian, i) + if err != nil { + break + } + } case Bitfield: - length += (len(msg.Bitfield) + 7) / 8 + _, err = buf.Write(marshalBitfield(msg.Bitfield)) case Piece: - length += msgIndexLen + msgBeginLen + len(msg.Piece) + for _, i := range []Integer{msg.Index, msg.Begin} { + err = binary.Write(buf, binary.BigEndian, i) + if err != nil { + return + } + } + n, err := buf.Write(msg.Piece) + if err != nil { + break + } + if n != len(msg.Piece) { + panic(n) + } case Extended: - length += msgExtendedIDLen + len(msg.ExtendedPayload) + err = buf.WriteByte(byte(msg.ExtendedID)) + if err != nil { + return + } + _, err = buf.Write(msg.ExtendedPayload) case Port: - length += msgPortLen + err = binary.Write(buf, binary.BigEndian, msg.Port) + case HashRequest: + buf.Write(msg.PiecesRoot[:]) + writeConsecutive(msg.BaseLayer, msg.Index, msg.Length, msg.ProofLayers) default: err = fmt.Errorf("unknown message type: %v", msg.Type) } @@ -146,19 +129,32 @@ func (msg Message) GetDataLength() (length int, err error) { return } +func (msg *Message) WriteTo(w MessageWriter) (err error) { + length, err := msg.getPayloadLength() + if err != nil { + return + } + err = binary.Write(w, binary.BigEndian, length) + if err != nil { + return + } + return msg.writePayloadTo(w) +} + +func (msg *Message) getPayloadLength() (length Integer, err error) { + var lw lengthWriter + err = msg.writePayloadTo(&lw) + length = lw.n + return +} + func (msg Message) MarshalBinary() (data []byte, err error) { // It might look like you could have a pool of buffers and preallocate the message length // prefix, but because we have to return []byte, it becomes non-trivial to make this fast. You // will need a benchmark. var buf bytes.Buffer - if !msg.Keepalive { - _, err = msg.WriteTo(&buf) - } - data = make([]byte, 4+buf.Len()) - binary.BigEndian.PutUint32(data, uint32(buf.Len())) - if buf.Len() != copy(data[4:], buf.Bytes()) { - panic("bad copy") - } + err = msg.WriteTo(&buf) + data = buf.Bytes() return } @@ -189,42 +185,17 @@ func (me *Message) UnmarshalBinary(b []byte) error { return nil } -type dataWriter struct { - writer io.Writer - n int64 +type lengthWriter struct { + n Integer } -func (d *dataWriter) BinaryWrite(order binary.ByteOrder, data any) error { - err := binary.Write(d.writer, order, data) - if err != nil { - return err - } - d.n += int64(binary.Size(data)) +func (l *lengthWriter) WriteByte(c byte) error { + l.n++ return nil } -func (d *dataWriter) Write(bytes []byte) (int, error) { - n, err := d.writer.Write(bytes) - if err != nil { - return n, err - } - d.n += int64(n) - return n, nil -} - -func (d *dataWriter) WriteByte(b byte) error { - n, err := d.Write([]byte{b}) - if err != nil { - return err - } - d.n += int64(n) - return nil -} - -func (d *dataWriter) GetBytesWritten() int64 { - return d.n -} - -func newDataWriter(writer io.Writer) *dataWriter { - return &dataWriter{writer, 0} +func (l *lengthWriter) Write(p []byte) (n int, err error) { + n = len(p) + l.n += Integer(n) + return } diff --git a/requesting.go b/requesting.go index 51419a3599..a59250375a 100644 --- a/requesting.go +++ b/requesting.go @@ -9,9 +9,8 @@ import ( "time" "unsafe" - g "github.com/anacrolix/generics" - "github.com/RoaringBitmap/roaring" + g "github.com/anacrolix/generics" "github.com/anacrolix/generics/heap" "github.com/anacrolix/log" "github.com/anacrolix/multiless"