diff --git a/streaming/connection.go b/streaming/connection.go index f57112d..fbd0f26 100644 --- a/streaming/connection.go +++ b/streaming/connection.go @@ -86,7 +86,7 @@ func (conn *Connection) Close() error { // Serve starts serving data to a client, continuously feeding packets from the queue. // An optional preamble buffer can be passed that will be sent before streaming the live payload // (but after the HTTP response headers). -func (conn *Connection) Serve(preamble []byte) { +func (conn *Connection) Serve(preamble []byte) bool { // set the content type (important) conn.writer.Header().Set("Content-Type", "video/mpeg") // a stream is always current diff --git a/streaming/streamer.go b/streaming/streamer.go index f825390..918c819 100644 --- a/streaming/streamer.go +++ b/streaming/streamer.go @@ -352,6 +352,9 @@ func (streamer *Streamer) Stream(queue <-chan protocol.MpegTsPacket) error { // close all downstream connections for conn := range pool { conn.Close() + // avoid waiting for the removal round-trip, this will make us less racy + // double deletes are safe, so nothing bad will happen when we do get the remove command later + delete(pool, request.Connection) } // TODO implement inhibit in the check api case StreamerCommandAllow: @@ -441,6 +444,7 @@ func (streamer *Streamer) ServeHTTP(writer http.ResponseWriter, request *http.Re "remote", request.RemoteAddr, ) + // here's where the action happens start := time.Now() conn.Serve(streamer.preamble) duration := time.Since(start)