Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sample Builder: Use Jitter Buffer implementation #2679

Merged
merged 1 commit into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/pion/webrtc/v4

go 1.13

Check failure on line 3 in go.mod

View workflow job for this annotation

GitHub Actions / lint / Metadata

Invalid Go version

Found 1.13. Expected 1.19

require (
github.com/onsi/ginkgo v1.16.5 // indirect
Expand All @@ -8,7 +8,7 @@
github.com/pion/datachannel v1.5.5
github.com/pion/dtls/v2 v2.2.10
github.com/pion/ice/v3 v3.0.3
github.com/pion/interceptor v0.1.25
github.com/pion/interceptor v0.1.27
github.com/pion/logging v0.2.2
github.com/pion/randutil v0.1.0
github.com/pion/rtcp v1.2.14
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,17 @@ github.com/pion/dtls/v2 v2.2.10 h1:u2Axk+FyIR1VFTPurktB+1zoEPGIW3bmyj3LEFrXjAA=
github.com/pion/dtls/v2 v2.2.10/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE=
github.com/pion/ice/v3 v3.0.3 h1:Mu5QkZ2pYmcjq9JETDcDR7F8UzjP1VHmcZmgU0yqsyk=
github.com/pion/ice/v3 v3.0.3/go.mod h1:YMLU7qbKfVjmEv7EoZPIVEI+kNYxWCdPK3VS0BU+U4Q=
github.com/pion/interceptor v0.1.25 h1:pwY9r7P6ToQ3+IF0bajN0xmk/fNw/suTgaTdlwTDmhc=
github.com/pion/interceptor v0.1.25/go.mod h1:wkbPYAak5zKsfpVDYMtEfWEy8D4zL+rpxCxPImLOg3Y=
github.com/pion/interceptor v0.1.27 h1:mZ01OiGiukwRxezmDGzYjjokCVlDOk4T6BfaL5qrtGo=
github.com/pion/interceptor v0.1.27/go.mod h1:/vVaqLwDjGv4GRbgmChIKZIT5EXFDijwmj4WmIYy9bI=
github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
github.com/pion/mdns v0.0.12 h1:CiMYlY+O0azojWDmxdNr7ADGrnZ+V6Ilfner+6mSVK8=
github.com/pion/mdns v0.0.12/go.mod h1:VExJjv8to/6Wqm1FXK+Ii/Z9tsVk/F5sD/N70cnYFbk=
github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA=
github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8=
github.com/pion/rtcp v1.2.10/go.mod h1:ztfEwXZNLGyF1oQDttz/ZKIBaeeg/oWbRYqzBM9TL1I=
github.com/pion/rtcp v1.2.12/go.mod h1:sn6qjxvnwyAkkPzPULIbVqSKI5Dv54Rv7VG0kNxh9L4=
github.com/pion/rtcp v1.2.14 h1:KCkGV3vJ+4DAJmvP0vaQShsb0xkRfWkO540Gy102KyE=
github.com/pion/rtcp v1.2.14/go.mod h1:sn6qjxvnwyAkkPzPULIbVqSKI5Dv54Rv7VG0kNxh9L4=
github.com/pion/rtp v1.8.2/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU=
github.com/pion/rtp v1.8.3/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU=
github.com/pion/rtp v1.8.4 h1:VqNGMNjMDMy9y0d+h+0dfjiWVKUEDQvA963jhJwu200=
github.com/pion/rtp v1.8.4/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU=
Expand Down
63 changes: 43 additions & 20 deletions pkg/media/samplebuilder/samplebuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"math"
"time"

"github.com/pion/interceptor/pkg/jitterbuffer"
"github.com/pion/rtp"
"github.com/pion/webrtc/v4/pkg/media"
)
Expand All @@ -16,7 +17,7 @@
type SampleBuilder struct {
maxLate uint16 // how many packets to wait until we get a valid Sample
maxLateTimestamp uint32 // max timestamp between old and new timestamps before dropping packets
buffer [math.MaxUint16 + 1]*rtp.Packet
buffer *jitterbuffer.JitterBuffer
preparedSamples [math.MaxUint16 + 1]*media.Sample

// Interface that allows us to take RTP packets to samples
Expand Down Expand Up @@ -60,7 +61,7 @@
// The depacketizer extracts media samples from RTP packets.
// Several depacketizers are available in package github.com/pion/rtp/codecs.
func New(maxLate uint16, depacketizer rtp.Depacketizer, sampleRate uint32, opts ...Option) *SampleBuilder {
s := &SampleBuilder{maxLate: maxLate, depacketizer: depacketizer, sampleRate: sampleRate}
s := &SampleBuilder{maxLate: maxLate, depacketizer: depacketizer, sampleRate: sampleRate, buffer: jitterbuffer.New(jitterbuffer.WithMinimumPacketCount(1))}
for _, o := range opts {
o(s)
}
Expand All @@ -76,7 +77,7 @@
var foundTail *rtp.Packet

for i := location.head; i != location.tail; i++ {
if packet := s.buffer[i]; packet != nil {
if packet, _ := s.buffer.PeekAtSequence(i); packet != nil {
foundHead = packet
break
}
Expand All @@ -87,7 +88,7 @@
}

for i := location.tail - 1; i != location.head; i-- {
if packet := s.buffer[i]; packet != nil {
if packet, _ := s.buffer.PeekAtSequence(i); packet != nil {
foundTail = packet
break
}
Expand All @@ -105,16 +106,16 @@
if location.empty() {
return 0, false
}
packet := s.buffer[location.head]
if packet == nil {
packet, err := s.buffer.PeekAtSequence(location.head)
if packet == nil || err != nil {
return 0, false
}
return packet.Timestamp, true
}

func (s *SampleBuilder) releasePacket(i uint16) {
var p *rtp.Packet
p, s.buffer[i] = s.buffer[i], nil
p, _ = s.buffer.PopAtSequence(i)
if p != nil && s.packetReleaseHandler != nil {
s.packetReleaseHandler(p)
}
Expand Down Expand Up @@ -178,7 +179,7 @@
// Push does not copy the input. If you wish to reuse
// this memory make sure to copy before calling Push
func (s *SampleBuilder) Push(p *rtp.Packet) {
s.buffer[p.SequenceNumber] = p
s.buffer.Push(p)

switch s.filled.compare(p.SequenceNumber) {
case slCompareVoid:
Expand Down Expand Up @@ -220,14 +221,19 @@

var consume sampleSequenceLocation

for i := s.active.head; s.buffer[i] != nil && s.active.compare(i) != slCompareAfter; i++ {
if s.depacketizer.IsPartitionTail(s.buffer[i].Marker, s.buffer[i].Payload) {
for i := s.active.head; s.active.compare(i) != slCompareAfter; i++ {
pkt, err := s.buffer.PeekAtSequence(i)
if pkt == nil || err != nil {
break
}

if s.depacketizer.IsPartitionTail(pkt.Marker, pkt.Payload) {
consume.head = s.active.head
consume.tail = i + 1
break
}
headTimestamp, hasData := s.fetchTimestamp(s.active)
if hasData && s.buffer[i].Timestamp != headTimestamp {
if hasData && pkt.Timestamp != headTimestamp {
consume.head = s.active.head
consume.tail = i
break
Expand All @@ -237,8 +243,8 @@
if consume.empty() {
return nil
}

if !purgingBuffers && s.buffer[consume.tail] == nil {
pkt, _ := s.buffer.PeekAtSequence(consume.tail)
if !purgingBuffers && pkt == nil {
// wait for the next packet after this set of packets to arrive
// to ensure at least one post sample timestamp is known
// (unless we have to release right now)
Expand All @@ -250,8 +256,10 @@

// scan for any packet after the current and use that time stamp as the diff point
for i := consume.tail; i < s.active.tail; i++ {
if s.buffer[i] != nil {
afterTimestamp = s.buffer[i].Timestamp
pkt, _ = s.buffer.PeekAtSequence(i)

if pkt != nil {
afterTimestamp = pkt.Timestamp
break
}
}
Expand All @@ -261,10 +269,11 @@

// prior to decoding all the packets, check if this packet
// would end being disposed anyway
if !s.depacketizer.IsPartitionHead(s.buffer[consume.head].Payload) {
pkt, err := s.buffer.PeekAtSequence(consume.head)
if err == nil && !s.depacketizer.IsPartitionHead(pkt.Payload) {
isPadding := false
for i := consume.head; i != consume.tail; i++ {
if s.lastSampleTimestamp != nil && *s.lastSampleTimestamp == s.buffer[i].Timestamp && len(s.buffer[i].Payload) == 0 {
if s.lastSampleTimestamp != nil && *s.lastSampleTimestamp == pkt.Timestamp && len(pkt.Payload) == 0 {
isPadding = true
}
}
Expand All @@ -282,16 +291,22 @@
var metadata interface{}
var rtpHeaders []*rtp.Header
for i := consume.head; i != consume.tail; i++ {
p, err := s.depacketizer.Unmarshal(s.buffer[i].Payload)
pkt, err := s.buffer.PeekAtSequence(i)
if err != nil {
return nil
}

Check warning on line 297 in pkg/media/samplebuilder/samplebuilder.go

View check run for this annotation

Codecov / codecov/patch

pkg/media/samplebuilder/samplebuilder.go#L296-L297

Added lines #L296 - L297 were not covered by tests
p, err := s.depacketizer.Unmarshal(pkt.Payload)
if err != nil {
return nil
}
if i == consume.head && s.packetHeadHandler != nil {
metadata = s.packetHeadHandler(s.depacketizer)
}
if s.returnRTPHeaders {
h := s.buffer[i].Header.Clone()
rtpHeaders = append(rtpHeaders, &h)
if packet, _ := s.buffer.PeekAtSequence(i); packet != nil {
h := pkt.Header.Clone()
rtpHeaders = append(rtpHeaders, &h)
}
}

data = append(data, p...)
Expand Down Expand Up @@ -389,3 +404,11 @@
o.returnRTPHeaders = enable
}
}

// WithJitterBufferMinimumLength sets the minimum number of packets which must first
// be received before starting any playback
func WithJitterBufferMinimumLength(length uint16) Option {
return func(o *SampleBuilder) {
o.buffer = jitterbuffer.New(jitterbuffer.WithMinimumPacketCount(length))
}

Check warning on line 413 in pkg/media/samplebuilder/samplebuilder.go

View check run for this annotation

Codecov / codecov/patch

pkg/media/samplebuilder/samplebuilder.go#L410-L413

Added lines #L410 - L413 were not covered by tests
}
10 changes: 7 additions & 3 deletions pkg/media/samplebuilder/samplebuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,14 +394,18 @@ func TestSampleBuilderCleanReference(t *testing.T) {
s.Push(pkt5)

for i := 0; i < 3; i++ {
if s.buffer[(i+int(seqStart))%0x10000] != nil {
pkt, err := s.buffer.PeekAtSequence(uint16((i + int(seqStart)) % 0x10000))

if pkt != nil || err == nil {
t.Errorf("Old packet (%d) is not unreferenced (maxLate: 10, pushed: 12)", i)
}
}
if s.buffer[(14+int(seqStart))%0x10000] != pkt4 {
pkt, err := s.buffer.PeekAtSequence(uint16((14 + int(seqStart)) % 0x10000))
if pkt != pkt4 || err != nil {
t.Error("New packet must be referenced after jump")
}
if s.buffer[(12+int(seqStart))%0x10000] != pkt5 {
pkt, err = s.buffer.PeekAtSequence(uint16((12 + int(seqStart)) % 0x10000))
if pkt != pkt5 || err != nil {
t.Error("New packet must be referenced after jump")
}
})
Expand Down
Loading