Skip to content

Commit

Permalink
fix races
Browse files Browse the repository at this point in the history
  • Loading branch information
zak-pawel committed Aug 13, 2024
1 parent 1f791d5 commit 83661ef
Showing 1 changed file with 77 additions and 58 deletions.
135 changes: 77 additions & 58 deletions plugins/outputs/zabbix/zabbix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ type result struct {
type zabbixMockServer struct {
listener net.Listener
ignoreAcceptError bool
results []result
}

func newZabbixMockServer(addr string, ignoreAcceptError bool) (*zabbixMockServer, error) {
Expand All @@ -55,11 +54,11 @@ func newZabbixMockServer(addr string, ignoreAcceptError bool) (*zabbixMockServer
return &zabbixMockServer{listener: l, ignoreAcceptError: ignoreAcceptError}, nil
}

func (s *zabbixMockServer) Addr() string {
func (s *zabbixMockServer) addr() string {
return s.listener.Addr().String()
}

func (s *zabbixMockServer) Close() error {
func (s *zabbixMockServer) close() error {
if s.listener != nil {
return s.listener.Close()
}
Expand Down Expand Up @@ -485,10 +484,10 @@ func TestZabbix(t *testing.T) {
// Simulate a Zabbix server to get the data sent. It has a timeout to avoid waiting forever.
server, err := newZabbixMockServer("127.0.0.1:", len(test.zabbixMetrics) == 0)
require.NoError(t, err)
defer server.Close()
defer server.close()

z := &Zabbix{
Address: server.Addr(),
Address: server.addr(),
KeyPrefix: test.KeyPrefix,
HostTag: "host",
SkipMeasurementPrefix: test.SkipMeasurementPrefix,
Expand Down Expand Up @@ -577,11 +576,10 @@ func TestLLD(t *testing.T) {
// Simulate a Zabbix server to get the data sent
server, err := newZabbixMockServer("127.0.0.1:", false)
require.NoError(t, err)
defer server.Close()
server.Start()
defer server.close()

z := &Zabbix{
Address: server.Addr(),
Address: server.addr(),
KeyPrefix: "telegraf.",
HostTag: "host",
LLDSendInterval: config.Duration(10 * time.Minute),
Expand All @@ -590,6 +588,11 @@ func TestLLD(t *testing.T) {
}
require.NoError(t, z.Init())

resCh := make(chan []result, 1)
go func() {
resCh <- server.listenForNRequests(9)
}()

// First packet
require.NoError(t, z.Write([]telegraf.Metric{m}))

Expand Down Expand Up @@ -630,66 +633,70 @@ func TestLLD(t *testing.T) {
// Ninth packet, time has surpassed LLDSendInterval, metrics + LLD.
require.NoError(t, z.Write([]telegraf.Metric{m}))

require.Eventually(t, func() bool {
return len(server.results) == 9
}, 2*time.Second, 50*time.Millisecond, "did not receive 9 results within specific time")
var results []result
select {
case res := <-resCh:
require.Len(t, res, 9)
results = res
case <-time.After(9 * time.Second):
require.Fail(t, "Timeout while waiting for results")
}

// Read first packet with two metrics, then the first auto-register packet and the second auto-register packet.
// First packet with metrics
require.NoError(t, server.results[0].err)
compareData(t, []zabbixRequestData{zabbixMetric}, server.results[0].req.Data)
require.NoError(t, results[0].err)
compareData(t, []zabbixRequestData{zabbixMetric}, results[0].req.Data)

// Second packet, while time has not surpassed LLDSendInterval
require.NoError(t, server.results[1].err)
compareData(t, []zabbixRequestData{zabbixMetric}, server.results[1].req.Data)
require.NoError(t, results[1].err)
compareData(t, []zabbixRequestData{zabbixMetric}, results[1].req.Data)

// Third packet, time has surpassed LLDSendInterval, metrics + LLD
require.NoError(t, server.results[2].err)
require.Len(t, server.results[2].req.Data, 2, "Expected 2 metrics")
server.results[2].req.Data[1].Clock = 0 // Ignore lld request clock
compareData(t, []zabbixRequestData{zabbixMetric, zabbixLLDMetric}, server.results[2].req.Data)
require.NoError(t, results[2].err)
require.Len(t, results[2].req.Data, 2, "Expected 2 metrics")
results[2].req.Data[1].Clock = 0 // Ignore lld request clock
compareData(t, []zabbixRequestData{zabbixMetric, zabbixLLDMetric}, results[2].req.Data)

// Fourth packet with metrics
require.NoError(t, server.results[3].err)
compareData(t, []zabbixRequestData{zabbixMetric}, server.results[3].req.Data)
require.NoError(t, results[3].err)
compareData(t, []zabbixRequestData{zabbixMetric}, results[3].req.Data)

// Fifth packet, time has surpassed LLDSendInterval, metrics. No LLD as there is nothing new.
require.NoError(t, server.results[4].err)
compareData(t, []zabbixRequestData{zabbixMetric}, server.results[4].req.Data)
require.NoError(t, results[4].err)
compareData(t, []zabbixRequestData{zabbixMetric}, results[4].req.Data)

// Sixth packet, new LLD info, but time has not surpassed LLDSendInterval
require.NoError(t, server.results[5].err)
compareData(t, []zabbixRequestData{zabbixMetricNew}, server.results[5].req.Data)
require.NoError(t, results[5].err)
compareData(t, []zabbixRequestData{zabbixMetricNew}, results[5].req.Data)

// Seventh packet, time has surpassed LLDSendInterval, metrics + LLD.
// Also, time has surpassed LLDClearInterval, so LLD is cleared.
require.NoError(t, server.results[6].err)
require.Len(t, server.results[6].req.Data, 2, "Expected 2 metrics")
server.results[6].req.Data[1].Clock = 0 // Ignore lld request clock
compareData(t, []zabbixRequestData{zabbixMetric, zabbixLLDMetricNew}, server.results[6].req.Data)
require.NoError(t, results[6].err)
require.Len(t, results[6].req.Data, 2, "Expected 2 metrics")
results[6].req.Data[1].Clock = 0 // Ignore lld request clock
compareData(t, []zabbixRequestData{zabbixMetric, zabbixLLDMetricNew}, results[6].req.Data)

// Eighth packet, time host not surpassed LLDSendInterval, just metrics.
require.NoError(t, server.results[7].err)
compareData(t, []zabbixRequestData{zabbixMetric}, server.results[7].req.Data)
require.NoError(t, results[7].err)
compareData(t, []zabbixRequestData{zabbixMetric}, results[7].req.Data)

// Ninth packet, time has surpassed LLDSendInterval, metrics + LLD.
// Just the info of the zabbixMetric as zabbixMetricNew has not been seen since LLDClearInterval.
require.NoError(t, server.results[8].err)
require.Len(t, server.results[8].req.Data, 2, "Expected 2 metrics")
server.results[8].req.Data[1].Clock = 0 // Ignore lld request clock
compareData(t, []zabbixRequestData{zabbixMetric, zabbixLLDMetric}, server.results[8].req.Data)
require.NoError(t, results[8].err)
require.Len(t, results[8].req.Data, 2, "Expected 2 metrics")
results[8].req.Data[1].Clock = 0 // Ignore lld request clock
compareData(t, []zabbixRequestData{zabbixMetric, zabbixLLDMetric}, results[8].req.Data)
}

// TestAutoRegister tests that auto-registration requests are sent to zabbix if enabled
func TestAutoRegister(t *testing.T) {
// Simulate a Zabbix server to get the data sent
server, err := newZabbixMockServer("127.0.0.1:", false)
require.NoError(t, err)
defer server.Close()
server.Start()
defer server.close()

z := &Zabbix{
Address: server.Addr(),
Address: server.addr(),
KeyPrefix: "telegraf.",
HostTag: "host",
SkipMeasurementPrefix: false,
Expand All @@ -699,6 +706,12 @@ func TestAutoRegister(t *testing.T) {
Log: testutil.Logger{},
}
require.NoError(t, z.Init())

resCh := make(chan []result, 1)
go func() {
resCh <- server.listenForNRequests(3)
}()

err = z.Write([]telegraf.Metric{
testutil.MustMetric(
"name",
Expand All @@ -715,27 +728,32 @@ func TestAutoRegister(t *testing.T) {
})
require.NoError(t, err)

require.Eventually(t, func() bool {
return len(server.results) == 3
}, 2*time.Second, 50*time.Millisecond, "did not receive 3 results within specific time")
var results []result
select {
case res := <-resCh:
require.Len(t, res, 3)
results = res
case <-time.After(3 * time.Second):
require.Fail(t, "Timeout while waiting for results")
}

// Read first packet with two metrics, then the first auto-register packet and the second auto-register packet.
// Accept packet with the two metrics sent
require.NoError(t, server.results[0].err)
require.NoError(t, results[0].err)

// Read the first auto-register packet
require.NoError(t, server.results[1].err)
require.Equal(t, "active checks", server.results[1].req.Request)
require.Equal(t, "xxx", server.results[1].req.HostMetadata)
require.NoError(t, results[1].err)
require.Equal(t, "active checks", results[1].req.Request)
require.Equal(t, "xxx", results[1].req.HostMetadata)

// Read the second auto-register packet
require.NoError(t, server.results[2].err)
require.Equal(t, "active checks", server.results[2].req.Request)
require.Equal(t, "xxx", server.results[2].req.HostMetadata)
require.NoError(t, results[2].err)
require.Equal(t, "active checks", results[2].req.Request)
require.Equal(t, "xxx", results[2].req.HostMetadata)

// Check we have received auto-registration for both hosts
hostsRegistered := []string{server.results[1].req.Host}
hostsRegistered = append(hostsRegistered, server.results[2].req.Host)
hostsRegistered := []string{results[1].req.Host}
hostsRegistered = append(hostsRegistered, results[2].req.Host)
require.ElementsMatch(t, []string{"hostA", "hostB"}, hostsRegistered)
}

Expand Down Expand Up @@ -799,14 +817,15 @@ func compareData(t *testing.T, expected []zabbixRequestData, data []zabbixReques
}
}

func (s *zabbixMockServer) Start() {
go func() {
defer s.listener.Close()
for {
res := s.listenForSingleRequest()
s.results = append(s.results, res)
}
}()
func (s *zabbixMockServer) listenForNRequests(n int) []result {
results := make([]result, 0, n)
defer s.listener.Close()
for i := 0; i < n; i++ {
res := s.listenForSingleRequest()
results = append(results, res)
}

return results
}

func (s *zabbixMockServer) listenForSingleRequest() result {
Expand Down

0 comments on commit 83661ef

Please sign in to comment.