diff --git a/README.md b/README.md index 67312e3..9c21606 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,7 @@ See also [server_demo](https://github.com/yutopp/go-rtmp/tree/master/example/ser ## Documentation - [GoDoc](https://pkg.go.dev/github.com/yutopp/go-rtmp) -- [REAL-TIME MESSAGING PROTOCOL (RTMP) SPECIFICATION](https://www.adobe.com/devnet/rtmp.html) +- [Adobe RTMP Specification](https://rtmp.veriskope.com/docs/spec/) ## NOTES diff --git a/client.go b/client.go index ceb563d..6e215eb 100644 --- a/client.go +++ b/client.go @@ -8,29 +8,43 @@ package rtmp import ( + "context" "crypto/tls" "net" "github.com/pkg/errors" ) -func Dial(protocol, addr string, config *ConnConfig) (*ClientConn, error) { - return DialWithDialer(&net.Dialer{}, protocol, addr, config) +type dialOptions struct { + dialFunc func(ctx context.Context, network, addr string) (net.Conn, error) } -func TLSDial(protocol, addr string, config *ConnConfig, tlsConfig *tls.Config) (*ClientConn, error) { - return DialWithTLSDialer(&tls.Dialer{ - NetDialer: &net.Dialer{}, - Config: tlsConfig, - }, protocol, addr, config) +func WithContextDialer(dialFunc func(context.Context, string, string) (net.Conn, error)) DialOption { + return func(o *dialOptions) { + o.dialFunc = dialFunc + } } -func DialWithDialer(dialer *net.Dialer, protocol, addr string, config *ConnConfig) (*ClientConn, error) { - if protocol != "rtmp" { - return nil, errors.Errorf("Unknown protocol: %s", protocol) +type DialOption func(*dialOptions) + +// DialContext dials a connection to the specified address. +// The protocol must be "rtmp" or "rtmps". +func DialContext(ctx context.Context, protocol, addr string, config *ConnConfig, opts ...DialOption) (*ClientConn, error) { + opt := &dialOptions{ + // default dialer + dialFunc: func(ctx context.Context, network, addr string) (net.Conn, error) { + return (&net.Dialer{}).DialContext(ctx, network, addr) + }, + } + for _, o := range opts { + o(opt) } - rwc, err := dialer.Dial("tcp", addr) + if protocol != "rtmp" && protocol != "rtmps" { + return nil, errors.Errorf("unknown protocol: %s", protocol) + } + + rwc, err := opt.dialFunc(ctx, "tcp", addr) if err != nil { return nil, err } @@ -38,15 +52,32 @@ func DialWithDialer(dialer *net.Dialer, protocol, addr string, config *ConnConfi return newClientConnWithSetup(rwc, config) } -func DialWithTLSDialer(dialer *tls.Dialer, protocol, addr string, config *ConnConfig) (*ClientConn, error) { - if protocol != "rtmps" { - return nil, errors.Errorf("Unknown protocol: %s", protocol) - } +// Dial dials a connection to the specified address. +func Dial(protocol, addr string, config *ConnConfig, opts ...DialOption) (*ClientConn, error) { + return DialContext(context.Background(), protocol, addr, config, opts...) +} - rwc, err := dialer.Dial("tcp", addr) - if err != nil { - return nil, err +// DialWithDialer dials a connection to the specified address with the specified dialer. +func DialWithDialer(dialer *net.Dialer, protocol, addr string, config *ConnConfig) (*ClientConn, error) { + return Dial(protocol, addr, config, WithContextDialer(dialer.DialContext)) +} + +// TLSDialContext dials a connection to the specified address with TLS. +func TLSDialContext(ctx context.Context, protocol, addr string, config *ConnConfig, tlsConfig *tls.Config, opts ...DialOption) (*ClientConn, error) { + dialer := &tls.Dialer{ + NetDialer: &net.Dialer{}, + Config: tlsConfig, } + opts = append([]DialOption{WithContextDialer(dialer.DialContext)}, opts...) + return DialContext(ctx, protocol, addr, config, opts...) +} - return newClientConnWithSetup(rwc, config) +// TLSDial dials a connection to the specified address with TLS. +func TLSDial(protocol, addr string, config *ConnConfig, tlsConfig *tls.Config, opts ...DialOption) (*ClientConn, error) { + return TLSDialContext(context.Background(), protocol, addr, config, tlsConfig, opts...) +} + +// DialWithTLSDialer dials a connection to the specified address with the specified TLS dialer. +func DialWithTLSDialer(dialer *tls.Dialer, protocol, addr string, config *ConnConfig) (*ClientConn, error) { + return Dial(protocol, addr, config, WithContextDialer(dialer.DialContext)) } diff --git a/client_conn.go b/client_conn.go index 04726a0..1ed450c 100644 --- a/client_conn.go +++ b/client_conn.go @@ -104,6 +104,7 @@ func (cc *ClientConn) CreateStream(body *message.NetConnectionCreateStream, chun if err != nil { return nil, err } + newStream.handler.ChangeState(streamStateClientConnected) return newStream, nil } diff --git a/client_control_connected_handler.go b/client_control_connected_handler.go new file mode 100644 index 0000000..d047f6e --- /dev/null +++ b/client_control_connected_handler.go @@ -0,0 +1,59 @@ +// +// Copyright (c) 2023- yutopp (yutopp@gmail.com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at https://www.boost.org/LICENSE_1_0.txt) +// + +package rtmp + +import ( + "github.com/yutopp/go-rtmp/internal" + "github.com/yutopp/go-rtmp/message" +) + +// clientControlConnectedHandler Handle control messages from a server in flow of connected. +// +// transitions: +// | _ -> self +type clientControlConnectedHandler struct { + sh *streamHandler +} + +var _ stateHandler = (*clientControlConnectedHandler)(nil) + +func (h *clientControlConnectedHandler) onMessage( + chunkStreamID int, + timestamp uint32, + msg message.Message, +) error { + return internal.ErrPassThroughMsg +} + +func (h *clientControlConnectedHandler) onData( + chunkStreamID int, + timestamp uint32, + dataMsg *message.DataMessage, + body interface{}, +) error { + return internal.ErrPassThroughMsg +} + +func (h *clientControlConnectedHandler) onCommand( + chunkStreamID int, + timestamp uint32, + cmdMsg *message.CommandMessage, + body interface{}, +) error { + switch cmd := body.(type) { + case *message.NetStreamOnStatus: + if cmd.InfoObject.Code == message.NetStreamOnStatusCodePlayStart { + h.sh.ChangeState(streamStateClientPlay) + } + + return internal.ErrPassThroughMsg + + default: + return internal.ErrPassThroughMsg + } +} diff --git a/client_control_not_connected_handler.go b/client_control_not_connected_handler.go index 14e04a4..6ca34c1 100644 --- a/client_control_not_connected_handler.go +++ b/client_control_not_connected_handler.go @@ -12,8 +12,6 @@ import ( "github.com/yutopp/go-rtmp/message" ) -var _ stateHandler = (*clientControlNotConnectedHandler)(nil) - // clientControlNotConnectedHandler Handle control messages from a server in flow of connecting. // // transitions: @@ -23,6 +21,8 @@ type clientControlNotConnectedHandler struct { sh *streamHandler } +var _ stateHandler = (*clientControlNotConnectedHandler)(nil) + func (h *clientControlNotConnectedHandler) onMessage( chunkStreamID int, timestamp uint32, diff --git a/client_data_play_handler.go b/client_data_play_handler.go new file mode 100644 index 0000000..225371c --- /dev/null +++ b/client_data_play_handler.go @@ -0,0 +1,64 @@ +// +// Copyright (c) 2023- yutopp (yutopp@gmail.com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at https://www.boost.org/LICENSE_1_0.txt) +// + +package rtmp + +import ( + "github.com/yutopp/go-rtmp/internal" + "github.com/yutopp/go-rtmp/message" +) + +// clientDataPlayHandler Handle control messages from a server in flow of connected. +// +// transitions: +// | _ -> self +type clientDataPlayHandler struct { + sh *streamHandler +} + +var _ stateHandler = (*clientDataPlayHandler)(nil) + +func (h *clientDataPlayHandler) onMessage( + chunkStreamID int, + timestamp uint32, + msg message.Message, +) error { + switch msg := msg.(type) { + case *message.AudioMessage: + return h.sh.stream.userHandler().OnAudio(timestamp, msg.Payload) + + case *message.VideoMessage: + return h.sh.stream.userHandler().OnVideo(timestamp, msg.Payload) + + default: + return internal.ErrPassThroughMsg + } +} + +func (h *clientDataPlayHandler) onData( + chunkStreamID int, + timestamp uint32, + dataMsg *message.DataMessage, + body interface{}, +) error { + switch data := body.(type) { + case *message.NetStreamSetDataFrame: + return h.sh.stream.userHandler().OnSetDataFrame(timestamp, data) + + default: + return internal.ErrPassThroughMsg + } +} + +func (h *clientDataPlayHandler) onCommand( + chunkStreamID int, + timestamp uint32, + cmdMsg *message.CommandMessage, + body interface{}, +) error { + return internal.ErrPassThroughMsg +} diff --git a/example/client_demo/Makefile b/example/client_demo/Makefile index 087b10b..eb57c77 100644 --- a/example/client_demo/Makefile +++ b/example/client_demo/Makefile @@ -1,3 +1,3 @@ PHONY: all all: - go build -i -v -o client_demo . + go build -v -o client_demo . diff --git a/example/server_demo/Makefile b/example/server_demo/Makefile index c54e77e..d223e1b 100644 --- a/example/server_demo/Makefile +++ b/example/server_demo/Makefile @@ -1,3 +1,3 @@ PHONY: all all: - go build -i -v -o server_demo . + go build -v -o server_demo . diff --git a/example/server_relay_demo/.gitignore b/example/server_relay_demo/.gitignore index 6160795..d8a43b0 100644 --- a/example/server_relay_demo/.gitignore +++ b/example/server_relay_demo/.gitignore @@ -1 +1,2 @@ /server_relay_demo +/movie.mp4 diff --git a/example/server_relay_demo/Makefile b/example/server_relay_demo/Makefile index 3379b75..0b0b761 100644 --- a/example/server_relay_demo/Makefile +++ b/example/server_relay_demo/Makefile @@ -1,3 +1,3 @@ PHONY: all all: - go build -i -v -o server_relay_demo . + go build -v -o server_relay_demo . diff --git a/message/body_decoder.go b/message/body_decoder.go index 9509eda..e94533a 100644 --- a/message/body_decoder.go +++ b/message/body_decoder.go @@ -71,6 +71,7 @@ var CmdBodyDecoders = map[string]BodyDecoderFunc{ "getStreamLength": DecodeBodyGetStreamLength, "ping": DecodeBodyPing, "closeStream": DecodeBodyCloseStream, + "onStatus": DecodeBodyOnStatus, } func CmdBodyDecoderFor(name string, transactionID int64) BodyDecoderFunc { @@ -354,3 +355,22 @@ func DecodeBodyCloseStream(_ io.Reader, d AMFDecoder, v *AMFConvertible) error { return nil } + +func DecodeBodyOnStatus(_ io.Reader, d AMFDecoder, v *AMFConvertible) error { + var commandObject interface{} // maybe nil + if err := d.Decode(&commandObject); err != nil { + return errors.Wrap(err, "Failed to decode 'OnStatus' args[0]") + } + var infoObject map[string]interface{} + if err := d.Decode(&infoObject); err != nil { + return errors.Wrap(err, "Failed to decode 'OnStatus' args[1]") + } + + var cmd NetStreamOnStatus + if err := cmd.FromArgs(commandObject, infoObject); err != nil { + return errors.Wrap(err, "Failed to reconstruct 'OnStatus'") + } + + *v = &cmd + return nil +} diff --git a/message/body_decoder_test.go b/message/body_decoder_test.go index 2088a3c..29c186a 100644 --- a/message/body_decoder_test.go +++ b/message/body_decoder_test.go @@ -252,6 +252,45 @@ func TestDecodeCmdMessageCloseStream(t *testing.T) { require.Equal(t, &NetStreamCloseStream{}, v) } +func TestDecodeCmdMessageOnStatus(t *testing.T) { + t.Run("OK", func(t *testing.T) { + bin := []byte{ + // nil + 0x05, + // object start + 0x03, + // key[0]: "level" + 0x00, 0x05, 0x6c, 0x65, 0x76, 0x65, 0x6c, + // value[0]: string status + 0x02, 0x00, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, + // key[1]: "code", + 0x00, 0x04, 0x63, 0x6f, 0x64, 0x65, + // value[1]: string NetStream.Play.Start + 0x02, 0x00, 0x14, 0x4e, 0x65, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x50, 0x6c, 0x61, 0x79, 0x2e, 0x53, 0x74, 0x61, 0x72, 0x74, + // key[2]: "description", + 0x00, 0x0b, 0x64, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, + // value[2]: string abc + 0x02, 0x00, 0x03, 0x61, 0x62, 0x63, + // empty key, object end + 0x00, 0x00, 0x09, + } + r := bytes.NewReader(bin) + d := amf0.NewDecoder(r) + + var v AMFConvertible + err := CmdBodyDecoderFor("onStatus", 42)(r, d, &v) + require.Nil(t, err) + require.Equal(t, &NetStreamOnStatus{ + InfoObject: NetStreamOnStatusInfoObject{ + Level: NetStreamOnStatusLevelStatus, + Code: NetStreamOnStatusCodePlayStart, + Description: "abc", + ExtraProperties: map[string]interface{}{}, + }, + }, v) + }) +} + func TestDecodeCmdMessageUnknown(t *testing.T) { bin := []byte{ // nil diff --git a/message/body_encoder_test.go b/message/body_encoder_test.go new file mode 100644 index 0000000..d42dcda --- /dev/null +++ b/message/body_encoder_test.go @@ -0,0 +1,145 @@ +// +// Copyright (c) 2023- yutopp (yutopp@gmail.com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at https://www.boost.org/LICENSE_1_0.txt) +// + +package message + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestEncodeCmdMessageOnStatus(t *testing.T) { + tests := []struct { + name string + decoder BodyDecoderFunc + body AMFConvertible + }{ + { + name: "NetConnectionConnect", + decoder: DecodeBodyConnect, + body: &NetConnectionConnect{ + Command: NetConnectionConnectCommand{ + App: "app", + Type: "type", + FlashVer: "flashVer", + TCURL: "tcUrl", + Fpad: true, + Capabilities: 1, + AudioCodecs: 2, + VideoCodecs: 3, + VideoFunction: 4, + ObjectEncoding: EncodingTypeAMF3, + }, + }, + }, + { + name: "NetConnectionConnectResult", + decoder: DecodeBodyConnectResult, + body: &NetConnectionConnectResult{ + Properties: NetConnectionConnectResultProperties{ + FMSVer: "FMS/3,0,1,123", + Capabilities: 31, + Mode: 1, + }, + Information: NetConnectionConnectResultInformation{ + Level: "status", + Code: NetConnectionConnectCodeSuccess, + Description: "Connection succeeded", + Data: map[string]interface{}{}, + }, + }, + }, + { + name: "NetConnectionConnectResult with data", + decoder: DecodeBodyConnectResult, + body: &NetConnectionConnectResult{ + Properties: NetConnectionConnectResultProperties{ + FMSVer: "FMS/3,0,1,123", + Capabilities: 31, + Mode: 1, + }, + Information: NetConnectionConnectResultInformation{ + Level: "status", + Code: NetConnectionConnectCodeSuccess, + Description: "Connection succeeded", + Data: map[string]interface{}{ + "test": "test", + }, + }, + }, + }, + { + name: "NetConnectionCreateStream", + decoder: DecodeBodyCreateStream, + body: &NetConnectionCreateStream{}, + }, + { + name: "NetConnectionCreateStreamResult", + decoder: DecodeBodyCreateStreamResult, + body: &NetConnectionCreateStreamResult{ + StreamID: 1, + }, + }, + { + name: "NetConnectionReleaseStream", + decoder: DecodeBodyReleaseStream, + body: &NetConnectionReleaseStream{ + StreamName: "stream", + }, + }, + { + name: "NetStreamOnStatus", + decoder: DecodeBodyOnStatus, + body: &NetStreamOnStatus{ + InfoObject: NetStreamOnStatusInfoObject{ + Level: NetStreamOnStatusLevelStatus, + Code: NetStreamOnStatusCodePlayStart, + Description: "abc", + ExtraProperties: map[string]interface{}{}, + }, + }, + }, + { + name: "NetStreamOnStatus with extra properties", + decoder: DecodeBodyOnStatus, + body: &NetStreamOnStatus{ + InfoObject: NetStreamOnStatusInfoObject{ + Level: NetStreamOnStatusLevelStatus, + Code: NetStreamOnStatusCodePlayStart, + Description: "abc", + ExtraProperties: map[string]interface{}{"foo": "bar"}, + }, + }, + }, + } + + for _, test := range tests { + test := test // capture + + t.Run(test.name, func(t *testing.T) { + t.Parallel() + + amfTy := EncodingTypeAMF0 + buf := new(bytes.Buffer) + + // object to bytes(AMF0) + amfEnc := NewAMFEncoder(buf, amfTy) + err := EncodeBodyAnyValues(amfEnc, test.body) + require.Nil(t, err) + + // bytes(AMF0) to object + amfDec := NewAMFDecoder(buf, amfTy) + var v AMFConvertible + err = test.decoder(buf, amfDec, &v) + require.Nil(t, err) + + require.Equal(t, test.body, v) + }) + } +} diff --git a/message/error.go b/message/error.go index cdc9393..09687e4 100644 --- a/message/error.go +++ b/message/error.go @@ -27,7 +27,7 @@ type UnknownCommandBodyDecodeError struct { } func (e *UnknownCommandBodyDecodeError) Error() string { - return fmt.Sprintf("UnknownCommandMessageDecodeError: Name = %s, TransactionID = %d, Objs = %+v", + return fmt.Sprintf("UnknownCommandBodyDecodeError: Name = %s, TransactionID = %d, Objs = %+v", e.Name, e.TransactionID, e.Objs, diff --git a/message/net_connection.go b/message/net_connection.go index 4b5ff99..2f5bc76 100644 --- a/message/net_connection.go +++ b/message/net_connection.go @@ -39,9 +39,12 @@ type NetConnectionConnectCommand struct { } func (t *NetConnectionConnect) FromArgs(args ...interface{}) error { - command := args[0].(map[string]interface{}) + command, ok := args[0].(map[string]interface{}) + if !ok { + return errors.Errorf("expect map[string]interface{} at arg[0], but got %T", args[0]) + } if err := mapstructure.Decode(command, &t.Command); err != nil { - return errors.Wrapf(err, "Failed to mapping NetConnectionConnect") + return errors.Wrapf(err, "failed to mapping arg[0] to NetConnectionConnectCommand") } return nil @@ -72,14 +75,20 @@ type NetConnectionConnectResultInformation struct { } func (t *NetConnectionConnectResult) FromArgs(args ...interface{}) error { - properties := args[0].(map[string]interface{}) + properties, ok := args[0].(map[string]interface{}) + if !ok { + return errors.Errorf("expect map[string]interface{} at arg[0], but got %T", args[0]) + } if err := mapstructure.Decode(properties, &t.Properties); err != nil { - return errors.Wrapf(err, "Failed to mapping NetConnectionConnectResultProperties") + return errors.Wrapf(err, "failed to mapping arg[0] to NetConnectionConnectResultProperties") } - information := args[1].(map[string]interface{}) + information, ok := args[1].(map[string]interface{}) + if !ok { + return errors.Errorf("expect map[string]interface{} at arg[1], but got %T", args[1]) + } if err := mapstructure.Decode(information, &t.Information); err != nil { - return errors.Wrapf(err, "Failed to mapping NetConnectionConnectResultInformation") + return errors.Wrapf(err, "failed to mapping arg[1] to NetConnectionConnectResultInformation") } return nil @@ -113,7 +122,12 @@ type NetConnectionCreateStreamResult struct { func (t *NetConnectionCreateStreamResult) FromArgs(args ...interface{}) error { // args[0] is unknown, ignore - t.StreamID = args[1].(uint32) + + streamID, ok := args[1].(uint32) + if !ok { + return errors.Errorf("expect uint32 at arg[1], but got %T", args[1]) + } + t.StreamID = streamID return nil } @@ -131,7 +145,12 @@ type NetConnectionReleaseStream struct { func (t *NetConnectionReleaseStream) FromArgs(args ...interface{}) error { // args[0] is unknown, ignore - t.StreamName = args[1].(string) + + streamName, ok := args[1].(string) + if !ok { + return errors.Errorf("expect string at arg[1], but got %T", args[1]) + } + t.StreamName = streamName return nil } diff --git a/message/net_stream.go b/message/net_stream.go index 5157e0f..e0d381a 100644 --- a/message/net_stream.go +++ b/message/net_stream.go @@ -7,6 +7,8 @@ package message +import "errors" + type NetStreamPublish struct { CommandObject interface{} PublishingName string @@ -44,7 +46,11 @@ func (t *NetStreamPlay) FromArgs(args ...interface{}) error { } func (t *NetStreamPlay) ToArgs(ty EncodingType) ([]interface{}, error) { - panic("Not implemented") + return []interface{}{ + nil, // Always nil + t.StreamName, + t.Start, + }, nil } type NetStreamOnStatusLevel string @@ -77,14 +83,70 @@ type NetStreamOnStatusInfoObject struct { Level NetStreamOnStatusLevel Code NetStreamOnStatusCode Description string + + ExtraProperties map[string]interface{} } func (t *NetStreamOnStatus) FromArgs(args ...interface{}) error { - panic("Not implemented") + // args[0] is nil, ignore + + info, ok := args[1].(map[string]interface{}) + if !ok { + return errors.New("expect map type value") + } + + { + v, ok := info["level"] + if !ok { + return errors.New("missing `level` key") + } + level, ok := v.(string) + if !ok { + return errors.New("expect string type for value of `level` key") + } + t.InfoObject.Level = NetStreamOnStatusLevel(level) // TODO: type check + + delete(info, "level") + } + + { + v, ok := info["code"] + if !ok { + return errors.New("missing `code` key") + } + code, ok := v.(string) + if !ok { + return errors.New("expect string type for value of `code` key") + } + t.InfoObject.Code = NetStreamOnStatusCode(code) // TODO: type check + + delete(info, "code") + } + + { + v, ok := info["description"] + if !ok { + return errors.New("missing `description` key") + } + description, ok := v.(string) + if !ok { + return errors.New("expect string type for value of `description` key") + } + t.InfoObject.Description = description + + delete(info, "description") + } + + t.InfoObject.ExtraProperties = info + + return nil } func (t *NetStreamOnStatus) ToArgs(ty EncodingType) ([]interface{}, error) { info := make(map[string]interface{}) + for k, v := range t.InfoObject.ExtraProperties { + info[k] = v + } info["level"] = t.InfoObject.Level info["code"] = t.InfoObject.Code info["description"] = t.InfoObject.Description diff --git a/server_data_inactive_handler.go b/server_data_inactive_handler.go index 36d0c07..4dfa3a6 100644 --- a/server_data_inactive_handler.go +++ b/server_data_inactive_handler.go @@ -20,7 +20,7 @@ var _ stateHandler = (*serverDataInactiveHandler)(nil) // // transitions: // | "publish" -> serverDataPublishHandler -// | "play" -> serverDataPlayHandler +// | "play" -> serverDataPlayHandler // | _ -> self type serverDataInactiveHandler struct { sh *streamHandler diff --git a/stream.go b/stream.go index 5a5c82f..01f0501 100644 --- a/stream.go +++ b/stream.go @@ -227,6 +227,23 @@ func (s *Stream) ReplyCreateStream( ) } +func (s *Stream) Play( + body *message.NetStreamPlay, +) error { + if body == nil { + body = &message.NetStreamPlay{} + } + + chunkStreamID := 3 // TODO: fix + return s.writeCommandMessage( + chunkStreamID, + 0, // TODO: fix, Timestamp is 0 + "play", + int64(0), // Always 0, 7.2.2.1 + body, + ) +} + func (s *Stream) Publish( body *message.NetStreamPublish, ) error { diff --git a/stream_handler.go b/stream_handler.go index 6e90bdc..d27f423 100644 --- a/stream_handler.go +++ b/stream_handler.go @@ -28,6 +28,7 @@ const ( streamStateServerPlay streamStateClientNotConnected streamStateClientConnected + streamStateClientPlay ) func (s streamState) String() string { @@ -46,6 +47,8 @@ func (s streamState) String() string { return "NotConnected(Client)" case streamStateClientConnected: return "Connected(Client)" + case streamStateClientPlay: + return "Play(Client)" default: return "" } @@ -118,15 +121,17 @@ func (h *streamHandler) ChangeState(state streamState) { h.handler = &serverDataPlayHandler{sh: h} case streamStateClientNotConnected: h.handler = &clientControlNotConnectedHandler{sh: h} - // case streamStateClientConnected: - // h.handler = &serverControlConnectedHandler{sh: h} + case streamStateClientConnected: + h.handler = &clientControlConnectedHandler{sh: h} + case streamStateClientPlay: + h.handler = &clientDataPlayHandler{sh: h} default: - panic("Unexpected") + panic("Unexpected state") } h.state = state l := h.Logger() - l.Infof("Change state: From = %s, To = %s", prevState, h.State()) + l.Infof("State changed: From = %s, To = %s", prevState, h.State()) } func (h *streamHandler) State() streamState { @@ -186,8 +191,6 @@ func (h *streamHandler) handleCommand( } return nil - - // TODO: Support onStatus } amfDec := message.NewAMFDecoder(cmdMsg.Body, cmdMsg.Encoding)