From 1f791d53027e19d5835bff556ecbfbc785466748 Mon Sep 17 00:00:00 2001 From: Pawel Zak Date: Tue, 13 Aug 2024 01:15:24 +0200 Subject: [PATCH] addressing review comments --- plugins/outputs/zabbix/zabbix_test.go | 461 +++++++++----------------- 1 file changed, 161 insertions(+), 300 deletions(-) diff --git a/plugins/outputs/zabbix/zabbix_test.go b/plugins/outputs/zabbix/zabbix_test.go index 56734bb68f789..09c9235f8dcc7 100644 --- a/plugins/outputs/zabbix/zabbix_test.go +++ b/plugins/outputs/zabbix/zabbix_test.go @@ -3,17 +3,13 @@ package zabbix import ( "encoding/binary" "encoding/json" - "errors" - "fmt" "net" "os" "sort" "strings" - "sync" "testing" "time" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" @@ -40,6 +36,36 @@ type zabbixLLDValue struct { Data []map[string]string `json:"data"` } +type result struct { + req zabbixRequest + err error +} + +type zabbixMockServer struct { + listener net.Listener + ignoreAcceptError bool + results []result +} + +func newZabbixMockServer(addr string, ignoreAcceptError bool) (*zabbixMockServer, error) { + l, err := net.Listen("tcp", addr) + if err != nil { + return nil, err + } + return &zabbixMockServer{listener: l, ignoreAcceptError: ignoreAcceptError}, nil +} + +func (s *zabbixMockServer) Addr() string { + return s.listener.Addr().String() +} + +func (s *zabbixMockServer) Close() error { + if s.listener != nil { + return s.listener.Close() + } + return nil +} + func TestZabbix(t *testing.T) { hostname, err := os.Hostname() require.NoError(t, err) @@ -197,7 +223,7 @@ func TestZabbix(t *testing.T) { }, }, }, - "send one metric with two extra tags, zabbix parameters should be alfabetically orderer": { + "send one metric with two extra tags, zabbix parameters should be alphabetically ordered": { telegrafMetrics: []telegraf.Metric{ testutil.MustMetric("name", map[string]string{ @@ -457,12 +483,12 @@ func TestZabbix(t *testing.T) { for desc, test := range tests { t.Run(desc, func(t *testing.T) { // Simulate a Zabbix server to get the data sent. It has a timeout to avoid waiting forever. - listener, err := net.Listen("tcp", "127.0.0.1:") + server, err := newZabbixMockServer("127.0.0.1:", len(test.zabbixMetrics) == 0) require.NoError(t, err) - defer listener.Close() + defer server.Close() z := &Zabbix{ - Address: listener.Addr().String(), + Address: server.Addr(), KeyPrefix: test.KeyPrefix, HostTag: "host", SkipMeasurementPrefix: test.SkipMeasurementPrefix, @@ -472,58 +498,26 @@ func TestZabbix(t *testing.T) { } require.NoError(t, z.Init()) - outerErrs := make(chan error, 1) - wg := sync.WaitGroup{} - wg.Add(1) + resCh := make(chan result, 1) go func() { - defer wg.Done() - - success := make(chan zabbixRequest, 1) - innerErrs := make(chan error, 1) - - go func() { - req, err := listenForZabbixMetric(t, listener, len(test.zabbixMetrics) == 0) - if err != nil { - innerErrs <- err - } else { - success <- req - } - }() - - // By default, we use trappers - requestType := "sender data" - if test.AgentActive { - requestType = "agent data" - } - - select { - case request := <-success: - if !assert.Equal(t, requestType, request.Request) { - outerErrs <- fmt.Errorf("%q is not equal to %q", request.Request, requestType) - return - } - err = compareData(t, test.zabbixMetrics, request.Data) - if err != nil { - outerErrs <- err - } - case err := <-innerErrs: - outerErrs <- err - case <-time.After(1 * time.Second): - if !assert.Empty(t, test.zabbixMetrics, "no metrics should be expected if the connection times out") { - outerErrs <- errors.New("no metrics should be expected if the connection times out") - } - } + resCh <- server.listenForSingleRequest() }() require.NoError(t, z.Write(test.telegrafMetrics)) - // Wait for zabbix server emulator to finish - wg.Wait() - close(outerErrs) + // By default, we use trappers + requestType := "sender data" + if test.AgentActive { + requestType = "agent data" + } - err = <-outerErrs - if err != nil { - t.Fatal(err) + select { + case res := <-resCh: + require.NoError(t, res.err) + require.Equal(t, requestType, res.req.Request) + compareData(t, test.zabbixMetrics, res.req.Data) + case <-time.After(1 * time.Second): + require.Empty(t, test.zabbixMetrics, "no metrics should be expected if the connection times out") } }) } @@ -581,12 +575,13 @@ func TestLLD(t *testing.T) { } // Simulate a Zabbix server to get the data sent - listener, err := net.Listen("tcp", "127.0.0.1:") + server, err := newZabbixMockServer("127.0.0.1:", false) require.NoError(t, err) - defer listener.Close() + defer server.Close() + server.Start() z := &Zabbix{ - Address: listener.Addr().String(), + Address: server.Addr(), KeyPrefix: "telegraf.", HostTag: "host", LLDSendInterval: config.Duration(10 * time.Minute), @@ -595,140 +590,6 @@ func TestLLD(t *testing.T) { } require.NoError(t, z.Init()) - errs := make(chan error, 1) - wg := sync.WaitGroup{} - wg.Add(1) - - // Read first packet with two metrics, then the first autoregister packet and the second autoregister packet. - go func() { - defer wg.Done() - - // First packet with metrics - request, err := listenForZabbixMetric(t, listener, false) - if err != nil { - errs <- err - return - } - err = compareData(t, []zabbixRequestData{zabbixMetric}, request.Data) - if err != nil { - errs <- err - return - } - - // Second packet, while time has not surpassed LLDSendInterval - request, err = listenForZabbixMetric(t, listener, false) - if err != nil { - errs <- err - return - } - err = compareData(t, []zabbixRequestData{zabbixMetric}, request.Data) - if err != nil { - errs <- err - return - } - - // Third packet, time has surpassed LLDSendInterval, metrics + LLD - request, err = listenForZabbixMetric(t, listener, false) - if err != nil { - errs <- err - return - } - if !assert.Len(t, request.Data, 2, "Expected 2 metrics") { - errs <- errors.New("expected 2 metrics") - return - } - request.Data[1].Clock = 0 // Ignore lld request clock - err = compareData(t, []zabbixRequestData{zabbixMetric, zabbixLLDMetric}, request.Data) - if err != nil { - errs <- err - return - } - - // Fourth packet with metrics - request, err = listenForZabbixMetric(t, listener, false) - if err != nil { - errs <- err - return - } - err = compareData(t, []zabbixRequestData{zabbixMetric}, request.Data) - if err != nil { - errs <- err - return - } - - // Fifth packet, time has surpassed LLDSendInterval, metrics. No LLD as there is nothing new. - request, err = listenForZabbixMetric(t, listener, false) - if err != nil { - errs <- err - return - } - err = compareData(t, []zabbixRequestData{zabbixMetric}, request.Data) - if err != nil { - errs <- err - return - } - - // Sixth packet, new LLD info, but time has not surpassed LLDSendInterval - request, err = listenForZabbixMetric(t, listener, false) - if err != nil { - errs <- err - return - } - err = compareData(t, []zabbixRequestData{zabbixMetricNew}, request.Data) - if err != nil { - errs <- err - return - } - - // Seventh packet, time has surpassed LLDSendInterval, metrics + LLD. - // Also, time has surpassed LLDClearInterval, so LLD is cleared. - request, err = listenForZabbixMetric(t, listener, false) - if err != nil { - errs <- err - return - } - if !assert.Len(t, request.Data, 2, "Expected 2 metrics") { - errs <- errors.New("expected 2 metrics") - return - } - request.Data[1].Clock = 0 // Ignore lld request clock - err = compareData(t, []zabbixRequestData{zabbixMetric, zabbixLLDMetricNew}, request.Data) - if err != nil { - errs <- err - return - } - - // Eighth packet, time host not surpassed LLDSendInterval, just metrics. - request, err = listenForZabbixMetric(t, listener, false) - if err != nil { - errs <- err - return - } - err = compareData(t, []zabbixRequestData{zabbixMetric}, request.Data) - if err != nil { - errs <- err - return - } - - // Ninth packet, time has surpassed LLDSendInterval, metrics + LLD. - // Just the info of the zabbixMetric as zabbixMetricNew has not been seen since LLDClearInterval. - request, err = listenForZabbixMetric(t, listener, false) - if err != nil { - errs <- err - return - } - if !assert.Len(t, request.Data, 2, "Expected 2 metrics") { - errs <- errors.New("expected 2 metrics") - return - } - request.Data[1].Clock = 0 // Ignore lld request clock - err = compareData(t, []zabbixRequestData{zabbixMetric, zabbixLLDMetric}, request.Data) - if err != nil { - errs <- err - return - } - }() - // First packet require.NoError(t, z.Write([]telegraf.Metric{m})) @@ -769,25 +630,66 @@ func TestLLD(t *testing.T) { // Ninth packet, time has surpassed LLDSendInterval, metrics + LLD. require.NoError(t, z.Write([]telegraf.Metric{m})) - // Wait for zabbix server emulator to finish - wg.Wait() - close(errs) + require.Eventually(t, func() bool { + return len(server.results) == 9 + }, 2*time.Second, 50*time.Millisecond, "did not receive 9 results within specific time") - err = <-errs - if err != nil { - t.Fatal(err) - } + // Read first packet with two metrics, then the first auto-register packet and the second auto-register packet. + // First packet with metrics + require.NoError(t, server.results[0].err) + compareData(t, []zabbixRequestData{zabbixMetric}, server.results[0].req.Data) + + // Second packet, while time has not surpassed LLDSendInterval + require.NoError(t, server.results[1].err) + compareData(t, []zabbixRequestData{zabbixMetric}, server.results[1].req.Data) + + // Third packet, time has surpassed LLDSendInterval, metrics + LLD + require.NoError(t, server.results[2].err) + require.Len(t, server.results[2].req.Data, 2, "Expected 2 metrics") + server.results[2].req.Data[1].Clock = 0 // Ignore lld request clock + compareData(t, []zabbixRequestData{zabbixMetric, zabbixLLDMetric}, server.results[2].req.Data) + + // Fourth packet with metrics + require.NoError(t, server.results[3].err) + compareData(t, []zabbixRequestData{zabbixMetric}, server.results[3].req.Data) + + // Fifth packet, time has surpassed LLDSendInterval, metrics. No LLD as there is nothing new. + require.NoError(t, server.results[4].err) + compareData(t, []zabbixRequestData{zabbixMetric}, server.results[4].req.Data) + + // Sixth packet, new LLD info, but time has not surpassed LLDSendInterval + require.NoError(t, server.results[5].err) + compareData(t, []zabbixRequestData{zabbixMetricNew}, server.results[5].req.Data) + + // Seventh packet, time has surpassed LLDSendInterval, metrics + LLD. + // Also, time has surpassed LLDClearInterval, so LLD is cleared. + require.NoError(t, server.results[6].err) + require.Len(t, server.results[6].req.Data, 2, "Expected 2 metrics") + server.results[6].req.Data[1].Clock = 0 // Ignore lld request clock + compareData(t, []zabbixRequestData{zabbixMetric, zabbixLLDMetricNew}, server.results[6].req.Data) + + // Eighth packet, time host not surpassed LLDSendInterval, just metrics. + require.NoError(t, server.results[7].err) + compareData(t, []zabbixRequestData{zabbixMetric}, server.results[7].req.Data) + + // Ninth packet, time has surpassed LLDSendInterval, metrics + LLD. + // Just the info of the zabbixMetric as zabbixMetricNew has not been seen since LLDClearInterval. + require.NoError(t, server.results[8].err) + require.Len(t, server.results[8].req.Data, 2, "Expected 2 metrics") + server.results[8].req.Data[1].Clock = 0 // Ignore lld request clock + compareData(t, []zabbixRequestData{zabbixMetric, zabbixLLDMetric}, server.results[8].req.Data) } -// TestAutoregister tests that autoregistration requests are sent to zabbix if enabled -func TestAutoregister(t *testing.T) { +// TestAutoRegister tests that auto-registration requests are sent to zabbix if enabled +func TestAutoRegister(t *testing.T) { // Simulate a Zabbix server to get the data sent - listener, err := net.Listen("tcp", "127.0.0.1:") + server, err := newZabbixMockServer("127.0.0.1:", false) require.NoError(t, err) - defer listener.Close() + defer server.Close() + server.Start() z := &Zabbix{ - Address: listener.Addr().String(), + Address: server.Addr(), KeyPrefix: "telegraf.", HostTag: "host", SkipMeasurementPrefix: false, @@ -797,47 +699,6 @@ func TestAutoregister(t *testing.T) { Log: testutil.Logger{}, } require.NoError(t, z.Init()) - - errs := make(chan error, 1) - wg := sync.WaitGroup{} - wg.Add(1) - - // Read first packet with two metrics, then the first autoregister packet and the second autoregister packet. - go func() { - defer wg.Done() - - // Accept packet with the two metrics sent - _, err = listenForZabbixMetric(t, listener, false) - if err != nil { - errs <- err - return - } - - // Read the first autoregister packet - request, err := listenForZabbixMetric(t, listener, false) - if err != nil { - errs <- err - return - } - assert.Equal(t, "active checks", request.Request) - assert.Equal(t, "xxx", request.HostMetadata) - - hostsRegistered := []string{request.Host} - - // Read the second autoregister packet - request, err = listenForZabbixMetric(t, listener, false) - if err != nil { - errs <- err - return - } - assert.Equal(t, "active checks", request.Request) - assert.Equal(t, "xxx", request.HostMetadata) - - // Check we have received autoregistration for both hosts - hostsRegistered = append(hostsRegistered, request.Host) - assert.ElementsMatch(t, []string{"hostA", "hostB"}, hostsRegistered) - }() - err = z.Write([]telegraf.Metric{ testutil.MustMetric( "name", @@ -854,21 +715,34 @@ func TestAutoregister(t *testing.T) { }) require.NoError(t, err) - // Wait for zabbix server emulator to finish - wg.Wait() - close(errs) + require.Eventually(t, func() bool { + return len(server.results) == 3 + }, 2*time.Second, 50*time.Millisecond, "did not receive 3 results within specific time") - err = <-errs - if err != nil { - t.Fatal(err) - } + // Read first packet with two metrics, then the first auto-register packet and the second auto-register packet. + // Accept packet with the two metrics sent + require.NoError(t, server.results[0].err) + + // Read the first auto-register packet + require.NoError(t, server.results[1].err) + require.Equal(t, "active checks", server.results[1].req.Request) + require.Equal(t, "xxx", server.results[1].req.HostMetadata) + + // Read the second auto-register packet + require.NoError(t, server.results[2].err) + require.Equal(t, "active checks", server.results[2].req.Request) + require.Equal(t, "xxx", server.results[2].req.HostMetadata) + + // Check we have received auto-registration for both hosts + hostsRegistered := []string{server.results[1].req.Host} + hostsRegistered = append(hostsRegistered, server.results[2].req.Host) + require.ElementsMatch(t, []string{"hostA", "hostB"}, hostsRegistered) } -// compareData compares generated data with expected data ignoring slice order if all Clocks are -// the same. +// compareData compares generated data with expected data ignoring slice order if all Clocks are the same. // This is useful for metrics with several fields that should produce several Zabbix values that // could not be sorted by clock -func compareData(t *testing.T, expected []zabbixRequestData, data []zabbixRequestData) error { +func compareData(t *testing.T, expected []zabbixRequestData, data []zabbixRequestData) { t.Helper() var clock int64 @@ -912,62 +786,56 @@ func compareData(t *testing.T, expected []zabbixRequestData, data []zabbixReques return strings.Join(keysValuesI, "") < strings.Join(keysValuesJ, "") }) sortedValue, err := json.Marshal(lldValue) - //nolint:testifylint // require-error ignores assertions in the if condition - //but has problems with negation: https://github.com/Antonboom/testifylint/issues/125 - if !assert.NoError(t, err) { - return err - } + require.NoError(t, err) data[i].Value = string(sortedValue) } } if sameClock { - if !assert.ElementsMatch(t, expected, data) { - return errors.New("elements did not match") - } + require.ElementsMatch(t, expected, data) } else { - if !assert.Equal(t, expected, data) { - return errors.New("elements were not equal") - } + require.Equal(t, expected, data) } - - return nil } -// listenForZabbixMetric starts a TCP server listening for one Zabbix metric. -// ignoreAcceptError is used to ignore the error when the server is closed. -func listenForZabbixMetric(t *testing.T, listener net.Listener, ignoreAcceptError bool) (zabbixRequest, error) { - t.Helper() +func (s *zabbixMockServer) Start() { + go func() { + defer s.listener.Close() + for { + res := s.listenForSingleRequest() + s.results = append(s.results, res) + } + }() +} - conn, err := listener.Accept() - if err != nil && ignoreAcceptError { - return zabbixRequest{}, nil +func (s *zabbixMockServer) listenForSingleRequest() result { + conn, err := s.listener.Accept() + if err != nil { + if s.ignoreAcceptError { + return result{req: zabbixRequest{}, err: nil} + } + return result{req: zabbixRequest{}, err: err} } + defer conn.Close() - //nolint:testifylint // require-error ignores assertions in the if condition - //but has problems with negation: https://github.com/Antonboom/testifylint/issues/125 - if !assert.NoError(t, err) { - return zabbixRequest{}, err + if err = conn.SetDeadline(time.Now().Add(time.Second)); err != nil { + return result{req: zabbixRequest{}, err: err} } // Obtain request from the mock zabbix server // Read protocol header and version header := make([]byte, 5) _, err = conn.Read(header) - //nolint:testifylint // require-error ignores assertions in the if condition - //but has problems with negation: https://github.com/Antonboom/testifylint/issues/125 - if !assert.NoError(t, err) { - return zabbixRequest{}, err + if err != nil { + return result{req: zabbixRequest{}, err: err} } // Read data length dataLengthRaw := make([]byte, 8) _, err = conn.Read(dataLengthRaw) - //nolint:testifylint // require-error ignores assertions in the if condition - //but has problems with negation: https://github.com/Antonboom/testifylint/issues/125 - if !assert.NoError(t, err) { - return zabbixRequest{}, err + if err != nil { + return result{req: zabbixRequest{}, err: err} } dataLength := binary.LittleEndian.Uint64(dataLengthRaw) @@ -975,33 +843,26 @@ func listenForZabbixMetric(t *testing.T, listener net.Listener, ignoreAcceptErro // Read data content content := make([]byte, dataLength) _, err = conn.Read(content) - //nolint:testifylint // require-error ignores assertions in the if condition - //but has problems with negation: https://github.com/Antonboom/testifylint/issues/125 - if !assert.NoError(t, err) { - return zabbixRequest{}, err + if err != nil { + return result{req: zabbixRequest{}, err: err} } // The zabbix output checks that there are not errors // Simulated response from the server resp := []byte("ZBXD\x01\x00\x00\x00\x00\x00\x00\x00\x00{\"response\": \"success\", \"info\": \"\"}\n") _, err = conn.Write(resp) - //nolint:testifylint // require-error ignores assertions in the if condition - //but has problems with negation: https://github.com/Antonboom/testifylint/issues/125 - if !assert.NoError(t, err) { - return zabbixRequest{}, err + if err != nil { + return result{req: zabbixRequest{}, err: err} } - // Close connection after reading the client data - conn.Close() - // Strip zabbix header and get JSON request var request zabbixRequest err = json.Unmarshal(content, &request) - if !assert.NoError(t, err) { - return zabbixRequest{}, err + if err != nil { + return result{req: zabbixRequest{}, err: err} } - return request, nil + return result{req: request, err: nil} } func TestBuildZabbixMetric(t *testing.T) {