Skip to content

Commit

Permalink
Add tests for DeleteStream. Add internal function to mark streams as …
Browse files Browse the repository at this point in the history
…closed
  • Loading branch information
yutopp committed Jul 2, 2023
1 parent 4d6e948 commit 39f0a77
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 16 deletions.
6 changes: 2 additions & 4 deletions client_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,11 @@ func (cc *ClientConn) DeleteStream(body *message.NetStreamDeleteStream) error {
}

// Check if stream id exists
_, err = cc.conn.streams.At(body.StreamID)
if err != nil {
if _, err := cc.conn.streams.At(body.StreamID); err != nil {
return err
}

err = ctrlStream.DeleteStream(body)
if err != nil {
if err := ctrlStream.DeleteStream(body); err != nil {
return err
}

Expand Down
59 changes: 49 additions & 10 deletions server_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,13 @@ const (
chunkSize = 128
)

type serverCanAcceptConnectHandler struct {
DefaultHandler
}

func TestServerCanAcceptConnect(t *testing.T) {
config := &ConnConfig{
Handler: &ServerCanAcceptConnectHandler{},
Handler: &serverCanAcceptConnectHandler{},
Logger: logrus.StandardLogger(),
}

Expand All @@ -36,13 +40,17 @@ func TestServerCanAcceptConnect(t *testing.T) {
})
}

type ServerCanAcceptConnectHandler struct {
type serverCanRejectConnectHandler struct {
DefaultHandler
}

func (h *serverCanRejectConnectHandler) OnConnect(_ uint32, _ *message.NetConnectionConnect) error {
return fmt.Errorf("Reject")
}

func TestServerCanRejectConnect(t *testing.T) {
config := &ConnConfig{
Handler: &ServerCanRejectConnectHandler{},
Handler: &serverCanRejectConnectHandler{},
Logger: logrus.StandardLogger(),
}

Expand All @@ -67,17 +75,13 @@ func TestServerCanRejectConnect(t *testing.T) {
})
}

type ServerCanRejectConnectHandler struct {
type serverCanAcceptCreateStreamHandler struct {
DefaultHandler
}

func (h *ServerCanRejectConnectHandler) OnConnect(_ uint32, _ *message.NetConnectionConnect) error {
return fmt.Errorf("Reject")
}

func TestServerCanAcceptCreateStream(t *testing.T) {
config := &ConnConfig{
Handler: &ServerCanAcceptCreateStreamHandler{},
Handler: &serverCanAcceptCreateStreamHandler{},
Logger: logrus.StandardLogger(),
ControlState: StreamControlStateConfig{
MaxMessageStreams: 2, // Control and another 1 stream
Expand All @@ -104,11 +108,45 @@ func TestServerCanAcceptCreateStream(t *testing.T) {
})
}

type ServerCanAcceptCreateStreamHandler struct {
type serverCanAcceptDeleteStreamHandler struct {
DefaultHandler
}

func TestServerCanAcceptDeleteStream(t *testing.T) {
config := &ConnConfig{
Handler: &serverCanAcceptDeleteStreamHandler{},
Logger: logrus.StandardLogger(),
ControlState: StreamControlStateConfig{
MaxMessageStreams: 2, // Control and another 1 stream
},
}

prepareConnection(t, config, func(c *ClientConn) {
err := c.Connect(nil)
require.Nil(t, err)

s0, err := c.CreateStream(nil, chunkSize)
require.NoError(t, err)
defer s0.Close()

t.Run("Cannot delete a stream which does not exist", func(t *testing.T) {
err = c.DeleteStream(&message.NetStreamDeleteStream{
StreamID: 42,
})
require.Error(t, err)
})

t.Run("Can delete a stream", func(t *testing.T) {
err = c.DeleteStream(&message.NetStreamDeleteStream{
StreamID: s0.streamID,
})
require.NoError(t, err)
})
})
}

func prepareConnection(t *testing.T, config *ConnConfig, f func(c *ClientConn)) {
// prepare server
l, err := net.Listen("tcp", "127.0.0.1:")
require.Nil(t, err)

Expand All @@ -127,6 +165,7 @@ func prepareConnection(t *testing.T, config *ConnConfig, f func(c *ClientConn))
require.Equal(t, ErrClosed, err)
}()

// prepare client
c, err := Dial("rtmp", l.Addr().String(), &ConnConfig{
Logger: logrus.StandardLogger(),
})
Expand Down
5 changes: 5 additions & 0 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,14 @@ func (s *Stream) NotifyStatus(
}

func (s *Stream) Close() error {
s.assumeClosed()
return nil // TODO: implement
}

func (s *Stream) assumeClosed() {
// TODO: implement
}

func (s *Stream) writeCommandMessage(
chunkStreamID int,
timestamp uint32,
Expand Down
6 changes: 4 additions & 2 deletions streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,14 @@ func (ss *streams) Delete(streamID uint32) error {
ss.m.Lock()
defer ss.m.Unlock()

_, ok := ss.streams[streamID]
s, ok := ss.streams[streamID]
if !ok {
return errors.Errorf("Stream not exists: StreamID = %d", streamID)
}

delete(ss.streams, streamID)
delete(ss.streams, s.streamID)

s.assumeClosed()

return nil
}
Expand Down

0 comments on commit 39f0a77

Please sign in to comment.