Skip to content

Commit

Permalink
SampleBuilder: Port to use jitter buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
thatsnotright committed Feb 24, 2024
1 parent c259e89 commit 8fa28ef
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 32 deletions.
57 changes: 39 additions & 18 deletions pkg/media/samplebuilder/samplebuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"math"
"time"

"github.com/pion/interceptor/pkg/jitterbuffer"

Check failure on line 11 in pkg/media/samplebuilder/samplebuilder.go

View workflow job for this annotation

GitHub Actions / test (1.20) / Go 1.20

no required module provides package github.com/pion/interceptor/pkg/jitterbuffer; to add it:
"github.com/pion/rtp"
"github.com/pion/webrtc/v4/pkg/media"
)
Expand All @@ -16,7 +17,7 @@ import (
type SampleBuilder struct {
maxLate uint16 // how many packets to wait until we get a valid Sample
maxLateTimestamp uint32 // max timestamp between old and new timestamps before dropping packets
buffer [math.MaxUint16 + 1]*rtp.Packet
buffer *jitterbuffer.JitterBuffer
preparedSamples [math.MaxUint16 + 1]*media.Sample

// Interface that allows us to take RTP packets to samples
Expand Down Expand Up @@ -57,7 +58,7 @@ type SampleBuilder struct {
// The depacketizer extracts media samples from RTP packets.
// Several depacketizers are available in package github.com/pion/rtp/codecs.
func New(maxLate uint16, depacketizer rtp.Depacketizer, sampleRate uint32, opts ...Option) *SampleBuilder {
s := &SampleBuilder{maxLate: maxLate, depacketizer: depacketizer, sampleRate: sampleRate}
s := &SampleBuilder{maxLate: maxLate, depacketizer: depacketizer, sampleRate: sampleRate, buffer: jitterbuffer.New()}
for _, o := range opts {
o(s)
}
Expand All @@ -73,7 +74,7 @@ func (s *SampleBuilder) tooOld(location sampleSequenceLocation) bool {
var foundTail *rtp.Packet

for i := location.head; i != location.tail; i++ {
if packet := s.buffer[i]; packet != nil {
if packet, _ := s.buffer.PeekAtSequence(i); packet != nil {
foundHead = packet
break
}
Expand All @@ -84,7 +85,7 @@ func (s *SampleBuilder) tooOld(location sampleSequenceLocation) bool {
}

for i := location.tail - 1; i != location.head; i-- {
if packet := s.buffer[i]; packet != nil {
if packet, _ := s.buffer.PeekAtSequence(i); packet != nil {
foundTail = packet
break
}
Expand All @@ -102,16 +103,16 @@ func (s *SampleBuilder) fetchTimestamp(location sampleSequenceLocation) (timesta
if location.empty() {
return 0, false
}
packet := s.buffer[location.head]
if packet == nil {
packet, err := s.buffer.PeekAtSequence(location.head)
if packet == nil || err != nil {
return 0, false
}
return packet.Timestamp, true
}

func (s *SampleBuilder) releasePacket(i uint16) {
var p *rtp.Packet
p, s.buffer[i] = s.buffer[i], nil
p, _ = s.buffer.PopAtSequence(i)
if p != nil && s.packetReleaseHandler != nil {
s.packetReleaseHandler(p)
}
Expand Down Expand Up @@ -175,7 +176,7 @@ func (s *SampleBuilder) purgeBuffers() {
// Push does not copy the input. If you wish to reuse
// this memory make sure to copy before calling Push
func (s *SampleBuilder) Push(p *rtp.Packet) {
s.buffer[p.SequenceNumber] = p
s.buffer.Push(p)

switch s.filled.compare(p.SequenceNumber) {
case slCompareVoid:
Expand Down Expand Up @@ -212,14 +213,19 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {

var consume sampleSequenceLocation

for i := s.active.head; s.buffer[i] != nil && s.active.compare(i) != slCompareAfter; i++ {
if s.depacketizer.IsPartitionTail(s.buffer[i].Marker, s.buffer[i].Payload) {
for i := s.active.head; s.active.compare(i) != slCompareAfter; i++ {
pkt, err := s.buffer.PeekAtSequence(i)
if pkt == nil || err != nil {
break
}

if s.depacketizer.IsPartitionTail(pkt.Marker, pkt.Payload) {
consume.head = s.active.head
consume.tail = i + 1
break
}
headTimestamp, hasData := s.fetchTimestamp(s.active)
if hasData && s.buffer[i].Timestamp != headTimestamp {
if hasData && pkt.Timestamp != headTimestamp {
consume.head = s.active.head
consume.tail = i
break
Expand All @@ -229,8 +235,8 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {
if consume.empty() {
return nil
}

if !purgingBuffers && s.buffer[consume.tail] == nil {
pkt, _ := s.buffer.PeekAtSequence(consume.tail)
if !purgingBuffers && pkt == nil {
// wait for the next packet after this set of packets to arrive
// to ensure at least one post sample timestamp is known
// (unless we have to release right now)
Expand All @@ -242,8 +248,10 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {

// scan for any packet after the current and use that time stamp as the diff point
for i := consume.tail; i < s.active.tail; i++ {
if s.buffer[i] != nil {
afterTimestamp = s.buffer[i].Timestamp
pkt, _ := s.buffer.PeekAtSequence(i)

if pkt != nil {
afterTimestamp = pkt.Timestamp
break
}
}
Expand All @@ -253,10 +261,11 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {

// prior to decoding all the packets, check if this packet
// would end being disposed anyway
if !s.depacketizer.IsPartitionHead(s.buffer[consume.head].Payload) {
pkt, err := s.buffer.PeekAtSequence(consume.head)
if err == nil && !s.depacketizer.IsPartitionHead(pkt.Payload) {
isPadding := false
for i := consume.head; i != consume.tail; i++ {
if s.lastSampleTimestamp != nil && *s.lastSampleTimestamp == s.buffer[i].Timestamp && len(s.buffer[i].Payload) == 0 {
if s.lastSampleTimestamp != nil && *s.lastSampleTimestamp == pkt.Timestamp && len(pkt.Payload) == 0 {
isPadding = true
}
}
Expand All @@ -273,7 +282,11 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {
data := []byte{}
var metadata interface{}
for i := consume.head; i != consume.tail; i++ {
p, err := s.depacketizer.Unmarshal(s.buffer[i].Payload)
pkt, err := s.buffer.PeekAtSequence(i)
if err != nil {
return nil
}
p, err := s.depacketizer.Unmarshal(pkt.Payload)
if err != nil {
return nil
}
Expand Down Expand Up @@ -384,3 +397,11 @@ func WithMaxTimeDelay(maxLateDuration time.Duration) Option {
o.maxLateTimestamp = uint32(int64(o.sampleRate) * totalMillis / 1000)
}
}

// WithJitterBufferMinimumLength sets the minimum number of packets which must first
// be received before starting any playback
func WithJitterBufferMinimumLength(length uint16) Option {
return func(o *SampleBuilder) {
o.buffer = jitterbuffer.New(jitterbuffer.WithMinimumPacketCount(length))
}
}
33 changes: 19 additions & 14 deletions pkg/media/samplebuilder/samplebuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func TestSampleBuilder(t *testing.T) {
time.Millisecond*time.Duration(int64(t.maxLateTimestamp)),
))
}

opts = append(opts, WithJitterBufferMinimumLength(1))
d := &fakeDepacketizer{
headChecker: t.withHeadChecker,
headBytes: t.headBytes,
Expand All @@ -304,7 +304,7 @@ func TestSampleBuilder(t *testing.T) {
// SampleBuilder should respect maxLate if we popped successfully but then have a gap larger then maxLate
func TestSampleBuilderMaxLate(t *testing.T) {
assert := assert.New(t)
s := New(50, &fakeDepacketizer{}, 1)
s := New(50, &fakeDepacketizer{}, 1, WithJitterBufferMinimumLength(1))

s.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 0, Timestamp: 1}, Payload: []byte{0x01}})
s.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 1, Timestamp: 2}, Payload: []byte{0x01}})
Expand Down Expand Up @@ -353,7 +353,7 @@ func TestSampleBuilderCleanReference(t *testing.T) {
} {
seqStart := seqStart
t.Run(fmt.Sprintf("From%d", seqStart), func(t *testing.T) {
s := New(10, &fakeDepacketizer{}, 1)
s := New(10, &fakeDepacketizer{}, 1, WithJitterBufferMinimumLength(1))

s.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 0 + seqStart, Timestamp: 0}, Payload: []byte{0x01}})
s.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 1 + seqStart, Timestamp: 0}, Payload: []byte{0x02}})
Expand All @@ -364,14 +364,18 @@ func TestSampleBuilderCleanReference(t *testing.T) {
s.Push(pkt5)

for i := 0; i < 3; i++ {
if s.buffer[(i+int(seqStart))%0x10000] != nil {
pkt, err := s.buffer.PeekAtSequence(uint16((i + int(seqStart)) % 0x10000))

if pkt != nil || err == nil {
t.Errorf("Old packet (%d) is not unreferenced (maxLate: 10, pushed: 12)", i)
}
}
if s.buffer[(14+int(seqStart))%0x10000] != pkt4 {
pkt, err := s.buffer.PeekAtSequence(uint16((14 + int(seqStart)) % 0x10000))
if pkt != pkt4 || err != nil {
t.Error("New packet must be referenced after jump")
}
if s.buffer[(12+int(seqStart))%0x10000] != pkt5 {
pkt, err = s.buffer.PeekAtSequence(uint16((12 + int(seqStart)) % 0x10000))
if pkt != pkt5 || err != nil {
t.Error("New packet must be referenced after jump")
}
})
Expand All @@ -388,7 +392,7 @@ func TestSampleBuilderPushMaxZero(t *testing.T) {
headBytes: []byte{0x01},
}

s := New(0, d, 1)
s := New(0, d, 1, WithJitterBufferMinimumLength(1))
s.Push(&pkts[0])
if sample := s.Pop(); sample == nil {
t.Error("Should expect a popped sample")
Expand All @@ -409,7 +413,7 @@ func TestSampleBuilderWithPacketReleaseHandler(t *testing.T) {
{Header: rtp.Header{SequenceNumber: 13, Timestamp: 122}, Payload: []byte{0x04}},
{Header: rtp.Header{SequenceNumber: 21, Timestamp: 200}, Payload: []byte{0x05}},
}
s := New(10, &fakeDepacketizer{}, 1, WithPacketReleaseHandler(fakePacketReleaseHandler))
s := New(10, &fakeDepacketizer{}, 1, WithPacketReleaseHandler(fakePacketReleaseHandler), WithJitterBufferMinimumLength(1))
s.Push(&pkts[0])
s.Push(&pkts[1])
if len(released) == 0 {
Expand Down Expand Up @@ -446,7 +450,7 @@ func TestSampleBuilderWithPacketHeadHandler(t *testing.T) {
s := New(10, &fakeDepacketizer{}, 1, WithPacketHeadHandler(func(headPacket interface{}) interface{} {
headCount++
return true
}))
}), WithJitterBufferMinimumLength(1))

for _, pkt := range packets {
s.Push(pkt)
Expand All @@ -467,7 +471,7 @@ func TestSampleBuilderWithPacketHeadHandler(t *testing.T) {

func TestPopWithTimestamp(t *testing.T) {
t.Run("Crash on nil", func(t *testing.T) {
s := New(0, &fakeDepacketizer{}, 1)
s := New(0, &fakeDepacketizer{}, 1, WithJitterBufferMinimumLength(1))
sample, timestamp := s.PopWithTimestamp()
assert.Nil(t, sample)
assert.Equal(t, uint32(0), timestamp)
Expand All @@ -483,6 +487,7 @@ func (f *truePartitionHeadChecker) IsPartitionHead([]byte) bool {
func TestSampleBuilderData(t *testing.T) {
s := New(10, &fakeDepacketizer{}, 1,
WithPartitionHeadChecker(&truePartitionHeadChecker{}),
WithJitterBufferMinimumLength(1),
)
j := 0
for i := 0; i < 0x20000; i++ {
Expand Down Expand Up @@ -510,7 +515,7 @@ func TestSampleBuilderData(t *testing.T) {
}

func BenchmarkSampleBuilderSequential(b *testing.B) {
s := New(100, &fakeDepacketizer{}, 1)
s := New(100, &fakeDepacketizer{}, 1, WithJitterBufferMinimumLength(1))
b.ResetTimer()
j := 0
for i := 0; i < b.N; i++ {
Expand All @@ -536,7 +541,7 @@ func BenchmarkSampleBuilderSequential(b *testing.B) {
}

func BenchmarkSampleBuilderLoss(b *testing.B) {
s := New(100, &fakeDepacketizer{}, 1)
s := New(100, &fakeDepacketizer{}, 1, WithJitterBufferMinimumLength(1))
b.ResetTimer()
j := 0
for i := 0; i < b.N; i++ {
Expand Down Expand Up @@ -565,7 +570,7 @@ func BenchmarkSampleBuilderLoss(b *testing.B) {
}

func BenchmarkSampleBuilderReordered(b *testing.B) {
s := New(100, &fakeDepacketizer{}, 1)
s := New(100, &fakeDepacketizer{}, 1, WithJitterBufferMinimumLength(1))
b.ResetTimer()
j := 0
for i := 0; i < b.N; i++ {
Expand All @@ -591,7 +596,7 @@ func BenchmarkSampleBuilderReordered(b *testing.B) {
}

func BenchmarkSampleBuilderFragmented(b *testing.B) {
s := New(100, &fakeDepacketizer{}, 1)
s := New(100, &fakeDepacketizer{}, 1, WithJitterBufferMinimumLength(1))
b.ResetTimer()
j := 0
for i := 0; i < b.N; i++ {
Expand Down

0 comments on commit 8fa28ef

Please sign in to comment.