Skip to content

Commit

Permalink
Provide SCTP Association OnClose callback
Browse files Browse the repository at this point in the history
The only way to detect a remote connection close is to check the state
of the SCTP transport.
See:
w3c/webrtc-pc#1799 (comment)

>In case it wasn't clear, I'm arguing for an exception for data
channels, saying pc1.close() should fire close on both ends of a data
channel, because A) that is useful, and the only way to detect remote
closing of a peer connection, and B) it's a workaround for the fact
that closing of peer connections isn't advertised to the other end.

Depending on datachannel close to notify us requires out of band
protocol agreement as datachannels can be closed by the remote for
other reasons. Having access to the SCTP layer we can provide an
accurate SCTP Assocation closed event.
  • Loading branch information
sukunrt committed Aug 8, 2024
1 parent c4d56d4 commit 529b9f2
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 4 deletions.
30 changes: 27 additions & 3 deletions sctptransport.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type SCTPTransport struct {
// OnStateChange func()

onErrorHandler func(error)
onCloseHandler func(error)

sctpAssociation *sctp.Association
onDataChannelHandler func(*DataChannel)
Expand Down Expand Up @@ -176,6 +177,7 @@ func (r *SCTPTransport) acceptDataChannels(a *sctp.Association) {
dataChannels = append(dataChannels, dc.dataChannel)
}
r.lock.RUnlock()

ACCEPT:
for {
dc, err := datachannel.Accept(a, &datachannel.Config{
Expand All @@ -185,6 +187,9 @@ ACCEPT:
if !errors.Is(err, io.EOF) {
r.log.Errorf("Failed to accept data channel: %v", err)
r.onError(err)
r.onClose(err)

Check warning on line 190 in sctptransport.go

View check run for this annotation

Codecov / codecov/patch

sctptransport.go#L190

Added line #L190 was not covered by tests
} else {
r.onClose(nil)
}
return
}
Expand Down Expand Up @@ -232,9 +237,12 @@ ACCEPT:
MaxRetransmits: maxRetransmits,
}, r, r.api.settingEngine.LoggerFactory.NewLogger("ortc"))
if err != nil {
// This data channel is invalid. Close it an log an error.
dc.Close()

Check failure on line 241 in sctptransport.go

View workflow job for this annotation

GitHub Actions / lint / Go

Error return value of `dc.Close` is not checked (errcheck)

Check warning on line 241 in sctptransport.go

View check run for this annotation

Codecov / codecov/patch

sctptransport.go#L241

Added line #L241 was not covered by tests
r.log.Errorf("Failed to accept data channel: %v", err)
r.onError(err)
return
// We've received a datachannel with invalid configuration. We can still receive other datachannels.
continue ACCEPT

Check warning on line 245 in sctptransport.go

View check run for this annotation

Codecov / codecov/patch

sctptransport.go#L245

Added line #L245 was not covered by tests
}

<-r.onDataChannel(rtcDC)
Expand All @@ -251,8 +259,7 @@ ACCEPT:
}
}

// OnError sets an event handler which is invoked when
// the SCTP connection error occurs.
// OnError sets an event handler which is invoked when the SCTP Association errors.
func (r *SCTPTransport) OnError(f func(err error)) {
r.lock.Lock()
defer r.lock.Unlock()
Expand All @@ -269,6 +276,23 @@ func (r *SCTPTransport) onError(err error) {
}
}

// OnClose sets an event handler which is invoked when the SCTP Association closes.
func (r *SCTPTransport) OnClose(f func(err error)) {
r.lock.Lock()
defer r.lock.Unlock()
r.onCloseHandler = f
}

func (r *SCTPTransport) onClose(err error) {
r.lock.RLock()
handler := r.onCloseHandler
r.lock.RUnlock()

if handler != nil {
go handler(err)
}
}

// OnDataChannel sets an event handler which is invoked when a data
// channel message arrives from a remote peer.
func (r *SCTPTransport) OnDataChannel(f func(*DataChannel)) {
Expand Down
66 changes: 65 additions & 1 deletion sctptransport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,13 @@

package webrtc

import "testing"
import (
"bytes"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestGenerateDataChannelID(t *testing.T) {
sctpTransportWithChannels := func(ids []uint16) *SCTPTransport {
Expand Down Expand Up @@ -55,3 +61,61 @@ func TestGenerateDataChannelID(t *testing.T) {
}
}
}

func TestSCTPTransportOnClose(t *testing.T) {
offerPC, answerPC, err := newPair()
require.NoError(t, err)

answerPC.OnDataChannel(func(dc *DataChannel) {
dc.OnMessage(func(msg DataChannelMessage) {

Check warning on line 70 in sctptransport_test.go

View workflow job for this annotation

GitHub Actions / lint / Go

unused-parameter: parameter 'msg' seems to be unused, consider removing or renaming it as _ (revive)
dc.Send([]byte("hello"))

Check failure on line 71 in sctptransport_test.go

View workflow job for this annotation

GitHub Actions / lint / Go

Error return value of `dc.Send` is not checked (errcheck)
})
})

recvMsg := make(chan struct{}, 1)
offerPC.OnConnectionStateChange(func(state PeerConnectionState) {
if state == PeerConnectionStateConnected {
defer func() {
offerPC.OnConnectionStateChange(nil)
}()

dc, createErr := offerPC.CreateDataChannel(expectedLabel, nil)
if createErr != nil {
t.Errorf("Failed to create a PC pair for testing")
return
}
dc.OnMessage(func(msg DataChannelMessage) {
if !bytes.Equal(msg.Data, []byte("hello")) {
t.Error("invalid msg received")
}
recvMsg <- struct{}{}
})
dc.OnOpen(func() {
dc.Send([]byte("hello"))

Check failure on line 94 in sctptransport_test.go

View workflow job for this annotation

GitHub Actions / lint / Go

Error return value of `dc.Send` is not checked (errcheck)
})
}
})

err = signalPair(offerPC, answerPC)
require.NoError(t, err)

select {
case <-recvMsg:
case <-time.After(5 * time.Second):
t.Fatal("timed out")
}

// setup SCTP OnClose callback
ch := make(chan error, 1)
answerPC.SCTP().OnClose(func(err error) {
ch <- err
})

offerPC.Close() // This will trigger sctp onclose callback on remote

Check failure on line 114 in sctptransport_test.go

View workflow job for this annotation

GitHub Actions / lint / Go

Error return value of `offerPC.Close` is not checked (errcheck)

select {
case <-ch:
case <-time.After(5 * time.Second):
t.Fatal("timed out")
}
}

0 comments on commit 529b9f2

Please sign in to comment.