Skip to content

Commit

Permalink
Send start and stop events (atlassian#46)
Browse files Browse the repository at this point in the history
  • Loading branch information
ash2k authored Jun 23, 2016
1 parent 18d69eb commit fb08bb3
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 28 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ check:
go install
go install ./tester
gometalinter --concurrency=$(METALINTER_CONCURRENCY) --deadline=600s ./... --vendor --linter='errcheck:errcheck:-ignore=net:Close' --cyclo-over=20 \
--linter='vet:go tool vet -composites=false {paths}:PATH:LINE:MESSAGE' --disable=interfacer --dupl-threshold=200
--linter='vet:go tool vet -composites=false {paths}:PATH:LINE:MESSAGE' --disable=interfacer --disable=golint --dupl-threshold=200

check-all:
go install
Expand Down
2 changes: 1 addition & 1 deletion example/example.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

func main() {
r := statsd.NewMetricReceiver("stats", nil, handler{})
r := statsd.NewMetricReceiver("stats", handler{})
c, err := net.ListenPacket("udp", ":8125")
if err != nil {
log.Fatal(err)
Expand Down
2 changes: 1 addition & 1 deletion statsd/cloud_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type cloudHandler struct {

// NewCloudHandler initialises a new cloud handler.
// If cacheOptions is nil default cache configuration is used.
func NewCloudHandler(cloud cloudTypes.Interface, next Handler, limiter *rate.Limiter, cacheOptions *CacheOptions) RunableHandler {
func NewCloudHandler(cloud cloudTypes.Interface, next Handler, limiter *rate.Limiter, cacheOptions *CacheOptions) *cloudHandler {
if cacheOptions == nil {
cacheOptions = &CacheOptions{
CacheRefreshPeriod: DefaultCacheRefreshPeriod,
Expand Down
4 changes: 2 additions & 2 deletions statsd/cloud_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ func testExpire(t *testing.T, expectedIps []types.IP, f func(Handler) error) {
if !reflect.DeepEqual(fp.ips, expectedIps) {
t.Errorf("%+v is not equal to the expected ips %+v", fp.ips, expectedIps)
}
if len(ch.(*cloudHandler).cache) > 0 {
t.Errorf("cache should be empty %s", ch.(*cloudHandler).cache)
if len(ch.cache) > 0 {
t.Errorf("cache should be empty %s", ch.cache)
}
}

Expand Down
16 changes: 15 additions & 1 deletion statsd/dispatching_handler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package statsd

import (
"sync"

backendTypes "github.com/atlassian/gostatsd/backend/types"
"github.com/atlassian/gostatsd/types"

Expand All @@ -10,35 +12,47 @@ import (

// dispatchingHandler dispatches events to all configured backends and forwards metrics to a Dispatcher.
type dispatchingHandler struct {
wg sync.WaitGroup
dispatcher Dispatcher
backends []backendTypes.Backend
tags types.Tags // Tags to add to all metrics and events
}

// NewDispatchingHandler initialises a new dispatching handler.
func NewDispatchingHandler(dispatcher Dispatcher, backends []backendTypes.Backend) Handler {
func NewDispatchingHandler(dispatcher Dispatcher, backends []backendTypes.Backend, tags types.Tags) *dispatchingHandler {
return &dispatchingHandler{
dispatcher: dispatcher,
backends: backends,
tags: tags,
}
}

func (dh *dispatchingHandler) DispatchMetric(ctx context.Context, m *types.Metric) error {
if m.Hostname == "" {
m.Hostname = string(m.SourceIP)
}
m.Tags = append(m.Tags, dh.tags...)
return dh.dispatcher.DispatchMetric(ctx, m)
}

func (dh *dispatchingHandler) DispatchEvent(ctx context.Context, e *types.Event) error {
if e.Hostname == "" {
e.Hostname = string(e.SourceIP)
}
e.Tags = append(e.Tags, dh.tags...)
dh.wg.Add(len(dh.backends))
for _, backend := range dh.backends {
go func(b backendTypes.Backend) {
defer dh.wg.Done()
if err := b.SendEvent(ctx, e); err != nil {
log.Errorf("Sending event to backend failed: %v", err)
}
}(backend)
}
return nil
}

// Wait waits for all event-dispatching goroutines to finish.
func (dh *dispatchingHandler) WaitForEvents() {
dh.wg.Wait()
}
6 changes: 3 additions & 3 deletions statsd/lexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,11 @@ func BenchmarkParseSet(b *testing.B) {
benchmarkLexer(&metricReceiver{}, "uniq.usr:joe|s", b)
}
func BenchmarkParseCounterWithDefaultTags(b *testing.B) {
benchmarkLexer(&metricReceiver{tags: []string{"env:foo", "foo:bar"}}, "foo.bar.baz:2|c", b)
benchmarkLexer(&metricReceiver{}, "foo.bar.baz:2|c", b)
}
func BenchmarkParseCounterWithDefaultTagsAndTags(b *testing.B) {
benchmarkLexer(&metricReceiver{tags: []string{"env:foo", "foo:bar"}}, "foo.bar.baz:2|c|#foo:bar,baz", b)
benchmarkLexer(&metricReceiver{}, "foo.bar.baz:2|c|#foo:bar,baz", b)
}
func BenchmarkParseCounterWithDefaultTagsAndTagsAndNameSpace(b *testing.B) {
benchmarkLexer(&metricReceiver{namespace: "stats", tags: []string{"env:foo", "foo:bar"}}, "foo.bar.baz:2|c|#foo:bar,baz", b)
benchmarkLexer(&metricReceiver{namespace: "stats"}, "foo.bar.baz:2|c|#foo:bar,baz", b)
}
17 changes: 3 additions & 14 deletions statsd/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,6 @@ type Handler interface {
DispatchEvent(context.Context, *types.Event) error
}

// RunableHandler extends Handler interface to add the Run method that needs to be executed in a separate
// goroutine to make the handler work.
type RunableHandler interface {
Handler
Run(context.Context) error
}

// Receiver receives data on its PacketConn and converts lines into Metrics.
// For each types.Metric it calls Handler.HandleMetric()
type Receiver interface {
Expand All @@ -55,17 +48,15 @@ type metricReceiver struct {
packetsReceived uint64
metricsReceived uint64
eventsReceived uint64
handler Handler // handler to invoke
namespace string // Namespace to prefix all metrics
tags types.Tags // Tags to add to all metrics
handler Handler // handler to invoke
namespace string // Namespace to prefix all metrics
}

// NewMetricReceiver initialises a new Receiver.
func NewMetricReceiver(ns string, tags []string, handler Handler) Receiver {
func NewMetricReceiver(ns string, handler Handler) Receiver {
return &metricReceiver{
handler: handler,
namespace: ns,
tags: tags,
}
}

Expand Down Expand Up @@ -141,12 +132,10 @@ func (mr *metricReceiver) handlePacket(ctx context.Context, addr net.Addr, msg [
}
if metric != nil {
numMetrics++
metric.Tags = append(metric.Tags, mr.tags...)
metric.SourceIP = ip
err = mr.handler.DispatchMetric(ctx, metric)
} else if event != nil {
numEvents++
event.Tags = append(event.Tags, mr.tags...)
event.SourceIP = ip
if event.DateHappened == 0 {
event.DateHappened = time.Now().Unix()
Expand Down
4 changes: 2 additions & 2 deletions statsd/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestReceiveEmptyPacket(t *testing.T) {
}
for _, inp := range input {
ch := &countingHandler{}
mr := NewMetricReceiver("", []string{}, ch).(*metricReceiver)
mr := NewMetricReceiver("", ch).(*metricReceiver)

err := mr.handlePacket(context.Background(), fakesocket.FakeAddr, inp)
if err != nil {
Expand Down Expand Up @@ -75,7 +75,7 @@ func TestReceivePacket(t *testing.T) {
}
for packet, mAndE := range input {
ch := &countingHandler{}
mr := NewMetricReceiver("", []string{}, ch).(*metricReceiver)
mr := NewMetricReceiver("", ch).(*metricReceiver)

err := mr.handlePacket(context.Background(), fakesocket.FakeAddr, []byte(packet))
if err != nil {
Expand Down
52 changes: 49 additions & 3 deletions statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package statsd
import (
"fmt"
"net"
"os"
"runtime"
"strconv"
"strings"
Expand All @@ -11,6 +12,7 @@ import (

backendTypes "github.com/atlassian/gostatsd/backend/types"
cloudTypes "github.com/atlassian/gostatsd/cloudprovider/types"
"github.com/atlassian/gostatsd/types"

log "github.com/Sirupsen/logrus"
"github.com/spf13/pflag"
Expand Down Expand Up @@ -176,7 +178,9 @@ func (s *Server) RunWithCustomSocket(ctx context.Context, sf SocketFactory) erro
}()

// 2. Start handlers
handler := NewDispatchingHandler(dispatcher, s.Backends)
var handler Handler
dispHandler := NewDispatchingHandler(dispatcher, s.Backends, s.DefaultTags)
handler = dispHandler
if s.CloudProvider != nil {
ch := NewCloudHandler(s.CloudProvider, handler, s.Limiter, nil)
handler = ch
Expand Down Expand Up @@ -209,7 +213,7 @@ func (s *Server) RunWithCustomSocket(ctx context.Context, sf SocketFactory) erro
}
}()

receiver := NewMetricReceiver(s.Namespace, s.DefaultTags, handler)
receiver := NewMetricReceiver(s.Namespace, handler)
wgReceiver.Add(s.MaxReaders)
for r := 0; r < s.MaxReaders; r++ {
go func() {
Expand Down Expand Up @@ -242,11 +246,53 @@ func (s *Server) RunWithCustomSocket(ctx context.Context, sf SocketFactory) erro
// go console.ListenAndServe()
//}

// 6. Listen until done
// 6. Send events on start and on stop
defer sendStopEvent(dispHandler)
sendStartEvent(ctx, dispHandler)

// 7. Listen until done
<-ctx.Done()
return ctx.Err()
}

func sendStartEvent(ctx context.Context, dispHandler *dispatchingHandler) {
err := dispHandler.DispatchEvent(ctx, &types.Event{
Title: "Gostatsd started",
Text: "Gostatsd started",
DateHappened: time.Now().Unix(),
Hostname: getHost(),
Priority: types.PriLow,
})
if err != nil {
log.Warnf("Failed to send start event: %v", err)
}
}

func sendStopEvent(dispHandler *dispatchingHandler) {
ctx, cancelFunc := context.WithTimeout(context.Background(), 2*time.Second)
defer cancelFunc()
err := dispHandler.DispatchEvent(ctx, &types.Event{
Title: "Gostatsd stopped",
Text: "Gostatsd stopped",
DateHappened: time.Now().Unix(),
Hostname: getHost(),
Priority: types.PriLow,
})
if err != nil {
log.Warnf("Failed to send stop event: %v", err)
}
dispHandler.WaitForEvents()
}

func getHost() string {
host, err := os.Hostname()
if err != nil {
log.Warnf("Cannot get hostname: %v", err)
return ""
}
return host
}

type agrFactory struct {
percentThresholds []float64
flushInterval time.Duration
Expand Down

0 comments on commit fb08bb3

Please sign in to comment.