Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RTX support for v3 #2666

Merged
merged 6 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ const (
generatedCertificateOrigin = "WebRTC"

sdesRepairRTPStreamIDURI = "urn:ietf:params:rtp-hdrext:sdes:repaired-rtp-stream-id"

// AttributeRtxPayloadType is the interceptor attribute added when Read() returns an RTX packet containing the RTX stream payload type
AttributeRtxPayloadType = "rtx_payload_type"
// AttributeRtxSsrc is the interceptor attribute added when Read() returns an RTX packet containing the RTX stream SSRC
AttributeRtxSsrc = "rtx_ssrc"
// AttributeRtxSequenceNumber is the interceptor attribute added when Read() returns an RTX packet containing the RTX stream sequence number
AttributeRtxSequenceNumber = "rtx_sequence_number"
)

func defaultSrtpProtectionProfiles() []dtls.SRTPProtectionProfile {
Expand Down
11 changes: 10 additions & 1 deletion mediaengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,9 +415,11 @@ func (m *MediaEngine) matchRemoteCodec(remoteCodec RTPCodecParameters, typ RTPCo
}

aptMatch := codecMatchNone
var aptCodec RTPCodecParameters
for _, codec := range exactMatches {
if codec.PayloadType == PayloadType(payloadType) {
aptMatch = codecMatchExact
aptCodec = codec
break
}
}
Expand All @@ -426,6 +428,7 @@ func (m *MediaEngine) matchRemoteCodec(remoteCodec RTPCodecParameters, typ RTPCo
for _, codec := range partialMatches {
if codec.PayloadType == PayloadType(payloadType) {
aptMatch = codecMatchPartial
aptCodec = codec
break
}
}
Expand All @@ -435,8 +438,14 @@ func (m *MediaEngine) matchRemoteCodec(remoteCodec RTPCodecParameters, typ RTPCo
return codecMatchNone, nil // not an error, we just ignore this codec we don't support
}

// replace the apt value with the original codec's payload type
toMatchCodec := remoteCodec
if aptMatched, mt := codecParametersFuzzySearch(aptCodec, codecs); mt == aptMatch {
toMatchCodec.SDPFmtpLine = strings.Replace(toMatchCodec.SDPFmtpLine, fmt.Sprintf("apt=%d", payloadType), fmt.Sprintf("apt=%d", aptMatched.PayloadType), 1)
}

// if apt's media codec is partial match, then apt codec must be partial match too
_, matchType := codecParametersFuzzySearch(remoteCodec, codecs)
_, matchType := codecParametersFuzzySearch(toMatchCodec, codecs)
if matchType == codecMatchExact && aptMatch == codecMatchPartial {
matchType = codecMatchPartial
}
Expand Down
64 changes: 58 additions & 6 deletions mediaengine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,18 @@ a=rtpmap:96 VP8/90000
o=- 4596489990601351948 2 IN IP4 127.0.0.1
s=-
t=0 0
m=video 60323 UDP/TLS/RTP/SAVPF 94 96 97
m=video 60323 UDP/TLS/RTP/SAVPF 94 95 106 107 108 109 96 97
a=rtpmap:94 VP8/90000
a=rtpmap:95 rtx/90000
a=fmtp:95 apt=94
a=rtpmap:106 H264/90000
a=fmtp:106 level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42001f
a=rtpmap:107 rtx/90000
a=fmtp:107 apt=106
a=rtpmap:108 H264/90000
a=fmtp:108 level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=42001f
a=rtpmap:109 rtx/90000
a=fmtp:109 apt=108
a=rtpmap:96 VP9/90000
a=fmtp:96 profile-id=2
a=rtpmap:97 rtx/90000
Expand All @@ -318,22 +328,64 @@ a=fmtp:97 apt=96
m := MediaEngine{}
assert.NoError(t, m.RegisterCodec(RTPCodecParameters{
RTPCodecCapability: RTPCodecCapability{MimeTypeVP8, 90000, 0, "", nil},
PayloadType: 94,
}, RTPCodecTypeVideo))
assert.NoError(t, m.RegisterCodec(RTPCodecParameters{
RTPCodecCapability: RTPCodecCapability{MimeTypeVP9, 90000, 0, "profile-id=2", nil},
PayloadType: 96,
}, RTPCodecTypeVideo))
assert.NoError(t, m.RegisterCodec(RTPCodecParameters{
RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=96", nil},
PayloadType: 97,
}, RTPCodecTypeVideo))
assert.NoError(t, m.RegisterCodec(RTPCodecParameters{
RTPCodecCapability: RTPCodecCapability{MimeTypeH264, 90000, 0, "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42001f", nil},
PayloadType: 102,
}, RTPCodecTypeVideo))
assert.NoError(t, m.RegisterCodec(RTPCodecParameters{
RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=102", nil},
PayloadType: 103,
}, RTPCodecTypeVideo))
assert.NoError(t, m.RegisterCodec(RTPCodecParameters{
RTPCodecCapability: RTPCodecCapability{MimeTypeH264, 90000, 0, "level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=42001f", nil},
PayloadType: 104,
}, RTPCodecTypeVideo))
assert.NoError(t, m.RegisterCodec(RTPCodecParameters{
RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=104", nil},
PayloadType: 105,
}, RTPCodecTypeVideo))
assert.NoError(t, m.RegisterCodec(RTPCodecParameters{
RTPCodecCapability: RTPCodecCapability{MimeTypeVP9, 90000, 0, "profile-id=2", nil},
PayloadType: 98,
}, RTPCodecTypeVideo))
assert.NoError(t, m.RegisterCodec(RTPCodecParameters{
RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=98", nil},
PayloadType: 99,
}, RTPCodecTypeVideo))
assert.NoError(t, m.updateFromRemoteDescription(mustParse(profileLevels)))

assert.True(t, m.negotiatedVideo)

_, _, err := m.getCodecByPayload(97)
vp9Codec, _, err := m.getCodecByPayload(96)
assert.NoError(t, err)
assert.Equal(t, vp9Codec.MimeType, MimeTypeVP9)
vp9RTX, _, err := m.getCodecByPayload(97)
assert.NoError(t, err)
assert.Equal(t, vp9RTX.MimeType, "video/rtx")

h264P1Codec, _, err := m.getCodecByPayload(106)
assert.NoError(t, err)
assert.Equal(t, h264P1Codec.MimeType, MimeTypeH264)
assert.Equal(t, h264P1Codec.SDPFmtpLine, "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42001f")
h264P1RTX, _, err := m.getCodecByPayload(107)
assert.NoError(t, err)
assert.Equal(t, h264P1RTX.MimeType, "video/rtx")
assert.Equal(t, h264P1RTX.SDPFmtpLine, "apt=106")

h264P0Codec, _, err := m.getCodecByPayload(108)
assert.NoError(t, err)
assert.Equal(t, h264P0Codec.MimeType, MimeTypeH264)
assert.Equal(t, h264P0Codec.SDPFmtpLine, "level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=42001f")
h264P0RTX, _, err := m.getCodecByPayload(109)
assert.NoError(t, err)
assert.Equal(t, h264P0RTX.MimeType, "video/rtx")
assert.Equal(t, h264P0RTX.SDPFmtpLine, "apt=108")
})

