Skip to content

jitterbuffer: Use jitterbuffer in SampleBuilder #2959

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 34 additions & 19 deletions pkg/media/samplebuilder/samplebuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"math"
"time"

"github.com/pion/interceptor/pkg/jitterbuffer"
"github.com/pion/rtp"
"github.com/pion/webrtc/v4/pkg/media"
)
Expand All @@ -16,7 +17,7 @@
type SampleBuilder struct {
maxLate uint16 // how many packets to wait until we get a valid Sample
maxLateTimestamp uint32 // max timestamp between old and new timestamps before dropping packets
buffer [math.MaxUint16 + 1]*rtp.Packet
buffer *jitterbuffer.JitterBuffer
preparedSamples [math.MaxUint16 + 1]*media.Sample

// Interface that allows us to take RTP packets to samples
Expand Down Expand Up @@ -60,7 +61,7 @@
// The depacketizer extracts media samples from RTP packets.
// Several depacketizers are available in package github.com/pion/rtp/codecs.
func New(maxLate uint16, depacketizer rtp.Depacketizer, sampleRate uint32, opts ...Option) *SampleBuilder {
s := &SampleBuilder{maxLate: maxLate, depacketizer: depacketizer, sampleRate: sampleRate}
s := &SampleBuilder{maxLate: maxLate, depacketizer: depacketizer, sampleRate: sampleRate, buffer: jitterbuffer.New(jitterbuffer.WithMinimumPacketCount(1))}
for _, o := range opts {
o(s)
}
Expand All @@ -77,7 +78,7 @@
var foundTail *rtp.Packet

for i := location.head; i != location.tail; i++ {
if packet := s.buffer[i]; packet != nil {
if packet, _ := s.buffer.PeekAtSequence(i); packet != nil {
foundHead = packet

break
Expand All @@ -89,7 +90,7 @@
}

for i := location.tail - 1; i != location.head; i-- {
if packet := s.buffer[i]; packet != nil {
if packet, _ := s.buffer.PeekAtSequence(i); packet != nil {
foundTail = packet

break
Expand All @@ -108,7 +109,7 @@
if location.empty() {
return 0, false
}
packet := s.buffer[location.head]
packet, _ := s.buffer.PeekAtSequence(location.head)
if packet == nil {
return 0, false
}
Expand All @@ -118,7 +119,7 @@

func (s *SampleBuilder) releasePacket(i uint16) {
var p *rtp.Packet
p, s.buffer[i] = s.buffer[i], nil
p, _ = s.buffer.PopAtSequence(i)
if p != nil && s.packetReleaseHandler != nil {
s.packetReleaseHandler(p)
}
Expand Down Expand Up @@ -183,7 +184,7 @@
// Push does not copy the input. If you wish to reuse
// this memory make sure to copy before calling Push.
func (s *SampleBuilder) Push(packet *rtp.Packet) {
s.buffer[packet.SequenceNumber] = packet
s.buffer.Push(packet)

switch s.filled.compare(packet.SequenceNumber) {
case slCompareVoid:
Expand Down Expand Up @@ -226,15 +227,19 @@

var consume sampleSequenceLocation

for i := s.active.head; s.buffer[i] != nil && s.active.compare(i) != slCompareAfter; i++ {
if s.depacketizer.IsPartitionTail(s.buffer[i].Marker, s.buffer[i].Payload) {
for i := s.active.head; s.active.compare(i) != slCompareAfter; i++ {
pkt, err := s.buffer.PeekAtSequence(i)
if pkt == nil || err != nil {
break
}
if s.depacketizer.IsPartitionTail(pkt.Marker, pkt.Payload) {
consume.head = s.active.head
consume.tail = i + 1

break
}
headTimestamp, hasData := s.fetchTimestamp(s.active)
if hasData && s.buffer[i].Timestamp != headTimestamp {
if hasData && pkt.Timestamp != headTimestamp {
consume.head = s.active.head
consume.tail = i

Expand All @@ -245,8 +250,8 @@
if consume.empty() {
return nil
}

if !purgingBuffers && s.buffer[consume.tail] == nil {
pkt, _ := s.buffer.PeekAtSequence(consume.tail)
if !purgingBuffers && pkt == nil {
// wait for the next packet after this set of packets to arrive
// to ensure at least one post sample timestamp is known
// (unless we have to release right now)
Expand All @@ -258,9 +263,9 @@

// scan for any packet after the current and use that time stamp as the diff point
for i := consume.tail; i < s.active.tail; i++ {
if s.buffer[i] != nil {
afterTimestamp = s.buffer[i].Timestamp

pkt, _ := s.buffer.PeekAtSequence(i)
if pkt != nil {
afterTimestamp = pkt.Timestamp
break
}
}
Expand All @@ -270,10 +275,12 @@

// prior to decoding all the packets, check if this packet
// would end being disposed anyway
if !s.depacketizer.IsPartitionHead(s.buffer[consume.head].Payload) {
pkt, err := s.buffer.PeekAtSequence(consume.head)
if pkt != nil && err == nil && !s.depacketizer.IsPartitionHead(pkt.Payload) {
isPadding := false
for i := consume.head; i != consume.tail; i++ {
if s.lastSampleTimestamp != nil && *s.lastSampleTimestamp == s.buffer[i].Timestamp && len(s.buffer[i].Payload) == 0 {
pkt, _ := s.buffer.PeekAtSequence(i)
if s.lastSampleTimestamp != nil && *s.lastSampleTimestamp == pkt.Timestamp && len(pkt.Payload) == 0 {
isPadding = true
}
}
Expand All @@ -292,19 +299,27 @@
var metadata interface{}
var rtpHeaders []*rtp.Header
for i := consume.head; i != consume.tail; i++ {
payload, err := s.depacketizer.Unmarshal(s.buffer[i].Payload)
pkt, _ := s.buffer.PeekAtSequence(i)
if pkt == nil {
return nil
}
p, err := s.depacketizer.Unmarshal(pkt.Payload)

Check failure on line 306 in pkg/media/samplebuilder/samplebuilder.go

View workflow job for this annotation

GitHub Actions / lint / Go

declared and not used: p

Check failure on line 306 in pkg/media/samplebuilder/samplebuilder.go

View workflow job for this annotation

GitHub Actions / test (1.23) / Go 1.23

declared and not used: p

Check failure on line 306 in pkg/media/samplebuilder/samplebuilder.go

View workflow job for this annotation

GitHub Actions / test (1.23) / Go 1.23

declared and not used: p

Check failure on line 306 in pkg/media/samplebuilder/samplebuilder.go

View workflow job for this annotation

GitHub Actions / test (1.24) / Go 1.24

declared and not used: p

Check failure on line 306 in pkg/media/samplebuilder/samplebuilder.go

View workflow job for this annotation

GitHub Actions / test (1.24) / Go 1.24

declared and not used: p
if err != nil {
return nil
}
if i == consume.head && s.packetHeadHandler != nil {
metadata = s.packetHeadHandler(s.depacketizer)
}
if s.returnRTPHeaders {
h := s.buffer[i].Header.Clone()
pkt, err := s.buffer.PeekAtSequence(i)
if err != nil {
return nil
}
h := pkt.Header.Clone()
rtpHeaders = append(rtpHeaders, &h)
}

data = append(data, payload...)

Check failure on line 322 in pkg/media/samplebuilder/samplebuilder.go

View workflow job for this annotation

GitHub Actions / lint / Go

undefined: payload

Check failure on line 322 in pkg/media/samplebuilder/samplebuilder.go

View workflow job for this annotation

GitHub Actions / test (1.23) / Go 1.23

undefined: payload

Check failure on line 322 in pkg/media/samplebuilder/samplebuilder.go

View workflow job for this annotation

GitHub Actions / test (1.23) / Go 1.23

undefined: payload

Check failure on line 322 in pkg/media/samplebuilder/samplebuilder.go

View workflow job for this annotation

GitHub Actions / test (1.24) / Go 1.24

undefined: payload

Check failure on line 322 in pkg/media/samplebuilder/samplebuilder.go

View workflow job for this annotation

GitHub Actions / test (1.24) / Go 1.24

undefined: payload
}
samples := afterTimestamp - sampleTimestamp

Expand Down
18 changes: 14 additions & 4 deletions pkg/media/samplebuilder/samplebuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,17 +427,27 @@
fd.Push(pkt5)

for i := 0; i < 3; i++ {
assert.Nilf(
t, fd.buffer[(i+int(seqStart))%0x10000],
"Old packet (%d) is not unreferenced (maxLate: 10, pushed: 12)", i,
)
pkt, err := s.buffer.PeekAtSequence(uint16((i + int(seqStart)) % 0x10000))

Check failure on line 430 in pkg/media/samplebuilder/samplebuilder_test.go

View workflow job for this annotation

GitHub Actions / lint / Go

undefined: s

Check failure on line 430 in pkg/media/samplebuilder/samplebuilder_test.go

View workflow job for this annotation

GitHub Actions / test (1.23) / Go 1.23

undefined: s

Check failure on line 430 in pkg/media/samplebuilder/samplebuilder_test.go

View workflow job for this annotation

GitHub Actions / test (1.24) / Go 1.24

undefined: s

if pkt != nil || err == nil {
t.Errorf("Old packet (%d) is not unreferenced (maxLate: 10, pushed: 12)", i)
}
}
pkt, _ := s.buffer.PeekAtSequence(uint16((14 + int(seqStart)) % 0x10000))

Check failure on line 436 in pkg/media/samplebuilder/samplebuilder_test.go

View workflow job for this annotation

GitHub Actions / lint / Go

undefined: s

Check failure on line 436 in pkg/media/samplebuilder/samplebuilder_test.go

View workflow job for this annotation

GitHub Actions / test (1.23) / Go 1.23

undefined: s

Check failure on line 436 in pkg/media/samplebuilder/samplebuilder_test.go

View workflow job for this annotation

GitHub Actions / test (1.24) / Go 1.24

undefined: s
if pkt != pkt4 {
t.Error("New packet must be referenced after jump")
}
pkt, _ = s.buffer.PeekAtSequence(uint16((12 + int(seqStart)) % 0x10000))

Check failure on line 440 in pkg/media/samplebuilder/samplebuilder_test.go

View workflow job for this annotation

GitHub Actions / lint / Go

undefined: s

Check failure on line 440 in pkg/media/samplebuilder/samplebuilder_test.go

View workflow job for this annotation

GitHub Actions / test (1.23) / Go 1.23

undefined: s

Check failure on line 440 in pkg/media/samplebuilder/samplebuilder_test.go

View workflow job for this annotation

GitHub Actions / test (1.24) / Go 1.24

undefined: s

if pkt != pkt5 {
t.Error("New packet must be referenced after jump")
}
assert.Equal(
t, pkt4, fd.buffer[(14+int(seqStart))%0x10000],

Check failure on line 446 in pkg/media/samplebuilder/samplebuilder_test.go

View workflow job for this annotation

GitHub Actions / lint / Go

invalid operation: cannot index fd.buffer (variable of type *jitterbuffer.JitterBuffer)

Check failure on line 446 in pkg/media/samplebuilder/samplebuilder_test.go

View workflow job for this annotation

GitHub Actions / test (1.23) / Go 1.23

invalid operation: cannot index fd.buffer (variable of type *jitterbuffer.JitterBuffer)

Check failure on line 446 in pkg/media/samplebuilder/samplebuilder_test.go

View workflow job for this annotation

GitHub Actions / test (1.24) / Go 1.24

invalid operation: cannot index fd.buffer (variable of type *jitterbuffer.JitterBuffer)
"New packet must be referenced after jump",
)
assert.Equal(
t, pkt5, fd.buffer[(12+int(seqStart))%0x10000],

Check failure on line 450 in pkg/media/samplebuilder/samplebuilder_test.go

View workflow job for this annotation

GitHub Actions / lint / Go

invalid operation: cannot index fd.buffer (variable of type *jitterbuffer.JitterBuffer) (typecheck)

Check failure on line 450 in pkg/media/samplebuilder/samplebuilder_test.go

View workflow job for this annotation

GitHub Actions / test (1.23) / Go 1.23

invalid operation: cannot index fd.buffer (variable of type *jitterbuffer.JitterBuffer)

Check failure on line 450 in pkg/media/samplebuilder/samplebuilder_test.go

View workflow job for this annotation

GitHub Actions / test (1.24) / Go 1.24

invalid operation: cannot index fd.buffer (variable of type *jitterbuffer.JitterBuffer)
"New packet must be referenced after jump",
)
})
Expand Down
Loading