Skip to content

Commit

Permalink
Start of mux method
Browse files Browse the repository at this point in the history
  • Loading branch information
djthorpe committed Jun 24, 2024
1 parent dc97548 commit 5db5f2b
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 12 deletions.
5 changes: 5 additions & 0 deletions frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ func (frame *frame) Type() MediaType {
return NONE
}

// Id is unused
func (frame *frame) Id() int {
return 0
}

// Return the timestamp as a duration, or minus one if not set
func (frame *frame) Time() time.Duration {
pts := frame.ctx.Pts()
Expand Down
13 changes: 13 additions & 0 deletions interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,14 @@ type Media interface {
// Return a decoding context for the media stream, and
// map the streams to decoders. If no function is provided
// (ie, the argument is nil) then all streams are demultiplexed.
// Will return an error if called on a writer.
Decoder(DecoderMapFunc) (Decoder, error)

// Multiplex media into packets. Pass a packet to a muxer function.
// Stop when the context is cancelled or the end of the media stream is
// signalled. Will return an error if called on a reader.
Mux(context.Context, MuxFunc) error

// Return INPUT for a demuxer or source, OUTPUT for a muxer or
// sink, DEVICE for a device, FILE for a file or stream.
Type() MediaType
Expand Down Expand Up @@ -191,6 +197,9 @@ type Parameters interface {
// Return the media type (AUDIO, VIDEO, SUBTITLE, DATA)
Type() MediaType

// Return the stream id for encoding, or zero if not set
Id() int

// Return number of planes for a specific PixelFormat
// or SampleFormat and ChannelLayout combination
NumPlanes() int
Expand Down Expand Up @@ -224,6 +233,10 @@ type VideoParameters interface {
// io.EOF if you want to stop processing the packets early.
type DecoderFunc func(Packet) error

// MuxFunc is a function that multiplexes a packet. Return
// io.EOF to stop multiplexing normally.
type MuxFunc func(Packet) error

// FrameFunc is a function that processes a frame of audio
// or video data. Return io.EOF if you want to stop
// processing the frames early.
Expand Down
8 changes: 8 additions & 0 deletions parameters.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ type par struct {
type codecpar struct {
Codec ff.AVCodecID `json:"codec"`

// Stream Id
StreamId int `json:"stream_id"`

// For video (in fps)
Framerate float64 `json:"framerate"`
}
Expand Down Expand Up @@ -168,6 +171,11 @@ func (par *par) Type() MediaType {
return par.t
}

// Return stream id
func (par *par) Id() int {
return par.codecpar.StreamId
}

// Return number of planes for a specific PixelFormat
// or SampleFormat and ChannelLayout combination
func (par *par) NumPlanes() int {
Expand Down
8 changes: 8 additions & 0 deletions reader.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package media

import (
"context"
"encoding/json"
"errors"
"io"
Expand All @@ -9,6 +10,9 @@ import (

// Packages
ff "github.com/mutablelogic/go-media/sys/ffmpeg61"

// Namespace imports
. "github.com/djthorpe/go-errors"
)

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -229,6 +233,10 @@ func (r *reader) Metadata(keys ...string) []Metadata {
return result
}

func (r *reader) Mux(context.Context, MuxFunc) error {
return ErrOutOfOrder.With("not an output stream")
}

////////////////////////////////////////////////////////////////////////////////
// PRIVATE METHODS

Expand Down
66 changes: 56 additions & 10 deletions writer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package media

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -67,11 +68,18 @@ func createMedia(url string, format Format, metadata []Metadata, params ...Param
// Add encoders and streams
var result error
for i, param := range params {
encoder, err := newEncoder(ctx, i, param)
// Stream Id from codec parameters, or use the index
stream_id := param.Id()
if stream_id <= 0 {
stream_id = i + 1
}
encoder, err := newEncoder(ctx, stream_id, param)
if err != nil {
result = errors.Join(result, err)
} else if _, exists := writer.encoder[stream_id]; exists {

} else {
writer.encoder[i] = encoder
writer.encoder[stream_id] = encoder
}
}

Expand Down Expand Up @@ -143,16 +151,15 @@ func (w *writer) Close() error {
result = errors.Join(result, encoder.Close())
}

// Free resources
if w.metadata != nil {
ff.AVUtil_dict_free(w.metadata)
}
// Free output resources
if w.output != nil {
// This calls avio_close(w.avio)
result = errors.Join(result, ff.AVFormat_close_writer(w.output))
}
if w.avio != nil {
fmt.Println("TODO AVIO")
// result = errors.Join(result, ff.AVFormat_avio_close(w.avio))

// Free resources
if w.metadata != nil {
ff.AVUtil_dict_free(w.metadata)
}

// Release resources
Expand Down Expand Up @@ -183,7 +190,46 @@ func (w *writer) String() string {
// PUBLIC METHODS

func (w *writer) Decoder(DecoderMapFunc) (Decoder, error) {
return nil, ErrNotImplemented
return nil, ErrOutOfOrder.With("not an input stream")
}

func (w *writer) Mux(context.Context, MuxFunc) error {
return ErrNotImplemented

/*
while (1) {
AVStream *in_stream, *out_stream;
ret = av_read_frame(ifmt_ctx, pkt);
if (ret < 0)
break;
in_stream = ifmt_ctx->streams[pkt->stream_index];
if (pkt->stream_index >= stream_mapping_size ||
stream_mapping[pkt->stream_index] < 0) {
av_packet_unref(pkt);
continue;
}
pkt->stream_index = stream_mapping[pkt->stream_index];
out_stream = ofmt_ctx->streams[pkt->stream_index];
log_packet(ifmt_ctx, pkt, "in");
// copy packet
av_packet_rescale_ts(pkt, in_stream->time_base, out_stream->time_base);
pkt->pos = -1;
log_packet(ofmt_ctx, pkt, "out");
ret = av_interleaved_write_frame(ofmt_ctx, pkt);
// pkt is now blank (av_interleaved_write_frame() takes ownership of
// its contents and resets pkt), so that no unreferencing is necessary.
// This would be different if one used av_write_frame().
if (ret < 0) {
fprintf(stderr, "Error muxing packet\n");
break;
}
}
*/
}

// Return OUTPUT and combination of DEVICE and STREAM
Expand Down
4 changes: 2 additions & 2 deletions writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func Test_writer_001(t *testing.T) {
}

// Write audio file
filename := filepath.Join(t.TempDir(), t.Name()+".sw")
filename := filepath.Join(t.TempDir(), t.Name()+".mp3")
stream, err := manager.AudioParameters("mono", "s16", 22050)
if !assert.NoError(err) {
t.SkipNow()
Expand All @@ -33,5 +33,5 @@ func Test_writer_001(t *testing.T) {
t.SkipNow()
}
defer writer.Close()
t.Log(writer)
t.Log(writer, "=>", filename)
}

0 comments on commit 5db5f2b

Please sign in to comment.