Skip to content

Commit

Permalink
Added downstream connection duration logging
Browse files Browse the repository at this point in the history
  • Loading branch information
onitake committed Nov 8, 2017
1 parent e2f6a63 commit c5a7519
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 2 deletions.
2 changes: 2 additions & 0 deletions src/restreamer/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func (api *statisticsApi) ServeHTTP(writer http.ResponseWriter, request *http.Re
TotalBytesReceived uint64 `json:"total_bytes_received"`
TotalBytesSent uint64 `json:"total_bytes_sent"`
TotalBytesDropped uint64 `json:"total_bytes_dropped"`
TotalStreamTime int64 `json:"total_stream_time_ns"`
PacketsPerSecondReceived uint64 `json:"packets_per_second_received"`
PacketsPerSecondSent uint64 `json:"packets_per_second_sent"`
PacketsPerSecondDropped uint64 `json:"packets_per_second_dropped"`
Expand All @@ -116,6 +117,7 @@ func (api *statisticsApi) ServeHTTP(writer http.ResponseWriter, request *http.Re
stats.TotalBytesReceived = global.TotalBytesReceived
stats.TotalBytesSent = global.TotalBytesSent
stats.TotalBytesDropped = global.TotalBytesDropped
stats.TotalStreamTime = global.TotalStreamTime
stats.PacketsPerSecondReceived = global.PacketsPerSecondReceived
stats.PacketsPerSecondSent = global.PacketsPerSecondSent
stats.PacketsPerSecondDropped = global.PacketsPerSecondDropped
Expand Down
21 changes: 19 additions & 2 deletions src/restreamer/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type Collector interface {
SourceDisconnected()
// IsUpstreamConnected tells you if upstream is connected.
IsUpstreamConnected() bool
// StreamDuration reports how long a downstream connection was up
StreamDuration(duration time.Duration)
}

// realCollector represents per-stream state information
Expand All @@ -60,6 +62,8 @@ type realCollector struct {
packetsDropped uint64
// upstream connection state, 0 = offline, !0 = connected
connected int32
// total streaming duration
duration int64
}

func (stats *realCollector) ConnectionAdded() {
Expand Down Expand Up @@ -94,6 +98,10 @@ func (stats *realCollector) IsUpstreamConnected() bool {
return atomic.LoadInt32(&stats.connected) != 0
}

func (stats *realCollector) StreamDuration(duration time.Duration) {
atomic.AddInt64(&stats.duration, int64(duration))
}

// clone creates a copy of the stats object - useful for
// storing state temporarily.
func (stats *realCollector) clone() *realCollector {
Expand All @@ -103,6 +111,7 @@ func (stats *realCollector) clone() *realCollector {
packetsSent: atomic.LoadUint64(&stats.packetsSent),
packetsDropped: atomic.LoadUint64(&stats.packetsDropped),
connected: atomic.LoadInt32(&stats.connected),
duration: atomic.LoadInt64(&stats.duration),
}
}

Expand All @@ -122,9 +131,10 @@ func (stats *realCollector) clone() *realCollector {
func (from *realCollector) invsub(to *realCollector) {
from.connections = to.connections - from.connections
from.packetsReceived = to.packetsReceived - from.packetsReceived
from.packetsSent= to.packetsSent - from.packetsSent
from.packetsDropped= to.packetsDropped - from.packetsDropped
from.packetsSent = to.packetsSent - from.packetsSent
from.packetsDropped = to.packetsDropped - from.packetsDropped
from.connected = to.connected
from.duration = to.duration - from.duration
}

// StreamStatistics is the current state of a single stream
Expand All @@ -138,6 +148,7 @@ type StreamStatistics struct {
TotalBytesReceived uint64
TotalBytesSent uint64
TotalBytesDropped uint64
TotalStreamTime int64
PacketsPerSecondReceived uint64
PacketsPerSecondSent uint64
PacketsPerSecondDropped uint64
Expand Down Expand Up @@ -212,6 +223,7 @@ func (stats *realStatistics) update(delta time.Duration, change map[string]*real
stats.global.TotalBytesReceived = 0
stats.global.TotalBytesSent = 0
stats.global.TotalBytesDropped = 0
stats.global.TotalStreamTime = 0
stats.global.PacketsPerSecondReceived = 0
stats.global.PacketsPerSecondSent = 0
stats.global.PacketsPerSecondDropped = 0
Expand All @@ -232,6 +244,7 @@ func (stats *realStatistics) update(delta time.Duration, change map[string]*real
stream.TotalBytesReceived = stream.TotalPacketsReceived * PacketSize
stream.TotalBytesSent = stream.TotalPacketsSent * PacketSize
stream.TotalBytesDropped = stream.TotalPacketsDropped * PacketSize
stream.TotalStreamTime += diff.duration
stream.PacketsPerSecondReceived = uint64(float64(diff.packetsReceived) / delta.Seconds())
stream.PacketsPerSecondSent = uint64(float64(diff.packetsSent) / delta.Seconds())
stream.PacketsPerSecondDropped = uint64(float64(diff.packetsDropped) / delta.Seconds())
Expand All @@ -248,6 +261,7 @@ func (stats *realStatistics) update(delta time.Duration, change map[string]*real
stats.global.TotalBytesReceived += stream.TotalBytesReceived
stats.global.TotalBytesSent += stream.TotalBytesSent
stats.global.TotalBytesDropped += stream.TotalBytesDropped
stats.global.TotalStreamTime += stream.TotalStreamTime
stats.global.PacketsPerSecondReceived += stream.PacketsPerSecondReceived
stats.global.PacketsPerSecondSent += stream.PacketsPerSecondSent
stats.global.PacketsPerSecondDropped += stream.PacketsPerSecondDropped
Expand Down Expand Up @@ -436,3 +450,6 @@ func (stats *DummyCollector) SourceDisconnected() {
func (stats *DummyCollector) IsUpstreamConnected() bool {
return false
}

func (stats *DummyCollector) StreamDuration(duration time.Duration) {
}
6 changes: 6 additions & 0 deletions src/restreamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package restreamer
import (
"fmt"
"sync"
"time"
"errors"
"net/http"
)
Expand Down Expand Up @@ -321,7 +322,10 @@ func (streamer *Streamer) ServeHTTP(writer http.ResponseWriter, request *http.Re
"message": fmt.Sprintf("Streaming to %s", request.RemoteAddr),
"remote": request.RemoteAddr,
})

start := time.Now()
conn.Serve()
duration := time.Since(start)

// done, remove the stale connection
streamer.request<- ConnectionRequest{
Expand All @@ -337,10 +341,12 @@ func (streamer *Streamer) ServeHTTP(writer http.ResponseWriter, request *http.Re
"event": eventStreamerClosed,
"message": fmt.Sprintf("Connection from %s closed", request.RemoteAddr),
"remote": request.RemoteAddr,
"duration": duration,
})

// and report
streamer.stats.ConnectionRemoved()
streamer.stats.StreamDuration(duration)

// also notify the broker
streamer.broker.Release(streamer)
Expand Down

0 comments on commit c5a7519

Please sign in to comment.