From a8b07ebb80c0a0ef9a096e94e44f576713735b9b Mon Sep 17 00:00:00 2001 From: sukun Date: Mon, 2 Sep 2024 16:31:25 +0530 Subject: [PATCH] webrtc: increase maximum message size --- p2p/transport/webrtc/stream.go | 23 ++++++++++------------- p2p/transport/webrtc/stream_test.go | 4 ++-- p2p/transport/webrtc/stream_write.go | 5 ++++- p2p/transport/webrtc/transport.go | 2 +- p2p/transport/webrtc/transport_test.go | 4 ++-- 5 files changed, 19 insertions(+), 19 deletions(-) diff --git a/p2p/transport/webrtc/stream.go b/p2p/transport/webrtc/stream.go index 56f869f5e1..032d0476ac 100644 --- a/p2p/transport/webrtc/stream.go +++ b/p2p/transport/webrtc/stream.go @@ -15,15 +15,17 @@ import ( ) const ( - // maxMessageSize is the maximum message size of the Protobuf message we send / receive. - maxMessageSize = 16384 + // maxMessageSizeRead is the maximum message size of the Protobuf message we send / receive. + maxMessageSizeRead = 256 * 1024 + // maxMessageSizeWrite is the maximum message size of the Protobuf message we send / receive. + maxMessageSizeWrite = 64 * 1024 // maxSendBuffer is the maximum data we enqueue on the underlying data channel for writes. // The underlying SCTP layer has an unbounded buffer for writes. We limit the amount enqueued // per stream is limited to avoid a single stream monopolizing the entire connection. - maxSendBuffer = 2 * maxMessageSize + maxSendBuffer = 2 * maxMessageSizeWrite // sendBufferLowThreshold is the threshold below which we write more data on the underlying // data channel. We want a notification as soon as we can write 1 full sized message. - sendBufferLowThreshold = maxSendBuffer - maxMessageSize + sendBufferLowThreshold = maxSendBuffer - maxMessageSizeWrite // maxTotalControlMessagesSize is the maximum total size of all control messages we will // write on this stream. // 4 control messages of size 10 bytes + 10 bytes buffer. This number doesn't need to be @@ -31,15 +33,10 @@ const ( // send queue. maxTotalControlMessagesSize = 50 - // Proto overhead assumption is 5 bytes + // protoOverhead assumption is 5 bytes protoOverhead = 5 - // Varint overhead is assumed to be 2 bytes. This is safe since - // 1. This is only used and when writing message, and - // 2. We only send messages in chunks of `maxMessageSize - varintOverhead` - // which includes the data and the protobuf header. Since `maxMessageSize` - // is less than or equal to 2 ^ 14, the varint will not be more than - // 2 bytes in length. - varintOverhead = 2 + // varintOverhead is the value of `maxMessageSizeWrite` in varint format + varintOverhead = 3 // maxFINACKWait is the maximum amount of time a stream will wait to read // FIN_ACK before closing the data channel maxFINACKWait = 10 * time.Second @@ -106,7 +103,7 @@ func newStream( onDone func(), ) *stream { s := &stream{ - reader: pbio.NewDelimitedReader(rwc, maxMessageSize), + reader: pbio.NewDelimitedReader(rwc, maxMessageSizeRead), writer: pbio.NewDelimitedWriter(rwc), writeStateChanged: make(chan struct{}, 1), id: *channel.ID(), diff --git a/p2p/transport/webrtc/stream_test.go b/p2p/transport/webrtc/stream_test.go index 0f57f44f79..6d36eb9374 100644 --- a/p2p/transport/webrtc/stream_test.go +++ b/p2p/transport/webrtc/stream_test.go @@ -545,7 +545,7 @@ func TestStreamChunking(t *testing.T) { clientStr := newStream(client.dc, client.rwc, func() {}) serverStr := newStream(server.dc, server.rwc, func() {}) - const N = (16 << 10) + 1000 + const N = (64 << 10) + 1000 go func() { data := make([]byte, N) _, err := clientStr.Write(data) @@ -555,7 +555,7 @@ func TestStreamChunking(t *testing.T) { data := make([]byte, N) n, err := serverStr.Read(data) require.NoError(t, err) - require.LessOrEqual(t, n, 16<<10) + require.LessOrEqual(t, n, 64<<10) nn, err := serverStr.Read(data) require.NoError(t, err) diff --git a/p2p/transport/webrtc/stream_write.go b/p2p/transport/webrtc/stream_write.go index 534a8d8e60..7cc9f5f01a 100644 --- a/p2p/transport/webrtc/stream_write.go +++ b/p2p/transport/webrtc/stream_write.go @@ -2,6 +2,7 @@ package libp2pwebrtc import ( "errors" + "fmt" "os" "time" @@ -78,13 +79,14 @@ func (s *stream) Write(b []byte) (int, error) { select { case <-writeDeadlineChan: s.mx.Lock() + fmt.Println("returning deadline exceeded") return n, os.ErrDeadlineExceeded case <-s.writeStateChanged: } s.mx.Lock() continue } - end := maxMessageSize + end := maxMessageSizeWrite if end > availableSpace { end = availableSpace } @@ -94,6 +96,7 @@ func (s *stream) Write(b []byte) (int, error) { } msg = pb.Message{Message: b[:end]} if err := s.writer.WriteMsg(&msg); err != nil { + fmt.Println("err", err) return n, err } n += end diff --git a/p2p/transport/webrtc/transport.go b/p2p/transport/webrtc/transport.go index c4c16fd402..447e3755da 100644 --- a/p2p/transport/webrtc/transport.go +++ b/p2p/transport/webrtc/transport.go @@ -68,7 +68,7 @@ const ( DefaultFailedTimeout = 30 * time.Second DefaultKeepaliveTimeout = 15 * time.Second - sctpReceiveBufferSize = 100_000 + sctpReceiveBufferSize = maxMessageSizeRead * 20 ) type WebRTCTransport struct { diff --git a/p2p/transport/webrtc/transport_test.go b/p2p/transport/webrtc/transport_test.go index b2c06d3374..a1363874af 100644 --- a/p2p/transport/webrtc/transport_test.go +++ b/p2p/transport/webrtc/transport_test.go @@ -546,7 +546,7 @@ func TestTransportWebRTC_Deadline(t *testing.T) { require.NoError(t, err) stream.SetWriteDeadline(time.Now().Add(100 * time.Millisecond)) - largeBuffer := make([]byte, 2*1024*1024) + largeBuffer := make([]byte, 20*1024*1024) _, err = stream.Write(largeBuffer) require.ErrorIs(t, err, os.ErrDeadlineExceeded) @@ -595,7 +595,7 @@ func TestTransportWebRTC_StreamWriteBufferContention(t *testing.T) { require.NoError(t, err) stream.SetWriteDeadline(time.Now().Add(200 * time.Millisecond)) - largeBuffer := make([]byte, 2*1024*1024) + largeBuffer := make([]byte, 20*1024*1024) _, err = stream.Write(largeBuffer) errC <- err }()