From 6268d8359cc7a38edbf0e2223e740c752b7a7589 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Sun, 29 Sep 2024 22:52:20 +0200 Subject: [PATCH] muxer: prevent creating empty parts before switching segments --- muxer.go | 62 ++---------- muxer_part.go | 15 ++- muxer_segment_fmp4.go | 46 +++------ muxer_segmenter.go | 228 +++++++++++++++++++++--------------------- muxer_stream.go | 31 ++---- 5 files changed, 155 insertions(+), 227 deletions(-) diff --git a/muxer.go b/muxer.go index 6775fa4..7a75d6f 100644 --- a/muxer.go +++ b/muxer.go @@ -72,42 +72,6 @@ func fmp4TimeScale(c codecs.Codec) uint32 { return 90000 } -func partDurationIsCompatible(partDuration time.Duration, sampleDuration time.Duration) bool { - if sampleDuration > partDuration { - return false - } - - f := (partDuration / sampleDuration) - if (partDuration % sampleDuration) != 0 { - f++ - } - f *= sampleDuration - - return partDuration > ((f * 85) / 100) -} - -func partDurationIsCompatibleWithAll(partDuration time.Duration, sampleDurations map[time.Duration]struct{}) bool { - for sd := range sampleDurations { - if !partDurationIsCompatible(partDuration, sd) { - return false - } - } - return true -} - -func findCompatiblePartDuration( - minPartDuration time.Duration, - sampleDurations map[time.Duration]struct{}, -) time.Duration { - i := minPartDuration - for ; i < 5*time.Second; i += 5 * time.Millisecond { - if partDurationIsCompatibleWithAll(i, sampleDurations) { - break - } - } - return i -} - type fmp4AugmentedSample struct { fmp4.PartSample dts time.Duration @@ -168,7 +132,6 @@ type Muxer struct { nextSegmentID uint64 nextPartID uint64 // low-latency only segmentDeleteCount int - nextPartHasSamples bool } // Start initializes the muxer. @@ -406,19 +369,12 @@ func (m *Muxer) rotateParts(nextDTS time.Duration) error { } func (m *Muxer) rotatePartsInner(nextDTS time.Duration, createNew bool) error { - if !m.nextPartHasSamples { - for _, stream := range m.streams { - stream.nextPart = nil - } - } else { - m.nextPartID++ - m.nextPartHasSamples = false + m.nextPartID++ - for _, stream := range m.streams { - err := stream.rotateParts(nextDTS, createNew) - if err != nil { - return err - } + for _, stream := range m.streams { + err := stream.rotateParts(nextDTS, createNew) + if err != nil { + return err } } @@ -448,9 +404,11 @@ func (m *Muxer) rotateSegmentsInner( nextNTP time.Time, force bool, ) error { - err := m.rotatePartsInner(nextDTS, false) - if err != nil { - return err + if m.Variant != MuxerVariantMPEGTS { + err := m.rotatePartsInner(nextDTS, false) + if err != nil { + return err + } } m.nextSegmentID++ diff --git a/muxer_part.go b/muxer_part.go index b983091..132a1ef 100644 --- a/muxer_part.go +++ b/muxer_part.go @@ -9,13 +9,12 @@ import ( ) type muxerPart struct { - stream *muxerStream - segment *muxerSegmentFMP4 - startDTS time.Duration - prefix string - id uint64 - storage storage.Part - setNextPartHasSamples func() + stream *muxerStream + segment *muxerSegmentFMP4 + startDTS time.Duration + prefix string + id uint64 + storage storage.Part path string isIndependent bool @@ -73,6 +72,4 @@ func (p *muxerPart) writeSample(track *muxerTrack, sample *fmp4AugmentedSample) } track.fmp4Samples = append(track.fmp4Samples, &sample.PartSample) - - p.setNextPartHasSamples() } diff --git a/muxer_segment_fmp4.go b/muxer_segment_fmp4.go index 837081f..cde5ca9 100644 --- a/muxer_segment_fmp4.go +++ b/muxer_segment_fmp4.go @@ -9,18 +9,16 @@ import ( ) type muxerSegmentFMP4 struct { - variant MuxerVariant - segmentMaxSize uint64 - prefix string - nextPartID uint64 - storageFactory storage.Factory - rotateParts func(time.Duration) error - setNextPartHasSamples func() - stream *muxerStream - id uint64 - startNTP time.Time - startDTS time.Duration - fromForcedRotation bool + variant MuxerVariant + segmentMaxSize uint64 + prefix string + nextPartID uint64 + storageFactory storage.Factory + stream *muxerStream + id uint64 + startNTP time.Time + startDTS time.Duration + fromForcedRotation bool path string storage storage.File @@ -39,13 +37,12 @@ func (s *muxerSegmentFMP4) initialize() error { } s.stream.nextPart = &muxerPart{ - stream: s.stream, - segment: s, - startDTS: s.startDTS, - prefix: s.prefix, - id: s.nextPartID, - storage: s.storage.NewPart(), - setNextPartHasSamples: s.setNextPartHasSamples, + stream: s.stream, + segment: s, + startDTS: s.startDTS, + prefix: s.prefix, + id: s.nextPartID, + storage: s.storage.NewPart(), } s.stream.nextPart.initialize() @@ -87,8 +84,6 @@ func (s *muxerSegmentFMP4) finalize(nextDTS time.Duration) error { func (s *muxerSegmentFMP4) writeSample( track *muxerTrack, sample *fmp4AugmentedSample, - nextDTS time.Duration, - adjustedPartDuration time.Duration, ) error { size := uint64(len(sample.Payload)) if (s.size + size) > s.segmentMaxSize { @@ -98,14 +93,5 @@ func (s *muxerSegmentFMP4) writeSample( s.stream.nextPart.writeSample(track, sample) - // switch part - if (s.variant == MuxerVariantLowLatency) && track.isLeading && - s.stream.nextPart.computeDuration(nextDTS) >= adjustedPartDuration { - err := s.rotateParts(nextDTS) - if err != nil { - return err - } - } - return nil } diff --git a/muxer_segmenter.go b/muxer_segmenter.go index 8e339d3..fd55b70 100644 --- a/muxer_segmenter.go +++ b/muxer_segmenter.go @@ -15,10 +15,46 @@ import ( "github.com/bluenviron/mediacommon/pkg/formats/fmp4" ) +func partDurationIsCompatible(partDuration time.Duration, sampleDuration time.Duration) bool { + if sampleDuration > partDuration { + return false + } + + f := (partDuration / sampleDuration) + if (partDuration % sampleDuration) != 0 { + f++ + } + f *= sampleDuration + + return partDuration > ((f * 85) / 100) +} + +func partDurationIsCompatibleWithAll(partDuration time.Duration, sampleDurations map[time.Duration]struct{}) bool { + for sd := range sampleDurations { + if !partDurationIsCompatible(partDuration, sd) { + return false + } + } + return true +} + +func findCompatiblePartDuration( + minPartDuration time.Duration, + sampleDurations map[time.Duration]struct{}, +) time.Duration { + i := minPartDuration + for ; i < 5*time.Second; i += 5 * time.Millisecond { + if partDurationIsCompatibleWithAll(i, sampleDurations) { + break + } + } + return i +} + type muxerSegmenter struct { muxer *Muxer // TODO: remove - pendingForceRotation bool + pendingParamsChange bool fmp4SampleDurations map[time.Duration]struct{} // low-latency only fmp4AdjustedPartDuration time.Duration // low-latency only fmp4FreezeAdjustedPartDuration bool // low-latency only @@ -51,16 +87,16 @@ func (s *muxerSegmenter) writeAV1( randomAccess = true if !bytes.Equal(codec.SequenceHeader, obu) { - s.pendingForceRotation = true + s.pendingParamsChange = true codec.SequenceHeader = obu } } } - forceRotation := false - if randomAccess && s.pendingForceRotation { - s.pendingForceRotation = false - forceRotation = true + paramsChanged := false + if randomAccess && s.pendingParamsChange { + s.pendingParamsChange = false + paramsChanged = true } if s.muxer.Variant == MuxerVariantMPEGTS { @@ -73,10 +109,10 @@ func (s *muxerSegmenter) writeAV1( return err } - return s.fmp4WriteVideo( + return s.fmp4WriteSample( track, randomAccess, - forceRotation, + paramsChanged, &fmp4AugmentedSample{ PartSample: *ps, dts: pts, @@ -105,35 +141,35 @@ func (s *muxerSegmenter) writeVP9( randomAccess = true if v := h.Width(); v != codec.Width { - s.pendingForceRotation = true + s.pendingParamsChange = true codec.Width = v } if v := h.Height(); v != codec.Height { - s.pendingForceRotation = true + s.pendingParamsChange = true codec.Height = v } if h.Profile != codec.Profile { - s.pendingForceRotation = true + s.pendingParamsChange = true codec.Profile = h.Profile } if h.ColorConfig.BitDepth != codec.BitDepth { - s.pendingForceRotation = true + s.pendingParamsChange = true codec.BitDepth = h.ColorConfig.BitDepth } if v := h.ChromaSubsampling(); v != codec.ChromaSubsampling { - s.pendingForceRotation = true + s.pendingParamsChange = true codec.ChromaSubsampling = v } if h.ColorConfig.ColorRange != codec.ColorRange { - s.pendingForceRotation = true + s.pendingParamsChange = true codec.ColorRange = h.ColorConfig.ColorRange } } - forceRotation := false - if randomAccess && s.pendingForceRotation { - s.pendingForceRotation = false - forceRotation = true + paramsChanged := false + if randomAccess && s.pendingParamsChange { + s.pendingParamsChange = false + paramsChanged = true } // skip samples silently until we find a random access one @@ -147,10 +183,10 @@ func (s *muxerSegmenter) writeVP9( if s.muxer.Variant == MuxerVariantMPEGTS { return fmt.Errorf("unimplemented") } else { - return s.fmp4WriteVideo( + return s.fmp4WriteSample( track, randomAccess, - forceRotation, + paramsChanged, &fmp4AugmentedSample{ PartSample: fmp4.PartSample{ IsNonSyncSample: !randomAccess, @@ -181,28 +217,28 @@ func (s *muxerSegmenter) writeH265( case h265.NALUType_VPS_NUT: if !bytes.Equal(codec.VPS, nalu) { - s.pendingForceRotation = true + s.pendingParamsChange = true codec.VPS = nalu } case h265.NALUType_SPS_NUT: if !bytes.Equal(codec.SPS, nalu) { - s.pendingForceRotation = true + s.pendingParamsChange = true codec.SPS = nalu } case h265.NALUType_PPS_NUT: if !bytes.Equal(codec.PPS, nalu) { - s.pendingForceRotation = true + s.pendingParamsChange = true codec.PPS = nalu } } } - forceRotation := false - if randomAccess && s.pendingForceRotation { - s.pendingForceRotation = false - forceRotation = true + paramsChanged := false + if randomAccess && s.pendingParamsChange { + s.pendingParamsChange = false + paramsChanged = true } // skip samples silently until we find a random access one @@ -231,10 +267,10 @@ func (s *muxerSegmenter) writeH265( return err } - return s.fmp4WriteVideo( + return s.fmp4WriteSample( track, randomAccess, - forceRotation, + paramsChanged, &fmp4AugmentedSample{ PartSample: *ps, dts: dts, @@ -267,13 +303,13 @@ func (s *muxerSegmenter) writeH264( case h264.NALUTypeSPS: if !bytes.Equal(codec.SPS, nalu) { - s.pendingForceRotation = true + s.pendingParamsChange = true codec.SPS = nalu } case h264.NALUTypePPS: if !bytes.Equal(codec.PPS, nalu) { - s.pendingForceRotation = true + s.pendingParamsChange = true codec.PPS = nalu } } @@ -283,10 +319,10 @@ func (s *muxerSegmenter) writeH264( return nil } - forceRotation := false - if randomAccess && s.pendingForceRotation { - s.pendingForceRotation = false - forceRotation = true + paramsChanged := false + if randomAccess && s.pendingParamsChange { + s.pendingParamsChange = false + paramsChanged = true } // skip samples silently until we find a random access one @@ -314,7 +350,7 @@ func (s *muxerSegmenter) writeH264( // switch segment if randomAccess && ((dts-track.stream.nextSegment.(*muxerSegmentMPEGTS).startDTS) >= s.muxer.SegmentMinDuration || - forceRotation) { + paramsChanged) { err := s.muxer.rotateSegments(dts, ntp, false) if err != nil { return err @@ -343,10 +379,10 @@ func (s *muxerSegmenter) writeH264( return err } - return s.fmp4WriteVideo( + return s.fmp4WriteSample( track, randomAccess, - forceRotation, + paramsChanged, &fmp4AugmentedSample{ PartSample: *ps, dts: dts, @@ -366,8 +402,10 @@ func (s *muxerSegmenter) writeOpus( return fmt.Errorf("unimplemented") } else { for _, packet := range packets { - err := s.fmp4WriteAudio( + err := s.fmp4WriteSample( track, + true, + false, &fmp4AugmentedSample{ PartSample: fmp4.PartSample{ Payload: packet, @@ -428,8 +466,10 @@ func (s *muxerSegmenter) writeMPEG4Audio(ntp time.Time, pts time.Duration, aus [ auPTS := pts + time.Duration(i)*mpeg4audio.SamplesPerAccessUnit* time.Second/sampleRate - err := s.fmp4WriteAudio( + err := s.fmp4WriteSample( track, + true, + false, &fmp4AugmentedSample{ PartSample: fmp4.PartSample{ Payload: au, @@ -468,71 +508,10 @@ func (s *muxerSegmenter) fmp4AdjustPartDuration(sampleDuration time.Duration) { } } -func (s *muxerSegmenter) fmp4WriteVideo( +func (s *muxerSegmenter) fmp4WriteSample( track *muxerTrack, randomAccess bool, - forceRotation bool, - sample *fmp4AugmentedSample, -) error { - // add a starting DTS to avoid a negative BaseTime - sample.dts += fmp4StartDTS - - // BaseTime is still negative, this is not supported by fMP4. Reject the sample silently. - if sample.dts < 0 { - return nil - } - - // put samples into a queue in order to - // - compute sample duration - // - check if next sample is IDR - sample, track.fmp4NextSample = track.fmp4NextSample, sample - if sample == nil { - return nil - } - duration := track.fmp4NextSample.dts - sample.dts - sample.Duration = uint32(durationGoToMp4(duration, 90000)) - - if track.stream.nextSegment == nil { - err := s.muxer.createFirstSegment(sample.dts, sample.ntp) - if err != nil { - return err - } - } - - s.fmp4AdjustPartDuration(duration) - - err := track.stream.nextSegment.(*muxerSegmentFMP4).writeSample( - track, - sample, - track.fmp4NextSample.dts, - s.fmp4AdjustedPartDuration) - if err != nil { - return err - } - - // switch segment - if randomAccess && - ((track.fmp4NextSample.dts-track.stream.nextSegment.(*muxerSegmentFMP4).startDTS) >= s.muxer.SegmentMinDuration || - forceRotation) { - err = s.muxer.rotateSegments(track.fmp4NextSample.dts, track.fmp4NextSample.ntp, forceRotation) - if err != nil { - return err - } - - if forceRotation { - // reset adjusted part duration - s.fmp4FreezeAdjustedPartDuration = false - s.fmp4SampleDurations = make(map[time.Duration]struct{}) - } else { - s.fmp4FreezeAdjustedPartDuration = true - } - } - - return nil -} - -func (s *muxerSegmenter) fmp4WriteAudio( - track *muxerTrack, + paramsChanged bool, sample *fmp4AugmentedSample, ) error { // add a starting DTS to avoid a negative BaseTime @@ -551,7 +530,7 @@ func (s *muxerSegmenter) fmp4WriteAudio( duration := track.fmp4NextSample.dts - sample.dts sample.Duration = uint32(durationGoToMp4(duration, track.fmp4TimeScale)) - if s.muxer.VideoTrack == nil { + if track.isLeading { // create first segment if track.stream.nextSegment == nil { err := s.muxer.createFirstSegment(sample.dts, sample.ntp) @@ -560,30 +539,49 @@ func (s *muxerSegmenter) fmp4WriteAudio( } } } else { - // wait for the video track + // wait for the leading track if track.stream.nextSegment == nil { return nil } } + if track.isLeading { + s.fmp4AdjustPartDuration(duration) + } + err := track.stream.nextSegment.(*muxerSegmentFMP4).writeSample( track, sample, - track.fmp4NextSample.dts, - s.muxer.PartMinDuration) + ) if err != nil { return err } - // switch segment - if s.muxer.VideoTrack == nil && - (track.fmp4NextSample.dts-track.stream.nextSegment.(*muxerSegmentFMP4).startDTS) >= s.muxer.SegmentMinDuration { - err = s.muxer.rotateSegments(track.fmp4NextSample.dts, track.fmp4NextSample.ntp, false) - if err != nil { - return err - } + if track.isLeading { + // switch segment + if randomAccess && (paramsChanged || + (track.fmp4NextSample.dts-track.stream.nextSegment.(*muxerSegmentFMP4).startDTS) >= s.muxer.SegmentMinDuration) { + err = s.muxer.rotateSegments(track.fmp4NextSample.dts, track.fmp4NextSample.ntp, paramsChanged) + if err != nil { + return err + } + + // reset or freeze adjusted part duration + if paramsChanged { + s.fmp4FreezeAdjustedPartDuration = false + s.fmp4SampleDurations = make(map[time.Duration]struct{}) + } else { + s.fmp4FreezeAdjustedPartDuration = true + } - s.fmp4FreezeAdjustedPartDuration = true + // switch part + } else if (s.muxer.Variant == MuxerVariantLowLatency) && + track.stream.nextPart.computeDuration(track.fmp4NextSample.dts) >= s.fmp4AdjustedPartDuration { + err := s.muxer.rotateParts(track.fmp4NextSample.dts) + if err != nil { + return err + } + } } return nil diff --git a/muxer_stream.go b/muxer_stream.go index c9f2472..aed4b16 100644 --- a/muxer_stream.go +++ b/muxer_stream.go @@ -524,15 +524,11 @@ func (s *muxerStream) createFirstSegment( s.nextSegment = seg } else { seg := &muxerSegmentFMP4{ - variant: s.muxer.Variant, - segmentMaxSize: s.muxer.SegmentMaxSize, - prefix: s.muxer.prefix, - nextPartID: s.muxer.nextPartID, - storageFactory: s.muxer.storageFactory, - rotateParts: s.muxer.rotateParts, - setNextPartHasSamples: func() { - s.muxer.nextPartHasSamples = true - }, + variant: s.muxer.Variant, + segmentMaxSize: s.muxer.SegmentMaxSize, + prefix: s.muxer.prefix, + nextPartID: s.muxer.nextPartID, + storageFactory: s.muxer.storageFactory, stream: s, id: s.muxer.nextSegmentID, startNTP: nextNTP, @@ -603,9 +599,6 @@ func (s *muxerStream) rotateParts(nextDTS time.Duration, createNew bool) error { prefix: s.muxer.prefix, id: s.muxer.nextPartID, storage: part.segment.storage.NewPart(), - setNextPartHasSamples: func() { - s.muxer.nextPartHasSamples = true - }, } nextPart.initialize() s.nextPart = nextPart @@ -703,15 +696,11 @@ func (s *muxerStream) rotateSegments( } } else { nextSegment = &muxerSegmentFMP4{ - variant: s.muxer.Variant, - segmentMaxSize: s.muxer.SegmentMaxSize, - prefix: s.muxer.prefix, - nextPartID: s.muxer.nextPartID, - storageFactory: s.muxer.storageFactory, - rotateParts: s.muxer.rotateParts, - setNextPartHasSamples: func() { - s.muxer.nextPartHasSamples = true - }, + variant: s.muxer.Variant, + segmentMaxSize: s.muxer.SegmentMaxSize, + prefix: s.muxer.prefix, + nextPartID: s.muxer.nextPartID, + storageFactory: s.muxer.storageFactory, stream: s, id: s.muxer.nextSegmentID, startNTP: nextNTP,