Skip to content

Commit

Permalink
Merge pull request #511 from kullanici0606/master
Browse files Browse the repository at this point in the history
Performance improvement - process udp packets in separate goroutine in order to minimize packet drops
  • Loading branch information
matthiasr authored Oct 23, 2023
2 parents 5a947bd + 04ed8c6 commit 12d14bb
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 2 deletions.
1 change: 1 addition & 0 deletions bridge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,7 @@ func TestHandlePacket(t *testing.T) {
Logger: log.NewNopLogger(),
LineParser: parser,
UDPPackets: udpPackets,
UDPPacketDrops: udpPacketDrops,
LinesReceived: linesReceived,
EventsFlushed: eventsFlushed,
SampleErrors: *sampleErrors,
Expand Down
2 changes: 2 additions & 0 deletions exporter_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func benchmarkUDPListener(times int, b *testing.B) {

// there are more events than input lines, need bigger buffer
events := make(chan event.Events, len(bytesInput)*times*2)
udpChan := make(chan []byte, len(bytesInput)*times*2)

l := listener.StatsDUDPListener{
EventHandler: &event.UnbufferedEventHandler{C: events},
Expand All @@ -74,6 +75,7 @@ func benchmarkUDPListener(times int, b *testing.B) {
LinesReceived: linesReceived,
SamplesReceived: samplesReceived,
TagsReceived: tagsReceived,
UdpPacketQueue: udpChan,
}

// resume benchmark timer
Expand Down
11 changes: 11 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ var (
Help: "The total number of StatsD packets received over UDP.",
},
)
udpPacketDrops = promauto.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_udp_packet_drops_total",
Help: "The total number of dropped StatsD packets which received over UDP.",
},
)
tcpConnections = promauto.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_tcp_connections_total",
Expand Down Expand Up @@ -261,6 +267,7 @@ func main() {
signalFXTagsEnabled = kingpin.Flag("statsd.parse-signalfx-tags", "Parse SignalFX style tags. Enabled by default.").Default("true").Bool()
relayAddr = kingpin.Flag("statsd.relay.address", "The UDP relay target address (host:port)").String()
relayPacketLen = kingpin.Flag("statsd.relay.packet-length", "Maximum relay output packet length to avoid fragmentation").Default("1400").Uint()
udpPacketQueueSize = kingpin.Flag("statsd.udp-packet-queue-size", "Size of internal queue for processing UDP packets.").Default("10000").Int()
)

promlogConfig := &promlog.Config{}
Expand Down Expand Up @@ -368,19 +375,23 @@ func main() {
}
}

udpPacketQueue := make(chan []byte, *udpPacketQueueSize)

ul := &listener.StatsDUDPListener{
Conn: uconn,
EventHandler: eventQueue,
Logger: logger,
LineParser: parser,
UDPPackets: udpPackets,
UDPPacketDrops: udpPacketDrops,
LinesReceived: linesReceived,
EventsFlushed: eventsFlushed,
Relay: relayTarget,
SampleErrors: *sampleErrors,
SamplesReceived: samplesReceived,
TagErrors: tagErrors,
TagsReceived: tagsReceived,
UdpPacketQueue: udpPacketQueue,
}

go ul.Listen()
Expand Down
7 changes: 7 additions & 0 deletions pkg/exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ var (
Help: "The total number of StatsD packets received over UDP.",
},
)
udpPacketDrops = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_udp_packet_drops_total",
Help: "The total number of dropped StatsD packets which received over UDP.",
},
)
tcpConnections = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "statsd_exporter_tcp_connections_total",
Expand Down Expand Up @@ -683,6 +689,7 @@ func TestInvalidUtf8InDatadogTagValue(t *testing.T) {
Logger: log.NewNopLogger(),
LineParser: parser,
UDPPackets: udpPackets,
UDPPacketDrops: udpPacketDrops,
LinesReceived: linesReceived,
EventsFlushed: eventsFlushed,
SampleErrors: *sampleErrors,
Expand Down
26 changes: 24 additions & 2 deletions pkg/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@ type StatsDUDPListener struct {
Logger log.Logger
LineParser Parser
UDPPackets prometheus.Counter
UDPPacketDrops prometheus.Counter
LinesReceived prometheus.Counter
EventsFlushed prometheus.Counter
Relay *relay.Relay
SampleErrors prometheus.CounterVec
SamplesReceived prometheus.Counter
TagErrors prometheus.Counter
TagsReceived prometheus.Counter
UdpPacketQueue chan []byte
}

func (l *StatsDUDPListener) SetEventHandler(eh event.EventHandler) {
Expand All @@ -53,6 +55,7 @@ func (l *StatsDUDPListener) SetEventHandler(eh event.EventHandler) {

func (l *StatsDUDPListener) Listen() {
buf := make([]byte, 65535)
go l.ProcessUdpPacketQueue()
for {
n, _, err := l.Conn.ReadFromUDP(buf)
if err != nil {
Expand All @@ -64,12 +67,31 @@ func (l *StatsDUDPListener) Listen() {
level.Error(l.Logger).Log("error", err)
return
}
l.HandlePacket(buf[0:n])

l.EnqueueUdpPacket(buf, n)
}
}

func (l *StatsDUDPListener) HandlePacket(packet []byte) {
func (l *StatsDUDPListener) EnqueueUdpPacket(packet []byte, n int) {
l.UDPPackets.Inc()
packetCopy := make([]byte, n)
copy(packetCopy, packet)
select {
case l.UdpPacketQueue <- packetCopy:
// do nothing
default:
l.UDPPacketDrops.Inc()
}
}

func (l *StatsDUDPListener) ProcessUdpPacketQueue() {
for {
packet := <-l.UdpPacketQueue
l.HandlePacket(packet)
}
}

func (l *StatsDUDPListener) HandlePacket(packet []byte) {
lines := strings.Split(string(packet), "\n")
for _, line := range lines {
level.Debug(l.Logger).Log("msg", "Incoming line", "proto", "udp", "line", line)
Expand Down

0 comments on commit 12d14bb

Please sign in to comment.