Skip to content

Commit

Permalink
Implement Prometheus metrics
Browse files Browse the repository at this point in the history
* Moved metrics into separate package

* Implemented a typed metric data holder

* Partial implementation of an async stats collector

* Fixed some more metrics package refs in unit test

* Implemented multi-value collector and return

* Added the other datum convenience constructors

* Added a few more unit tests

* Cleaned up unit test

* Made Metric.MakeKey public

* go fmt

* Added dummy metrics collector and hid real collector behind interface

* Started integrating new collector

* Test implementation of new metrics lib

* Added missing package prefix

* Refactored collector structure (fetch is WIP)

* Added go module stuff to makefile

* Added prometheus deps

* Implemented prometheus metrics for client

* Implemented prometheus endpoint

* Registered client prom metrics on default registry

* Removed obsolete metrics framework

* Implemented prometheus metrics for streamer

* Documented new prometheus api resource

* Removed Go 1.6 and 1.9 from tested versions, as Promtheus needs at least 1.9

* Prevent default promtheus metrics from being exported

* Added network stats in bytes

* Added default process metrics and enabled Go metrics if profiling is enabled

* Updated readme with new info about prom metrics

* Updated copyright

* Updated metrics doc formatting
  • Loading branch information
srgoni authored Jul 19, 2019
1 parent 75b5b3b commit 0d388f4
Show file tree
Hide file tree
Showing 161 changed files with 40,185 additions and 39 deletions.
3 changes: 1 addition & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ language: go

go:
- "1.x"
- "1.6"
- "1.8"
- "1.9"
- "1.10.x"
- "1.11.x"
- master
8 changes: 7 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
#export GODEBUG = gctrace=1
# use go netcode instead of libc
export CGO_ENABLED = 0
# enforce using gomod
export GO111MODULE = on

PACKAGE:=github.com/onitake/restreamer

# always force a rebuild of the main binary
.PHONY: all clean test fmt docker restreamer
.PHONY: all clean test fmt vendor docker restreamer

all: restreamer

Expand All @@ -19,6 +21,10 @@ test:
fmt:
go fmt $(PACKAGE)/...

vendor:
go mod tidy
go mod vendor

docker: restreamer
docker build -t restreamer .

Expand Down
37 changes: 35 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

HTTP transport stream proxy

Copyright © 2016-2018 Gregor Riepl;
Copyright © 2016-2019 Gregor Riepl;
All rights reserved.

Please see the LICENSE file for details on permitted use of this software.
Expand Down Expand Up @@ -41,10 +41,11 @@ These are the key components:
* streaming/connection - HTTP server that feeds data to clients
* streaming/streamer - connection broker and data queue
* api/api - web API for service monitoring
* api/stats - stat collector and tracker
* streaming/proxy - static web server and proxy
* protocol - network protocol library
* configuration - abstraction of the configuration file
* metrics - a small wrapper around the Promethus client library
* metrics/stats - the old, deprecated metrics collector; use Prometheus if possible
* cmd/restreamer - core program that glues the components together


Expand Down Expand Up @@ -130,6 +131,38 @@ It is highly recommended to log to stdout and collect logs using journald
or a similar logging engine.


## Metrics

Metrics are exported through the Prometheus client library. Enable a Prometheus
API endpoint and expose it on /metrics to expose them.

Supported metrics are:

* _streaming_packets_sent_
Total number of MPEG-TS packets sent from the output queue.
* _streaming_bytes_sent_
Total number of bytes sent from the output queue.
* _streaming_packets_dropped_
Total number of MPEG-TS packets dropped from the output queue.
* _streaming_bytes_dropped_
Total number of bytes dropped from the output queue.
* _streaming_connections_
Number of active client connections.
* _streaming_duration_
Total time spent streaming, summed over all client connections. In nanoseconds.
* _streaming_source_connected_
Connection status, 0=disconnected 1=connected.
* _streaming_packets_received_
Total number of MPEG-TS packets received.
* _streaming_bytes_received_
Total number of bytes received.

Additionally, the standard process metrics supported by the Prometheus client
library are exported. Go runtime statistics are disabled, as they can have a
considerable effect on realtime operation. To enable them, you need to turn on
profiling.


## Optimisation

### Test Stream
Expand Down
37 changes: 33 additions & 4 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package api
import (
"encoding/json"
"github.com/onitake/restreamer/auth"
"github.com/onitake/restreamer/metrics"
"net/http"
)

