Skip to content

Commit

Permalink
webrtc: increase maximum message size
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt committed Sep 3, 2024
1 parent 601db42 commit a8b07eb
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 19 deletions.
23 changes: 10 additions & 13 deletions p2p/transport/webrtc/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,28 @@ import (
)

const (
// maxMessageSize is the maximum message size of the Protobuf message we send / receive.
maxMessageSize = 16384
// maxMessageSizeRead is the maximum message size of the Protobuf message we send / receive.
maxMessageSizeRead = 256 * 1024
// maxMessageSizeWrite is the maximum message size of the Protobuf message we send / receive.
maxMessageSizeWrite = 64 * 1024
// maxSendBuffer is the maximum data we enqueue on the underlying data channel for writes.
// The underlying SCTP layer has an unbounded buffer for writes. We limit the amount enqueued
// per stream is limited to avoid a single stream monopolizing the entire connection.
maxSendBuffer = 2 * maxMessageSize
maxSendBuffer = 2 * maxMessageSizeWrite
// sendBufferLowThreshold is the threshold below which we write more data on the underlying
// data channel. We want a notification as soon as we can write 1 full sized message.
sendBufferLowThreshold = maxSendBuffer - maxMessageSize
sendBufferLowThreshold = maxSendBuffer - maxMessageSizeWrite
// maxTotalControlMessagesSize is the maximum total size of all control messages we will
// write on this stream.
// 4 control messages of size 10 bytes + 10 bytes buffer. This number doesn't need to be
// exact. In the worst case, we enqueue these many bytes more in the webrtc peer connection
// send queue.
maxTotalControlMessagesSize = 50

// Proto overhead assumption is 5 bytes
// protoOverhead assumption is 5 bytes
protoOverhead = 5
// Varint overhead is assumed to be 2 bytes. This is safe since
// 1. This is only used and when writing message, and
// 2. We only send messages in chunks of `maxMessageSize - varintOverhead`
// which includes the data and the protobuf header. Since `maxMessageSize`
// is less than or equal to 2 ^ 14, the varint will not be more than
// 2 bytes in length.
varintOverhead = 2
// varintOverhead is the value of `maxMessageSizeWrite` in varint format
varintOverhead = 3
// maxFINACKWait is the maximum amount of time a stream will wait to read
// FIN_ACK before closing the data channel
maxFINACKWait = 10 * time.Second
Expand Down Expand Up @@ -106,7 +103,7 @@ func newStream(
onDone func(),
) *stream {
s := &stream{
reader: pbio.NewDelimitedReader(rwc, maxMessageSize),
reader: pbio.NewDelimitedReader(rwc, maxMessageSizeRead),
writer: pbio.NewDelimitedWriter(rwc),
writeStateChanged: make(chan struct{}, 1),
id: *channel.ID(),
Expand Down
4 changes: 2 additions & 2 deletions p2p/transport/webrtc/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ func TestStreamChunking(t *testing.T) {
clientStr := newStream(client.dc, client.rwc, func() {})
serverStr := newStream(server.dc, server.rwc, func() {})

const N = (16 << 10) + 1000
const N = (64 << 10) + 1000
go func() {
data := make([]byte, N)
_, err := clientStr.Write(data)
Expand All @@ -555,7 +555,7 @@ func TestStreamChunking(t *testing.T) {
data := make([]byte, N)
n, err := serverStr.Read(data)
require.NoError(t, err)
require.LessOrEqual(t, n, 16<<10)
require.LessOrEqual(t, n, 64<<10)

nn, err := serverStr.Read(data)
require.NoError(t, err)
Expand Down
5 changes: 4 additions & 1 deletion p2p/transport/webrtc/stream_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package libp2pwebrtc

import (
"errors"
"fmt"
"os"
"time"

Expand Down Expand Up @@ -78,13 +79,14 @@ func (s *stream) Write(b []byte) (int, error) {
select {
case <-writeDeadlineChan:
s.mx.Lock()
fmt.Println("returning deadline exceeded")
return n, os.ErrDeadlineExceeded
case <-s.writeStateChanged:
}
s.mx.Lock()
continue
}
end := maxMessageSize
end := maxMessageSizeWrite
if end > availableSpace {
end = availableSpace
}
Expand All @@ -94,6 +96,7 @@ func (s *stream) Write(b []byte) (int, error) {
}
msg = pb.Message{Message: b[:end]}
if err := s.writer.WriteMsg(&msg); err != nil {
fmt.Println("err", err)
return n, err
}
n += end
Expand Down
2 changes: 1 addition & 1 deletion p2p/transport/webrtc/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ const (
DefaultFailedTimeout = 30 * time.Second
DefaultKeepaliveTimeout = 15 * time.Second

sctpReceiveBufferSize = 100_000
sctpReceiveBufferSize = maxMessageSizeRead * 20
)

type WebRTCTransport struct {
Expand Down
4 changes: 2 additions & 2 deletions p2p/transport/webrtc/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ func TestTransportWebRTC_Deadline(t *testing.T) {
require.NoError(t, err)

stream.SetWriteDeadline(time.Now().Add(100 * time.Millisecond))
largeBuffer := make([]byte, 2*1024*1024)
largeBuffer := make([]byte, 20*1024*1024)
_, err = stream.Write(largeBuffer)
require.ErrorIs(t, err, os.ErrDeadlineExceeded)

Expand Down Expand Up @@ -595,7 +595,7 @@ func TestTransportWebRTC_StreamWriteBufferContention(t *testing.T) {
require.NoError(t, err)

stream.SetWriteDeadline(time.Now().Add(200 * time.Millisecond))
largeBuffer := make([]byte, 2*1024*1024)
largeBuffer := make([]byte, 20*1024*1024)
_, err = stream.Write(largeBuffer)
errC <- err
}()
Expand Down

0 comments on commit a8b07eb

Please sign in to comment.