Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support client play handler / etc #60

Draft
wants to merge 15 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ See also [server_demo](https://github.com/yutopp/go-rtmp/tree/master/example/ser
## Documentation

- [GoDoc](https://pkg.go.dev/github.com/yutopp/go-rtmp)
- [REAL-TIME MESSAGING PROTOCOL (RTMP) SPECIFICATION](https://www.adobe.com/devnet/rtmp.html)
- [Adobe RTMP Specification](https://rtmp.veriskope.com/docs/spec/)


## NOTES
Expand Down
69 changes: 50 additions & 19 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,45 +8,76 @@
package rtmp

import (
"context"
"crypto/tls"
"net"

"github.com/pkg/errors"
)

func Dial(protocol, addr string, config *ConnConfig) (*ClientConn, error) {
return DialWithDialer(&net.Dialer{}, protocol, addr, config)
type dialOptions struct {
dialFunc func(ctx context.Context, network, addr string) (net.Conn, error)
}

func TLSDial(protocol, addr string, config *ConnConfig, tlsConfig *tls.Config) (*ClientConn, error) {
return DialWithTLSDialer(&tls.Dialer{
NetDialer: &net.Dialer{},
Config: tlsConfig,
}, protocol, addr, config)
func WithContextDialer(dialFunc func(context.Context, string, string) (net.Conn, error)) DialOption {
return func(o *dialOptions) {
o.dialFunc = dialFunc

Check warning on line 24 in client.go

View check run for this annotation

Codecov / codecov/patch

client.go#L22-L24

Added lines #L22 - L24 were not covered by tests
}
}

func DialWithDialer(dialer *net.Dialer, protocol, addr string, config *ConnConfig) (*ClientConn, error) {
if protocol != "rtmp" {
return nil, errors.Errorf("Unknown protocol: %s", protocol)
type DialOption func(*dialOptions)

// DialContext dials a connection to the specified address.
// The protocol must be "rtmp" or "rtmps".
func DialContext(ctx context.Context, protocol, addr string, config *ConnConfig, opts ...DialOption) (*ClientConn, error) {
opt := &dialOptions{
// default dialer
dialFunc: func(ctx context.Context, network, addr string) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, network, addr)
},
}
for _, o := range opts {
o(opt)

Check warning on line 40 in client.go

View check run for this annotation

Codecov / codecov/patch

client.go#L40

Added line #L40 was not covered by tests
}

rwc, err := dialer.Dial("tcp", addr)
if protocol != "rtmp" && protocol != "rtmps" {
return nil, errors.Errorf("unknown protocol: %s", protocol)

Check warning on line 44 in client.go

View check run for this annotation

Codecov / codecov/patch

client.go#L44

Added line #L44 was not covered by tests
}

rwc, err := opt.dialFunc(ctx, "tcp", addr)
if err != nil {
return nil, err
}

return newClientConnWithSetup(rwc, config)
}

func DialWithTLSDialer(dialer *tls.Dialer, protocol, addr string, config *ConnConfig) (*ClientConn, error) {
if protocol != "rtmps" {
return nil, errors.Errorf("Unknown protocol: %s", protocol)
}
// Dial dials a connection to the specified address.
func Dial(protocol, addr string, config *ConnConfig, opts ...DialOption) (*ClientConn, error) {
return DialContext(context.Background(), protocol, addr, config, opts...)
}

rwc, err := dialer.Dial("tcp", addr)
if err != nil {
return nil, err
// DialWithDialer dials a connection to the specified address with the specified dialer.
func DialWithDialer(dialer *net.Dialer, protocol, addr string, config *ConnConfig) (*ClientConn, error) {
return Dial(protocol, addr, config, WithContextDialer(dialer.DialContext))

Check warning on line 62 in client.go

View check run for this annotation

Codecov / codecov/patch

client.go#L61-L62

Added lines #L61 - L62 were not covered by tests
}

// TLSDialContext dials a connection to the specified address with TLS.
func TLSDialContext(ctx context.Context, protocol, addr string, config *ConnConfig, tlsConfig *tls.Config, opts ...DialOption) (*ClientConn, error) {
dialer := &tls.Dialer{
NetDialer: &net.Dialer{},
Config: tlsConfig,

Check warning on line 69 in client.go

View check run for this annotation

Codecov / codecov/patch

client.go#L66-L69

Added lines #L66 - L69 were not covered by tests
}
opts = append([]DialOption{WithContextDialer(dialer.DialContext)}, opts...)
return DialContext(ctx, protocol, addr, config, opts...)

Check warning on line 72 in client.go

View check run for this annotation

Codecov / codecov/patch

client.go#L71-L72

Added lines #L71 - L72 were not covered by tests
}

return newClientConnWithSetup(rwc, config)
// TLSDial dials a connection to the specified address with TLS.
func TLSDial(protocol, addr string, config *ConnConfig, tlsConfig *tls.Config, opts ...DialOption) (*ClientConn, error) {
return TLSDialContext(context.Background(), protocol, addr, config, tlsConfig, opts...)

Check warning on line 77 in client.go

View check run for this annotation

Codecov / codecov/patch

client.go#L76-L77

Added lines #L76 - L77 were not covered by tests
}

// DialWithTLSDialer dials a connection to the specified address with the specified TLS dialer.
func DialWithTLSDialer(dialer *tls.Dialer, protocol, addr string, config *ConnConfig) (*ClientConn, error) {
return Dial(protocol, addr, config, WithContextDialer(dialer.DialContext))

Check warning on line 82 in client.go

View check run for this annotation

Codecov / codecov/patch

client.go#L81-L82

Added lines #L81 - L82 were not covered by tests
}
1 change: 1 addition & 0 deletions client_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func (cc *ClientConn) CreateStream(body *message.NetConnectionCreateStream, chun
if err != nil {
return nil, err
}
newStream.handler.ChangeState(streamStateClientConnected)

return newStream, nil
}
Expand Down
59 changes: 59 additions & 0 deletions client_control_connected_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
//
// Copyright (c) 2023- yutopp ([email protected])
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at https://www.boost.org/LICENSE_1_0.txt)
//

package rtmp

import (
"github.com/yutopp/go-rtmp/internal"
"github.com/yutopp/go-rtmp/message"
)

// clientControlConnectedHandler Handle control messages from a server in flow of connected.
//
// transitions:
// | _ -> self
type clientControlConnectedHandler struct {
sh *streamHandler
}

var _ stateHandler = (*clientControlConnectedHandler)(nil)

func (h *clientControlConnectedHandler) onMessage(
chunkStreamID int,
timestamp uint32,
msg message.Message,
) error {
return internal.ErrPassThroughMsg

Check warning on line 30 in client_control_connected_handler.go

View check run for this annotation

Codecov / codecov/patch

client_control_connected_handler.go#L29-L30

Added lines #L29 - L30 were not covered by tests
}

func (h *clientControlConnectedHandler) onData(
chunkStreamID int,
timestamp uint32,
dataMsg *message.DataMessage,
body interface{},
) error {
return internal.ErrPassThroughMsg

Check warning on line 39 in client_control_connected_handler.go

View check run for this annotation

Codecov / codecov/patch

client_control_connected_handler.go#L38-L39

Added lines #L38 - L39 were not covered by tests
}

func (h *clientControlConnectedHandler) onCommand(
chunkStreamID int,
timestamp uint32,
cmdMsg *message.CommandMessage,
body interface{},
) error {
switch cmd := body.(type) {
case *message.NetStreamOnStatus:
if cmd.InfoObject.Code == message.NetStreamOnStatusCodePlayStart {
h.sh.ChangeState(streamStateClientPlay)

Check warning on line 51 in client_control_connected_handler.go

View check run for this annotation

Codecov / codecov/patch

client_control_connected_handler.go#L47-L51

Added lines #L47 - L51 were not covered by tests
}

return internal.ErrPassThroughMsg

Check warning on line 54 in client_control_connected_handler.go

View check run for this annotation

Codecov / codecov/patch

client_control_connected_handler.go#L54

Added line #L54 was not covered by tests

default:
return internal.ErrPassThroughMsg

Check warning on line 57 in client_control_connected_handler.go

View check run for this annotation

Codecov / codecov/patch

client_control_connected_handler.go#L56-L57

Added lines #L56 - L57 were not covered by tests
}
}
4 changes: 2 additions & 2 deletions client_control_not_connected_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
"github.com/yutopp/go-rtmp/message"
)

var _ stateHandler = (*clientControlNotConnectedHandler)(nil)

// clientControlNotConnectedHandler Handle control messages from a server in flow of connecting.
//
// transitions:
Expand All @@ -23,6 +21,8 @@ type clientControlNotConnectedHandler struct {
sh *streamHandler
}

var _ stateHandler = (*clientControlNotConnectedHandler)(nil)

func (h *clientControlNotConnectedHandler) onMessage(
chunkStreamID int,
timestamp uint32,
Expand Down
64 changes: 64 additions & 0 deletions client_data_play_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
//
// Copyright (c) 2023- yutopp ([email protected])
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at https://www.boost.org/LICENSE_1_0.txt)
//

package rtmp

import (
"github.com/yutopp/go-rtmp/internal"
"github.com/yutopp/go-rtmp/message"
)

// clientDataPlayHandler Handle control messages from a server in flow of connected.
//
// transitions:
// | _ -> self
type clientDataPlayHandler struct {
sh *streamHandler
}

var _ stateHandler = (*clientDataPlayHandler)(nil)

func (h *clientDataPlayHandler) onMessage(
chunkStreamID int,
timestamp uint32,
msg message.Message,
) error {
switch msg := msg.(type) {
case *message.AudioMessage:
return h.sh.stream.userHandler().OnAudio(timestamp, msg.Payload)

Check warning on line 32 in client_data_play_handler.go

View check run for this annotation

Codecov / codecov/patch

client_data_play_handler.go#L29-L32

Added lines #L29 - L32 were not covered by tests

case *message.VideoMessage:
return h.sh.stream.userHandler().OnVideo(timestamp, msg.Payload)

Check warning on line 35 in client_data_play_handler.go

View check run for this annotation

Codecov / codecov/patch

client_data_play_handler.go#L34-L35

Added lines #L34 - L35 were not covered by tests

default:
return internal.ErrPassThroughMsg

Check warning on line 38 in client_data_play_handler.go

View check run for this annotation

Codecov / codecov/patch

client_data_play_handler.go#L37-L38

Added lines #L37 - L38 were not covered by tests
}
}

func (h *clientDataPlayHandler) onData(
chunkStreamID int,
timestamp uint32,
dataMsg *message.DataMessage,
body interface{},
) error {
switch data := body.(type) {
case *message.NetStreamSetDataFrame:
return h.sh.stream.userHandler().OnSetDataFrame(timestamp, data)

Check warning on line 50 in client_data_play_handler.go

View check run for this annotation

Codecov / codecov/patch

client_data_play_handler.go#L47-L50

Added lines #L47 - L50 were not covered by tests

default:
return internal.ErrPassThroughMsg

Check warning on line 53 in client_data_play_handler.go

View check run for this annotation

Codecov / codecov/patch

client_data_play_handler.go#L52-L53

Added lines #L52 - L53 were not covered by tests
}
}

func (h *clientDataPlayHandler) onCommand(
chunkStreamID int,
timestamp uint32,
cmdMsg *message.CommandMessage,
body interface{},
) error {
return internal.ErrPassThroughMsg

Check warning on line 63 in client_data_play_handler.go

View check run for this annotation

Codecov / codecov/patch

client_data_play_handler.go#L62-L63

Added lines #L62 - L63 were not covered by tests
}
2 changes: 1 addition & 1 deletion example/client_demo/Makefile
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
PHONY: all
all:
go build -i -v -o client_demo .
go build -v -o client_demo .
2 changes: 1 addition & 1 deletion example/server_demo/Makefile
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
PHONY: all
all:
go build -i -v -o server_demo .
go build -v -o server_demo .
1 change: 1 addition & 0 deletions example/server_relay_demo/.gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
/server_relay_demo
/movie.mp4
2 changes: 1 addition & 1 deletion example/server_relay_demo/Makefile
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
PHONY: all
all:
go build -i -v -o server_relay_demo .
go build -v -o server_relay_demo .
20 changes: 20 additions & 0 deletions message/body_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
"getStreamLength": DecodeBodyGetStreamLength,
"ping": DecodeBodyPing,
"closeStream": DecodeBodyCloseStream,
"onStatus": DecodeBodyOnStatus,
}

func CmdBodyDecoderFor(name string, transactionID int64) BodyDecoderFunc {
Expand Down Expand Up @@ -354,3 +355,22 @@

return nil
}

func DecodeBodyOnStatus(_ io.Reader, d AMFDecoder, v *AMFConvertible) error {
var commandObject interface{} // maybe nil
if err := d.Decode(&commandObject); err != nil {
return errors.Wrap(err, "Failed to decode 'OnStatus' args[0]")

Check warning on line 362 in message/body_decoder.go

View check run for this annotation

Codecov / codecov/patch

message/body_decoder.go#L362

Added line #L362 was not covered by tests
}
var infoObject map[string]interface{}
if err := d.Decode(&infoObject); err != nil {
return errors.Wrap(err, "Failed to decode 'OnStatus' args[1]")

Check warning on line 366 in message/body_decoder.go

View check run for this annotation

Codecov / codecov/patch

message/body_decoder.go#L366

Added line #L366 was not covered by tests
}

var cmd NetStreamOnStatus
if err := cmd.FromArgs(commandObject, infoObject); err != nil {
return errors.Wrap(err, "Failed to reconstruct 'OnStatus'")

Check warning on line 371 in message/body_decoder.go

View check run for this annotation

Codecov / codecov/patch

message/body_decoder.go#L371

Added line #L371 was not covered by tests
}

*v = &cmd
return nil
}
39 changes: 39 additions & 0 deletions message/body_decoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,45 @@ func TestDecodeCmdMessageCloseStream(t *testing.T) {
require.Equal(t, &NetStreamCloseStream{}, v)
}

func TestDecodeCmdMessageOnStatus(t *testing.T) {
t.Run("OK", func(t *testing.T) {
bin := []byte{
// nil
0x05,
// object start
0x03,
// key[0]: "level"
0x00, 0x05, 0x6c, 0x65, 0x76, 0x65, 0x6c,
// value[0]: string status
0x02, 0x00, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73,
// key[1]: "code",
0x00, 0x04, 0x63, 0x6f, 0x64, 0x65,
// value[1]: string NetStream.Play.Start
0x02, 0x00, 0x14, 0x4e, 0x65, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x50, 0x6c, 0x61, 0x79, 0x2e, 0x53, 0x74, 0x61, 0x72, 0x74,
// key[2]: "description",
0x00, 0x0b, 0x64, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e,
// value[2]: string abc
0x02, 0x00, 0x03, 0x61, 0x62, 0x63,
// empty key, object end
0x00, 0x00, 0x09,
}
r := bytes.NewReader(bin)
d := amf0.NewDecoder(r)

var v AMFConvertible
err := CmdBodyDecoderFor("onStatus", 42)(r, d, &v)
require.Nil(t, err)
require.Equal(t, &NetStreamOnStatus{
InfoObject: NetStreamOnStatusInfoObject{
Level: NetStreamOnStatusLevelStatus,
Code: NetStreamOnStatusCodePlayStart,
Description: "abc",
ExtraProperties: map[string]interface{}{},
},
}, v)
})
}

func TestDecodeCmdMessageUnknown(t *testing.T) {
bin := []byte{
// nil
Expand Down
Loading
Loading