Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow using Unix Domain Socket instead of UDP #472

Merged
merged 2 commits into from
Mar 28, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ to run `gostatsd` with a graphite backend and a grafana dashboard.
While not generally tested on Windows, it should work. Maximum throughput is likely to be better on
a linux system, however.

The server listens for UDP packets by default. Unix Domain Sockets (UDS) can be used specifying a file path instead
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than abbreviating to UDS (which afaik is not generally used) we could use a shorthand of 'unix sockets' or 'IPC sockets'. I don't want to nitpick but we should use the accepted terminology - if i'm wrong and UDS is a commonly used acronym for this then that's fine.

Copy link
Contributor Author

@rubenruizdegauna rubenruizdegauna Mar 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I've used unix sockets and rephrased it a bit in b17ebf5

of `address:port` with the `metrics-addr` configuration option. This only works on linux and will ignore `conn-per-reader`
configuration option.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somewhere the docs should mention the socket mode is SOCK_DGRAM not SOCK_STREAM

Copy link
Contributor Author

@rubenruizdegauna rubenruizdegauna Mar 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added it to this paragraph in b17ebf5. I wasn't sure if it was too much info about the socket in this section and I thought about adding another section just for this. Let me know what do you think and I can quickly change it.


Configuring the server mode
---------------------------
The server can currently run in two modes: `standalone` and `forwarder`. It is configured through the top level
Expand Down Expand Up @@ -83,7 +87,8 @@ This configuration mode allows the following configuration options:
so that memory can be pre-allocated and reducing churn. Defaults to `4`. Note: this is only a hint, and it is safe
to send more.
- `log-raw-metric`: logs raw metrics received from the network. Defaults to `false`.
- `metrics-addr`: the address to listen to metrics on. Defaults to `:8125`.
- `metrics-addr`: the address to listen to metrics on. Defaults to `:8125`. Using a file path instead of `host:port`
will create a Unix Domain Socket in the specified path instead of using UDP.
- `namespace`: a namespace to prefix all metrics with. Defaults to ''.
- `statser-type`: configures where internal metrics are sent to. May be `internal` which sends them to the internal
processing pipeline, `logging` which logs them, `null` which drops them. Defaults to `internal`, or `null` if the
Expand All @@ -93,7 +98,8 @@ This configuration mode allows the following configuration options:
Defaults to `false`.
- `receive-batch-size`: the number of datagrams to attempt to read. It is more CPU efficient to read multiple, however
it takes extra memory. See [Memory allocation for read buffers] section below for details. Defaults to 50.
- `conn-per-reader`: attempts to create a connection for every UDP receiver. Not supported by all OS versions.
- `conn-per-reader`: attempts to create a connection for every UDP receiver. Not supported by all OS versions. Will be
ignored if UDS is used instead of UDP.
Defaults to `false`.
- `bad-lines-per-minute`: the number of metrics which fail to parse to log per minute. This is used to prevent a bad
client spamming malformed statsd data, while still logging some information to enable troubleshooting. Defaults to `0`.
Expand Down
16 changes: 15 additions & 1 deletion pkg/statsd/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package statsd
import (
"context"
"net"
"os"
"strings"
"sync/atomic"

Expand Down Expand Up @@ -98,6 +99,13 @@ func (dr *DatagramReceiver) Run(ctx context.Context) {
}
}

// Delete socket file when connection is made using Unix Domain Sockets
if len(connections) > 0 {
if socket, ok := connections[0].LocalAddr().(*net.UnixAddr); ok {
defer os.Remove(socket.String())
}
}

// Wait for everything to stop
wg.Wait()
}
Expand Down Expand Up @@ -142,8 +150,14 @@ func (dr *DatagramReceiver) Receive(ctx context.Context, c net.PacketConn) {
dr.bufPool.Put(retBuf)
}

ip := gostatsd.UnknownSource
// Do not retrieve IP address when connection is made using Unix Domain Sockets
if _, isUnixAddr := c.LocalAddr().(*net.UnixAddr); !isUnixAddr {
ip = getIP(addr)
}

