Skip to content

Commit

Permalink
client: fix computing absolute timestamp with multiple renditions (#192)
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 authored Oct 7, 2024
1 parent ed88408 commit 2e0d961
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 66 deletions.
36 changes: 15 additions & 21 deletions client_stream_processor_fmp4.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"context"
"fmt"
"time"

"github.com/bluenviron/mediacommon/pkg/formats/fmp4"

Expand Down Expand Up @@ -121,30 +120,24 @@ func (p *clientStreamProcessorFMP4) processSegment(ctx context.Context, seg *seg
return err
}

ntpAvailable := false
var ntpAbsolute time.Time
var ntpRelative int64
var leadingClockRate int

partTrack := findFirstPartTrackOfLeadingTrack(parts, p.leadingTrackID)
if partTrack == nil {
leadingPartTrack := findFirstPartTrackOfLeadingTrack(parts, p.leadingTrackID)
if leadingPartTrack == nil {
return fmt.Errorf("could not find data of leading track")
}

if p.trackProcessors == nil {
err := p.initializeTrackProcessors(ctx, partTrack)
err := p.initializeTrackProcessors(ctx, leadingPartTrack)
if err != nil {
return err
}
}

leadingTrackProc := p.trackProcessors[partTrack.ID]
leadingClockRate = leadingTrackProc.track.track.ClockRate

if seg.dateTime != nil {
ntpAvailable = true
ntpAbsolute = *seg.dateTime
ntpRelative = p.timeConv.convert(int64(partTrack.BaseTime), leadingClockRate)
if p.isLeading {
if seg.dateTime != nil {
leadingPartTrackProc := p.trackProcessors[leadingPartTrack.ID]
dts := p.timeConv.convert(int64(leadingPartTrack.BaseTime), leadingPartTrackProc.track.track.ClockRate)
p.timeConv.setNTP(*seg.dateTime, dts, leadingPartTrackProc.track.track.ClockRate)
}
}

partTrackCount := 0
Expand All @@ -156,11 +149,13 @@ func (p *clientStreamProcessorFMP4) processSegment(ctx context.Context, seg *seg
continue
}

dts := p.timeConv.convert(int64(partTrack.BaseTime), trackProc.track.track.ClockRate)
ntp := p.timeConv.getNTP(dts, trackProc.track.track.ClockRate)

err := trackProc.push(ctx, &procEntryFMP4{
ntpAvailable: ntpAvailable,
ntpAbsolute: ntpAbsolute,
ntpRelative: multiplyAndDivide(ntpRelative, int64(trackProc.track.track.ClockRate), int64(leadingClockRate)),
partTrack: partTrack,
partTrack: partTrack,
dts: dts,
ntp: ntp,
})
if err != nil {
return err
Expand Down Expand Up @@ -223,7 +218,6 @@ func (p *clientStreamProcessorFMP4) initializeTrackProcessors(
for i, track := range p.clientStreamTracks {
trackProc := &clientTrackProcessorFMP4{
track: track,
timeConv: p.timeConv,
onPartTrackProcessed: p.onPartTrackProcessed,
}
err := trackProc.initialize()
Expand Down
20 changes: 3 additions & 17 deletions client_stream_processor_mpegts.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"errors"
"fmt"
"io"
"time"

"github.com/asticode/go-astits"

Expand Down Expand Up @@ -180,10 +179,6 @@ func (p *clientStreamProcessorMPEGTS) initializeReader(ctx context.Context, firs
return fmt.Errorf("terminated")
}

ntpAvailable := false
var ntpAbsolute time.Time
var ntpRelative int64

for i, mpegtsTrack := range p.reader.Tracks() {
track := p.clientStreamTracks[i]
isLeadingTrack := (i == leadingTrackID)
Expand Down Expand Up @@ -213,24 +208,15 @@ func (p *clientStreamProcessorMPEGTS) initializeReader(ctx context.Context, firs
pts := p.timeConv.convert(rawPTS)
dts := p.timeConv.convert(rawDTS)

if isLeadingTrack && !p.dateTimeProcessed {
if !p.dateTimeProcessed && p.isLeading && isLeadingTrack {
p.dateTimeProcessed = true

if p.curSegment.dateTime != nil {
ntpAvailable = true
ntpAbsolute = *p.curSegment.dateTime
ntpRelative = dts
} else {
ntpAvailable = false
p.timeConv.setNTP(*p.curSegment.dateTime, dts)
}
}

ntp := time.Time{}
if ntpAvailable {
diff := dts - ntpRelative
diffDur := timestampToDuration(diff, 90000)
ntp = ntpAbsolute.Add(diffDur)
}
ntp := p.timeConv.getNTP(dts)

return trackProc.push(ctx, &procEntryMPEGTS{
pts: pts,
Expand Down
13 changes: 7 additions & 6 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,8 +461,9 @@ func TestClient(t *testing.T) {
require.Equal(t, int64(6000), dts)
require.Equal(t, int64(6000), pts)
require.Equal(t, [][]byte{{4}}, au)
_, ok := c.AbsoluteTime(tracks[0])
require.Equal(t, false, ok)
ntp, ok := c.AbsoluteTime(tracks[0])
require.Equal(t, true, ok)
require.Equal(t, time.Date(2015, time.February, 5, 1, 2, 2, 66666666, time.UTC), ntp)
close(videoRecv)
}
videoCount++
Expand Down Expand Up @@ -539,7 +540,7 @@ func TestClientFMP4MultiRenditions(t *testing.T) {
"#EXT-X-INDEPENDENT-SEGMENTS\n" +
"#EXT-X-TARGETDURATION:2\n" +
"#EXT-X-MAP:URI=\"init_audio.mp4\"\n" +
"#EXT-X-PROGRAM-DATE-TIME:2015-02-05T01:02:02Z\n" +
"#EXT-X-PROGRAM-DATE-TIME:2014-02-05T01:02:02Z\n" +
"#EXTINF:2,\n" +
"segment_audio.mp4\n" +
"#EXT-X-ENDLIST"))
Expand Down Expand Up @@ -601,7 +602,7 @@ func TestClientFMP4MultiRenditions(t *testing.T) {
Tracks: []*fmp4.PartTrack{
{
ID: 1,
BaseTime: 3000,
BaseTime: 44100 / 2, // +0.5 sec
Samples: []*fmp4.PartSample{{
Duration: 44100,
Payload: []byte{1, 2, 3, 4},
Expand Down Expand Up @@ -664,13 +665,13 @@ func TestClientFMP4MultiRenditions(t *testing.T) {
})

c.OnDataMPEG4Audio(tracks[1], func(pts int64, aus [][]byte) {
require.Equal(t, int64(3000), pts)
require.Equal(t, int64(22050), pts)
require.Equal(t, [][]byte{
{1, 2, 3, 4},
}, aus)
ntp, ok := c.AbsoluteTime(tracks[1])
require.Equal(t, true, ok)
require.Equal(t, time.Date(2015, time.February, 5, 1, 2, 2, 34693877, time.UTC), ntp)
require.Equal(t, time.Date(2015, time.February, 5, 1, 2, 2, 500000000, time.UTC), ntp)
packetRecv <- struct{}{}
})

Expand Down
37 changes: 37 additions & 0 deletions client_time_conv_fmp4.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
package gohlslib

import (
"sync"
"time"
)

type clientTimeConvFMP4 struct {
leadingTimeScale int64
leadingBaseTime int64

mutex sync.Mutex
ntpAvailable bool
ntpValue time.Time
ntpTimestamp int64
ntpClockRate int
}

func (ts *clientTimeConvFMP4) initialize() {
Expand All @@ -11,3 +22,29 @@ func (ts *clientTimeConvFMP4) initialize() {
func (ts *clientTimeConvFMP4) convert(v int64, clockRate int) int64 {
return v - multiplyAndDivide(ts.leadingBaseTime, int64(clockRate), ts.leadingTimeScale)
}

func (ts *clientTimeConvFMP4) setNTP(value time.Time, timestamp int64, clockRate int) {
ts.mutex.Lock()
defer ts.mutex.Unlock()

ts.ntpAvailable = true
ts.ntpValue = value
ts.ntpTimestamp = timestamp
ts.ntpClockRate = clockRate
}

func (ts *clientTimeConvFMP4) getNTP(timestamp int64, clockRate int) *time.Time {
ts.mutex.Lock()
defer ts.mutex.Unlock()

if !ts.ntpAvailable {
return nil
}

v := ts.ntpValue.Add(
timestampToDuration(
timestamp-multiplyAndDivide(ts.ntpTimestamp, int64(clockRate), int64(ts.ntpClockRate)),
clockRate))

return &v
}
30 changes: 28 additions & 2 deletions client_time_conv_mpegts.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,19 @@ package gohlslib

import (
"sync"
"time"

"github.com/bluenviron/mediacommon/pkg/formats/mpegts"
)

type clientTimeConvMPEGTS struct {
startDTS int64

td *mpegts.TimeDecoder2
mutex sync.Mutex
mutex sync.Mutex
td *mpegts.TimeDecoder2
ntpAvailable bool
ntpValue time.Time
ntpTimestamp int64
}

func (ts *clientTimeConvMPEGTS) initialize() {
Expand All @@ -21,5 +25,27 @@ func (ts *clientTimeConvMPEGTS) initialize() {
func (ts *clientTimeConvMPEGTS) convert(v int64) int64 {
ts.mutex.Lock()
defer ts.mutex.Unlock()

return ts.td.Decode(v)
}

func (ts *clientTimeConvMPEGTS) setNTP(value time.Time, timestamp int64) {
ts.mutex.Lock()
defer ts.mutex.Unlock()

ts.ntpAvailable = true
ts.ntpValue = value
ts.ntpTimestamp = timestamp
}

func (ts *clientTimeConvMPEGTS) getNTP(timestamp int64) *time.Time {
ts.mutex.Lock()
defer ts.mutex.Unlock()

if !ts.ntpAvailable {
return nil
}

v := ts.ntpValue.Add(timestampToDuration(timestamp-ts.ntpTimestamp, 90000))
return &v
}
8 changes: 4 additions & 4 deletions client_track.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,22 @@ import (
type clientTrack struct {
track *Track
onData clientOnDataFunc
lastAbsoluteTime time.Time
lastAbsoluteTime *time.Time
startRTC time.Time
}

func (t *clientTrack) absoluteTime() (time.Time, bool) {
if t.lastAbsoluteTime == zero {
if t.lastAbsoluteTime == nil {
return zero, false
}
return t.lastAbsoluteTime, true
return *t.lastAbsoluteTime, true
}

func (t *clientTrack) handleData(
ctx context.Context,
pts int64,
dts int64,
ntp time.Time,
ntp *time.Time,
data [][]byte,
) error {
// silently discard packets prior to the first packet of the leading track
Expand Down
26 changes: 11 additions & 15 deletions client_track_processor_fmp4.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,13 @@ import (
)

type procEntryFMP4 struct {
ntpAvailable bool
ntpAbsolute time.Time
ntpRelative int64
partTrack *fmp4.PartTrack
partTrack *fmp4.PartTrack
dts int64
ntp *time.Time
}

type clientTrackProcessorFMP4 struct {
track *clientTrack
timeConv *clientTimeConvFMP4
onPartTrackProcessed func(ctx context.Context)

decodePayload func(sample *fmp4.PartSample) ([][]byte, error)
Expand Down Expand Up @@ -76,30 +74,28 @@ func (t *clientTrackProcessorFMP4) run(ctx context.Context) error {
}

func (t *clientTrackProcessorFMP4) process(ctx context.Context, entry *procEntryFMP4) error {
rawDTS := int64(entry.partTrack.BaseTime)
dts := entry.dts

for _, sample := range entry.partTrack.Samples {
data, err := t.decodePayload(sample)
if err != nil {
return err
}

dts := t.timeConv.convert(rawDTS, t.track.track.ClockRate)
pts := dts + int64(sample.PTSOffset)
rawDTS += int64(sample.Duration)

ntp := time.Time{}
if entry.ntpAvailable {
trackNTPRelative := multiplyAndDivide(entry.ntpRelative, int64(t.track.track.ClockRate), t.timeConv.leadingTimeScale)
diff := dts - trackNTPRelative
diffDur := timestampToDuration(diff, t.track.track.ClockRate)
ntp = entry.ntpAbsolute.Add(diffDur)

var ntp *time.Time
if entry.ntp != nil {
v := entry.ntp.Add(timestampToDuration(dts-entry.dts, t.track.track.ClockRate))
ntp = &v
}

err = t.track.handleData(ctx, pts, dts, ntp, data)
if err != nil {
return err
}

dts += int64(sample.Duration)
}

t.onPartTrackProcessed(ctx)
Expand Down
2 changes: 1 addition & 1 deletion client_track_processor_mpegts.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
type procEntryMPEGTS struct {
pts int64
dts int64
ntp time.Time
ntp *time.Time
data [][]byte
}

Expand Down

0 comments on commit 2e0d961

Please sign in to comment.