t.Run("Matches when rtx apt for partial match codec", func(t *testing.T) {
Expand Down
9 changes: 5 additions & 4 deletions peerconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -1576,12 +1576,11 @@ func (pc *PeerConnection) handleIncomingSSRC(rtpStream io.Reader, ssrc SSRC) err
return err
}

var mid, rid, rsid string
payloadType, paddingOnly, err := handleUnknownRTPPacket(b[:i], uint8(midExtensionID), uint8(streamIDExtensionID), uint8(repairStreamIDExtensionID), &mid, &rid, &rsid)
if err != nil {
return err
if i < 4 {
return errRTPTooShort
}

payloadType := PayloadType(b[1] & 0x7f)
params, err := pc.api.mediaEngine.getRTPParametersByPayloadType(payloadType)
if err != nil {
return err
Expand All @@ -1593,6 +1592,8 @@ func (pc *PeerConnection) handleIncomingSSRC(rtpStream io.Reader, ssrc SSRC) err
return err
}

var mid, rid, rsid string
var paddingOnly bool
for readCount := 0; readCount <= simulcastProbeCount; readCount++ {
if mid == "" || (rid == "" && rsid == "") {
// skip padding only packets for probing
Expand Down
98 changes: 92 additions & 6 deletions rtpreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package webrtc

import (
"encoding/binary"
"fmt"
"io"
"sync"
Expand All @@ -31,13 +32,28 @@ type trackStreams struct {
rtcpReadStream *srtp.ReadStreamSRTCP
rtcpInterceptor interceptor.RTCPReader

repairReadStream *srtp.ReadStreamSRTP
repairInterceptor interceptor.RTPReader
repairReadStream *srtp.ReadStreamSRTP
repairInterceptor interceptor.RTPReader
repairStreamChannel chan rtxPacketWithAttributes

repairRtcpReadStream *srtp.ReadStreamSRTCP
repairRtcpInterceptor interceptor.RTCPReader
}

type rtxPacketWithAttributes struct {
pkt []byte
attributes interceptor.Attributes
pool *sync.Pool
}

func (p *rtxPacketWithAttributes) release() {
if p.pkt != nil {
b := p.pkt[:cap(p.pkt)]
p.pool.Put(b) // nolint:staticcheck
p.pkt = nil
}
}

// RTPReceiver allows an application to inspect the receipt of a TrackRemote
type RTPReceiver struct {
kind RTPCodecType
Expand All @@ -52,6 +68,8 @@ type RTPReceiver struct {

// A reference to the associated api object
api *API

rtxPool sync.Pool
}

// NewRTPReceiver constructs a new RTPReceiver
Expand All @@ -67,6 +85,9 @@ func (api *API) NewRTPReceiver(kind RTPCodecType, transport *DTLSTransport) (*RT
closed: make(chan interface{}),
received: make(chan interface{}),
tracks: []trackStreams{},
rtxPool: sync.Pool{New: func() interface{} {
return make([]byte, api.settingEngine.getReceiveMTU())
}},
}

return r, nil
Expand Down Expand Up @@ -145,6 +166,7 @@ func (r *RTPReceiver) configureReceive(parameters RTPReceiveParameters) {
track: newTrackRemote(
r.kind,
parameters.Encodings[i].SSRC,
parameters.Encodings[i].RTX.SSRC,
parameters.Encodings[i].RID,
r,
),
Expand Down Expand Up @@ -379,8 +401,6 @@ func (r *RTPReceiver) receiveForRid(rid string, params RTPParameters, streamInfo
}

// receiveForRtx starts a routine that processes the repair stream
// These packets aren't exposed to the user yet, but we need to process them for
// TWCC
func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *interceptor.StreamInfo, rtpReadStream *srtp.ReadStreamSRTP, rtpInterceptor interceptor.RTPReader, rtcpReadStream *srtp.ReadStreamSRTCP, rtcpInterceptor interceptor.RTCPReader) error {
var track *trackStreams
if ssrc != 0 && len(r.tracks) == 1 {
Expand All @@ -402,12 +422,56 @@ func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *intercep
track.repairInterceptor = rtpInterceptor
track.repairRtcpReadStream = rtcpReadStream
track.repairRtcpInterceptor = rtcpInterceptor
track.repairStreamChannel = make(chan rtxPacketWithAttributes)

go func() {
b := make([]byte, r.api.settingEngine.getReceiveMTU())
for {
if _, _, readErr := track.repairInterceptor.Read(b, nil); readErr != nil {
b := r.rtxPool.Get().([]byte) // nolint:forcetypeassert
i, attributes, err := track.repairInterceptor.Read(b, nil)
if err != nil {
r.rtxPool.Put(b) // nolint:staticcheck
return
}

// RTX packets have a different payload format. Move the OSN in the payload to the RTP header and rewrite the
// payload type and SSRC, so that we can return RTX packets to the caller 'transparently' i.e. in the same format
// as non-RTX RTP packets
hasExtension := b[0]&0b10000 > 0
hasPadding := b[0]&0b100000 > 0
csrcCount := b[0] & 0b1111
headerLength := uint16(12 + (4 * csrcCount))
paddingLength := 0
if hasExtension {
headerLength += 4 * (1 + binary.BigEndian.Uint16(b[headerLength+2:headerLength+4]))
}
if hasPadding {
paddingLength = int(b[i-1])
}

if i-int(headerLength)-paddingLength < 2 {
// BWE probe packet, ignore
r.rtxPool.Put(b) // nolint:staticcheck
continue
}

if attributes == nil {
attributes = make(interceptor.Attributes)
}
attributes.Set(AttributeRtxPayloadType, b[1]&0x7F)
attributes.Set(AttributeRtxSequenceNumber, binary.BigEndian.Uint16(b[2:4]))
attributes.Set(AttributeRtxSsrc, binary.BigEndian.Uint32(b[8:12]))

b[1] = (b[1] & 0x80) | uint8(track.track.PayloadType())
b[2] = b[headerLength]
b[3] = b[headerLength+1]
binary.BigEndian.PutUint32(b[8:12], uint32(track.track.SSRC()))
copy(b[headerLength:i-2], b[headerLength+2:i])

select {
case <-r.closed:
r.rtxPool.Put(b) // nolint:staticcheck
return
case track.repairStreamChannel <- rtxPacketWithAttributes{pkt: b[:i-2], attributes: attributes, pool: &r.rtxPool}:
}
}
}()
Expand Down Expand Up @@ -446,3 +510,25 @@ func (r *RTPReceiver) setRTPReadDeadline(deadline time.Time, reader *TrackRemote
}
return fmt.Errorf("%w: %d", errRTPReceiverWithSSRCTrackStreamNotFound, reader.SSRC())
}

// readRTX returns an RTX packet if one is available on the RTX track, otherwise returns nil
func (r *RTPReceiver) readRTX(reader *TrackRemote) *rtxPacketWithAttributes {
if !reader.HasRTX() {
return nil
}

select {
case <-r.received:
default:
return nil
}

if t := r.streamsForTrack(reader); t != nil {
select {
case rtxPacketReceived := <-t.repairStreamChannel:
return &rtxPacketReceived
default:
}
}
return nil
}
6 changes: 6 additions & 0 deletions sdp.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,12 @@ func trackDetailsFromSDP(log logging.LeveledLogger, s *sdp.SessionDescription) (
}
rtxRepairFlows[rtxRepairFlow] = baseSsrc
tracksInMediaSection = filterTrackWithSSRC(tracksInMediaSection, SSRC(rtxRepairFlow)) // Remove if rtx was added as track before
for i := range tracksInMediaSection {
if tracksInMediaSection[i].ssrcs[0] == SSRC(baseSsrc) {
repairSsrc := SSRC(rtxRepairFlow)
tracksInMediaSection[i].repairSsrc = &repairSsrc
}
}
}
}

Expand Down
Loading
Loading