From 8fa28ef77930fe4996e230434b1388aa1c0db301 Mon Sep 17 00:00:00 2001 From: Rob Elsner Date: Sat, 17 Feb 2024 17:33:16 -0400 Subject: [PATCH] SampleBuilder: Port to use jitter buffer --- pkg/media/samplebuilder/samplebuilder.go | 57 +++++++++++++------ pkg/media/samplebuilder/samplebuilder_test.go | 33 ++++++----- 2 files changed, 58 insertions(+), 32 deletions(-) diff --git a/pkg/media/samplebuilder/samplebuilder.go b/pkg/media/samplebuilder/samplebuilder.go index 74964f30490..8baf9cc677c 100644 --- a/pkg/media/samplebuilder/samplebuilder.go +++ b/pkg/media/samplebuilder/samplebuilder.go @@ -8,6 +8,7 @@ import ( "math" "time" + "github.com/pion/interceptor/pkg/jitterbuffer" "github.com/pion/rtp" "github.com/pion/webrtc/v4/pkg/media" ) @@ -16,7 +17,7 @@ import ( type SampleBuilder struct { maxLate uint16 // how many packets to wait until we get a valid Sample maxLateTimestamp uint32 // max timestamp between old and new timestamps before dropping packets - buffer [math.MaxUint16 + 1]*rtp.Packet + buffer *jitterbuffer.JitterBuffer preparedSamples [math.MaxUint16 + 1]*media.Sample // Interface that allows us to take RTP packets to samples @@ -57,7 +58,7 @@ type SampleBuilder struct { // The depacketizer extracts media samples from RTP packets. // Several depacketizers are available in package github.com/pion/rtp/codecs. func New(maxLate uint16, depacketizer rtp.Depacketizer, sampleRate uint32, opts ...Option) *SampleBuilder { - s := &SampleBuilder{maxLate: maxLate, depacketizer: depacketizer, sampleRate: sampleRate} + s := &SampleBuilder{maxLate: maxLate, depacketizer: depacketizer, sampleRate: sampleRate, buffer: jitterbuffer.New()} for _, o := range opts { o(s) } @@ -73,7 +74,7 @@ func (s *SampleBuilder) tooOld(location sampleSequenceLocation) bool { var foundTail *rtp.Packet for i := location.head; i != location.tail; i++ { - if packet := s.buffer[i]; packet != nil { + if packet, _ := s.buffer.PeekAtSequence(i); packet != nil { foundHead = packet break } @@ -84,7 +85,7 @@ func (s *SampleBuilder) tooOld(location sampleSequenceLocation) bool { } for i := location.tail - 1; i != location.head; i-- { - if packet := s.buffer[i]; packet != nil { + if packet, _ := s.buffer.PeekAtSequence(i); packet != nil { foundTail = packet break } @@ -102,8 +103,8 @@ func (s *SampleBuilder) fetchTimestamp(location sampleSequenceLocation) (timesta if location.empty() { return 0, false } - packet := s.buffer[location.head] - if packet == nil { + packet, err := s.buffer.PeekAtSequence(location.head) + if packet == nil || err != nil { return 0, false } return packet.Timestamp, true @@ -111,7 +112,7 @@ func (s *SampleBuilder) fetchTimestamp(location sampleSequenceLocation) (timesta func (s *SampleBuilder) releasePacket(i uint16) { var p *rtp.Packet - p, s.buffer[i] = s.buffer[i], nil + p, _ = s.buffer.PopAtSequence(i) if p != nil && s.packetReleaseHandler != nil { s.packetReleaseHandler(p) } @@ -175,7 +176,7 @@ func (s *SampleBuilder) purgeBuffers() { // Push does not copy the input. If you wish to reuse // this memory make sure to copy before calling Push func (s *SampleBuilder) Push(p *rtp.Packet) { - s.buffer[p.SequenceNumber] = p + s.buffer.Push(p) switch s.filled.compare(p.SequenceNumber) { case slCompareVoid: @@ -212,14 +213,19 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample { var consume sampleSequenceLocation - for i := s.active.head; s.buffer[i] != nil && s.active.compare(i) != slCompareAfter; i++ { - if s.depacketizer.IsPartitionTail(s.buffer[i].Marker, s.buffer[i].Payload) { + for i := s.active.head; s.active.compare(i) != slCompareAfter; i++ { + pkt, err := s.buffer.PeekAtSequence(i) + if pkt == nil || err != nil { + break + } + + if s.depacketizer.IsPartitionTail(pkt.Marker, pkt.Payload) { consume.head = s.active.head consume.tail = i + 1 break } headTimestamp, hasData := s.fetchTimestamp(s.active) - if hasData && s.buffer[i].Timestamp != headTimestamp { + if hasData && pkt.Timestamp != headTimestamp { consume.head = s.active.head consume.tail = i break @@ -229,8 +235,8 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample { if consume.empty() { return nil } - - if !purgingBuffers && s.buffer[consume.tail] == nil { + pkt, _ := s.buffer.PeekAtSequence(consume.tail) + if !purgingBuffers && pkt == nil { // wait for the next packet after this set of packets to arrive // to ensure at least one post sample timestamp is known // (unless we have to release right now) @@ -242,8 +248,10 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample { // scan for any packet after the current and use that time stamp as the diff point for i := consume.tail; i < s.active.tail; i++ { - if s.buffer[i] != nil { - afterTimestamp = s.buffer[i].Timestamp + pkt, _ := s.buffer.PeekAtSequence(i) + + if pkt != nil { + afterTimestamp = pkt.Timestamp break } } @@ -253,10 +261,11 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample { // prior to decoding all the packets, check if this packet // would end being disposed anyway - if !s.depacketizer.IsPartitionHead(s.buffer[consume.head].Payload) { + pkt, err := s.buffer.PeekAtSequence(consume.head) + if err == nil && !s.depacketizer.IsPartitionHead(pkt.Payload) { isPadding := false for i := consume.head; i != consume.tail; i++ { - if s.lastSampleTimestamp != nil && *s.lastSampleTimestamp == s.buffer[i].Timestamp && len(s.buffer[i].Payload) == 0 { + if s.lastSampleTimestamp != nil && *s.lastSampleTimestamp == pkt.Timestamp && len(pkt.Payload) == 0 { isPadding = true } } @@ -273,7 +282,11 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample { data := []byte{} var metadata interface{} for i := consume.head; i != consume.tail; i++ { - p, err := s.depacketizer.Unmarshal(s.buffer[i].Payload) + pkt, err := s.buffer.PeekAtSequence(i) + if err != nil { + return nil + } + p, err := s.depacketizer.Unmarshal(pkt.Payload) if err != nil { return nil } @@ -384,3 +397,11 @@ func WithMaxTimeDelay(maxLateDuration time.Duration) Option { o.maxLateTimestamp = uint32(int64(o.sampleRate) * totalMillis / 1000) } } + +// WithJitterBufferMinimumLength sets the minimum number of packets which must first +// be received before starting any playback +func WithJitterBufferMinimumLength(length uint16) Option { + return func(o *SampleBuilder) { + o.buffer = jitterbuffer.New(jitterbuffer.WithMinimumPacketCount(length)) + } +} diff --git a/pkg/media/samplebuilder/samplebuilder_test.go b/pkg/media/samplebuilder/samplebuilder_test.go index 276406ddc50..4c707e2d223 100644 --- a/pkg/media/samplebuilder/samplebuilder_test.go +++ b/pkg/media/samplebuilder/samplebuilder_test.go @@ -282,7 +282,7 @@ func TestSampleBuilder(t *testing.T) { time.Millisecond*time.Duration(int64(t.maxLateTimestamp)), )) } - + opts = append(opts, WithJitterBufferMinimumLength(1)) d := &fakeDepacketizer{ headChecker: t.withHeadChecker, headBytes: t.headBytes, @@ -304,7 +304,7 @@ func TestSampleBuilder(t *testing.T) { // SampleBuilder should respect maxLate if we popped successfully but then have a gap larger then maxLate func TestSampleBuilderMaxLate(t *testing.T) { assert := assert.New(t) - s := New(50, &fakeDepacketizer{}, 1) + s := New(50, &fakeDepacketizer{}, 1, WithJitterBufferMinimumLength(1)) s.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 0, Timestamp: 1}, Payload: []byte{0x01}}) s.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 1, Timestamp: 2}, Payload: []byte{0x01}}) @@ -353,7 +353,7 @@ func TestSampleBuilderCleanReference(t *testing.T) { } { seqStart := seqStart t.Run(fmt.Sprintf("From%d", seqStart), func(t *testing.T) { - s := New(10, &fakeDepacketizer{}, 1) + s := New(10, &fakeDepacketizer{}, 1, WithJitterBufferMinimumLength(1)) s.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 0 + seqStart, Timestamp: 0}, Payload: []byte{0x01}}) s.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 1 + seqStart, Timestamp: 0}, Payload: []byte{0x02}}) @@ -364,14 +364,18 @@ func TestSampleBuilderCleanReference(t *testing.T) { s.Push(pkt5) for i := 0; i < 3; i++ { - if s.buffer[(i+int(seqStart))%0x10000] != nil { + pkt, err := s.buffer.PeekAtSequence(uint16((i + int(seqStart)) % 0x10000)) + + if pkt != nil || err == nil { t.Errorf("Old packet (%d) is not unreferenced (maxLate: 10, pushed: 12)", i) } } - if s.buffer[(14+int(seqStart))%0x10000] != pkt4 { + pkt, err := s.buffer.PeekAtSequence(uint16((14 + int(seqStart)) % 0x10000)) + if pkt != pkt4 || err != nil { t.Error("New packet must be referenced after jump") } - if s.buffer[(12+int(seqStart))%0x10000] != pkt5 { + pkt, err = s.buffer.PeekAtSequence(uint16((12 + int(seqStart)) % 0x10000)) + if pkt != pkt5 || err != nil { t.Error("New packet must be referenced after jump") } }) @@ -388,7 +392,7 @@ func TestSampleBuilderPushMaxZero(t *testing.T) { headBytes: []byte{0x01}, } - s := New(0, d, 1) + s := New(0, d, 1, WithJitterBufferMinimumLength(1)) s.Push(&pkts[0]) if sample := s.Pop(); sample == nil { t.Error("Should expect a popped sample") @@ -409,7 +413,7 @@ func TestSampleBuilderWithPacketReleaseHandler(t *testing.T) { {Header: rtp.Header{SequenceNumber: 13, Timestamp: 122}, Payload: []byte{0x04}}, {Header: rtp.Header{SequenceNumber: 21, Timestamp: 200}, Payload: []byte{0x05}}, } - s := New(10, &fakeDepacketizer{}, 1, WithPacketReleaseHandler(fakePacketReleaseHandler)) + s := New(10, &fakeDepacketizer{}, 1, WithPacketReleaseHandler(fakePacketReleaseHandler), WithJitterBufferMinimumLength(1)) s.Push(&pkts[0]) s.Push(&pkts[1]) if len(released) == 0 { @@ -446,7 +450,7 @@ func TestSampleBuilderWithPacketHeadHandler(t *testing.T) { s := New(10, &fakeDepacketizer{}, 1, WithPacketHeadHandler(func(headPacket interface{}) interface{} { headCount++ return true - })) + }), WithJitterBufferMinimumLength(1)) for _, pkt := range packets { s.Push(pkt) @@ -467,7 +471,7 @@ func TestSampleBuilderWithPacketHeadHandler(t *testing.T) { func TestPopWithTimestamp(t *testing.T) { t.Run("Crash on nil", func(t *testing.T) { - s := New(0, &fakeDepacketizer{}, 1) + s := New(0, &fakeDepacketizer{}, 1, WithJitterBufferMinimumLength(1)) sample, timestamp := s.PopWithTimestamp() assert.Nil(t, sample) assert.Equal(t, uint32(0), timestamp) @@ -483,6 +487,7 @@ func (f *truePartitionHeadChecker) IsPartitionHead([]byte) bool { func TestSampleBuilderData(t *testing.T) { s := New(10, &fakeDepacketizer{}, 1, WithPartitionHeadChecker(&truePartitionHeadChecker{}), + WithJitterBufferMinimumLength(1), ) j := 0 for i := 0; i < 0x20000; i++ { @@ -510,7 +515,7 @@ func TestSampleBuilderData(t *testing.T) { } func BenchmarkSampleBuilderSequential(b *testing.B) { - s := New(100, &fakeDepacketizer{}, 1) + s := New(100, &fakeDepacketizer{}, 1, WithJitterBufferMinimumLength(1)) b.ResetTimer() j := 0 for i := 0; i < b.N; i++ { @@ -536,7 +541,7 @@ func BenchmarkSampleBuilderSequential(b *testing.B) { } func BenchmarkSampleBuilderLoss(b *testing.B) { - s := New(100, &fakeDepacketizer{}, 1) + s := New(100, &fakeDepacketizer{}, 1, WithJitterBufferMinimumLength(1)) b.ResetTimer() j := 0 for i := 0; i < b.N; i++ { @@ -565,7 +570,7 @@ func BenchmarkSampleBuilderLoss(b *testing.B) { } func BenchmarkSampleBuilderReordered(b *testing.B) { - s := New(100, &fakeDepacketizer{}, 1) + s := New(100, &fakeDepacketizer{}, 1, WithJitterBufferMinimumLength(1)) b.ResetTimer() j := 0 for i := 0; i < b.N; i++ { @@ -591,7 +596,7 @@ func BenchmarkSampleBuilderReordered(b *testing.B) { } func BenchmarkSampleBuilderFragmented(b *testing.B) { - s := New(100, &fakeDepacketizer{}, 1) + s := New(100, &fakeDepacketizer{}, 1, WithJitterBufferMinimumLength(1)) b.ResetTimer() j := 0 for i := 0; i < b.N; i++ {