Expand All @@ -30,14 +31,14 @@ type connectChecker interface {
// healthApi encapsulates a system status object and
// provides an HTTP/JSON handler for reporting system health.
type healthApi struct {
stats Statistics
stats metrics.Statistics
// auth is an authentication verifier for client requests
auth auth.Authenticator
}

// NewHealthApi creates a new health API object,
// serving data from a system Statistics object.
func NewHealthApi(stats Statistics, auth auth.Authenticator) http.Handler {
func NewHealthApi(stats metrics.Statistics, auth auth.Authenticator) http.Handler {
return &healthApi{
stats: stats,
auth: auth,
Expand Down Expand Up @@ -94,14 +95,14 @@ func (api *healthApi) ServeHTTP(writer http.ResponseWriter, request *http.Reques
// statisticsApi encapsulates a system status object and
// provides an HTTP/JSON handler for reporting total system statistics.
type statisticsApi struct {
stats Statistics
stats metrics.Statistics
// auth is an authentication verifier for client requests
auth auth.Authenticator
}

// NewStatisticsApi creates a new statistics API object,
// serving data from a system Statistics object.
func NewStatisticsApi(stats Statistics, auth auth.Authenticator) http.Handler {
func NewStatisticsApi(stats metrics.Statistics, auth auth.Authenticator) http.Handler {
return &statisticsApi{
stats: stats,
auth: auth,
Expand Down Expand Up @@ -270,3 +271,31 @@ func (api *streamControlApi) ServeHTTP(writer http.ResponseWriter, request *http
writer.Write([]byte("400 bad request"))
}
}

// prometheusApi implements a handler for scraping Prometheus metrics.
type prometheusApi struct {
// auth is an authentication verifier for client requests
auth auth.Authenticator
// handler is the delegate HTTP handler
handler http.Handler
}

// NewPrometheusApi creates a new Prometheus metrics API object,
// serving metrics to a Prometheus instance.
func NewPrometheusApi(auth auth.Authenticator) http.Handler {
return &prometheusApi{
auth: auth,
handler: metrics.PromHandler(),
}
}

// ServeHTTP is the http handler method.
func (api *prometheusApi) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
// fail-fast: verify that this user can access this resource first
if !auth.HandleHttpAuthentication(api.auth, request, writer) {
return
}

// authentication successful, forward the request to the promhttp handler
api.handler.ServeHTTP(writer, request)
}
17 changes: 9 additions & 8 deletions api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"encoding/json"
"github.com/onitake/restreamer/auth"
"github.com/onitake/restreamer/configuration"
"github.com/onitake/restreamer/metrics"
"net/http"
"net/url"
"testing"
Expand Down Expand Up @@ -57,29 +58,29 @@ func (writer *mockWriter) WriteHeader(status int) {
}

type mockStatistics struct {
Streams map[string]*StreamStatistics
Global StreamStatistics
Streams map[string]*metrics.StreamStatistics
Global metrics.StreamStatistics
}

func (*mockStatistics) Start() {}
func (*mockStatistics) Stop() {}
func (*mockStatistics) RegisterStream(name string) Collector {
func (*mockStatistics) RegisterStream(name string) metrics.Collector {
return nil
}
func (*mockStatistics) RemoveStream(name string) {}
func (stats *mockStatistics) GetStreamStatistics(name string) *StreamStatistics {
func (stats *mockStatistics) GetStreamStatistics(name string) *metrics.StreamStatistics {
return stats.Streams[name]
}
func (stats *mockStatistics) GetAllStreamStatistics() map[string]*StreamStatistics {
func (stats *mockStatistics) GetAllStreamStatistics() map[string]*metrics.StreamStatistics {
return stats.Streams
}
func (stats *mockStatistics) GetGlobalStatistics() *StreamStatistics {
func (stats *mockStatistics) GetGlobalStatistics() *metrics.StreamStatistics {
return &stats.Global
}

func testStatisticsConnections(t *testing.T, connections, full, max int64, status string) {
stats := &mockStatistics{
Global: StreamStatistics{
Global: metrics.StreamStatistics{
Connections: connections,
MaxConnections: max,
FullConnections: full,
Expand Down Expand Up @@ -108,7 +109,7 @@ func testStatisticsConnections(t *testing.T, connections, full, max int64, statu

func testHealthConnections(t *testing.T, connections, full, max int64, status string) {
stats := &mockStatistics{
Global: StreamStatistics{
Global: metrics.StreamStatistics{
Connections: connections,
MaxConnections: max,
FullConnections: full,
Expand Down
22 changes: 17 additions & 5 deletions cmd/restreamer/restreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import (
"github.com/onitake/restreamer/auth"
"github.com/onitake/restreamer/configuration"
"github.com/onitake/restreamer/event"
"github.com/onitake/restreamer/metrics"
"github.com/onitake/restreamer/streaming"
"github.com/onitake/restreamer/util"
"github.com/prometheus/client_golang/prometheus"
"log"
"math/rand"
"net/http"
Expand Down Expand Up @@ -61,6 +63,8 @@ func main() {

if config.Profile {
EnableProfiling()
// If profiling is enabled, we also want the Go runtime metrics collector
metrics.DefaultRegisterer.Register(prometheus.NewGoCollector())
}

if config.Log != "" {
Expand All @@ -73,11 +77,11 @@ func main() {

clients := make(map[string]*streaming.Client)

var stats api.Statistics
var stats metrics.Statistics
if config.NoStats {
stats = &api.DummyStatistics{}
stats = &metrics.DummyStatistics{}
} else {
stats = api.NewStatistics(config.MaxConnections, config.FullConnections)
stats = metrics.NewStatistics(config.MaxConnections, config.FullConnections)
}

controller := streaming.NewAccessController(config.MaxConnections)
Expand Down Expand Up @@ -155,15 +159,15 @@ func main() {

auth := auth.NewAuthenticator(streamdef.Authentication, config.UserList)

streamer := streaming.NewStreamer(config.OutputBuffer, controller, auth)
streamer := streaming.NewStreamer(streamdef.Serve, config.OutputBuffer, controller, auth)
streamer.SetCollector(reg)
streamer.SetNotifier(queue)

// shuffle the list here, not later
// should give a bit more randomness
remotes := util.ShuffleStrings(rnd, streamdef.Remotes)

client, err := streaming.NewClient(remotes, streamer, config.Timeout, config.Reconnect, config.ReadTimeout, config.InputBuffer, streamdef.ClientInterface, config.InputBuffer, streamdef.Mru)
client, err := streaming.NewClient(streamdef.Serve, remotes, streamer, config.Timeout, config.Reconnect, config.ReadTimeout, config.InputBuffer, streamdef.ClientInterface, config.InputBuffer, streamdef.Mru)
if err == nil {
client.SetCollector(reg)
client.Connect()
Expand Down Expand Up @@ -255,6 +259,14 @@ func main() {
"message", fmt.Sprintf("Error, stream not found: %s", streamdef.Remote),
)
}
case "prometheus":
logger.Logkv(
"event", eventMainConfigApi,
"api", "prometheus",
"serve", streamdef.Serve,
"message", fmt.Sprintf("Registering Prometheus API on %s", streamdef.Serve),
)
mux.Handle(streamdef.Serve, api.NewPrometheusApi(auth))
default:
logger.Logkv(
"event", eventMainError,
Expand Down
8 changes: 7 additions & 1 deletion examples/documented/restreamer.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@
"type": "stream",
"": "API endpoint, only used if type is api.",
"": "health = reports system health.",
"": "statistics = reports detailed system statistics.",
"": "statistics = reports detailed system statistics. [deprecated, use prometheus]",
"": "prometheus = reports detailed system statistics as a standard Prometheus scrape endpoint.",
"": "check = reports the status of a stream. remote contains the serve path of the stream.",
"": "control = allows setting a stream offline or online. The state is controlled by the presence of the query parameters 'offline' or 'online', respectively.",
"api": "",
Expand Down Expand Up @@ -114,6 +115,11 @@
"api": "health",
"serve": "/health"
},
{
"type": "api",
"api": "prometheus",
"serve": "/metrics"
},
{
"type": "static",
"serve": "/test",
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
module github.com/onitake/restreamer

require github.com/prometheus/client_golang v1.0.0
51 changes: 51 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v1.0.0 h1:vrDKnkGzuGvhNAL56c7DBz29ZL+KxnoR0x7enabFceM=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 h1:S/YWwWx/RA8rT8tKFRuGUZhuA90OyIBpPCXkcbwU8DE=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/common v0.4.1 h1:K0MGApIoQvMw27RTdJkPbr3JZ7DNbtxQNyi5STVM6Kw=
github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.2 h1:6LJUbpNm42llc4HRCuvApCSWB/WfhuNo9K98Q9sNGfs=
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5 h1:mzjBh+S5frKOsOBobWIMAbXavqjmgO17k/2puhcFR94=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
Loading

0 comments on commit 0d388f4

Please sign in to comment.