dgs[i] = &Datagram{
IP: getIP(addr),
IP: ip,
Msg: buf,
Timestamp: now,
DoneFunc: doneFn,
Expand Down
71 changes: 71 additions & 0 deletions pkg/statsd/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package statsd

import (
"context"
"net"
"os"
"runtime"
"sync"
"testing"
"time"

"github.com/magiconair/properties/assert"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't realize we had other assert libs. Raised #480 to remove others.

tassert "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/atlassian/gostatsd"
Expand Down Expand Up @@ -93,3 +96,71 @@ func TestDatagramReceiver_Receive(t *testing.T) {
assert.Equal(t, string(dg.IP), fakesocket.FakeAddr.IP.String())
assert.Equal(t, dg.Msg, fakesocket.FakeMetric)
}

func TestDatagramReceiver_UDS(t *testing.T) {
ch := make(chan []*Datagram, 1)
message := "abc.def.g:10|c"

// Datagram receiver listening in Unix Domain Socket
socketPath := os.TempDir() + "/gostatsd_receiver_test_receive_uds.sock"
mr := NewDatagramReceiver(ch, socketFactory(socketPath, false), 1, 2)
ctx, cancel := context.WithCancel(context.Background())

var wg sync.WaitGroup
wg.Add(1)
go func() {
mr.Run(ctx)
wg.Done()
}()

// Wait until the socket is created
tassert.Eventually(t, func() bool {
return tassert.FileExists(t, socketPath)
}, time.Second, 10*time.Millisecond)

err := sendDataToSocket(socketPath, message)
tassert.NoError(t, err)

select {
case d := <-ch:
tassert.Len(t, d, 1)
tassert.Equal(t, d[0].Msg, []byte(message))
cancel()
case <-time.After(time.Second):
t.Errorf("Timeout, failed to read datagram")
}
wg.Wait()
}

func TestDatagramReceiver_SocketIsRemoved(t *testing.T) {
ch := make(chan []*Datagram, 1)

socketPath := os.TempDir() + "/gostatsd_receiver_test_receive_uds.sock"
mr := NewDatagramReceiver(ch, socketFactory(socketPath, false), 1, 2)
ctx, cancel := context.WithCancel(context.Background())

var wg sync.WaitGroup
wg.Add(1)
go func() {
mr.Run(ctx)
wg.Done()
}()

tassert.Eventually(t, func() bool {
return tassert.FileExists(t, socketPath)
}, time.Second, 10*time.Millisecond)
cancel()
wg.Wait()
tassert.NoFileExists(t, socketPath)
}

func sendDataToSocket(socketPath string, data string) error {
c, err := net.Dial("unixgram", socketPath)
if err != nil {
return err
}
defer c.Close()

_, err = c.Write([]byte(data))
return err
}
23 changes: 18 additions & 5 deletions pkg/statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"net"
"strings"
"time"

"github.com/ash2k/stager"
Expand Down Expand Up @@ -81,12 +82,24 @@ func socketFactory(metricsAddr string, connPerReader bool) SocketFactory {
return func() (net.PacketConn, error) {
return reuseport.ListenPacket("udp", metricsAddr)
}
} else {
conn, err := net.ListenPacket("udp", metricsAddr)
return func() (net.PacketConn, error) {
return conn, err
}
}

conn, err := net.ListenPacket(networkFromAddress(metricsAddr), metricsAddr)
return func() (net.PacketConn, error) {
return conn, err
}
}

//networkFromAddress returns the network type based on the provided address
//if the address is empty or [host]:[port] combination it will be UDP otherwise UDS
func networkFromAddress(addr string) string {
if strings.TrimSpace(addr) == "" {
return "udp"
}
if strings.Index(addr, ":") != -1 {
return "udp"
}
return "unixgram"
}

func (s *Server) createStandaloneSink() (gostatsd.PipelineHandler, []gostatsd.Runnable, error) {
Expand Down
31 changes: 31 additions & 0 deletions pkg/statsd/statsd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package statsd

import (
"context"
"github.com/stretchr/testify/assert"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be a check somewhere for this, not sure why the build didn't hit it. Import ordering is:

  • stdlib
  • 3rd party
  • local

"math/rand"
"runtime"
"strconv"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -145,6 +147,35 @@ func TestStatsdThroughput(t *testing.T) {
memStatsFinish.GCCPUFraction)
}

func TestNetworkFromAddress(t *testing.T) {
t.Parallel()
input := []struct {
address string
expectedNetwork string
}{
{
address: "localhost:8125",
expectedNetwork: "udp",
},
{
address: "",
expectedNetwork: "udp",
},
{
address: "/some/file.sock",
expectedNetwork: "unixgram",
},
}
for pos, inp := range input {
inp := inp
t.Run(strconv.Itoa(pos), func(t *testing.T) {
t.Parallel()
network := networkFromAddress(inp.address)
assert.Equal(t, inp.expectedNetwork, network)
})
}
}

type countingBackend struct {
metrics uint64
events uint64
Expand Down