Skip to content

Commit

Permalink
Close unhandled rtcp simulcast streams
Browse files Browse the repository at this point in the history
handleIncomingSSRC will call streamsForSSRC which
opens rtp/rtcp streams that if unhandled can be
leaked resources. Now we will proactively open
them before calling handleIncomingSSRC and close
then later. In the future it would be better to
do this inside handleIncomingSSRC to protect other
callers.
  • Loading branch information
edaniels committed Jul 23, 2024
1 parent c85269b commit 93d9dad
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 10 deletions.
14 changes: 10 additions & 4 deletions dtlstransport.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type DTLSTransport struct {

srtpSession, srtcpSession atomic.Value
srtpEndpoint, srtcpEndpoint *mux.Endpoint
simulcastStreams []*srtp.ReadStreamSRTP
simulcastStreams []simulcastStreamPair
srtpReady chan struct{}

dtlsMatcher mux.MatchFunc
Expand All @@ -60,6 +60,11 @@ type DTLSTransport struct {
log logging.LeveledLogger
}

type simulcastStreamPair struct {
srtp *srtp.ReadStreamSRTP
srtcp *srtp.ReadStreamSRTCP
}

// NewDTLSTransport creates a new DTLSTransport.
// This constructor is part of the ORTC API. It is not
// meant to be used together with the basic WebRTC API.
Expand Down Expand Up @@ -436,7 +441,8 @@ func (t *DTLSTransport) Stop() error {
}

for i := range t.simulcastStreams {
closeErrs = append(closeErrs, t.simulcastStreams[i].Close())
closeErrs = append(closeErrs, t.simulcastStreams[i].srtp.Close())
closeErrs = append(closeErrs, t.simulcastStreams[i].srtcp.Close())
}

if t.conn != nil {
Expand Down Expand Up @@ -477,11 +483,11 @@ func (t *DTLSTransport) ensureICEConn() error {
return nil
}

func (t *DTLSTransport) storeSimulcastStream(s *srtp.ReadStreamSRTP) {
func (t *DTLSTransport) storeSimulcastStream(srtpReadStream *srtp.ReadStreamSRTP, srtcpReadStream *srtp.ReadStreamSRTCP) {
t.lock.Lock()
defer t.lock.Unlock()

t.simulcastStreams = append(t.simulcastStreams, s)
t.simulcastStreams = append(t.simulcastStreams, simulcastStreamPair{srtpReadStream, srtcpReadStream})
}

func (t *DTLSTransport) streamsForSSRC(ssrc SSRC, streamInfo interceptor.StreamInfo) (*srtp.ReadStreamSRTP, interceptor.RTPReader, *srtp.ReadStreamSRTCP, interceptor.RTCPReader, error) {
Expand Down
29 changes: 23 additions & 6 deletions peerconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -1569,7 +1569,8 @@ func (pc *PeerConnection) handleIncomingSSRC(rtpStream io.Reader, ssrc SSRC) err
}
}

// If the remote SDP was only one media section the ssrc doesn't have to be explicitly declared
// If the remote SDP was only one media (non-datachannel) section the ssrc doesn't
// have to be explicitly declared
if handled, err := pc.handleUndeclaredSSRC(ssrc, remoteDescription); handled || err != nil {
return err
}
Expand Down Expand Up @@ -1670,26 +1671,42 @@ func (pc *PeerConnection) undeclaredRTPMediaProcessor() {
return
}

stream, ssrc, err := srtpSession.AcceptStream()
srtcpSession, err := pc.dtlsTransport.getSRTCPSession()
if err != nil {
pc.log.Warnf("undeclaredMediaProcessor failed to open SrtcpSession: %v", err)
return

Check warning on line 1677 in peerconnection.go

View check run for this annotation

Codecov / codecov/patch

peerconnection.go#L1676-L1677

Added lines #L1676 - L1677 were not covered by tests
}

srtpReadStream, ssrc, err := srtpSession.AcceptStream()
if err != nil {
pc.log.Warnf("Failed to accept RTP %v", err)
return
}

// open accompanying srtcp stream
srtcpReadStream, err := srtcpSession.OpenReadStream(ssrc)
if err != nil {
pc.log.Warnf("Failed to open RTCP stream for %d: %v", ssrc, err)
return

Check warning on line 1690 in peerconnection.go

View check run for this annotation

Codecov / codecov/patch

peerconnection.go#L1689-L1690

Added lines #L1689 - L1690 were not covered by tests
}

if pc.isClosed.get() {
if err = stream.Close(); err != nil {
if err = srtpReadStream.Close(); err != nil {

Check warning on line 1694 in peerconnection.go

View check run for this annotation

Codecov / codecov/patch

peerconnection.go#L1694

Added line #L1694 was not covered by tests
pc.log.Warnf("Failed to close RTP stream %v", err)
}
if err = srtcpReadStream.Close(); err != nil {
pc.log.Warnf("Failed to close RTCP stream %v", err)

Check warning on line 1698 in peerconnection.go

View check run for this annotation

Codecov / codecov/patch

peerconnection.go#L1697-L1698

Added lines #L1697 - L1698 were not covered by tests
}
continue
}

pc.dtlsTransport.storeSimulcastStream(srtpReadStream, srtcpReadStream)

if ssrc == 0 {
go pc.handleNonMediaBandwidthProbe()
continue
}

pc.dtlsTransport.storeSimulcastStream(stream)

if atomic.AddUint64(&simulcastRoutineCount, 1) >= simulcastMaxProbeRoutines {
atomic.AddUint64(&simulcastRoutineCount, ^uint64(0))
pc.log.Warn(ErrSimulcastProbeOverflow.Error())
Expand All @@ -1701,7 +1718,7 @@ func (pc *PeerConnection) undeclaredRTPMediaProcessor() {
pc.log.Errorf(incomingUnhandledRTPSsrc, ssrc, err)
}
atomic.AddUint64(&simulcastRoutineCount, ^uint64(0))
}(stream, SSRC(ssrc))
}(srtpReadStream, SSRC(ssrc))
}
}

Expand Down

0 comments on commit 93d9dad

Please sign in to comment.