From 5ff026eecc2f8122061ef094dd36a2ef426168d4 Mon Sep 17 00:00:00 2001 From: yutopp Date: Thu, 20 Jul 2023 02:45:31 +0900 Subject: [PATCH 01/14] Support DialOption --- client.go | 31 +++++++++++++++++++++++++++---- 1 file changed, 27 insertions(+), 4 deletions(-) diff --git a/client.go b/client.go index 1c79acd..12fff8e 100644 --- a/client.go +++ b/client.go @@ -8,21 +8,40 @@ package rtmp import ( + "context" "net" "github.com/pkg/errors" ) -func Dial(protocol, addr string, config *ConnConfig) (*ClientConn, error) { - return DialWithDialer(&net.Dialer{}, protocol, addr, config) +type dialOptions struct { + dialFunc func(ctx context.Context, network, addr string) (net.Conn, error) } -func DialWithDialer(dialer *net.Dialer, protocol, addr string, config *ConnConfig) (*ClientConn, error) { +func WithContextDialer(dialFunc func(context.Context, string, string) (net.Conn, error)) DialOption { + return func(o *dialOptions) { + o.dialFunc = dialFunc + } +} + +type DialOption func(*dialOptions) + +func Dial(protocol, addr string, config *ConnConfig, opts ...DialOption) (*ClientConn, error) { + opt := &dialOptions{ + dialFunc: func(ctx context.Context, network, addr string) (net.Conn, error) { + return (&net.Dialer{}).DialContext(ctx, network, addr) + }, + } + for _, o := range opts { + o(opt) + } + if protocol != "rtmp" { return nil, errors.Errorf("Unknown protocol: %s", protocol) } - rwc, err := dialer.Dial("tcp", addr) + // TODO: support ctx + rwc, err := opt.dialFunc(context.Background(), "tcp", addr) if err != nil { return nil, err } @@ -30,6 +49,10 @@ func DialWithDialer(dialer *net.Dialer, protocol, addr string, config *ConnConfi return newClientConnWithSetup(rwc, config) } +func DialWithDialer(dialer *net.Dialer, protocol, addr string, config *ConnConfig) (*ClientConn, error) { + return Dial(protocol, addr, config, WithContextDialer(dialer.DialContext)) +} + func makeValidAddr(addr string) (string, error) { host, port, err := net.SplitHostPort(addr) if err != nil { From 799dc9dfb0162e8257b6c9c3a6fe67a87c204481 Mon Sep 17 00:00:00 2001 From: yutopp Date: Thu, 20 Jul 2023 02:56:42 +0900 Subject: [PATCH 02/14] Remove -i build options from Makefile. Ignore movie.mp4 --- example/client_demo/Makefile | 2 +- example/server_demo/Makefile | 2 +- example/server_relay_demo/.gitignore | 1 + example/server_relay_demo/Makefile | 2 +- 4 files changed, 4 insertions(+), 3 deletions(-) diff --git a/example/client_demo/Makefile b/example/client_demo/Makefile index 087b10b..eb57c77 100644 --- a/example/client_demo/Makefile +++ b/example/client_demo/Makefile @@ -1,3 +1,3 @@ PHONY: all all: - go build -i -v -o client_demo . + go build -v -o client_demo . diff --git a/example/server_demo/Makefile b/example/server_demo/Makefile index c54e77e..d223e1b 100644 --- a/example/server_demo/Makefile +++ b/example/server_demo/Makefile @@ -1,3 +1,3 @@ PHONY: all all: - go build -i -v -o server_demo . + go build -v -o server_demo . diff --git a/example/server_relay_demo/.gitignore b/example/server_relay_demo/.gitignore index 6160795..d8a43b0 100644 --- a/example/server_relay_demo/.gitignore +++ b/example/server_relay_demo/.gitignore @@ -1 +1,2 @@ /server_relay_demo +/movie.mp4 diff --git a/example/server_relay_demo/Makefile b/example/server_relay_demo/Makefile index 3379b75..0b0b761 100644 --- a/example/server_relay_demo/Makefile +++ b/example/server_relay_demo/Makefile @@ -1,3 +1,3 @@ PHONY: all all: - go build -i -v -o server_relay_demo . + go build -v -o server_relay_demo . From 6d9b806d5aebf541600df693a96c5d871f9081b8 Mon Sep 17 00:00:00 2001 From: yutopp Date: Thu, 20 Jul 2023 03:00:10 +0900 Subject: [PATCH 03/14] Fix link to RTMP spec --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 67312e3..9c21606 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,7 @@ See also [server_demo](https://github.com/yutopp/go-rtmp/tree/master/example/ser ## Documentation - [GoDoc](https://pkg.go.dev/github.com/yutopp/go-rtmp) -- [REAL-TIME MESSAGING PROTOCOL (RTMP) SPECIFICATION](https://www.adobe.com/devnet/rtmp.html) +- [Adobe RTMP Specification](https://rtmp.veriskope.com/docs/spec/) ## NOTES From d707e8b0a3491647c9f1dba93beb9a65740a8ba5 Mon Sep 17 00:00:00 2001 From: yutopp Date: Fri, 21 Jul 2023 03:06:54 +0900 Subject: [PATCH 04/14] Set default state as not-connected(client) --- client_conn.go | 1 + 1 file changed, 1 insertion(+) diff --git a/client_conn.go b/client_conn.go index 04726a0..7329746 100644 --- a/client_conn.go +++ b/client_conn.go @@ -104,6 +104,7 @@ func (cc *ClientConn) CreateStream(body *message.NetConnectionCreateStream, chun if err != nil { return nil, err } + newStream.handler.ChangeState(streamStateClientNotConnected) return newStream, nil } From 19867f9af84c08816740bf463c8f8d1d7ff3bcd5 Mon Sep 17 00:00:00 2001 From: yutopp Date: Fri, 21 Jul 2023 03:08:55 +0900 Subject: [PATCH 05/14] Support Play command --- message/net_stream.go | 11 +++++++++-- stream.go | 17 +++++++++++++++++ 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/message/net_stream.go b/message/net_stream.go index 5157e0f..6c40eb6 100644 --- a/message/net_stream.go +++ b/message/net_stream.go @@ -44,7 +44,11 @@ func (t *NetStreamPlay) FromArgs(args ...interface{}) error { } func (t *NetStreamPlay) ToArgs(ty EncodingType) ([]interface{}, error) { - panic("Not implemented") + return []interface{}{ + nil, // Always nil + t.StreamName, + t.Start, + }, nil } type NetStreamOnStatusLevel string @@ -80,7 +84,10 @@ type NetStreamOnStatusInfoObject struct { } func (t *NetStreamOnStatus) FromArgs(args ...interface{}) error { - panic("Not implemented") + // args[0] is nil, ignore + t.InfoObject = args[1].(NetStreamOnStatusInfoObject) + + return nil } func (t *NetStreamOnStatus) ToArgs(ty EncodingType) ([]interface{}, error) { diff --git a/stream.go b/stream.go index 5a5c82f..01f0501 100644 --- a/stream.go +++ b/stream.go @@ -227,6 +227,23 @@ func (s *Stream) ReplyCreateStream( ) } +func (s *Stream) Play( + body *message.NetStreamPlay, +) error { + if body == nil { + body = &message.NetStreamPlay{} + } + + chunkStreamID := 3 // TODO: fix + return s.writeCommandMessage( + chunkStreamID, + 0, // TODO: fix, Timestamp is 0 + "play", + int64(0), // Always 0, 7.2.2.1 + body, + ) +} + func (s *Stream) Publish( body *message.NetStreamPublish, ) error { From 65f091e12c70eb5e7557caddacd727fde5104bd4 Mon Sep 17 00:00:00 2001 From: yutopp Date: Fri, 21 Jul 2023 03:09:28 +0900 Subject: [PATCH 06/14] Support decoding OnStatus --- message/body_decoder.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/message/body_decoder.go b/message/body_decoder.go index 9509eda..8f341bb 100644 --- a/message/body_decoder.go +++ b/message/body_decoder.go @@ -71,6 +71,7 @@ var CmdBodyDecoders = map[string]BodyDecoderFunc{ "getStreamLength": DecodeBodyGetStreamLength, "ping": DecodeBodyPing, "closeStream": DecodeBodyCloseStream, + "onStatus": DecodeBodyOnStatus, } func CmdBodyDecoderFor(name string, transactionID int64) BodyDecoderFunc { @@ -354,3 +355,22 @@ func DecodeBodyCloseStream(_ io.Reader, d AMFDecoder, v *AMFConvertible) error { return nil } + +func DecodeBodyOnStatus(_ io.Reader, d AMFDecoder, v *AMFConvertible) error { + var commandObject interface{} // maybe nil + if err := d.Decode(&commandObject); err != nil { + return errors.Wrap(err, "Failed to decode 'OnStatus' args[0]") + } + var infoObject NetStreamOnStatusInfoObject + if err := d.Decode(&infoObject); err != nil { + return errors.Wrap(err, "Failed to decode 'OnStatus' args[1]") + } + + var cmd NetStreamOnStatus + if err := cmd.FromArgs(commandObject, infoObject); err != nil { + return errors.Wrap(err, "Failed to reconstruct 'OnStatus'") + } + + *v = &cmd + return nil +} From 63c583d7166a6f766030719052c3550c7e6e9b38 Mon Sep 17 00:00:00 2001 From: yutopp Date: Fri, 21 Jul 2023 03:13:24 +0900 Subject: [PATCH 07/14] Fix comments. Handling OnStatus in stream_handler is not necessary (handled in per-state-handler) --- server_data_inactive_handler.go | 2 +- stream_handler.go | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/server_data_inactive_handler.go b/server_data_inactive_handler.go index 36d0c07..4dfa3a6 100644 --- a/server_data_inactive_handler.go +++ b/server_data_inactive_handler.go @@ -20,7 +20,7 @@ var _ stateHandler = (*serverDataInactiveHandler)(nil) // // transitions: // | "publish" -> serverDataPublishHandler -// | "play" -> serverDataPlayHandler +// | "play" -> serverDataPlayHandler // | _ -> self type serverDataInactiveHandler struct { sh *streamHandler diff --git a/stream_handler.go b/stream_handler.go index 6e90bdc..93539b4 100644 --- a/stream_handler.go +++ b/stream_handler.go @@ -126,7 +126,7 @@ func (h *streamHandler) ChangeState(state streamState) { h.state = state l := h.Logger() - l.Infof("Change state: From = %s, To = %s", prevState, h.State()) + l.Infof("State changed: From = %s, To = %s", prevState, h.State()) } func (h *streamHandler) State() streamState { @@ -186,8 +186,6 @@ func (h *streamHandler) handleCommand( } return nil - - // TODO: Support onStatus } amfDec := message.NewAMFDecoder(cmdMsg.Body, cmdMsg.Encoding) From 87e9b6745b7a44731c8658b4b579e8b11a5c9761 Mon Sep 17 00:00:00 2001 From: yutopp Date: Fri, 21 Jul 2023 03:13:51 +0900 Subject: [PATCH 08/14] Fix error string, adjust to typename --- message/error.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/message/error.go b/message/error.go index cdc9393..09687e4 100644 --- a/message/error.go +++ b/message/error.go @@ -27,7 +27,7 @@ type UnknownCommandBodyDecodeError struct { } func (e *UnknownCommandBodyDecodeError) Error() string { - return fmt.Sprintf("UnknownCommandMessageDecodeError: Name = %s, TransactionID = %d, Objs = %+v", + return fmt.Sprintf("UnknownCommandBodyDecodeError: Name = %s, TransactionID = %d, Objs = %+v", e.Name, e.TransactionID, e.Objs, From 62930c726fcec212e8f795fcbc2193b9d566e7dd Mon Sep 17 00:00:00 2001 From: yutopp Date: Fri, 21 Jul 2023 03:45:07 +0900 Subject: [PATCH 09/14] Add client state: connect, play --- client_conn.go | 2 +- client_control_connected_handler.go | 59 +++++++++++++++++++++++ client_control_not_connected_handler.go | 4 +- client_data_play_handler.go | 64 +++++++++++++++++++++++++ stream_handler.go | 11 +++-- 5 files changed, 134 insertions(+), 6 deletions(-) create mode 100644 client_control_connected_handler.go create mode 100644 client_data_play_handler.go diff --git a/client_conn.go b/client_conn.go index 7329746..1ed450c 100644 --- a/client_conn.go +++ b/client_conn.go @@ -104,7 +104,7 @@ func (cc *ClientConn) CreateStream(body *message.NetConnectionCreateStream, chun if err != nil { return nil, err } - newStream.handler.ChangeState(streamStateClientNotConnected) + newStream.handler.ChangeState(streamStateClientConnected) return newStream, nil } diff --git a/client_control_connected_handler.go b/client_control_connected_handler.go new file mode 100644 index 0000000..d047f6e --- /dev/null +++ b/client_control_connected_handler.go @@ -0,0 +1,59 @@ +// +// Copyright (c) 2023- yutopp (yutopp@gmail.com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at https://www.boost.org/LICENSE_1_0.txt) +// + +package rtmp + +import ( + "github.com/yutopp/go-rtmp/internal" + "github.com/yutopp/go-rtmp/message" +) + +// clientControlConnectedHandler Handle control messages from a server in flow of connected. +// +// transitions: +// | _ -> self +type clientControlConnectedHandler struct { + sh *streamHandler +} + +var _ stateHandler = (*clientControlConnectedHandler)(nil) + +func (h *clientControlConnectedHandler) onMessage( + chunkStreamID int, + timestamp uint32, + msg message.Message, +) error { + return internal.ErrPassThroughMsg +} + +func (h *clientControlConnectedHandler) onData( + chunkStreamID int, + timestamp uint32, + dataMsg *message.DataMessage, + body interface{}, +) error { + return internal.ErrPassThroughMsg +} + +func (h *clientControlConnectedHandler) onCommand( + chunkStreamID int, + timestamp uint32, + cmdMsg *message.CommandMessage, + body interface{}, +) error { + switch cmd := body.(type) { + case *message.NetStreamOnStatus: + if cmd.InfoObject.Code == message.NetStreamOnStatusCodePlayStart { + h.sh.ChangeState(streamStateClientPlay) + } + + return internal.ErrPassThroughMsg + + default: + return internal.ErrPassThroughMsg + } +} diff --git a/client_control_not_connected_handler.go b/client_control_not_connected_handler.go index 14e04a4..6ca34c1 100644 --- a/client_control_not_connected_handler.go +++ b/client_control_not_connected_handler.go @@ -12,8 +12,6 @@ import ( "github.com/yutopp/go-rtmp/message" ) -var _ stateHandler = (*clientControlNotConnectedHandler)(nil) - // clientControlNotConnectedHandler Handle control messages from a server in flow of connecting. // // transitions: @@ -23,6 +21,8 @@ type clientControlNotConnectedHandler struct { sh *streamHandler } +var _ stateHandler = (*clientControlNotConnectedHandler)(nil) + func (h *clientControlNotConnectedHandler) onMessage( chunkStreamID int, timestamp uint32, diff --git a/client_data_play_handler.go b/client_data_play_handler.go new file mode 100644 index 0000000..225371c --- /dev/null +++ b/client_data_play_handler.go @@ -0,0 +1,64 @@ +// +// Copyright (c) 2023- yutopp (yutopp@gmail.com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at https://www.boost.org/LICENSE_1_0.txt) +// + +package rtmp + +import ( + "github.com/yutopp/go-rtmp/internal" + "github.com/yutopp/go-rtmp/message" +) + +// clientDataPlayHandler Handle control messages from a server in flow of connected. +// +// transitions: +// | _ -> self +type clientDataPlayHandler struct { + sh *streamHandler +} + +var _ stateHandler = (*clientDataPlayHandler)(nil) + +func (h *clientDataPlayHandler) onMessage( + chunkStreamID int, + timestamp uint32, + msg message.Message, +) error { + switch msg := msg.(type) { + case *message.AudioMessage: + return h.sh.stream.userHandler().OnAudio(timestamp, msg.Payload) + + case *message.VideoMessage: + return h.sh.stream.userHandler().OnVideo(timestamp, msg.Payload) + + default: + return internal.ErrPassThroughMsg + } +} + +func (h *clientDataPlayHandler) onData( + chunkStreamID int, + timestamp uint32, + dataMsg *message.DataMessage, + body interface{}, +) error { + switch data := body.(type) { + case *message.NetStreamSetDataFrame: + return h.sh.stream.userHandler().OnSetDataFrame(timestamp, data) + + default: + return internal.ErrPassThroughMsg + } +} + +func (h *clientDataPlayHandler) onCommand( + chunkStreamID int, + timestamp uint32, + cmdMsg *message.CommandMessage, + body interface{}, +) error { + return internal.ErrPassThroughMsg +} diff --git a/stream_handler.go b/stream_handler.go index 93539b4..d27f423 100644 --- a/stream_handler.go +++ b/stream_handler.go @@ -28,6 +28,7 @@ const ( streamStateServerPlay streamStateClientNotConnected streamStateClientConnected + streamStateClientPlay ) func (s streamState) String() string { @@ -46,6 +47,8 @@ func (s streamState) String() string { return "NotConnected(Client)" case streamStateClientConnected: return "Connected(Client)" + case streamStateClientPlay: + return "Play(Client)" default: return "" } @@ -118,10 +121,12 @@ func (h *streamHandler) ChangeState(state streamState) { h.handler = &serverDataPlayHandler{sh: h} case streamStateClientNotConnected: h.handler = &clientControlNotConnectedHandler{sh: h} - // case streamStateClientConnected: - // h.handler = &serverControlConnectedHandler{sh: h} + case streamStateClientConnected: + h.handler = &clientControlConnectedHandler{sh: h} + case streamStateClientPlay: + h.handler = &clientDataPlayHandler{sh: h} default: - panic("Unexpected") + panic("Unexpected state") } h.state = state From b40efdef0abe0f6d45e0d2b41429eff69f453593 Mon Sep 17 00:00:00 2001 From: yutopp Date: Fri, 21 Jul 2023 03:45:30 +0900 Subject: [PATCH 10/14] Fix decoding format of OnStatus --- message/body_decoder.go | 2 +- message/net_stream.go | 29 ++++++++++++++++++++++++++++- 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/message/body_decoder.go b/message/body_decoder.go index 8f341bb..e94533a 100644 --- a/message/body_decoder.go +++ b/message/body_decoder.go @@ -361,7 +361,7 @@ func DecodeBodyOnStatus(_ io.Reader, d AMFDecoder, v *AMFConvertible) error { if err := d.Decode(&commandObject); err != nil { return errors.Wrap(err, "Failed to decode 'OnStatus' args[0]") } - var infoObject NetStreamOnStatusInfoObject + var infoObject map[string]interface{} if err := d.Decode(&infoObject); err != nil { return errors.Wrap(err, "Failed to decode 'OnStatus' args[1]") } diff --git a/message/net_stream.go b/message/net_stream.go index 6c40eb6..a80cc6e 100644 --- a/message/net_stream.go +++ b/message/net_stream.go @@ -7,6 +7,8 @@ package message +import "errors" + type NetStreamPublish struct { CommandObject interface{} PublishingName string @@ -85,7 +87,32 @@ type NetStreamOnStatusInfoObject struct { func (t *NetStreamOnStatus) FromArgs(args ...interface{}) error { // args[0] is nil, ignore - t.InfoObject = args[1].(NetStreamOnStatusInfoObject) + + info, ok := args[1].(map[string]interface{}) + if !ok { + return errors.New("invalid type") // TODO: fix + } + if v, ok := info["level"]; ok { + level, ok := v.(string) + if !ok { + return errors.New("invalid type") // TODO: fix + } + t.InfoObject.Level = NetStreamOnStatusLevel(level) // TODO: type check + } + if v, ok := info["code"]; ok { + code, ok := v.(string) + if !ok { + return errors.New("invalid type") // TODO: fix + } + t.InfoObject.Code = NetStreamOnStatusCode(code) // TODO: type check + } + if v, ok := info["description"]; ok { + description, ok := v.(string) + if !ok { + return errors.New("invalid type") // TODO: fix + } + t.InfoObject.Description = description + } return nil } From 6ed3890f63548317e7efabb84780aaff0b991d97 Mon Sep 17 00:00:00 2001 From: yutopp Date: Tue, 25 Jul 2023 11:29:39 +0900 Subject: [PATCH 11/14] Add tests for onStatus --- message/body_decoder_test.go | 38 ++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/message/body_decoder_test.go b/message/body_decoder_test.go index 2088a3c..6958dd2 100644 --- a/message/body_decoder_test.go +++ b/message/body_decoder_test.go @@ -252,6 +252,44 @@ func TestDecodeCmdMessageCloseStream(t *testing.T) { require.Equal(t, &NetStreamCloseStream{}, v) } +func TestDecodeCmdMessageOnStatus(t *testing.T) { + t.Run("OK", func(t *testing.T) { + bin := []byte{ + // nil + 0x05, + // object start + 0x03, + // key[0]: "level" + 0x00, 0x05, 0x6c, 0x65, 0x76, 0x65, 0x6c, + // value[0]: string status + 0x02, 0x00, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, + // key[1]: "code", + 0x00, 0x04, 0x63, 0x6f, 0x64, 0x65, + // value[1]: string NetStream.Play.Start + 0x02, 0x00, 0x14, 0x4e, 0x65, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x50, 0x6c, 0x61, 0x79, 0x2e, 0x53, 0x74, 0x61, 0x72, 0x74, + // key[2]: "description", + 0x00, 0x0b, 0x64, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, + // value[2]: string abc + 0x02, 0x00, 0x03, 0x61, 0x62, 0x63, + // empty key, object end + 0x00, 0x00, 0x09, + } + r := bytes.NewReader(bin) + d := amf0.NewDecoder(r) + + var v AMFConvertible + err := CmdBodyDecoderFor("onStatus", 42)(r, d, &v) + require.Nil(t, err) + require.Equal(t, &NetStreamOnStatus{ + InfoObject: NetStreamOnStatusInfoObject{ + Level: NetStreamOnStatusLevelStatus, + Code: NetStreamOnStatusCodePlayStart, + Description: "abc", + }, + }, v) + }) +} + func TestDecodeCmdMessageUnknown(t *testing.T) { bin := []byte{ // nil From da0adc2af3a3aea6350bcc7e2666c0573f3787f8 Mon Sep 17 00:00:00 2001 From: yutopp Date: Wed, 26 Jul 2023 19:30:43 +0900 Subject: [PATCH 12/14] Fix error messages and support extra properties for onStatus --- message/body_decoder_test.go | 7 ++++--- message/net_stream.go | 39 +++++++++++++++++++++++++++++------- 2 files changed, 36 insertions(+), 10 deletions(-) diff --git a/message/body_decoder_test.go b/message/body_decoder_test.go index 6958dd2..29c186a 100644 --- a/message/body_decoder_test.go +++ b/message/body_decoder_test.go @@ -282,9 +282,10 @@ func TestDecodeCmdMessageOnStatus(t *testing.T) { require.Nil(t, err) require.Equal(t, &NetStreamOnStatus{ InfoObject: NetStreamOnStatusInfoObject{ - Level: NetStreamOnStatusLevelStatus, - Code: NetStreamOnStatusCodePlayStart, - Description: "abc", + Level: NetStreamOnStatusLevelStatus, + Code: NetStreamOnStatusCodePlayStart, + Description: "abc", + ExtraProperties: map[string]interface{}{}, }, }, v) }) diff --git a/message/net_stream.go b/message/net_stream.go index a80cc6e..617dd86 100644 --- a/message/net_stream.go +++ b/message/net_stream.go @@ -83,6 +83,8 @@ type NetStreamOnStatusInfoObject struct { Level NetStreamOnStatusLevel Code NetStreamOnStatusCode Description string + + ExtraProperties map[string]interface{} } func (t *NetStreamOnStatus) FromArgs(args ...interface{}) error { @@ -90,30 +92,53 @@ func (t *NetStreamOnStatus) FromArgs(args ...interface{}) error { info, ok := args[1].(map[string]interface{}) if !ok { - return errors.New("invalid type") // TODO: fix + return errors.New("expect map type value") } - if v, ok := info["level"]; ok { + + { + v, ok := info["level"] + if !ok { + return errors.New("missing `level` key") + } level, ok := v.(string) if !ok { - return errors.New("invalid type") // TODO: fix + return errors.New("expect string type for value of `level` key") } t.InfoObject.Level = NetStreamOnStatusLevel(level) // TODO: type check + + delete(info, "level") } - if v, ok := info["code"]; ok { + + { + v, ok := info["code"] + if !ok { + return errors.New("missing `code` key") + } code, ok := v.(string) if !ok { - return errors.New("invalid type") // TODO: fix + return errors.New("expect string type for value of `code` key") } t.InfoObject.Code = NetStreamOnStatusCode(code) // TODO: type check + + delete(info, "code") } - if v, ok := info["description"]; ok { + + { + v, ok := info["description"] + if !ok { + return errors.New("missing `description` key") + } description, ok := v.(string) if !ok { - return errors.New("invalid type") // TODO: fix + return errors.New("expect string type for value of `description` key") } t.InfoObject.Description = description + + delete(info, "description") } + t.InfoObject.ExtraProperties = info + return nil } From ce59bc6597bd8330a93f68a692865af55801bdbe Mon Sep 17 00:00:00 2001 From: yutopp Date: Thu, 27 Jul 2023 01:01:15 +0900 Subject: [PATCH 13/14] Add tests. Reduce panics --- message/body_encoder_test.go | 144 +++++++++++++++++++++++++++++++++++ message/net_connection.go | 33 ++++++-- message/net_stream.go | 3 + 3 files changed, 173 insertions(+), 7 deletions(-) create mode 100644 message/body_encoder_test.go diff --git a/message/body_encoder_test.go b/message/body_encoder_test.go new file mode 100644 index 0000000..474d0de --- /dev/null +++ b/message/body_encoder_test.go @@ -0,0 +1,144 @@ +// +// Copyright (c) 2023- yutopp (yutopp@gmail.com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at https://www.boost.org/LICENSE_1_0.txt) +// + +package message + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestEncodeCmdMessageOnStatus(t *testing.T) { + tests := []struct { + name string + decoder BodyDecoderFunc + body AMFConvertible + }{ + { + name: "NetConnectionConnect", + decoder: DecodeBodyConnect, + body: &NetConnectionConnect{ + Command: NetConnectionConnectCommand{ + App: "app", + Type: "type", + FlashVer: "flashVer", + TCURL: "tcUrl", + Fpad: true, + Capabilities: 1, + AudioCodecs: 2, + VideoCodecs: 3, + VideoFunction: 4, + ObjectEncoding: EncodingTypeAMF3, + }, + }, + }, + { + name: "NetConnectionConnectResult", + decoder: DecodeBodyConnectResult, + body: &NetConnectionConnectResult{ + Properties: NetConnectionConnectResultProperties{ + FMSVer: "FMS/3,0,1,123", + Capabilities: 31, + Mode: 1, + }, + Information: NetConnectionConnectResultInformation{ + Level: "status", + Code: NetConnectionConnectCodeSuccess, + Description: "Connection succeeded", + }, + }, + }, + { + name: "NetConnectionConnectResult with data", + decoder: DecodeBodyConnectResult, + body: &NetConnectionConnectResult{ + Properties: NetConnectionConnectResultProperties{ + FMSVer: "FMS/3,0,1,123", + Capabilities: 31, + Mode: 1, + }, + Information: NetConnectionConnectResultInformation{ + Level: "status", + Code: NetConnectionConnectCodeSuccess, + Description: "Connection succeeded", + Data: map[string]interface{}{ + "test": "test", + }, + }, + }, + }, + { + name: "NetConnectionCreateStream", + decoder: DecodeBodyCreateStream, + body: &NetConnectionCreateStream{}, + }, + { + name: "NetConnectionCreateStreamResult", + decoder: DecodeBodyCreateStreamResult, + body: &NetConnectionCreateStreamResult{ + StreamID: 1, + }, + }, + { + name: "NetConnectionReleaseStream", + decoder: DecodeBodyReleaseStream, + body: &NetConnectionReleaseStream{ + StreamName: "stream", + }, + }, + { + name: "NetStreamOnStatus", + decoder: DecodeBodyOnStatus, + body: &NetStreamOnStatus{ + InfoObject: NetStreamOnStatusInfoObject{ + Level: NetStreamOnStatusLevelStatus, + Code: NetStreamOnStatusCodePlayStart, + Description: "abc", + ExtraProperties: map[string]interface{}{}, + }, + }, + }, + { + name: "NetStreamOnStatus with extra properties", + decoder: DecodeBodyOnStatus, + body: &NetStreamOnStatus{ + InfoObject: NetStreamOnStatusInfoObject{ + Level: NetStreamOnStatusLevelStatus, + Code: NetStreamOnStatusCodePlayStart, + Description: "abc", + ExtraProperties: map[string]interface{}{"foo": "bar"}, + }, + }, + }, + } + + for _, test := range tests { + test := test // capture + + t.Run(test.name, func(t *testing.T) { + t.Parallel() + + amfTy := EncodingTypeAMF0 + buf := new(bytes.Buffer) + + // object to bytes(AMF0) + amfEnc := NewAMFEncoder(buf, amfTy) + err := EncodeBodyAnyValues(amfEnc, test.body) + require.Nil(t, err) + + // bytes(AMF0) to object + amfDec := NewAMFDecoder(buf, amfTy) + var v AMFConvertible + err = test.decoder(buf, amfDec, &v) + require.Nil(t, err) + + require.Equal(t, test.body, v) + }) + } +} diff --git a/message/net_connection.go b/message/net_connection.go index 4b5ff99..4c1cc0b 100644 --- a/message/net_connection.go +++ b/message/net_connection.go @@ -39,9 +39,12 @@ type NetConnectionConnectCommand struct { } func (t *NetConnectionConnect) FromArgs(args ...interface{}) error { - command := args[0].(map[string]interface{}) + command, ok := args[0].(map[string]interface{}) + if !ok { + return errors.Errorf("expect map[string]interface{} at arg[0], but got %T", args[0]) + } if err := mapstructure.Decode(command, &t.Command); err != nil { - return errors.Wrapf(err, "Failed to mapping NetConnectionConnect") + return errors.Wrapf(err, "failed to mapping arg[0] to NetConnectionConnectCommand") } return nil @@ -72,14 +75,20 @@ type NetConnectionConnectResultInformation struct { } func (t *NetConnectionConnectResult) FromArgs(args ...interface{}) error { - properties := args[0].(map[string]interface{}) + properties, ok := args[0].(map[string]interface{}) + if !ok { + return errors.Errorf("expect map[string]interface{} at arg[0], but got %T", args[0]) + } if err := mapstructure.Decode(properties, &t.Properties); err != nil { - return errors.Wrapf(err, "Failed to mapping NetConnectionConnectResultProperties") + return errors.Wrapf(err, "failed to mapping arg[0] to NetConnectionConnectResultProperties") } information := args[1].(map[string]interface{}) + if !ok { + return errors.Errorf("expect map[string]interface{} at arg[1], but got %T", args[1]) + } if err := mapstructure.Decode(information, &t.Information); err != nil { - return errors.Wrapf(err, "Failed to mapping NetConnectionConnectResultInformation") + return errors.Wrapf(err, "failed to mapping arg[1] to NetConnectionConnectResultInformation") } return nil @@ -113,7 +122,12 @@ type NetConnectionCreateStreamResult struct { func (t *NetConnectionCreateStreamResult) FromArgs(args ...interface{}) error { // args[0] is unknown, ignore - t.StreamID = args[1].(uint32) + + streamID, ok := args[1].(uint32) + if !ok { + return errors.Errorf("expect uint32 at arg[1], but got %T", args[1]) + } + t.StreamID = streamID return nil } @@ -131,7 +145,12 @@ type NetConnectionReleaseStream struct { func (t *NetConnectionReleaseStream) FromArgs(args ...interface{}) error { // args[0] is unknown, ignore - t.StreamName = args[1].(string) + + streamName, ok := args[1].(string) + if !ok { + return errors.Errorf("expect string at arg[1], but got %T", args[1]) + } + t.StreamName = streamName return nil } diff --git a/message/net_stream.go b/message/net_stream.go index 617dd86..e0d381a 100644 --- a/message/net_stream.go +++ b/message/net_stream.go @@ -144,6 +144,9 @@ func (t *NetStreamOnStatus) FromArgs(args ...interface{}) error { func (t *NetStreamOnStatus) ToArgs(ty EncodingType) ([]interface{}, error) { info := make(map[string]interface{}) + for k, v := range t.InfoObject.ExtraProperties { + info[k] = v + } info["level"] = t.InfoObject.Level info["code"] = t.InfoObject.Code info["description"] = t.InfoObject.Description From ece6c5c8c1c847ba5b1994f757ba4093c14e5402 Mon Sep 17 00:00:00 2001 From: yutopp Date: Mon, 15 Jul 2024 11:30:05 +0900 Subject: [PATCH 14/14] Fix test. Fix error handling which uses old ok values --- message/body_encoder_test.go | 1 + message/net_connection.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/message/body_encoder_test.go b/message/body_encoder_test.go index 474d0de..d42dcda 100644 --- a/message/body_encoder_test.go +++ b/message/body_encoder_test.go @@ -51,6 +51,7 @@ func TestEncodeCmdMessageOnStatus(t *testing.T) { Level: "status", Code: NetConnectionConnectCodeSuccess, Description: "Connection succeeded", + Data: map[string]interface{}{}, }, }, }, diff --git a/message/net_connection.go b/message/net_connection.go index 4c1cc0b..2f5bc76 100644 --- a/message/net_connection.go +++ b/message/net_connection.go @@ -83,7 +83,7 @@ func (t *NetConnectionConnectResult) FromArgs(args ...interface{}) error { return errors.Wrapf(err, "failed to mapping arg[0] to NetConnectionConnectResultProperties") } - information := args[1].(map[string]interface{}) + information, ok := args[1].(map[string]interface{}) if !ok { return errors.Errorf("expect map[string]interface{} at arg[1], but got %T", args[1]) }