From b91a07c658d8fc9ed440e8a40e5443c321c3d4fb Mon Sep 17 00:00:00 2001 From: Pawel Zak Date: Thu, 14 Nov 2024 22:21:31 +0100 Subject: [PATCH] chore: Fix linter findings for `revive:exported` in `plugins/inputs/m*` --- plugins/inputs/mailchimp/chimp_api.go | 34 +-- plugins/inputs/mailchimp/mailchimp.go | 19 +- plugins/inputs/marklogic/marklogic.go | 72 +++---- plugins/inputs/marklogic/marklogic_test.go | 2 +- plugins/inputs/mcrouter/mcrouter.go | 162 +++++++------- plugins/inputs/mcrouter/mcrouter_test.go | 4 +- plugins/inputs/mdstat/mdstat.go | 16 +- plugins/inputs/mdstat/mdstat_notlinux.go | 8 +- plugins/inputs/mdstat/mdstat_test.go | 10 +- plugins/inputs/mem/mem.go | 10 +- plugins/inputs/mem/mem_test.go | 7 +- plugins/inputs/memcached/memcached.go | 158 +++++++------- plugins/inputs/mesos/mesos.go | 137 ++++++------ plugins/inputs/mesos/mesos_test.go | 12 +- plugins/inputs/minecraft/client.go | 50 ++--- plugins/inputs/minecraft/client_test.go | 48 ++--- plugins/inputs/minecraft/minecraft.go | 33 ++- plugins/inputs/minecraft/minecraft_test.go | 46 ++-- plugins/inputs/mock/mock.go | 8 +- plugins/inputs/modbus/configuration.go | 8 +- plugins/inputs/modbus/configuration_metric.go | 46 ++-- .../modbus/configuration_metric_test.go | 2 +- .../inputs/modbus/configuration_register.go | 32 +-- .../inputs/modbus/configuration_request.go | 50 ++--- .../modbus/configuration_request_test.go | 8 +- plugins/inputs/modbus/modbus.go | 125 ++++++----- plugins/inputs/modbus/modbus_test.go | 10 +- plugins/inputs/modbus/request.go | 48 ++--- plugins/inputs/mongodb/mongodb.go | 100 +++++---- plugins/inputs/mongodb/mongodb_data.go | 42 ++-- plugins/inputs/mongodb/mongodb_data_test.go | 48 ++--- plugins/inputs/mongodb/mongodb_server.go | 86 ++++---- plugins/inputs/mongodb/mongodb_server_test.go | 10 +- plugins/inputs/mongodb/mongostat.go | 63 +----- plugins/inputs/mongodb/mongostat_test.go | 42 ++-- plugins/inputs/monit/monit.go | 20 +- plugins/inputs/mqtt_consumer/mqtt_consumer.go | 93 ++++---- .../mqtt_consumer/mqtt_consumer_test.go | 204 +++++++++--------- plugins/inputs/mqtt_consumer/mqtt_logger.go | 3 + plugins/inputs/mqtt_consumer/topic_parser.go | 12 +- plugins/inputs/multifile/multifile.go | 12 +- plugins/inputs/multifile/multifile_test.go | 10 +- plugins/inputs/mysql/mysql.go | 17 +- plugins/inputs/mysql/v2/convert.go | 10 +- 44 files changed, 923 insertions(+), 1014 deletions(-) diff --git a/plugins/inputs/mailchimp/chimp_api.go b/plugins/inputs/mailchimp/chimp_api.go index ec01b28882b08..8e6c65ed1ef97 100644 --- a/plugins/inputs/mailchimp/chimp_api.go +++ b/plugins/inputs/mailchimp/chimp_api.go @@ -22,7 +22,7 @@ const ( var mailchimpDatacenter = regexp.MustCompile("[a-z]+[0-9]+$") type chimpAPI struct { - Transport http.RoundTripper + transport http.RoundTripper debug bool sync.Mutex @@ -32,30 +32,30 @@ type chimpAPI struct { } type reportsParams struct { - Count string - Offset string - SinceSendTime string - BeforeSendTime string + count string + offset string + sinceSendTime string + beforeSendTime string } func (p *reportsParams) String() string { v := url.Values{} - if p.Count != "" { - v.Set("count", p.Count) + if p.count != "" { + v.Set("count", p.count) } - if p.Offset != "" { - v.Set("offset", p.Offset) + if p.offset != "" { + v.Set("offset", p.offset) } - if p.BeforeSendTime != "" { - v.Set("before_send_time", p.BeforeSendTime) + if p.beforeSendTime != "" { + v.Set("before_send_time", p.beforeSendTime) } - if p.SinceSendTime != "" { - v.Set("since_send_time", p.SinceSendTime) + if p.sinceSendTime != "" { + v.Set("since_send_time", p.sinceSendTime) } return v.Encode() } -func NewChimpAPI(apiKey string, log telegraf.Logger) *chimpAPI { +func newChimpAPI(apiKey string, log telegraf.Logger) *chimpAPI { u := &url.URL{} u.Scheme = "https" u.Host = mailchimpDatacenter.FindString(apiKey) + ".api.mailchimp.com" @@ -86,7 +86,7 @@ func chimpErrorCheck(body []byte) error { return nil } -func (a *chimpAPI) GetReports(params reportsParams) (reportsResponse, error) { +func (a *chimpAPI) getReports(params reportsParams) (reportsResponse, error) { a.Lock() defer a.Unlock() a.url.Path = reportsEndpoint @@ -105,7 +105,7 @@ func (a *chimpAPI) GetReports(params reportsParams) (reportsResponse, error) { return response, nil } -func (a *chimpAPI) GetReport(campaignID string) (report, error) { +func (a *chimpAPI) getReport(campaignID string) (report, error) { a.Lock() defer a.Unlock() a.url.Path = fmt.Sprintf(reportsEndpointCampaign, campaignID) @@ -126,7 +126,7 @@ func (a *chimpAPI) GetReport(campaignID string) (report, error) { func (a *chimpAPI) runChimp(params reportsParams) ([]byte, error) { client := &http.Client{ - Transport: a.Transport, + Transport: a.transport, Timeout: 4 * time.Second, } diff --git a/plugins/inputs/mailchimp/mailchimp.go b/plugins/inputs/mailchimp/mailchimp.go index 0035eee6bbd02..f19e7ee0b9fdf 100644 --- a/plugins/inputs/mailchimp/mailchimp.go +++ b/plugins/inputs/mailchimp/mailchimp.go @@ -14,13 +14,12 @@ import ( var sampleConfig string type MailChimp struct { - api *chimpAPI - - APIKey string `toml:"api_key"` - DaysOld int `toml:"days_old"` - CampaignID string `toml:"campaign_id"` + APIKey string `toml:"api_key"` + DaysOld int `toml:"days_old"` + CampaignID string `toml:"campaign_id"` + Log telegraf.Logger `toml:"-"` - Log telegraf.Logger `toml:"-"` + api *chimpAPI } func (*MailChimp) SampleConfig() string { @@ -28,7 +27,7 @@ func (*MailChimp) SampleConfig() string { } func (m *MailChimp) Init() error { - m.api = NewChimpAPI(m.APIKey, m.Log) + m.api = newChimpAPI(m.APIKey, m.Log) return nil } @@ -45,8 +44,8 @@ func (m *MailChimp) Gather(acc telegraf.Accumulator) error { since = now.Add(-d).Format(time.RFC3339) } - reports, err := m.api.GetReports(reportsParams{ - SinceSendTime: since, + reports, err := m.api.getReports(reportsParams{ + sinceSendTime: since, }) if err != nil { return err @@ -57,7 +56,7 @@ func (m *MailChimp) Gather(acc telegraf.Accumulator) error { gatherReport(acc, report, now) } } else { - report, err := m.api.GetReport(m.CampaignID) + report, err := m.api.getReport(m.CampaignID) if err != nil { return err } diff --git a/plugins/inputs/marklogic/marklogic.go b/plugins/inputs/marklogic/marklogic.go index c9546568e5aa2..bbb8def29774b 100644 --- a/plugins/inputs/marklogic/marklogic.go +++ b/plugins/inputs/marklogic/marklogic.go @@ -19,69 +19,69 @@ import ( //go:embed sample.conf var sampleConfig string -// Marklogic configuration toml +const ( + // MarkLogic v2 management api endpoints for hosts status + statsPath = "/manage/v2/hosts/" + viewFormat = "view=status&format=json" +) + type Marklogic struct { URL string `toml:"url"` Hosts []string `toml:"hosts"` Username string `toml:"username"` Password string `toml:"password"` - Sources []string - tls.ClientConfig - client *http.Client + client *http.Client + sources []string } -type MlPointInt struct { +type mlPointInt struct { Value int `json:"value"` } -type MlPointFloat struct { +type mlPointFloat struct { Value float64 `json:"value"` } -type MlPointBool struct { +type mlPointBool struct { Value bool `json:"value"` } -// MarkLogic v2 management api endpoints for hosts status -const statsPath = "/manage/v2/hosts/" -const viewFormat = "view=status&format=json" - -type MlHost struct { +type mlHost struct { HostStatus struct { ID string `json:"id"` Name string `json:"name"` StatusProperties struct { - Online MlPointBool `json:"online"` + Online mlPointBool `json:"online"` LoadProperties struct { - TotalLoad MlPointFloat `json:"total-load"` + TotalLoad mlPointFloat `json:"total-load"` } `json:"load-properties"` RateProperties struct { - TotalRate MlPointFloat `json:"total-rate"` + TotalRate mlPointFloat `json:"total-rate"` } `json:"rate-properties"` StatusDetail struct { - Cpus MlPointInt `json:"cpus"` - Cores MlPointInt `json:"cores"` + Cpus mlPointInt `json:"cpus"` + Cores mlPointInt `json:"cores"` TotalCPUStatUser float64 `json:"total-cpu-stat-user"` TotalCPUStatSystem float64 `json:"total-cpu-stat-system"` TotalCPUStatIdle float64 `json:"total-cpu-stat-idle"` TotalCPUStatIowait float64 `json:"total-cpu-stat-iowait"` - MemoryProcessSize MlPointInt `json:"memory-process-size"` - MemoryProcessRss MlPointInt `json:"memory-process-rss"` - MemorySystemTotal MlPointInt `json:"memory-system-total"` - MemorySystemFree MlPointInt `json:"memory-system-free"` - MemoryProcessSwapSize MlPointInt `json:"memory-process-swap-size"` - MemorySize MlPointInt `json:"memory-size"` - HostSize MlPointInt `json:"host-size"` - LogDeviceSpace MlPointInt `json:"log-device-space"` - DataDirSpace MlPointInt `json:"data-dir-space"` - QueryReadBytes MlPointInt `json:"query-read-bytes"` - QueryReadLoad MlPointInt `json:"query-read-load"` - MergeReadLoad MlPointInt `json:"merge-read-load"` - MergeWriteLoad MlPointInt `json:"merge-write-load"` - HTTPServerReceiveBytes MlPointInt `json:"http-server-receive-bytes"` - HTTPServerSendBytes MlPointInt `json:"http-server-send-bytes"` + MemoryProcessSize mlPointInt `json:"memory-process-size"` + MemoryProcessRss mlPointInt `json:"memory-process-rss"` + MemorySystemTotal mlPointInt `json:"memory-system-total"` + MemorySystemFree mlPointInt `json:"memory-system-free"` + MemoryProcessSwapSize mlPointInt `json:"memory-process-swap-size"` + MemorySize mlPointInt `json:"memory-size"` + HostSize mlPointInt `json:"host-size"` + LogDeviceSpace mlPointInt `json:"log-device-space"` + DataDirSpace mlPointInt `json:"data-dir-space"` + QueryReadBytes mlPointInt `json:"query-read-bytes"` + QueryReadLoad mlPointInt `json:"query-read-load"` + MergeReadLoad mlPointInt `json:"merge-read-load"` + MergeWriteLoad mlPointInt `json:"merge-write-load"` + HTTPServerReceiveBytes mlPointInt `json:"http-server-receive-bytes"` + HTTPServerSendBytes mlPointInt `json:"http-server-send-bytes"` } `json:"status-detail"` } `json:"status-properties"` } `json:"host-status"` @@ -91,7 +91,6 @@ func (*Marklogic) SampleConfig() string { return sampleConfig } -// Init parse all source URLs and place on the Marklogic struct func (c *Marklogic) Init() error { if len(c.URL) == 0 { c.URL = "http://localhost:8002/" @@ -108,12 +107,11 @@ func (c *Marklogic) Init() error { addr.RawQuery = viewFormat u := addr.String() - c.Sources = append(c.Sources, u) + c.sources = append(c.sources, u) } return nil } -// Gather metrics from HTTP Server. func (c *Marklogic) Gather(accumulator telegraf.Accumulator) error { var wg sync.WaitGroup @@ -127,7 +125,7 @@ func (c *Marklogic) Gather(accumulator telegraf.Accumulator) error { } // Range over all source URL's appended to the struct - for _, serv := range c.Sources { + for _, serv := range c.sources { wg.Add(1) go func(serv string) { defer wg.Done() @@ -143,7 +141,7 @@ func (c *Marklogic) Gather(accumulator telegraf.Accumulator) error { } func (c *Marklogic) fetchAndInsertData(acc telegraf.Accumulator, address string) error { - ml := &MlHost{} + ml := &mlHost{} if err := c.gatherJSONData(address, ml); err != nil { return err } diff --git a/plugins/inputs/marklogic/marklogic_test.go b/plugins/inputs/marklogic/marklogic_test.go index 52641741d0dcd..a20cde2814def 100644 --- a/plugins/inputs/marklogic/marklogic_test.go +++ b/plugins/inputs/marklogic/marklogic_test.go @@ -33,7 +33,7 @@ func TestMarklogic(t *testing.T) { ml := &Marklogic{ Hosts: []string{"example1"}, URL: ts.URL, - // Sources: []string{"http://localhost:8002/manage/v2/hosts/hostname1?view=status&format=json"}, + // sources: []string{"http://localhost:8002/manage/v2/hosts/hostname1?view=status&format=json"}, } // Create a test accumulator diff --git a/plugins/inputs/mcrouter/mcrouter.go b/plugins/inputs/mcrouter/mcrouter.go index 6506e104858b7..37202fa300db0 100644 --- a/plugins/inputs/mcrouter/mcrouter.go +++ b/plugins/inputs/mcrouter/mcrouter.go @@ -21,12 +21,6 @@ import ( //go:embed sample.conf var sampleConfig string -// Mcrouter is a mcrouter plugin -type Mcrouter struct { - Servers []string - Timeout config.Duration -} - // enum for statType type statType int @@ -35,86 +29,90 @@ const ( typeFloat statType = iota ) -var defaultTimeout = 5 * time.Second - -var defaultServerURL = url.URL{ - Scheme: "tcp", - Host: "localhost:11211", -} +var ( + defaultTimeout = 5 * time.Second + defaultServerURL = url.URL{ + Scheme: "tcp", + Host: "localhost:11211", + } + // The list of metrics that should be sent + sendMetrics = map[string]statType{ + "uptime": typeInt, + "num_servers": typeInt, + "num_servers_new": typeInt, + "num_servers_up": typeInt, + "num_servers_down": typeInt, + "num_servers_closed": typeInt, + "num_clients": typeInt, + "num_suspect_servers": typeInt, + "destination_batches_sum": typeInt, + "destination_requests_sum": typeInt, + "outstanding_route_get_reqs_queued": typeInt, + "outstanding_route_update_reqs_queued": typeInt, + "outstanding_route_get_avg_queue_size": typeInt, + "outstanding_route_update_avg_queue_size": typeInt, + "outstanding_route_get_avg_wait_time_sec": typeInt, + "outstanding_route_update_avg_wait_time_sec": typeInt, + "retrans_closed_connections": typeInt, + "destination_pending_reqs": typeInt, + "destination_inflight_reqs": typeInt, + "destination_batch_size": typeInt, + "asynclog_requests": typeInt, + "proxy_reqs_processing": typeInt, + "proxy_reqs_waiting": typeInt, + "client_queue_notify_period": typeInt, + "rusage_system": typeFloat, + "rusage_user": typeFloat, + "ps_num_minor_faults": typeInt, + "ps_num_major_faults": typeInt, + "ps_user_time_sec": typeFloat, + "ps_system_time_sec": typeFloat, + "ps_vsize": typeInt, + "ps_rss": typeInt, + "fibers_allocated": typeInt, + "fibers_pool_size": typeInt, + "fibers_stack_high_watermark": typeInt, + "successful_client_connections": typeInt, + "duration_us": typeInt, + "destination_max_pending_reqs": typeInt, + "destination_max_inflight_reqs": typeInt, + "retrans_per_kbyte_max": typeInt, + "cmd_get_count": typeInt, + "cmd_delete_out": typeInt, + "cmd_lease_get": typeInt, + "cmd_set": typeInt, + "cmd_get_out_all": typeInt, + "cmd_get_out": typeInt, + "cmd_lease_set_count": typeInt, + "cmd_other_out_all": typeInt, + "cmd_lease_get_out": typeInt, + "cmd_set_count": typeInt, + "cmd_lease_set_out": typeInt, + "cmd_delete_count": typeInt, + "cmd_other": typeInt, + "cmd_delete": typeInt, + "cmd_get": typeInt, + "cmd_lease_set": typeInt, + "cmd_set_out": typeInt, + "cmd_lease_get_count": typeInt, + "cmd_other_out": typeInt, + "cmd_lease_get_out_all": typeInt, + "cmd_set_out_all": typeInt, + "cmd_other_count": typeInt, + "cmd_delete_out_all": typeInt, + "cmd_lease_set_out_all": typeInt, + } +) -// The list of metrics that should be sent -var sendMetrics = map[string]statType{ - "uptime": typeInt, - "num_servers": typeInt, - "num_servers_new": typeInt, - "num_servers_up": typeInt, - "num_servers_down": typeInt, - "num_servers_closed": typeInt, - "num_clients": typeInt, - "num_suspect_servers": typeInt, - "destination_batches_sum": typeInt, - "destination_requests_sum": typeInt, - "outstanding_route_get_reqs_queued": typeInt, - "outstanding_route_update_reqs_queued": typeInt, - "outstanding_route_get_avg_queue_size": typeInt, - "outstanding_route_update_avg_queue_size": typeInt, - "outstanding_route_get_avg_wait_time_sec": typeInt, - "outstanding_route_update_avg_wait_time_sec": typeInt, - "retrans_closed_connections": typeInt, - "destination_pending_reqs": typeInt, - "destination_inflight_reqs": typeInt, - "destination_batch_size": typeInt, - "asynclog_requests": typeInt, - "proxy_reqs_processing": typeInt, - "proxy_reqs_waiting": typeInt, - "client_queue_notify_period": typeInt, - "rusage_system": typeFloat, - "rusage_user": typeFloat, - "ps_num_minor_faults": typeInt, - "ps_num_major_faults": typeInt, - "ps_user_time_sec": typeFloat, - "ps_system_time_sec": typeFloat, - "ps_vsize": typeInt, - "ps_rss": typeInt, - "fibers_allocated": typeInt, - "fibers_pool_size": typeInt, - "fibers_stack_high_watermark": typeInt, - "successful_client_connections": typeInt, - "duration_us": typeInt, - "destination_max_pending_reqs": typeInt, - "destination_max_inflight_reqs": typeInt, - "retrans_per_kbyte_max": typeInt, - "cmd_get_count": typeInt, - "cmd_delete_out": typeInt, - "cmd_lease_get": typeInt, - "cmd_set": typeInt, - "cmd_get_out_all": typeInt, - "cmd_get_out": typeInt, - "cmd_lease_set_count": typeInt, - "cmd_other_out_all": typeInt, - "cmd_lease_get_out": typeInt, - "cmd_set_count": typeInt, - "cmd_lease_set_out": typeInt, - "cmd_delete_count": typeInt, - "cmd_other": typeInt, - "cmd_delete": typeInt, - "cmd_get": typeInt, - "cmd_lease_set": typeInt, - "cmd_set_out": typeInt, - "cmd_lease_get_count": typeInt, - "cmd_other_out": typeInt, - "cmd_lease_get_out_all": typeInt, - "cmd_set_out_all": typeInt, - "cmd_other_count": typeInt, - "cmd_delete_out_all": typeInt, - "cmd_lease_set_out_all": typeInt, +type Mcrouter struct { + Servers []string `toml:"servers"` + Timeout config.Duration `toml:"timeout"` } func (*Mcrouter) SampleConfig() string { return sampleConfig } -// Gather reads stats from all configured servers accumulates stats func (m *Mcrouter) Gather(acc telegraf.Accumulator) error { ctx := context.Background() @@ -136,8 +134,8 @@ func (m *Mcrouter) Gather(acc telegraf.Accumulator) error { return nil } -// ParseAddress parses an address string into 'host:port' and 'protocol' parts -func (m *Mcrouter) ParseAddress(address string) (parsedAddress, protocol string, err error) { +// parseAddress parses an address string into 'host:port' and 'protocol' parts +func (m *Mcrouter) parseAddress(address string) (parsedAddress, protocol string, err error) { var host string var port string @@ -189,7 +187,7 @@ func (m *Mcrouter) gatherServer(ctx context.Context, address string, acc telegra var protocol string var dialer net.Dialer - address, protocol, err = m.ParseAddress(address) + address, protocol, err = m.parseAddress(address) if err != nil { return err } diff --git a/plugins/inputs/mcrouter/mcrouter_test.go b/plugins/inputs/mcrouter/mcrouter_test.go index a8b01eb278ebb..47f658d256afa 100644 --- a/plugins/inputs/mcrouter/mcrouter_test.go +++ b/plugins/inputs/mcrouter/mcrouter_test.go @@ -32,7 +32,7 @@ func TestAddressParsing(t *testing.T) { } for _, args := range acceptTests { - address, protocol, err := m.ParseAddress(args[0]) + address, protocol, err := m.parseAddress(args[0]) require.NoError(t, err, args[0]) require.Equal(t, args[1], address, args[0]) @@ -40,7 +40,7 @@ func TestAddressParsing(t *testing.T) { } for _, addr := range rejectTests { - address, protocol, err := m.ParseAddress(addr) + address, protocol, err := m.parseAddress(addr) require.Error(t, err, addr) require.Empty(t, address, addr) diff --git a/plugins/inputs/mdstat/mdstat.go b/plugins/inputs/mdstat/mdstat.go index f1edf3f0e753e..b00095349eacf 100644 --- a/plugins/inputs/mdstat/mdstat.go +++ b/plugins/inputs/mdstat/mdstat.go @@ -44,6 +44,10 @@ var ( componentDeviceRE = regexp.MustCompile(`(.*)\[\d+\]`) ) +type Mdstat struct { + FileName string `toml:"file_name"` +} + type statusLine struct { active int64 total int64 @@ -58,10 +62,6 @@ type recoveryLine struct { speed float64 } -type MdstatConf struct { - FileName string `toml:"file_name"` -} - func evalStatusLine(deviceLine, statusLineStr string) (statusLine, error) { sizeFields := strings.Fields(statusLineStr) if len(sizeFields) < 1 { @@ -173,11 +173,11 @@ func evalComponentDevices(deviceFields []string) string { return strings.Join(mdComponentDevices, ",") } -func (*MdstatConf) SampleConfig() string { +func (*Mdstat) SampleConfig() string { return sampleConfig } -func (k *MdstatConf) Gather(acc telegraf.Accumulator) error { +func (k *Mdstat) Gather(acc telegraf.Accumulator) error { data, err := k.getProcMdstat() if err != nil { return err @@ -267,7 +267,7 @@ func (k *MdstatConf) Gather(acc telegraf.Accumulator) error { return nil } -func (k *MdstatConf) getProcMdstat() ([]byte, error) { +func (k *Mdstat) getProcMdstat() ([]byte, error) { var mdStatFile string if k.FileName == "" { mdStatFile = internal.GetProcPath() + "/mdstat" @@ -289,5 +289,5 @@ func (k *MdstatConf) getProcMdstat() ([]byte, error) { } func init() { - inputs.Add("mdstat", func() telegraf.Input { return &MdstatConf{} }) + inputs.Add("mdstat", func() telegraf.Input { return &Mdstat{} }) } diff --git a/plugins/inputs/mdstat/mdstat_notlinux.go b/plugins/inputs/mdstat/mdstat_notlinux.go index 40430aa6e90c3..578a5b5aabb7d 100644 --- a/plugins/inputs/mdstat/mdstat_notlinux.go +++ b/plugins/inputs/mdstat/mdstat_notlinux.go @@ -17,12 +17,14 @@ type Mdstat struct { Log telegraf.Logger `toml:"-"` } +func (*Mdstat) SampleConfig() string { return sampleConfig } + func (m *Mdstat) Init() error { - m.Log.Warn("current platform is not supported") + m.Log.Warn("Current platform is not supported") return nil } -func (*Mdstat) SampleConfig() string { return sampleConfig } -func (*Mdstat) Gather(_ telegraf.Accumulator) error { return nil } + +func (*Mdstat) Gather(telegraf.Accumulator) error { return nil } func init() { inputs.Add("mdstat", func() telegraf.Input { diff --git a/plugins/inputs/mdstat/mdstat_test.go b/plugins/inputs/mdstat/mdstat_test.go index 94b73ad756a99..8d9c51c5698b3 100644 --- a/plugins/inputs/mdstat/mdstat_test.go +++ b/plugins/inputs/mdstat/mdstat_test.go @@ -14,7 +14,7 @@ import ( func TestFullMdstatProcFile(t *testing.T) { filename := makeFakeMDStatFile([]byte(mdStatFileFull)) defer os.Remove(filename) - k := MdstatConf{ + k := Mdstat{ FileName: filename, } acc := testutil.Accumulator{} @@ -39,7 +39,7 @@ func TestFullMdstatProcFile(t *testing.T) { func TestMdstatSyncStart(t *testing.T) { filename := makeFakeMDStatFile([]byte(mdStatSyncStart)) defer os.Remove(filename) - k := MdstatConf{ + k := Mdstat{ FileName: filename, } acc := testutil.Accumulator{} @@ -65,7 +65,7 @@ func TestFailedDiskMdStatProcFile1(t *testing.T) { filename := makeFakeMDStatFile([]byte(mdStatFileFailedDisk)) defer os.Remove(filename) - k := MdstatConf{ + k := Mdstat{ FileName: filename, } @@ -92,7 +92,7 @@ func TestEmptyMdStatProcFile1(t *testing.T) { filename := makeFakeMDStatFile([]byte(mdStatFileEmpty)) defer os.Remove(filename) - k := MdstatConf{ + k := Mdstat{ FileName: filename, } @@ -105,7 +105,7 @@ func TestInvalidMdStatProcFile1(t *testing.T) { filename := makeFakeMDStatFile([]byte(mdStatFileInvalid)) defer os.Remove(filename) - k := MdstatConf{ + k := Mdstat{ FileName: filename, } diff --git a/plugins/inputs/mem/mem.go b/plugins/inputs/mem/mem.go index 3faa1beea3988..bd4d9ef74d65e 100644 --- a/plugins/inputs/mem/mem.go +++ b/plugins/inputs/mem/mem.go @@ -14,21 +14,21 @@ import ( //go:embed sample.conf var sampleConfig string -type MemStats struct { +type Mem struct { ps system.PS platform string } -func (*MemStats) SampleConfig() string { +func (*Mem) SampleConfig() string { return sampleConfig } -func (ms *MemStats) Init() error { +func (ms *Mem) Init() error { ms.platform = runtime.GOOS return nil } -func (ms *MemStats) Gather(acc telegraf.Accumulator) error { +func (ms *Mem) Gather(acc telegraf.Accumulator) error { vm, err := ms.ps.VMStat() if err != nil { return fmt.Errorf("error getting virtual memory info: %w", err) @@ -102,6 +102,6 @@ func (ms *MemStats) Gather(acc telegraf.Accumulator) error { func init() { ps := system.NewSystemPS() inputs.Add("mem", func() telegraf.Input { - return &MemStats{ps: ps} + return &Mem{ps: ps} }) } diff --git a/plugins/inputs/mem/mem_test.go b/plugins/inputs/mem/mem_test.go index ecf3a8cebc81c..ffc03cad0f73d 100644 --- a/plugins/inputs/mem/mem_test.go +++ b/plugins/inputs/mem/mem_test.go @@ -4,11 +4,12 @@ import ( "testing" "time" + "github.com/shirou/gopsutil/v4/mem" + "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs/system" "github.com/influxdata/telegraf/testutil" - "github.com/shirou/gopsutil/v4/mem" - "github.com/stretchr/testify/require" ) func TestMemStats(t *testing.T) { @@ -55,7 +56,7 @@ func TestMemStats(t *testing.T) { } mps.On("VMStat").Return(vms, nil) - plugin := &MemStats{ps: &mps} + plugin := &Mem{ps: &mps} err = plugin.Init() require.NoError(t, err) diff --git a/plugins/inputs/memcached/memcached.go b/plugins/inputs/memcached/memcached.go index 6f050b27874a2..f09510f1b88b7 100644 --- a/plugins/inputs/memcached/memcached.go +++ b/plugins/inputs/memcached/memcached.go @@ -22,7 +22,82 @@ import ( //go:embed sample.conf var sampleConfig string -// Memcached is a memcached plugin +var ( + defaultTimeout = 5 * time.Second + + // The list of metrics that should be sent + sendMetrics = []string{ + "accepting_conns", + "auth_cmds", + "auth_errors", + "bytes", + "bytes_read", + "bytes_written", + "cas_badval", + "cas_hits", + "cas_misses", + "cmd_flush", + "cmd_get", + "cmd_set", + "cmd_touch", + "conn_yields", + "connection_structures", + "curr_connections", + "curr_items", + "decr_hits", + "decr_misses", + "delete_hits", + "delete_misses", + "evicted_active", + "evicted_unfetched", + "evictions", + "expired_unfetched", + "extstore_compact_lost", + "extstore_compact_rescues", + "extstore_compact_resc_cold", + "extstore_compact_resc_old", + "extstore_compact_skipped", + "extstore_page_allocs", + "extstore_page_evictions", + "extstore_page_reclaims", + "extstore_pages_free", + "extstore_pages_used", + "extstore_objects_evicted", + "extstore_objects_read", + "extstore_objects_written", + "extstore_objects_used", + "extstore_bytes_evicted", + "extstore_bytes_written", + "extstore_bytes_read", + "extstore_bytes_used", + "extstore_bytes_fragmented", + "extstore_limit_maxbytes", + "extstore_io_queue", + "get_expired", + "get_flushed", + "get_hits", + "get_misses", + "hash_bytes", + "hash_is_expanding", + "hash_power_level", + "incr_hits", + "incr_misses", + "limit_maxbytes", + "listen_disabled_num", + "max_connections", + "reclaimed", + "rejected_connections", + "store_no_memory", + "store_too_large", + "threads", + "total_connections", + "total_items", + "touch_hits", + "touch_misses", + "uptime", + } +) + type Memcached struct { Servers []string `toml:"servers"` UnixSockets []string `toml:"unix_sockets"` @@ -30,85 +105,10 @@ type Memcached struct { common_tls.ClientConfig } -var defaultTimeout = 5 * time.Second - -// The list of metrics that should be sent -var sendMetrics = []string{ - "accepting_conns", - "auth_cmds", - "auth_errors", - "bytes", - "bytes_read", - "bytes_written", - "cas_badval", - "cas_hits", - "cas_misses", - "cmd_flush", - "cmd_get", - "cmd_set", - "cmd_touch", - "conn_yields", - "connection_structures", - "curr_connections", - "curr_items", - "decr_hits", - "decr_misses", - "delete_hits", - "delete_misses", - "evicted_active", - "evicted_unfetched", - "evictions", - "expired_unfetched", - "extstore_compact_lost", - "extstore_compact_rescues", - "extstore_compact_resc_cold", - "extstore_compact_resc_old", - "extstore_compact_skipped", - "extstore_page_allocs", - "extstore_page_evictions", - "extstore_page_reclaims", - "extstore_pages_free", - "extstore_pages_used", - "extstore_objects_evicted", - "extstore_objects_read", - "extstore_objects_written", - "extstore_objects_used", - "extstore_bytes_evicted", - "extstore_bytes_written", - "extstore_bytes_read", - "extstore_bytes_used", - "extstore_bytes_fragmented", - "extstore_limit_maxbytes", - "extstore_io_queue", - "get_expired", - "get_flushed", - "get_hits", - "get_misses", - "hash_bytes", - "hash_is_expanding", - "hash_power_level", - "incr_hits", - "incr_misses", - "limit_maxbytes", - "listen_disabled_num", - "max_connections", - "reclaimed", - "rejected_connections", - "store_no_memory", - "store_too_large", - "threads", - "total_connections", - "total_items", - "touch_hits", - "touch_misses", - "uptime", -} - func (*Memcached) SampleConfig() string { return sampleConfig } -// Gather reads stats from all configured servers accumulates stats func (m *Memcached) Gather(acc telegraf.Accumulator) error { if len(m.Servers) == 0 && len(m.UnixSockets) == 0 { return m.gatherServer(":11211", false, acc) @@ -125,11 +125,7 @@ func (m *Memcached) Gather(acc telegraf.Accumulator) error { return nil } -func (m *Memcached) gatherServer( - address string, - unix bool, - acc telegraf.Accumulator, -) error { +func (m *Memcached) gatherServer(address string, unix bool, acc telegraf.Accumulator) error { var conn net.Conn var err error var dialer proxy.Dialer diff --git a/plugins/inputs/mesos/mesos.go b/plugins/inputs/mesos/mesos.go index c65f9bb3c6eb5..be9da94c080cd 100644 --- a/plugins/inputs/mesos/mesos.go +++ b/plugins/inputs/mesos/mesos.go @@ -23,22 +23,27 @@ import ( //go:embed sample.conf var sampleConfig string -type Role string +type role string const ( - MASTER Role = "master" - SLAVE Role = "slave" + master role = "master" + slave role = "slave" ) +var allMetrics = map[role][]string{ + master: {"resources", "master", "system", "agents", "frameworks", "framework_offers", "tasks", "messages", "evqueue", "registrar", "allocator"}, + slave: {"resources", "agent", "system", "executors", "tasks", "messages"}, +} + type Mesos struct { - Timeout int - Masters []string + Timeout int `toml:"timeout"` + Masters []string `toml:"masters"` MasterCols []string `toml:"master_collections"` - Slaves []string + Slaves []string `toml:"slaves"` SlaveCols []string `toml:"slave_collections"` tls.ClientConfig - Log telegraf.Logger + Log telegraf.Logger `toml:"-"` initialized bool client *http.Client @@ -46,21 +51,52 @@ type Mesos struct { slaveURLs []*url.URL } -var allMetrics = map[Role][]string{ - MASTER: {"resources", "master", "system", "agents", "frameworks", "framework_offers", "tasks", "messages", "evqueue", "registrar", "allocator"}, - SLAVE: {"resources", "agent", "system", "executors", "tasks", "messages"}, +func (*Mesos) SampleConfig() string { + return sampleConfig +} + +func (m *Mesos) Gather(acc telegraf.Accumulator) error { + if !m.initialized { + err := m.initialize() + if err != nil { + return err + } + m.initialized = true + } + + var wg sync.WaitGroup + + for _, mstr := range m.masterURLs { + wg.Add(1) + go func(mstr *url.URL) { + acc.AddError(m.gatherMainMetrics(mstr, master, acc)) + wg.Done() + }(mstr) + } + + for _, slv := range m.slaveURLs { + wg.Add(1) + go func(slv *url.URL) { + acc.AddError(m.gatherMainMetrics(slv, slave, acc)) + wg.Done() + }(slv) + } + + wg.Wait() + + return nil } -func (m *Mesos) parseURL(s string, role Role) (*url.URL, error) { +func (m *Mesos) parseURL(s string, role role) (*url.URL, error) { if !strings.HasPrefix(s, "http://") && !strings.HasPrefix(s, "https://") { host, port, err := net.SplitHostPort(s) // no port specified if err != nil { host = s switch role { - case MASTER: + case master: port = "5050" - case SLAVE: + case slave: port = "5051" } } @@ -74,11 +110,11 @@ func (m *Mesos) parseURL(s string, role Role) (*url.URL, error) { func (m *Mesos) initialize() error { if len(m.MasterCols) == 0 { - m.MasterCols = allMetrics[MASTER] + m.MasterCols = allMetrics[master] } if len(m.SlaveCols) == 0 { - m.SlaveCols = allMetrics[SLAVE] + m.SlaveCols = allMetrics[slave] } if m.Timeout == 0 { @@ -89,8 +125,8 @@ func (m *Mesos) initialize() error { rawQuery := "timeout=" + strconv.Itoa(m.Timeout) + "ms" m.masterURLs = make([]*url.URL, 0, len(m.Masters)) - for _, master := range m.Masters { - u, err := m.parseURL(master, MASTER) + for _, mstr := range m.Masters { + u, err := m.parseURL(mstr, master) if err != nil { return err } @@ -100,8 +136,8 @@ func (m *Mesos) initialize() error { } m.slaveURLs = make([]*url.URL, 0, len(m.Slaves)) - for _, slave := range m.Slaves { - u, err := m.parseURL(slave, SLAVE) + for _, slv := range m.Slaves { + u, err := m.parseURL(slv, slave) if err != nil { return err } @@ -119,43 +155,6 @@ func (m *Mesos) initialize() error { return nil } -func (*Mesos) SampleConfig() string { - return sampleConfig -} - -// Gather() metrics from given list of Mesos Masters -func (m *Mesos) Gather(acc telegraf.Accumulator) error { - if !m.initialized { - err := m.initialize() - if err != nil { - return err - } - m.initialized = true - } - - var wg sync.WaitGroup - - for _, master := range m.masterURLs { - wg.Add(1) - go func(master *url.URL) { - acc.AddError(m.gatherMainMetrics(master, MASTER, acc)) - wg.Done() - }(master) - } - - for _, slave := range m.slaveURLs { - wg.Add(1) - go func(slave *url.URL) { - acc.AddError(m.gatherMainMetrics(slave, SLAVE, acc)) - wg.Done() - }(slave) - } - - wg.Wait() - - return nil -} - func (m *Mesos) createHTTPClient() (*http.Client, error) { tlsCfg, err := m.ClientConfig.TLSConfig() if err != nil { @@ -174,7 +173,7 @@ func (m *Mesos) createHTTPClient() (*http.Client, error) { } // metricsDiff() returns set names for removal -func metricsDiff(role Role, w []string) []string { +func metricsDiff(role role, w []string) []string { b := make([]string, 0, len(allMetrics[role])) s := make(map[string]bool) @@ -196,10 +195,10 @@ func metricsDiff(role Role, w []string) []string { } // masterBlocks serves as kind of metrics registry grouping them in sets -func (m *Mesos) getMetrics(role Role, group string) []string { +func (m *Mesos) getMetrics(role role, group string) []string { metrics := make(map[string][]string) - if role == MASTER { + if role == master { metrics["resources"] = []string{ "master/cpus_percent", "master/cpus_used", @@ -356,7 +355,7 @@ func (m *Mesos) getMetrics(role Role, group string) []string { "registrar/registry_size_bytes", "registrar/state_store_ms/count", } - } else if role == SLAVE { + } else if role == slave { metrics["resources"] = []string{ "slave/cpus_percent", "slave/cpus_used", @@ -430,7 +429,6 @@ func (m *Mesos) getMetrics(role Role, group string) []string { } ret, ok := metrics[group] - if !ok { m.Log.Infof("Unknown role %q metrics group: %s", role, group) return nil @@ -439,13 +437,13 @@ func (m *Mesos) getMetrics(role Role, group string) []string { return ret } -func (m *Mesos) filterMetrics(role Role, metrics *map[string]interface{}) { +func (m *Mesos) filterMetrics(role role, metrics *map[string]interface{}) { var ok bool var selectedMetrics []string - if role == MASTER { + if role == master { selectedMetrics = m.MasterCols - } else if role == SLAVE { + } else if role == slave { selectedMetrics = m.SlaveCols } @@ -476,13 +474,6 @@ func (m *Mesos) filterMetrics(role Role, metrics *map[string]interface{}) { } } -// TaskStats struct for JSON API output /monitor/statistics -type TaskStats struct { - ExecutorID string `json:"executor_id"` - FrameworkID string `json:"framework_id"` - Statistics map[string]interface{} `json:"statistics"` -} - func withPath(u *url.URL, path string) *url.URL { c := *u c.Path = path @@ -498,7 +489,7 @@ func urlTag(u *url.URL) string { } // This should not belong to the object -func (m *Mesos) gatherMainMetrics(u *url.URL, role Role, acc telegraf.Accumulator) error { +func (m *Mesos) gatherMainMetrics(u *url.URL, role role, acc telegraf.Accumulator) error { var jsonOut map[string]interface{} tags := map[string]string{ @@ -533,7 +524,7 @@ func (m *Mesos) gatherMainMetrics(u *url.URL, role Role, acc telegraf.Accumulato return err } - if role == MASTER { + if role == master { if jf.Fields["master/elected"] != 0.0 { tags["state"] = "leader" } else { diff --git a/plugins/inputs/mesos/mesos_test.go b/plugins/inputs/mesos/mesos_test.go index c88412095b168..2a0981441cd30 100644 --- a/plugins/inputs/mesos/mesos_test.go +++ b/plugins/inputs/mesos/mesos_test.go @@ -333,11 +333,11 @@ func TestMasterFilter(t *testing.T) { "messages", "evqueue", "tasks", } - m.filterMetrics(MASTER, &masterMetrics) + m.filterMetrics(master, &masterMetrics) // Assert expected metrics are present. for _, v := range m.MasterCols { - for _, x := range m.getMetrics(MASTER, v) { + for _, x := range m.getMetrics(master, v) { _, ok := masterMetrics[x] require.Truef(t, ok, "Didn't find key %s, it should present.", x) } @@ -354,7 +354,7 @@ func TestMasterFilter(t *testing.T) { // Assert unexpected metrics are not present. for _, v := range b { - for _, x := range m.getMetrics(MASTER, v) { + for _, x := range m.getMetrics(master, v) { _, ok := masterMetrics[x] require.Falsef(t, ok, "Found key %s, it should be gone.", x) } @@ -395,16 +395,16 @@ func TestSlaveFilter(t *testing.T) { "system", "executors", "messages", } - m.filterMetrics(SLAVE, &slaveMetrics) + m.filterMetrics(slave, &slaveMetrics) for _, v := range b { - for _, x := range m.getMetrics(SLAVE, v) { + for _, x := range m.getMetrics(slave, v) { _, ok := slaveMetrics[x] require.Falsef(t, ok, "Found key %s, it should be gone.", x) } } for _, v := range m.MasterCols { - for _, x := range m.getMetrics(SLAVE, v) { + for _, x := range m.getMetrics(slave, v) { _, ok := slaveMetrics[x] require.Truef(t, ok, "Didn't find key %s, it should present.", x) } diff --git a/plugins/inputs/minecraft/client.go b/plugins/inputs/minecraft/client.go index 3ad1337b1781f..1a7783f9a8d67 100644 --- a/plugins/inputs/minecraft/client.go +++ b/plugins/inputs/minecraft/client.go @@ -13,16 +13,16 @@ var ( scoreboardRegex = regexp.MustCompile(`\[(?P[^\]]+)\]: (?P\d+)`) ) -// Connection is an established connection to the Minecraft server. -type Connection interface { +// connection is an established connection to the Minecraft server. +type connection interface { // Execute runs a command. Execute(command string) (string, error) } -// Connector is used to create connections to the Minecraft server. -type Connector interface { - // Connect establishes a connection to the server. - Connect() (Connection, error) +// conn is used to create connections to the Minecraft server. +type conn interface { + // connect establishes a connection to the server. + connect() (connection, error) } func newConnector(hostname, port, password string) *connector { @@ -39,7 +39,7 @@ type connector struct { password string } -func (c *connector) Connect() (Connection, error) { +func (c *connector) connect() (connection, error) { client, err := rcon.Dial(c.hostname+":"+c.port, c.password) if err != nil { return nil, err @@ -48,17 +48,17 @@ func (c *connector) Connect() (Connection, error) { return client, nil } -func newClient(connector Connector) *client { +func newClient(connector conn) *client { return &client{connector: connector} } type client struct { - connector Connector - conn Connection + connector conn + conn connection } -func (c *client) Connect() error { - conn, err := c.connector.Connect() +func (c *client) connect() error { + conn, err := c.connector.connect() if err != nil { return err } @@ -66,9 +66,9 @@ func (c *client) Connect() error { return nil } -func (c *client) Players() ([]string, error) { +func (c *client) players() ([]string, error) { if c.conn == nil { - err := c.Connect() + err := c.connect() if err != nil { return nil, err } @@ -83,9 +83,9 @@ func (c *client) Players() ([]string, error) { return parsePlayers(resp), nil } -func (c *client) Scores(player string) ([]Score, error) { +func (c *client) scores(player string) ([]score, error) { if c.conn == nil { - err := c.Connect() + err := c.connect() if err != nil { return nil, err } @@ -127,13 +127,13 @@ func parsePlayers(input string) []string { return players } -// Score is an individual tracked scoreboard stat. -type Score struct { - Name string - Value int64 +// score is an individual tracked scoreboard stat. +type score struct { + name string + value int64 } -func parseScores(input string) []Score { +func parseScores(input string) []score { if strings.Contains(input, "has no scores") { return nil } @@ -147,19 +147,19 @@ func parseScores(input string) []Score { } matches := re.FindAllStringSubmatch(input, -1) - scores := make([]Score, 0, len(matches)) + scores := make([]score, 0, len(matches)) for _, match := range matches { - score := Score{} + score := score{} for i, subexp := range re.SubexpNames() { switch subexp { case "name": - score.Name = match[i] + score.name = match[i] case "value": value, err := strconv.ParseInt(match[i], 10, 64) if err != nil { continue } - score.Value = value + score.value = value default: continue } diff --git a/plugins/inputs/minecraft/client_test.go b/plugins/inputs/minecraft/client_test.go index 9d71b7472e81e..02a5332d49409 100644 --- a/plugins/inputs/minecraft/client_test.go +++ b/plugins/inputs/minecraft/client_test.go @@ -6,19 +6,19 @@ import ( "github.com/stretchr/testify/require" ) -type MockConnection struct { +type mockConnection struct { commands map[string]string } -func (c *MockConnection) Execute(command string) (string, error) { +func (c *mockConnection) Execute(command string) (string, error) { return c.commands[command], nil } -type MockConnector struct { - conn *MockConnection +type mockConnector struct { + conn *mockConnection } -func (c *MockConnector) Connect() (Connection, error) { +func (c *mockConnector) connect() (connection, error) { return c.conn, nil } @@ -92,12 +92,12 @@ func TestClient_Player(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - connector := &MockConnector{ - conn: &MockConnection{commands: tt.commands}, + connector := &mockConnector{ + conn: &mockConnection{commands: tt.commands}, } client := newClient(connector) - actual, err := client.Players() + actual, err := client.players() require.NoError(t, err) require.Equal(t, tt.expected, actual) @@ -110,7 +110,7 @@ func TestClient_Scores(t *testing.T) { name string player string commands map[string]string - expected []Score + expected []score }{ { name: "minecraft 1.12 player with no scores", @@ -125,8 +125,8 @@ func TestClient_Scores(t *testing.T) { commands: map[string]string{ "scoreboard players list Etho": "Showing 1 tracked objective(s) for Etho:- jump: 2 (jump)", }, - expected: []Score{ - {Name: "jump", Value: 2}, + expected: []score{ + {name: "jump", value: 2}, }, }, { @@ -135,10 +135,10 @@ func TestClient_Scores(t *testing.T) { commands: map[string]string{ "scoreboard players list Etho": "Showing 3 tracked objective(s) for Etho:- hopper: 2 (hopper)- dropper: 2 (dropper)- redstone: 1 (redstone)", }, - expected: []Score{ - {Name: "hopper", Value: 2}, - {Name: "dropper", Value: 2}, - {Name: "redstone", Value: 1}, + expected: []score{ + {name: "hopper", value: 2}, + {name: "dropper", value: 2}, + {name: "redstone", value: 1}, }, }, { @@ -154,8 +154,8 @@ func TestClient_Scores(t *testing.T) { commands: map[string]string{ "scoreboard players list Etho": "Etho has 1 scores:[jumps]: 1", }, - expected: []Score{ - {Name: "jumps", Value: 1}, + expected: []score{ + {name: "jumps", value: 1}, }, }, { @@ -164,21 +164,21 @@ func TestClient_Scores(t *testing.T) { commands: map[string]string{ "scoreboard players list Etho": "Etho has 3 scores:[hopper]: 2[dropper]: 2[redstone]: 1", }, - expected: []Score{ - {Name: "hopper", Value: 2}, - {Name: "dropper", Value: 2}, - {Name: "redstone", Value: 1}, + expected: []score{ + {name: "hopper", value: 2}, + {name: "dropper", value: 2}, + {name: "redstone", value: 1}, }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - connector := &MockConnector{ - conn: &MockConnection{commands: tt.commands}, + connector := &mockConnector{ + conn: &mockConnection{commands: tt.commands}, } client := newClient(connector) - actual, err := client.Scores(tt.player) + actual, err := client.scores(tt.player) require.NoError(t, err) require.Equal(t, tt.expected, actual) diff --git a/plugins/inputs/minecraft/minecraft.go b/plugins/inputs/minecraft/minecraft.go index c10b44a54b670..979b0317a4c79 100644 --- a/plugins/inputs/minecraft/minecraft.go +++ b/plugins/inputs/minecraft/minecraft.go @@ -11,25 +11,24 @@ import ( //go:embed sample.conf var sampleConfig string -// Client is a client for the Minecraft server. -type Client interface { - // Connect establishes a connection to the server. - Connect() error - - // Players returns the players on the scoreboard. - Players() ([]string, error) - - // Scores return the objective scores for a player. - Scores(player string) ([]Score, error) -} - -// Minecraft is the plugin type. type Minecraft struct { Server string `toml:"server"` Port string `toml:"port"` Password string `toml:"password"` - client Client + client cli +} + +// cli is a client for the Minecraft server. +type cli interface { + // connect establishes a connection to the server. + connect() error + + // players returns the players on the scoreboard. + players() ([]string, error) + + // scores returns the objective scores for a player. + scores(player string) ([]score, error) } func (*Minecraft) SampleConfig() string { @@ -42,13 +41,13 @@ func (s *Minecraft) Gather(acc telegraf.Accumulator) error { s.client = newClient(connector) } - players, err := s.client.Players() + players, err := s.client.players() if err != nil { return err } for _, player := range players { - scores, err := s.client.Scores(player) + scores, err := s.client.scores(player) if err != nil { return err } @@ -62,7 +61,7 @@ func (s *Minecraft) Gather(acc telegraf.Accumulator) error { var fields = make(map[string]interface{}, len(scores)) for _, score := range scores { - fields[score.Name] = score.Value + fields[score.name] = score.value } acc.AddFields("minecraft", fields, tags) diff --git a/plugins/inputs/minecraft/minecraft_test.go b/plugins/inputs/minecraft/minecraft_test.go index fcf4a363b0758..eed0c5b6b297c 100644 --- a/plugins/inputs/minecraft/minecraft_test.go +++ b/plugins/inputs/minecraft/minecraft_test.go @@ -9,22 +9,22 @@ import ( "github.com/stretchr/testify/require" ) -type MockClient struct { - ConnectF func() error - PlayersF func() ([]string, error) - ScoresF func(player string) ([]Score, error) +type mockClient struct { + connectF func() error + playersF func() ([]string, error) + scoresF func(player string) ([]score, error) } -func (c *MockClient) Connect() error { - return c.ConnectF() +func (c *mockClient) connect() error { + return c.connectF() } -func (c *MockClient) Players() ([]string, error) { - return c.PlayersF() +func (c *mockClient) players() ([]string, error) { + return c.playersF() } -func (c *MockClient) Scores(player string) ([]Score, error) { - return c.ScoresF(player) +func (c *mockClient) scores(player string) ([]score, error) { + return c.scoresF(player) } func TestGather(t *testing.T) { @@ -32,31 +32,31 @@ func TestGather(t *testing.T) { tests := []struct { name string - client *MockClient + client *mockClient metrics []telegraf.Metric err error }{ { name: "no players", - client: &MockClient{ - ConnectF: func() error { + client: &mockClient{ + connectF: func() error { return nil }, - PlayersF: func() ([]string, error) { + playersF: func() ([]string, error) { return nil, nil }, }, }, { name: "one player without scores", - client: &MockClient{ - ConnectF: func() error { + client: &mockClient{ + connectF: func() error { return nil }, - PlayersF: func() ([]string, error) { + playersF: func() ([]string, error) { return []string{"Etho"}, nil }, - ScoresF: func(player string) ([]Score, error) { + scoresF: func(player string) ([]score, error) { switch player { case "Etho": return nil, nil @@ -68,17 +68,17 @@ func TestGather(t *testing.T) { }, { name: "one player with scores", - client: &MockClient{ - ConnectF: func() error { + client: &mockClient{ + connectF: func() error { return nil }, - PlayersF: func() ([]string, error) { + playersF: func() ([]string, error) { return []string{"Etho"}, nil }, - ScoresF: func(player string) ([]Score, error) { + scoresF: func(player string) ([]score, error) { switch player { case "Etho": - return []Score{{Name: "jumps", Value: 42}}, nil + return []score{{name: "jumps", value: 42}}, nil default: panic("unknown player") } diff --git a/plugins/inputs/mock/mock.go b/plugins/inputs/mock/mock.go index 94e94175272ad..6b9adaa73f9da 100644 --- a/plugins/inputs/mock/mock.go +++ b/plugins/inputs/mock/mock.go @@ -48,22 +48,22 @@ type sineWave struct { } type step struct { - latest float64 - Name string `toml:"name"` Start float64 `toml:"start"` Step float64 `toml:"step"` Min float64 `toml:"min" deprecated:"1.28.2;1.35.0;use 'start' instead"` Max float64 `toml:"max" deprecated:"1.28.2;1.35.0;use 'step' instead"` -} -type stock struct { latest float64 +} +type stock struct { Name string `toml:"name"` Price float64 `toml:"price"` Volatility float64 `toml:"volatility"` + + latest float64 } func (*Mock) SampleConfig() string { diff --git a/plugins/inputs/modbus/configuration.go b/plugins/inputs/modbus/configuration.go index ef2acce74f665..13bcc58de41b6 100644 --- a/plugins/inputs/modbus/configuration.go +++ b/plugins/inputs/modbus/configuration.go @@ -9,10 +9,10 @@ const ( maxQuantityHoldingRegisters = uint16(125) ) -type Configuration interface { - Check() error - Process() (map[byte]requestSet, error) - SampleConfigPart() string +type configuration interface { + check() error + process() (map[byte]requestSet, error) + sampleConfigPart() string } func removeDuplicates(elements []uint16) []uint16 { diff --git a/plugins/inputs/modbus/configuration_metric.go b/plugins/inputs/modbus/configuration_metric.go index 70323dda8f173..c0301728e0e39 100644 --- a/plugins/inputs/modbus/configuration_metric.go +++ b/plugins/inputs/modbus/configuration_metric.go @@ -32,21 +32,21 @@ type metricDefinition struct { Tags map[string]string `toml:"tags"` } -type ConfigurationPerMetric struct { +type configurationPerMetric struct { Optimization string `toml:"optimization"` MaxExtraRegisters uint16 `toml:"optimization_max_register_fill"` Metrics []metricDefinition `toml:"metric"` - workarounds ModbusWorkarounds + workarounds workarounds excludeRegisterType bool logger telegraf.Logger } -func (c *ConfigurationPerMetric) SampleConfigPart() string { +func (c *configurationPerMetric) sampleConfigPart() string { return sampleConfigPartPerMetric } -func (c *ConfigurationPerMetric) Check() error { +func (c *configurationPerMetric) check() error { switch c.workarounds.StringRegisterLocation { case "", "both", "lower", "upper": // Do nothing as those are valid @@ -178,7 +178,7 @@ func (c *ConfigurationPerMetric) Check() error { return nil } -func (c *ConfigurationPerMetric) Process() (map[byte]requestSet, error) { +func (c *configurationPerMetric) process() (map[byte]requestSet, error) { collection := make(map[byte]map[string][]field) // Collect the requested registers across metrics and transform them into @@ -206,40 +206,40 @@ func (c *ConfigurationPerMetric) Process() (map[byte]requestSet, error) { result := make(map[byte]requestSet) params := groupingParams{ - Optimization: c.Optimization, - MaxExtraRegisters: c.MaxExtraRegisters, - Log: c.logger, + optimization: c.Optimization, + maxExtraRegisters: c.MaxExtraRegisters, + log: c.logger, } for sid, scollection := range collection { var set requestSet for registerType, fields := range scollection { switch registerType { case "coil": - params.MaxBatchSize = maxQuantityCoils + params.maxBatchSize = maxQuantityCoils if c.workarounds.OnRequestPerField { - params.MaxBatchSize = 1 + params.maxBatchSize = 1 } - params.EnforceFromZero = c.workarounds.ReadCoilsStartingAtZero + params.enforceFromZero = c.workarounds.ReadCoilsStartingAtZero requests := groupFieldsToRequests(fields, params) set.coil = append(set.coil, requests...) case "discrete": - params.MaxBatchSize = maxQuantityDiscreteInput + params.maxBatchSize = maxQuantityDiscreteInput if c.workarounds.OnRequestPerField { - params.MaxBatchSize = 1 + params.maxBatchSize = 1 } requests := groupFieldsToRequests(fields, params) set.discrete = append(set.discrete, requests...) case "holding": - params.MaxBatchSize = maxQuantityHoldingRegisters + params.maxBatchSize = maxQuantityHoldingRegisters if c.workarounds.OnRequestPerField { - params.MaxBatchSize = 1 + params.maxBatchSize = 1 } requests := groupFieldsToRequests(fields, params) set.holding = append(set.holding, requests...) case "input": - params.MaxBatchSize = maxQuantityInputRegisters + params.maxBatchSize = maxQuantityInputRegisters if c.workarounds.OnRequestPerField { - params.MaxBatchSize = 1 + params.maxBatchSize = 1 } requests := groupFieldsToRequests(fields, params) set.input = append(set.input, requests...) @@ -247,7 +247,7 @@ func (c *ConfigurationPerMetric) Process() (map[byte]requestSet, error) { return nil, fmt.Errorf("unknown register type %q", registerType) } } - if !set.Empty() { + if !set.empty() { result[sid] = set } } @@ -255,7 +255,7 @@ func (c *ConfigurationPerMetric) Process() (map[byte]requestSet, error) { return result, nil } -func (c *ConfigurationPerMetric) newField(def metricFieldDefinition, mdef metricDefinition) (field, error) { +func (c *configurationPerMetric) newField(def metricFieldDefinition, mdef metricDefinition) (field, error) { typed := def.RegisterType == "holding" || def.RegisterType == "input" fieldLength := uint16(1) @@ -339,7 +339,7 @@ func (c *ConfigurationPerMetric) newField(def metricFieldDefinition, mdef metric return f, nil } -func (c *ConfigurationPerMetric) fieldID(seed maphash.Seed, def metricDefinition, field metricFieldDefinition) uint64 { +func (c *configurationPerMetric) fieldID(seed maphash.Seed, def metricDefinition, field metricFieldDefinition) uint64 { var mh maphash.Hash mh.SetSeed(seed) @@ -354,7 +354,7 @@ func (c *ConfigurationPerMetric) fieldID(seed maphash.Seed, def metricDefinition mh.WriteString(field.Name) mh.WriteByte(0) - // Tags + // tags for k, v := range def.Tags { mh.WriteString(k) mh.WriteByte('=') @@ -366,7 +366,7 @@ func (c *ConfigurationPerMetric) fieldID(seed maphash.Seed, def metricDefinition return mh.Sum64() } -func (c *ConfigurationPerMetric) determineOutputDatatype(input string) (string, error) { +func (c *configurationPerMetric) determineOutputDatatype(input string) (string, error) { // Handle our special types switch input { case "INT8L", "INT8H", "INT16", "INT32", "INT64": @@ -381,7 +381,7 @@ func (c *ConfigurationPerMetric) determineOutputDatatype(input string) (string, return "unknown", fmt.Errorf("invalid input datatype %q for determining output", input) } -func (c *ConfigurationPerMetric) determineFieldLength(input string, length uint16) (uint16, error) { +func (c *configurationPerMetric) determineFieldLength(input string, length uint16) (uint16, error) { // Handle our special types switch input { case "BIT", "INT8L", "INT8H", "UINT8L", "UINT8H": diff --git a/plugins/inputs/modbus/configuration_metric_test.go b/plugins/inputs/modbus/configuration_metric_test.go index 8829c89cfabb3..541098cb45346 100644 --- a/plugins/inputs/modbus/configuration_metric_test.go +++ b/plugins/inputs/modbus/configuration_metric_test.go @@ -371,7 +371,7 @@ func TestMetricAddressOverflow(t *testing.T) { Controller: "tcp://localhost:1502", ConfigurationType: "metric", Log: logger, - Workarounds: ModbusWorkarounds{ReadCoilsStartingAtZero: true}, + Workarounds: workarounds{ReadCoilsStartingAtZero: true}, } plugin.Metrics = []metricDefinition{ { diff --git a/plugins/inputs/modbus/configuration_register.go b/plugins/inputs/modbus/configuration_register.go index 91eb9e2e7c6cc..9bd70caca6caa 100644 --- a/plugins/inputs/modbus/configuration_register.go +++ b/plugins/inputs/modbus/configuration_register.go @@ -21,21 +21,21 @@ type fieldDefinition struct { Bit uint8 `toml:"bit"` } -type ConfigurationOriginal struct { +type configurationOriginal struct { SlaveID byte `toml:"slave_id"` DiscreteInputs []fieldDefinition `toml:"discrete_inputs"` Coils []fieldDefinition `toml:"coils"` HoldingRegisters []fieldDefinition `toml:"holding_registers"` InputRegisters []fieldDefinition `toml:"input_registers"` - workarounds ModbusWorkarounds + workarounds workarounds logger telegraf.Logger } -func (c *ConfigurationOriginal) SampleConfigPart() string { +func (c *configurationOriginal) sampleConfigPart() string { return sampleConfigPartPerRegister } -func (c *ConfigurationOriginal) Check() error { +func (c *configurationOriginal) check() error { switch c.workarounds.StringRegisterLocation { case "", "both", "lower", "upper": // Do nothing as those are valid @@ -58,7 +58,7 @@ func (c *ConfigurationOriginal) Check() error { return c.validateFieldDefinitions(c.InputRegisters, cInputRegisters) } -func (c *ConfigurationOriginal) Process() (map[byte]requestSet, error) { +func (c *configurationOriginal) process() (map[byte]requestSet, error) { maxQuantity := uint16(1) if !c.workarounds.OnRequestPerField { maxQuantity = maxQuantityCoils @@ -102,22 +102,22 @@ func (c *ConfigurationOriginal) Process() (map[byte]requestSet, error) { }, nil } -func (c *ConfigurationOriginal) initRequests(fieldDefs []fieldDefinition, maxQuantity uint16, typed bool) ([]request, error) { +func (c *configurationOriginal) initRequests(fieldDefs []fieldDefinition, maxQuantity uint16, typed bool) ([]request, error) { fields, err := c.initFields(fieldDefs, typed) if err != nil { return nil, err } params := groupingParams{ - MaxBatchSize: maxQuantity, - Optimization: "none", - EnforceFromZero: c.workarounds.ReadCoilsStartingAtZero, - Log: c.logger, + maxBatchSize: maxQuantity, + optimization: "none", + enforceFromZero: c.workarounds.ReadCoilsStartingAtZero, + log: c.logger, } return groupFieldsToRequests(fields, params), nil } -func (c *ConfigurationOriginal) initFields(fieldDefs []fieldDefinition, typed bool) ([]field, error) { +func (c *configurationOriginal) initFields(fieldDefs []fieldDefinition, typed bool) ([]field, error) { // Construct the fields from the field definitions fields := make([]field, 0, len(fieldDefs)) for _, def := range fieldDefs { @@ -131,7 +131,7 @@ func (c *ConfigurationOriginal) initFields(fieldDefs []fieldDefinition, typed bo return fields, nil } -func (c *ConfigurationOriginal) newFieldFromDefinition(def fieldDefinition, typed bool) (field, error) { +func (c *configurationOriginal) newFieldFromDefinition(def fieldDefinition, typed bool) (field, error) { // Check if the addresses are consecutive expected := def.Address[0] for _, current := range def.Address[1:] { @@ -182,7 +182,7 @@ func (c *ConfigurationOriginal) newFieldFromDefinition(def fieldDefinition, type return f, nil } -func (c *ConfigurationOriginal) validateFieldDefinitions(fieldDefs []fieldDefinition, registerType string) error { +func (c *configurationOriginal) validateFieldDefinitions(fieldDefs []fieldDefinition, registerType string) error { nameEncountered := make(map[string]bool, len(fieldDefs)) for _, item := range fieldDefs { // check empty name @@ -276,7 +276,7 @@ func (c *ConfigurationOriginal) validateFieldDefinitions(fieldDefs []fieldDefini return nil } -func (c *ConfigurationOriginal) normalizeInputDatatype(dataType string, words int) (string, error) { +func (c *configurationOriginal) normalizeInputDatatype(dataType string, words int) (string, error) { if dataType == "FLOAT32" { config.PrintOptionValueDeprecationNotice("input.modbus", "data_type", "FLOAT32", telegraf.DeprecationInfo{ Since: "1.16.0", @@ -323,7 +323,7 @@ func (c *ConfigurationOriginal) normalizeInputDatatype(dataType string, words in return normalizeInputDatatype(dataType) } -func (c *ConfigurationOriginal) normalizeOutputDatatype(dataType string) (string, error) { +func (c *configurationOriginal) normalizeOutputDatatype(dataType string) (string, error) { // Handle our special types switch dataType { case "FIXED", "FLOAT32", "UFIXED": @@ -332,7 +332,7 @@ func (c *ConfigurationOriginal) normalizeOutputDatatype(dataType string) (string return normalizeOutputDatatype("native") } -func (c *ConfigurationOriginal) normalizeByteOrder(byteOrder string) (string, error) { +func (c *configurationOriginal) normalizeByteOrder(byteOrder string) (string, error) { // Handle our special types switch byteOrder { case "AB", "ABCDEFGH": diff --git a/plugins/inputs/modbus/configuration_request.go b/plugins/inputs/modbus/configuration_request.go index 02c12d9badfc1..6288b0c1b5f99 100644 --- a/plugins/inputs/modbus/configuration_request.go +++ b/plugins/inputs/modbus/configuration_request.go @@ -37,19 +37,19 @@ type requestDefinition struct { Tags map[string]string `toml:"tags"` } -type ConfigurationPerRequest struct { +type configurationPerRequest struct { Requests []requestDefinition `toml:"request"` - workarounds ModbusWorkarounds + workarounds workarounds excludeRegisterType bool logger telegraf.Logger } -func (c *ConfigurationPerRequest) SampleConfigPart() string { +func (c *configurationPerRequest) sampleConfigPart() string { return sampleConfigPartPerRequest } -func (c *ConfigurationPerRequest) Check() error { +func (c *configurationPerRequest) check() error { switch c.workarounds.StringRegisterLocation { case "", "both", "lower", "upper": // Do nothing as those are valid @@ -213,7 +213,7 @@ func (c *ConfigurationPerRequest) Check() error { return nil } -func (c *ConfigurationPerRequest) Process() (map[byte]requestSet, error) { +func (c *configurationPerRequest) process() (map[byte]requestSet, error) { result := make(map[byte]requestSet, len(c.Requests)) for _, def := range c.Requests { // Set default @@ -235,45 +235,45 @@ func (c *ConfigurationPerRequest) Process() (map[byte]requestSet, error) { } params := groupingParams{ - MaxExtraRegisters: def.MaxExtraRegisters, - Optimization: def.Optimization, - Tags: def.Tags, - Log: c.logger, + maxExtraRegisters: def.MaxExtraRegisters, + optimization: def.Optimization, + tags: def.Tags, + log: c.logger, } switch def.RegisterType { case "coil": - params.MaxBatchSize = maxQuantityCoils + params.maxBatchSize = maxQuantityCoils if c.workarounds.OnRequestPerField { - params.MaxBatchSize = 1 + params.maxBatchSize = 1 } - params.EnforceFromZero = c.workarounds.ReadCoilsStartingAtZero + params.enforceFromZero = c.workarounds.ReadCoilsStartingAtZero requests := groupFieldsToRequests(fields, params) set.coil = append(set.coil, requests...) case "discrete": - params.MaxBatchSize = maxQuantityDiscreteInput + params.maxBatchSize = maxQuantityDiscreteInput if c.workarounds.OnRequestPerField { - params.MaxBatchSize = 1 + params.maxBatchSize = 1 } requests := groupFieldsToRequests(fields, params) set.discrete = append(set.discrete, requests...) case "holding": - params.MaxBatchSize = maxQuantityHoldingRegisters + params.maxBatchSize = maxQuantityHoldingRegisters if c.workarounds.OnRequestPerField { - params.MaxBatchSize = 1 + params.maxBatchSize = 1 } requests := groupFieldsToRequests(fields, params) set.holding = append(set.holding, requests...) case "input": - params.MaxBatchSize = maxQuantityInputRegisters + params.maxBatchSize = maxQuantityInputRegisters if c.workarounds.OnRequestPerField { - params.MaxBatchSize = 1 + params.maxBatchSize = 1 } requests := groupFieldsToRequests(fields, params) set.input = append(set.input, requests...) default: return nil, fmt.Errorf("unknown register type %q", def.RegisterType) } - if !set.Empty() { + if !set.empty() { result[def.SlaveID] = set } } @@ -281,7 +281,7 @@ func (c *ConfigurationPerRequest) Process() (map[byte]requestSet, error) { return result, nil } -func (c *ConfigurationPerRequest) initFields(fieldDefs []requestFieldDefinition, typed bool, byteOrder string) ([]field, error) { +func (c *configurationPerRequest) initFields(fieldDefs []requestFieldDefinition, typed bool, byteOrder string) ([]field, error) { // Construct the fields from the field definitions fields := make([]field, 0, len(fieldDefs)) for _, def := range fieldDefs { @@ -295,7 +295,7 @@ func (c *ConfigurationPerRequest) initFields(fieldDefs []requestFieldDefinition, return fields, nil } -func (c *ConfigurationPerRequest) newFieldFromDefinition(def requestFieldDefinition, typed bool, byteOrder string) (field, error) { +func (c *configurationPerRequest) newFieldFromDefinition(def requestFieldDefinition, typed bool, byteOrder string) (field, error) { var err error fieldLength := uint16(1) @@ -379,7 +379,7 @@ func (c *ConfigurationPerRequest) newFieldFromDefinition(def requestFieldDefinit return f, nil } -func (c *ConfigurationPerRequest) fieldID(seed maphash.Seed, def requestDefinition, field requestFieldDefinition) uint64 { +func (c *configurationPerRequest) fieldID(seed maphash.Seed, def requestDefinition, field requestFieldDefinition) uint64 { var mh maphash.Hash mh.SetSeed(seed) @@ -394,7 +394,7 @@ func (c *ConfigurationPerRequest) fieldID(seed maphash.Seed, def requestDefiniti mh.WriteString(field.Name) mh.WriteByte(0) - // Tags + // tags for k, v := range def.Tags { mh.WriteString(k) mh.WriteByte('=') @@ -406,7 +406,7 @@ func (c *ConfigurationPerRequest) fieldID(seed maphash.Seed, def requestDefiniti return mh.Sum64() } -func (c *ConfigurationPerRequest) determineOutputDatatype(input string) (string, error) { +func (c *configurationPerRequest) determineOutputDatatype(input string) (string, error) { // Handle our special types switch input { case "INT8L", "INT8H", "INT16", "INT32", "INT64": @@ -421,7 +421,7 @@ func (c *ConfigurationPerRequest) determineOutputDatatype(input string) (string, return "unknown", fmt.Errorf("invalid input datatype %q for determining output", input) } -func (c *ConfigurationPerRequest) determineFieldLength(input string, length uint16) (uint16, error) { +func (c *configurationPerRequest) determineFieldLength(input string, length uint16) (uint16, error) { // Handle our special types switch input { case "BIT", "INT8L", "INT8H", "UINT8L", "UINT8H": diff --git a/plugins/inputs/modbus/configuration_request_test.go b/plugins/inputs/modbus/configuration_request_test.go index e298c95decc34..45f971ac3b715 100644 --- a/plugins/inputs/modbus/configuration_request_test.go +++ b/plugins/inputs/modbus/configuration_request_test.go @@ -3177,7 +3177,7 @@ func TestRequestWorkaroundsOneRequestPerField(t *testing.T) { Controller: "tcp://localhost:1502", ConfigurationType: "request", Log: testutil.Logger{}, - Workarounds: ModbusWorkarounds{OnRequestPerField: true}, + Workarounds: workarounds{OnRequestPerField: true}, } plugin.Requests = []requestDefinition{ { @@ -3223,7 +3223,7 @@ func TestRequestWorkaroundsReadCoilsStartingAtZeroRequest(t *testing.T) { Controller: "tcp://localhost:1502", ConfigurationType: "request", Log: testutil.Logger{}, - Workarounds: ModbusWorkarounds{ReadCoilsStartingAtZero: true}, + Workarounds: workarounds{ReadCoilsStartingAtZero: true}, } plugin.SlaveID = 1 plugin.Requests = []requestDefinition{ @@ -3262,7 +3262,7 @@ func TestRequestOverlap(t *testing.T) { Controller: "tcp://localhost:1502", ConfigurationType: "request", Log: logger, - Workarounds: ModbusWorkarounds{ReadCoilsStartingAtZero: true}, + Workarounds: workarounds{ReadCoilsStartingAtZero: true}, } plugin.Requests = []requestDefinition{ { @@ -3320,7 +3320,7 @@ func TestRequestAddressOverflow(t *testing.T) { Controller: "tcp://localhost:1502", ConfigurationType: "request", Log: logger, - Workarounds: ModbusWorkarounds{ReadCoilsStartingAtZero: true}, + Workarounds: workarounds{ReadCoilsStartingAtZero: true}, } plugin.Requests = []requestDefinition{ { diff --git a/plugins/inputs/modbus/modbus.go b/plugins/inputs/modbus/modbus.go index b21f52c662973..0d95d3987ced6 100644 --- a/plugins/inputs/modbus/modbus.go +++ b/plugins/inputs/modbus/modbus.go @@ -27,7 +27,45 @@ var sampleConfigEnd string var errAddressOverflow = errors.New("address overflow") -type ModbusWorkarounds struct { +const ( + cDiscreteInputs = "discrete_input" + cCoils = "coil" + cHoldingRegisters = "holding_register" + cInputRegisters = "input_register" +) + +type Modbus struct { + Name string `toml:"name"` + Controller string `toml:"controller"` + TransmissionMode string `toml:"transmission_mode"` + BaudRate int `toml:"baud_rate"` + DataBits int `toml:"data_bits"` + Parity string `toml:"parity"` + StopBits int `toml:"stop_bits"` + RS485 *rs485Config `toml:"rs485"` + Timeout config.Duration `toml:"timeout"` + Retries int `toml:"busy_retries"` + RetriesWaitTime config.Duration `toml:"busy_retries_wait"` + DebugConnection bool `toml:"debug_connection" deprecated:"1.35.0;use 'log_level' 'trace' instead"` + Workarounds workarounds `toml:"workarounds"` + ConfigurationType string `toml:"configuration_type"` + ExcludeRegisterTypeTag bool `toml:"exclude_register_type_tag"` + Log telegraf.Logger `toml:"-"` + + // configuration type specific settings + configurationOriginal + configurationPerRequest + configurationPerMetric + + // Connection handling + client mb.Client + handler mb.ClientHandler + isConnected bool + // Request handling + requests map[byte]requestSet +} + +type workarounds struct { AfterConnectPause config.Duration `toml:"pause_after_connect"` PollPause config.Duration `toml:"pause_between_requests"` CloseAfterGather bool `toml:"close_connection_after_gather"` @@ -37,7 +75,7 @@ type ModbusWorkarounds struct { } // According to github.com/grid-x/serial -type RS485Config struct { +type rs485Config struct { DelayRtsBeforeSend config.Duration `toml:"delay_rts_before_send"` DelayRtsAfterSend config.Duration `toml:"delay_rts_after_send"` RtsHighDuringSend bool `toml:"rts_high_during_send"` @@ -45,38 +83,6 @@ type RS485Config struct { RxDuringTx bool `toml:"rx_during_tx"` } -// Modbus holds all data relevant to the plugin -type Modbus struct { - Name string `toml:"name"` - Controller string `toml:"controller"` - TransmissionMode string `toml:"transmission_mode"` - BaudRate int `toml:"baud_rate"` - DataBits int `toml:"data_bits"` - Parity string `toml:"parity"` - StopBits int `toml:"stop_bits"` - RS485 *RS485Config `toml:"rs485"` - Timeout config.Duration `toml:"timeout"` - Retries int `toml:"busy_retries"` - RetriesWaitTime config.Duration `toml:"busy_retries_wait"` - DebugConnection bool `toml:"debug_connection" deprecated:"1.35.0;use 'log_level' 'trace' instead"` - Workarounds ModbusWorkarounds `toml:"workarounds"` - ConfigurationType string `toml:"configuration_type"` - ExcludeRegisterTypeTag bool `toml:"exclude_register_type_tag"` - Log telegraf.Logger `toml:"-"` - - // Configuration type specific settings - ConfigurationOriginal - ConfigurationPerRequest - ConfigurationPerMetric - - // Connection handling - client mb.Client - handler mb.ClientHandler - isConnected bool - // Request handling - requests map[byte]requestSet -} - type fieldConverterFunc func(bytes []byte) interface{} type requestSet struct { @@ -86,7 +92,7 @@ type requestSet struct { input []request } -func (r requestSet) Empty() bool { +func (r requestSet) empty() bool { l := len(r.coil) l += len(r.discrete) l += len(r.holding) @@ -105,24 +111,16 @@ type field struct { tags map[string]string } -const ( - cDiscreteInputs = "discrete_input" - cCoils = "coil" - cHoldingRegisters = "holding_register" - cInputRegisters = "input_register" -) - -// SampleConfig returns a basic configuration for the plugin func (m *Modbus) SampleConfig() string { - configs := []Configuration{ - &m.ConfigurationOriginal, - &m.ConfigurationPerRequest, - &m.ConfigurationPerMetric, + configs := []configuration{ + &m.configurationOriginal, + &m.configurationPerRequest, + &m.configurationPerMetric, } totalConfig := sampleConfigStart for _, c := range configs { - totalConfig += c.SampleConfigPart() + "\n" + totalConfig += c.sampleConfigPart() + "\n" } totalConfig += "\n" totalConfig += sampleConfigEnd @@ -140,32 +138,32 @@ func (m *Modbus) Init() error { } // Determine the configuration style - var cfg Configuration + var cfg configuration switch m.ConfigurationType { case "", "register": - m.ConfigurationOriginal.workarounds = m.Workarounds - m.ConfigurationOriginal.logger = m.Log - cfg = &m.ConfigurationOriginal + m.configurationOriginal.workarounds = m.Workarounds + m.configurationOriginal.logger = m.Log + cfg = &m.configurationOriginal case "request": - m.ConfigurationPerRequest.workarounds = m.Workarounds - m.ConfigurationPerRequest.excludeRegisterType = m.ExcludeRegisterTypeTag - m.ConfigurationPerRequest.logger = m.Log - cfg = &m.ConfigurationPerRequest + m.configurationPerRequest.workarounds = m.Workarounds + m.configurationPerRequest.excludeRegisterType = m.ExcludeRegisterTypeTag + m.configurationPerRequest.logger = m.Log + cfg = &m.configurationPerRequest case "metric": - m.ConfigurationPerMetric.workarounds = m.Workarounds - m.ConfigurationPerMetric.excludeRegisterType = m.ExcludeRegisterTypeTag - m.ConfigurationPerMetric.logger = m.Log - cfg = &m.ConfigurationPerMetric + m.configurationPerMetric.workarounds = m.Workarounds + m.configurationPerMetric.excludeRegisterType = m.ExcludeRegisterTypeTag + m.configurationPerMetric.logger = m.Log + cfg = &m.configurationPerMetric default: return fmt.Errorf("unknown configuration type %q in device %q", m.ConfigurationType, m.Name) } // Check and process the configuration - if err := cfg.Check(); err != nil { + if err := cfg.check(); err != nil { return fmt.Errorf("configuration invalid for device %q: %w", m.Name, err) } - r, err := cfg.Process() + r, err := cfg.process() if err != nil { return fmt.Errorf("cannot process configuration for device %q: %w", m.Name, err) } @@ -219,7 +217,6 @@ func (m *Modbus) Init() error { return nil } -// Gather implements the telegraf plugin interface method for data accumulation func (m *Modbus) Gather(acc telegraf.Accumulator) error { if !m.isConnected { if err := m.connect(); err != nil { @@ -558,7 +555,7 @@ func (m *Modbus) collectFields(grouper *metric.SeriesGrouper, timestamp time.Tim } } -// Implement the logger interface of the modbus client +// Printf implements the logger interface of the modbus client func (m *Modbus) Printf(format string, v ...interface{}) { m.Log.Tracef(format, v...) } diff --git a/plugins/inputs/modbus/modbus_test.go b/plugins/inputs/modbus/modbus_test.go index 68a1a5e34df60..4b2d76269b4ca 100644 --- a/plugins/inputs/modbus/modbus_test.go +++ b/plugins/inputs/modbus/modbus_test.go @@ -491,7 +491,7 @@ func TestRegisterWorkaroundsOneRequestPerField(t *testing.T) { Controller: "tcp://localhost:1502", ConfigurationType: "register", Log: testutil.Logger{Quiet: true}, - Workarounds: ModbusWorkarounds{OnRequestPerField: true}, + Workarounds: workarounds{OnRequestPerField: true}, } plugin.SlaveID = 1 plugin.HoldingRegisters = []fieldDefinition{ @@ -541,7 +541,7 @@ func TestRequestsWorkaroundsReadCoilsStartingAtZeroRegister(t *testing.T) { Controller: "tcp://localhost:1502", ConfigurationType: "register", Log: testutil.Logger{Quiet: true}, - Workarounds: ModbusWorkarounds{ReadCoilsStartingAtZero: true}, + Workarounds: workarounds{ReadCoilsStartingAtZero: true}, } plugin.SlaveID = 1 plugin.Coils = []fieldDefinition{ @@ -688,8 +688,8 @@ func TestWorkaroundsStringRegisterLocation(t *testing.T) { Controller: "tcp://localhost:1502", ConfigurationType: "request", Log: testutil.Logger{Quiet: true}, - Workarounds: ModbusWorkarounds{StringRegisterLocation: tt.location}, - ConfigurationPerRequest: ConfigurationPerRequest{ + Workarounds: workarounds{StringRegisterLocation: tt.location}, + configurationPerRequest: configurationPerRequest{ Requests: []requestDefinition{ { SlaveID: 1, @@ -738,7 +738,7 @@ func TestWorkaroundsStringRegisterLocationInvalid(t *testing.T) { Controller: "tcp://localhost:1502", ConfigurationType: "request", Log: testutil.Logger{Quiet: true}, - Workarounds: ModbusWorkarounds{StringRegisterLocation: "foo"}, + Workarounds: workarounds{StringRegisterLocation: "foo"}, } require.ErrorContains(t, plugin.Init(), `invalid 'string_register_location'`) } diff --git a/plugins/inputs/modbus/request.go b/plugins/inputs/modbus/request.go index 5b98426b4f3a6..dca8b4d74ac31 100644 --- a/plugins/inputs/modbus/request.go +++ b/plugins/inputs/modbus/request.go @@ -138,7 +138,7 @@ func optimizeGroup(g request, maxBatchSize uint16) []request { return requests } -func optimitzeGroupWithinLimits(g request, params groupingParams) []request { +func optimizeGroupWithinLimits(g request, params groupingParams) []request { if len(g.fields) == 0 { return nil } @@ -153,14 +153,14 @@ func optimitzeGroupWithinLimits(g request, params groupingParams) []request { // Check if we need to interrupt the current chunk and require a new one holeSize := g.fields[i].address - (g.fields[i-1].address + g.fields[i-1].length) if g.fields[i].address < g.fields[i-1].address+g.fields[i-1].length { - params.Log.Warnf( + params.log.Warnf( "Request at %d with length %d overlaps with next request at %d", g.fields[i-1].address, g.fields[i-1].length, g.fields[i].address, ) holeSize = 0 } - needInterrupt := holeSize > params.MaxExtraRegisters // too far apart - needInterrupt = needInterrupt || currentRequest.length+holeSize+g.fields[i].length > params.MaxBatchSize // too large + needInterrupt := holeSize > params.maxExtraRegisters // too far apart + needInterrupt = needInterrupt || currentRequest.length+holeSize+g.fields[i].length > params.maxBatchSize // too large if !needInterrupt { // Still safe to add the field to the current request currentRequest.length = g.fields[i].address + g.fields[i].length - currentRequest.address @@ -181,18 +181,16 @@ func optimitzeGroupWithinLimits(g request, params groupingParams) []request { type groupingParams struct { // Maximum size of a request in registers - MaxBatchSize uint16 - // Optimization to use for grouping register groups to requests. - // Also put potential optimization parameters here - Optimization string - MaxExtraRegisters uint16 - // Will force reads to start at zero (if possible) while respecting - // the max-batch size. - EnforceFromZero bool - // Tags to add for the requests - Tags map[string]string - // Log facility to inform the user - Log telegraf.Logger + maxBatchSize uint16 + // optimization to use for grouping register groups to requests, Also put potential optimization parameters here + optimization string + maxExtraRegisters uint16 + // Will force reads to start at zero (if possible) while respecting the max-batch size. + enforceFromZero bool + // tags to add for the requests + tags map[string]string + // log facility to inform the user + log telegraf.Logger } func groupFieldsToRequests(fields []field, params groupingParams) []request { @@ -216,9 +214,9 @@ func groupFieldsToRequests(fields []field, params groupingParams) []request { for _, f := range fields { // Add tags from higher up if f.tags == nil { - f.tags = make(map[string]string, len(params.Tags)) + f.tags = make(map[string]string, len(params.tags)) } - for k, v := range params.Tags { + for k, v := range params.tags { f.tags[k] = v } @@ -253,18 +251,18 @@ func groupFieldsToRequests(fields []field, params groupingParams) []request { } // Enforce the first read to start at zero if the option is set - if params.EnforceFromZero { + if params.enforceFromZero { groups[0].length += groups[0].address groups[0].address = 0 } var requests []request - switch params.Optimization { + switch params.optimization { case "shrink": // Shrink request by striping leading and trailing fields with an omit flag set for _, g := range groups { if len(g.fields) > 0 { - requests = append(requests, shrinkGroup(g, params.MaxBatchSize)...) + requests = append(requests, shrinkGroup(g, params.maxBatchSize)...) } } case "rearrange": @@ -272,7 +270,7 @@ func groupFieldsToRequests(fields []field, params groupingParams) []request { // registers while keeping the number of requests for _, g := range groups { if len(g.fields) > 0 { - requests = append(requests, optimizeGroup(g, params.MaxBatchSize)...) + requests = append(requests, optimizeGroup(g, params.maxBatchSize)...) } } case "aggressive": @@ -284,7 +282,7 @@ func groupFieldsToRequests(fields []field, params groupingParams) []request { total.fields = append(total.fields, g.fields...) } } - requests = optimizeGroup(total, params.MaxBatchSize) + requests = optimizeGroup(total, params.maxBatchSize) case "max_insert": // Similar to aggressive but keeps the number of touched registers below a threshold var total request @@ -293,12 +291,12 @@ func groupFieldsToRequests(fields []field, params groupingParams) []request { total.fields = append(total.fields, g.fields...) } } - requests = optimitzeGroupWithinLimits(total, params) + requests = optimizeGroupWithinLimits(total, params) default: // no optimization for _, g := range groups { if len(g.fields) > 0 { - requests = append(requests, splitMaxBatchSize(g, params.MaxBatchSize)...) + requests = append(requests, splitMaxBatchSize(g, params.maxBatchSize)...) } } } diff --git a/plugins/inputs/mongodb/mongodb.go b/plugins/inputs/mongodb/mongodb.go index 65c323c18e3ee..621a0500cea37 100644 --- a/plugins/inputs/mongodb/mongodb.go +++ b/plugins/inputs/mongodb/mongodb.go @@ -26,26 +26,26 @@ import ( //go:embed sample.conf var sampleConfig string -var DisconnectedServersBehaviors = []string{"error", "skip"} +var disconnectedServersBehaviors = []string{"error", "skip"} type MongoDB struct { - Servers []string - Ssl Ssl - GatherClusterStatus bool - GatherPerdbStats bool - GatherColStats bool - GatherTopStat bool - DisconnectedServersBehavior string - ColStatsDbs []string + Servers []string `toml:"servers"` + GatherClusterStatus bool `toml:"gather_cluster_status"` + GatherPerdbStats bool `toml:"gather_perdb_stats"` + GatherColStats bool `toml:"gather_col_stats"` + GatherTopStat bool `toml:"gather_top_stat"` + DisconnectedServersBehavior string `toml:"disconnected_servers_behavior"` + ColStatsDbs []string `toml:"col_stats_dbs"` common_tls.ClientConfig + Ssl ssl Log telegraf.Logger `toml:"-"` - clients []*Server + clients []*server tlsConfig *tls.Config } -type Ssl struct { +type ssl struct { Enabled bool `toml:"ssl_enabled" deprecated:"1.3.0;1.35.0;use 'tls_*' options instead"` CaCerts []string `toml:"cacerts" deprecated:"1.3.0;1.35.0;use 'tls_ca' instead"` } @@ -59,7 +59,7 @@ func (m *MongoDB) Init() error { m.DisconnectedServersBehavior = "error" } - if err := choice.Check(m.DisconnectedServersBehavior, DisconnectedServersBehaviors); err != nil { + if err := choice.Check(m.DisconnectedServersBehavior, disconnectedServersBehaviors); err != nil { return fmt.Errorf("disconnected_servers_behavior: %w", err) } @@ -105,6 +105,41 @@ func (m *MongoDB) Start(telegraf.Accumulator) error { return nil } +func (m *MongoDB) Gather(acc telegraf.Accumulator) error { + var wg sync.WaitGroup + for _, client := range m.clients { + wg.Add(1) + go func(srv *server) { + defer wg.Done() + if m.DisconnectedServersBehavior == "skip" { + if err := srv.ping(); err != nil { + m.Log.Debugf("Failed to ping server: %s", err) + return + } + } + + err := srv.gatherData(acc, m.GatherClusterStatus, m.GatherPerdbStats, m.GatherColStats, m.GatherTopStat, m.ColStatsDbs) + if err != nil { + m.Log.Errorf("Failed to gather data: %s", err) + } + }(client) + } + + wg.Wait() + return nil +} + +// Stop disconnects mongo connections when stop or reload +func (m *MongoDB) Stop() { + for _, server := range m.clients { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + if err := server.client.Disconnect(ctx); err != nil { + m.Log.Errorf("Disconnecting from %q failed: %v", server.hostname, err) + } + cancel() + } +} + func (m *MongoDB) setupConnection(connURL string) error { if !strings.HasPrefix(connURL, "mongodb://") && !strings.HasPrefix(connURL, "mongodb+srv://") { // Preserve backwards compatibility for hostnames without a @@ -143,52 +178,15 @@ func (m *MongoDB) setupConnection(connURL string) error { m.Log.Errorf("Unable to ping MongoDB: %s", err) } - server := &Server{ + server := &server{ client: client, hostname: u.Host, - Log: m.Log, + log: m.Log, } m.clients = append(m.clients, server) return nil } -// Stop disconnect mongo connections when stop or reload -func (m *MongoDB) Stop() { - for _, server := range m.clients { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - if err := server.client.Disconnect(ctx); err != nil { - m.Log.Errorf("Disconnecting from %q failed: %v", server.hostname, err) - } - cancel() - } -} - -// Reads stats from all configured servers accumulates stats. -// Returns one of the errors encountered while gather stats (if any). -func (m *MongoDB) Gather(acc telegraf.Accumulator) error { - var wg sync.WaitGroup - for _, client := range m.clients { - wg.Add(1) - go func(srv *Server) { - defer wg.Done() - if m.DisconnectedServersBehavior == "skip" { - if err := srv.ping(); err != nil { - m.Log.Debugf("Failed to ping server: %s", err) - return - } - } - - err := srv.gatherData(acc, m.GatherClusterStatus, m.GatherPerdbStats, m.GatherColStats, m.GatherTopStat, m.ColStatsDbs) - if err != nil { - m.Log.Errorf("Failed to gather data: %s", err) - } - }(client) - } - - wg.Wait() - return nil -} - func init() { inputs.Add("mongodb", func() telegraf.Input { return &MongoDB{ diff --git a/plugins/inputs/mongodb/mongodb_data.go b/plugins/inputs/mongodb/mongodb_data.go index 1ab501306fa8b..b85806dd5045b 100644 --- a/plugins/inputs/mongodb/mongodb_data.go +++ b/plugins/inputs/mongodb/mongodb_data.go @@ -8,29 +8,29 @@ import ( "github.com/influxdata/telegraf" ) -type MongodbData struct { +type mongodbData struct { StatLine *statLine Fields map[string]interface{} Tags map[string]string - DbData []DbData - ColData []ColData - ShardHostData []DbData - TopStatsData []DbData + DbData []bbData + ColData []colData + ShardHostData []bbData + TopStatsData []bbData } -type DbData struct { +type bbData struct { Name string Fields map[string]interface{} } -type ColData struct { +type colData struct { Name string DbName string Fields map[string]interface{} } -func NewMongodbData(statLine *statLine, tags map[string]string) *MongodbData { - return &MongodbData{ +func newMongodbData(statLine *statLine, tags map[string]string) *mongodbData { + return &mongodbData{ StatLine: statLine, Tags: tags, Fields: make(map[string]interface{}), @@ -297,11 +297,11 @@ var topDataStats = map[string]string{ "commands_count": "CommandsCount", } -func (d *MongodbData) AddDbStats() { +func (d *mongodbData) addDbStats() { for i := range d.StatLine.DbStatsLines { dbstat := d.StatLine.DbStatsLines[i] dbStatLine := reflect.ValueOf(&dbstat).Elem() - newDbData := &DbData{ + newDbData := &bbData{ Name: dbstat.Name, Fields: make(map[string]interface{}), } @@ -314,11 +314,11 @@ func (d *MongodbData) AddDbStats() { } } -func (d *MongodbData) AddColStats() { +func (d *mongodbData) addColStats() { for i := range d.StatLine.ColStatsLines { colstat := d.StatLine.ColStatsLines[i] colStatLine := reflect.ValueOf(&colstat).Elem() - newColData := &ColData{ + newColData := &colData{ Name: colstat.Name, DbName: colstat.DbName, Fields: make(map[string]interface{}), @@ -332,11 +332,11 @@ func (d *MongodbData) AddColStats() { } } -func (d *MongodbData) AddShardHostStats() { +func (d *mongodbData) addShardHostStats() { for host := range d.StatLine.ShardHostStatsLines { hostStat := d.StatLine.ShardHostStatsLines[host] hostStatLine := reflect.ValueOf(&hostStat).Elem() - newDbData := &DbData{ + newDbData := &bbData{ Name: host, Fields: make(map[string]interface{}), } @@ -349,11 +349,11 @@ func (d *MongodbData) AddShardHostStats() { } } -func (d *MongodbData) AddTopStats() { +func (d *mongodbData) addTopStats() { for i := range d.StatLine.TopStatLines { topStat := d.StatLine.TopStatLines[i] topStatLine := reflect.ValueOf(&topStat).Elem() - newTopStatData := &DbData{ + newTopStatData := &bbData{ Name: topStat.CollectionName, Fields: make(map[string]interface{}), } @@ -366,7 +366,7 @@ func (d *MongodbData) AddTopStats() { } } -func (d *MongodbData) AddDefaultStats() { +func (d *mongodbData) addDefaultStats() { statLine := reflect.ValueOf(d.StatLine).Elem() d.addStat(statLine, defaultStats) if d.StatLine.NodeType != "" { @@ -414,18 +414,18 @@ func (d *MongodbData) AddDefaultStats() { } } -func (d *MongodbData) addStat(statLine reflect.Value, stats map[string]string) { +func (d *mongodbData) addStat(statLine reflect.Value, stats map[string]string) { for key, value := range stats { val := statLine.FieldByName(value).Interface() d.add(key, val) } } -func (d *MongodbData) add(key string, val interface{}) { +func (d *mongodbData) add(key string, val interface{}) { d.Fields[key] = val } -func (d *MongodbData) flush(acc telegraf.Accumulator) { +func (d *mongodbData) flush(acc telegraf.Accumulator) { acc.AddFields( "mongodb", d.Fields, diff --git a/plugins/inputs/mongodb/mongodb_data_test.go b/plugins/inputs/mongodb/mongodb_data_test.go index 3d660dd99f74b..a83001c416aaf 100644 --- a/plugins/inputs/mongodb/mongodb_data_test.go +++ b/plugins/inputs/mongodb/mongodb_data_test.go @@ -13,7 +13,7 @@ import ( var tags = make(map[string]string) func TestAddNonReplStats(t *testing.T) { - d := NewMongodbData( + d := newMongodbData( &statLine{ StorageEngine: "", Time: time.Now(), @@ -62,7 +62,7 @@ func TestAddNonReplStats(t *testing.T) { ) var acc testutil.Accumulator - d.AddDefaultStats() + d.addDefaultStats() d.flush(&acc) for key := range defaultStats { @@ -71,7 +71,7 @@ func TestAddNonReplStats(t *testing.T) { } func TestAddReplStats(t *testing.T) { - d := NewMongodbData( + d := newMongodbData( &statLine{ StorageEngine: "mmapv1", Mapped: 0, @@ -83,7 +83,7 @@ func TestAddReplStats(t *testing.T) { var acc testutil.Accumulator - d.AddDefaultStats() + d.addDefaultStats() d.flush(&acc) for key := range mmapStats { @@ -92,7 +92,7 @@ func TestAddReplStats(t *testing.T) { } func TestAddWiredTigerStats(t *testing.T) { - d := NewMongodbData( + d := newMongodbData( &statLine{ StorageEngine: "wiredTiger", CacheDirtyPercent: 0, @@ -124,7 +124,7 @@ func TestAddWiredTigerStats(t *testing.T) { var acc testutil.Accumulator - d.AddDefaultStats() + d.addDefaultStats() d.flush(&acc) for key := range wiredTigerStats { @@ -139,7 +139,7 @@ func TestAddWiredTigerStats(t *testing.T) { } func TestAddShardStats(t *testing.T) { - d := NewMongodbData( + d := newMongodbData( &statLine{ TotalInUse: 0, TotalAvailable: 0, @@ -151,7 +151,7 @@ func TestAddShardStats(t *testing.T) { var acc testutil.Accumulator - d.AddDefaultStats() + d.addDefaultStats() d.flush(&acc) for key := range defaultShardStats { @@ -160,7 +160,7 @@ func TestAddShardStats(t *testing.T) { } func TestAddLatencyStats(t *testing.T) { - d := NewMongodbData( + d := newMongodbData( &statLine{ CommandOpsCnt: 73, CommandLatency: 364, @@ -174,7 +174,7 @@ func TestAddLatencyStats(t *testing.T) { var acc testutil.Accumulator - d.AddDefaultStats() + d.addDefaultStats() d.flush(&acc) for key := range defaultLatencyStats { @@ -183,7 +183,7 @@ func TestAddLatencyStats(t *testing.T) { } func TestAddAssertsStats(t *testing.T) { - d := NewMongodbData( + d := newMongodbData( &statLine{ Regular: 3, Warning: 9, @@ -196,7 +196,7 @@ func TestAddAssertsStats(t *testing.T) { var acc testutil.Accumulator - d.AddDefaultStats() + d.addDefaultStats() d.flush(&acc) for key := range defaultAssertsStats { @@ -205,7 +205,7 @@ func TestAddAssertsStats(t *testing.T) { } func TestAddCommandsStats(t *testing.T) { - d := NewMongodbData( + d := newMongodbData( &statLine{ AggregateCommandTotal: 12, AggregateCommandFailed: 2, @@ -231,7 +231,7 @@ func TestAddCommandsStats(t *testing.T) { var acc testutil.Accumulator - d.AddDefaultStats() + d.addDefaultStats() d.flush(&acc) for key := range defaultCommandsStats { @@ -240,7 +240,7 @@ func TestAddCommandsStats(t *testing.T) { } func TestAddTCMallocStats(t *testing.T) { - d := NewMongodbData( + d := newMongodbData( &statLine{ TCMallocCurrentAllocatedBytes: 5877253096, TCMallocHeapSize: 8067108864, @@ -267,7 +267,7 @@ func TestAddTCMallocStats(t *testing.T) { var acc testutil.Accumulator - d.AddDefaultStats() + d.addDefaultStats() d.flush(&acc) for key := range defaultTCMallocStats { @@ -276,7 +276,7 @@ func TestAddTCMallocStats(t *testing.T) { } func TestAddStorageStats(t *testing.T) { - d := NewMongodbData( + d := newMongodbData( &statLine{ StorageFreelistSearchBucketExhausted: 0, StorageFreelistSearchRequests: 0, @@ -287,7 +287,7 @@ func TestAddStorageStats(t *testing.T) { var acc testutil.Accumulator - d.AddDefaultStats() + d.addDefaultStats() d.flush(&acc) for key := range defaultStorageStats { @@ -307,7 +307,7 @@ func TestAddShardHostStats(t *testing.T) { } } - d := NewMongodbData( + d := newMongodbData( &statLine{ ShardHostStatsLines: hostStatLines, }, @@ -315,7 +315,7 @@ func TestAddShardHostStats(t *testing.T) { ) var acc testutil.Accumulator - d.AddShardHostStats() + d.addShardHostStats() d.flush(&acc) hostsFound := make([]string, 0, len(hostStatLines)) @@ -333,7 +333,7 @@ func TestAddShardHostStats(t *testing.T) { } func TestStateTag(t *testing.T) { - d := NewMongodbData( + d := newMongodbData( &statLine{ StorageEngine: "", Time: time.Now(), @@ -353,7 +353,7 @@ func TestStateTag(t *testing.T) { var acc testutil.Accumulator - d.AddDefaultStats() + d.addDefaultStats() d.flush(&acc) fields := map[string]interface{}{ "active_reads": int64(0), @@ -524,7 +524,7 @@ func TestAddTopStats(t *testing.T) { topStatLines = append(topStatLines, topStatLine) } - d := NewMongodbData( + d := newMongodbData( &statLine{ TopStatLines: topStatLines, }, @@ -532,7 +532,7 @@ func TestAddTopStats(t *testing.T) { ) var acc testutil.Accumulator - d.AddTopStats() + d.addTopStats() d.flush(&acc) for range topStatLines { diff --git a/plugins/inputs/mongodb/mongodb_server.go b/plugins/inputs/mongodb/mongodb_server.go index 641a0714b486c..b0ea0bb35ad29 100644 --- a/plugins/inputs/mongodb/mongodb_server.go +++ b/plugins/inputs/mongodb/mongodb_server.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "slices" "strconv" "strings" "time" @@ -16,44 +17,44 @@ import ( "github.com/influxdata/telegraf" ) -type Server struct { +type server struct { client *mongo.Client hostname string lastResult *mongoStatus - Log telegraf.Logger -} - -func (s *Server) getDefaultTags() map[string]string { - tags := make(map[string]string) - tags["hostname"] = s.hostname - return tags + log telegraf.Logger } type oplogEntry struct { Timestamp primitive.Timestamp `bson:"ts"` } -func IsAuthorization(err error) bool { +func isAuthorization(err error) bool { return strings.Contains(err.Error(), "not authorized") } -func (s *Server) ping() error { +func (s *server) getDefaultTags() map[string]string { + tags := make(map[string]string) + tags["hostname"] = s.hostname + return tags +} + +func (s *server) ping() error { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() return s.client.Ping(ctx, nil) } -func (s *Server) authLog(err error) { - if IsAuthorization(err) { - s.Log.Debug(err.Error()) +func (s *server) authLog(err error) { + if isAuthorization(err) { + s.log.Debug(err.Error()) } else { - s.Log.Error(err.Error()) + s.log.Error(err.Error()) } } -func (s *Server) runCommand(database string, cmd, result interface{}) error { +func (s *server) runCommand(database string, cmd, result interface{}) error { r := s.client.Database(database).RunCommand(context.Background(), cmd) if r.Err() != nil { return r.Err() @@ -61,7 +62,7 @@ func (s *Server) runCommand(database string, cmd, result interface{}) error { return r.Decode(result) } -func (s *Server) gatherServerStatus() (*serverStatus, error) { +func (s *server) gatherServerStatus() (*serverStatus, error) { serverStatus := &serverStatus{} err := s.runCommand("admin", bson.D{ { @@ -79,7 +80,7 @@ func (s *Server) gatherServerStatus() (*serverStatus, error) { return serverStatus, nil } -func (s *Server) gatherReplSetStatus() (*replSetStatus, error) { +func (s *server) gatherReplSetStatus() (*replSetStatus, error) { replSetStatus := &replSetStatus{} err := s.runCommand("admin", bson.D{ { @@ -93,7 +94,7 @@ func (s *Server) gatherReplSetStatus() (*replSetStatus, error) { return replSetStatus, nil } -func (s *Server) gatherTopStatData() (*topStats, error) { +func (s *server) gatherTopStatData() (*topStats, error) { var dest map[string]interface{} err := s.runCommand("admin", bson.D{ { @@ -124,7 +125,7 @@ func (s *Server) gatherTopStatData() (*topStats, error) { return &topStats{Totals: topInfo}, nil } -func (s *Server) gatherClusterStatus() (*clusterStatus, error) { +func (s *server) gatherClusterStatus() (*clusterStatus, error) { chunkCount, err := s.client.Database("config").Collection("chunks").CountDocuments(context.Background(), bson.M{"jumbo": true}) if err != nil { return nil, err @@ -148,7 +149,7 @@ func poolStatsCommand(version string) (string, error) { return "shardConnPoolStats", nil } -func (s *Server) gatherShardConnPoolStats(version string) (*shardStats, error) { +func (s *server) gatherShardConnPoolStats(version string) (*shardStats, error) { command, err := poolStatsCommand(version) if err != nil { return nil, err @@ -167,7 +168,7 @@ func (s *Server) gatherShardConnPoolStats(version string) (*shardStats, error) { return shardStats, nil } -func (s *Server) gatherDBStats(name string) (*db, error) { +func (s *server) gatherDBStats(name string) (*db, error) { stats := &dbStatsData{} err := s.runCommand(name, bson.D{ { @@ -185,7 +186,7 @@ func (s *Server) gatherDBStats(name string) (*db, error) { }, nil } -func (s *Server) getOplogReplLag(collection string) (*oplogStats, error) { +func (s *server) getOplogReplLag(collection string) (*oplogStats, error) { query := bson.M{"ts": bson.M{"$exists": true}} var first oplogEntry @@ -219,7 +220,7 @@ func (s *Server) getOplogReplLag(collection string) (*oplogStats, error) { // The "oplog.$main" collection is created on the master node of a // master-slave replicated deployment. As of MongoDB 3.2, master-slave // replication has been deprecated. -func (s *Server) gatherOplogStats() (*oplogStats, error) { +func (s *server) gatherOplogStats() (*oplogStats, error) { stats, err := s.getOplogReplLag("oplog.rs") if err == nil { return stats, nil @@ -228,7 +229,7 @@ func (s *Server) gatherOplogStats() (*oplogStats, error) { return s.getOplogReplLag("oplog.$main") } -func (s *Server) gatherCollectionStats(colStatsDbs []string) (*colStats, error) { +func (s *server) gatherCollectionStats(colStatsDbs []string) (*colStats, error) { names, err := s.client.ListDatabaseNames(context.Background(), bson.D{}) if err != nil { return nil, err @@ -236,14 +237,14 @@ func (s *Server) gatherCollectionStats(colStatsDbs []string) (*colStats, error) results := &colStats{} for _, dbName := range names { - if stringInSlice(dbName, colStatsDbs) || len(colStatsDbs) == 0 { + if slices.Contains(colStatsDbs, dbName) || len(colStatsDbs) == 0 { // skip views as they fail on collStats below filter := bson.M{"type": bson.M{"$in": bson.A{"collection", "timeseries"}}} var colls []string colls, err = s.client.Database(dbName).ListCollectionNames(context.Background(), filter) if err != nil { - s.Log.Errorf("Error getting collection names: %s", err.Error()) + s.log.Errorf("Error getting collection names: %s", err.Error()) continue } for _, colName := range colls { @@ -270,7 +271,7 @@ func (s *Server) gatherCollectionStats(colStatsDbs []string) (*colStats, error) return results, nil } -func (s *Server) gatherData(acc telegraf.Accumulator, gatherClusterStatus, gatherDbStats, gatherColStats, gatherTopStat bool, colStatsDbs []string) error { +func (s *server) gatherData(acc telegraf.Accumulator, gatherClusterStatus, gatherDbStats, gatherColStats, gatherTopStat bool, colStatsDbs []string) error { serverStatus, err := s.gatherServerStatus() if err != nil { return err @@ -280,7 +281,7 @@ func (s *Server) gatherData(acc telegraf.Accumulator, gatherClusterStatus, gathe // member of a replica set. replSetStatus, err := s.gatherReplSetStatus() if err != nil { - s.Log.Debugf("Unable to gather replica set status: %s", err.Error()) + s.log.Debugf("Unable to gather replica set status: %s", err.Error()) } // Gather the oplog if we are a member of a replica set. Non-replica set @@ -297,7 +298,7 @@ func (s *Server) gatherData(acc telegraf.Accumulator, gatherClusterStatus, gathe if gatherClusterStatus { status, err := s.gatherClusterStatus() if err != nil { - s.Log.Debugf("Unable to gather cluster status: %s", err.Error()) + s.log.Debugf("Unable to gather cluster status: %s", err.Error()) } clusterStatus = status } @@ -326,7 +327,7 @@ func (s *Server) gatherData(acc telegraf.Accumulator, gatherClusterStatus, gathe for _, name := range names { db, err := s.gatherDBStats(name) if err != nil { - s.Log.Debugf("Error getting db stats from %q: %s", name, err.Error()) + s.log.Debugf("Error getting db stats from %q: %s", name, err.Error()) } dbStats.Dbs = append(dbStats.Dbs, *db) } @@ -336,7 +337,7 @@ func (s *Server) gatherData(acc telegraf.Accumulator, gatherClusterStatus, gathe if gatherTopStat { topStats, err := s.gatherTopStatData() if err != nil { - s.Log.Debugf("Unable to gather top stat data: %s", err.Error()) + s.log.Debugf("Unable to gather top stat data: %s", err.Error()) return err } topStatData = topStats @@ -360,27 +361,18 @@ func (s *Server) gatherData(acc telegraf.Accumulator, gatherClusterStatus, gathe if durationInSeconds == 0 { durationInSeconds = 1 } - data := NewMongodbData( - NewStatLine(*s.lastResult, *result, s.hostname, true, durationInSeconds), + data := newMongodbData( + newStatLine(*s.lastResult, *result, s.hostname, durationInSeconds), s.getDefaultTags(), ) - data.AddDefaultStats() - data.AddDbStats() - data.AddColStats() - data.AddShardHostStats() - data.AddTopStats() + data.addDefaultStats() + data.addDbStats() + data.addColStats() + data.addShardHostStats() + data.addTopStats() data.flush(acc) } s.lastResult = result return nil } - -func stringInSlice(a string, list []string) bool { - for _, b := range list { - if b == a { - return true - } - } - return false -} diff --git a/plugins/inputs/mongodb/mongodb_server_test.go b/plugins/inputs/mongodb/mongodb_server_test.go index 05887f56beb95..2c131830352f8 100644 --- a/plugins/inputs/mongodb/mongodb_server_test.go +++ b/plugins/inputs/mongodb/mongodb_server_test.go @@ -11,15 +11,15 @@ import ( "github.com/influxdata/telegraf/testutil" ) -var ServicePort = "27017" +var servicePort = "27017" var unreachableMongoEndpoint = "mongodb://user:pass@127.0.0.1:27017/nop" func createTestServer(t *testing.T) *testutil.Container { container := testutil.Container{ Image: "mongo", - ExposedPorts: []string{ServicePort}, + ExposedPorts: []string{servicePort}, WaitingFor: wait.ForAll( - wait.NewHTTPStrategy("/").WithPort(nat.Port(ServicePort)), + wait.NewHTTPStrategy("/").WithPort(nat.Port(servicePort)), wait.ForLog("Waiting for connections"), ), } @@ -40,7 +40,7 @@ func TestGetDefaultTagsIntegration(t *testing.T) { m := &MongoDB{ Log: testutil.Logger{}, Servers: []string{ - fmt.Sprintf("mongodb://%s:%s", container.Address, container.Ports[ServicePort]), + fmt.Sprintf("mongodb://%s:%s", container.Address, container.Ports[servicePort]), }, } err := m.Init() @@ -76,7 +76,7 @@ func TestAddDefaultStatsIntegration(t *testing.T) { m := &MongoDB{ Log: testutil.Logger{}, Servers: []string{ - fmt.Sprintf("mongodb://%s:%s", container.Address, container.Ports[ServicePort]), + fmt.Sprintf("mongodb://%s:%s", container.Address, container.Ports[servicePort]), }, } err := m.Init() diff --git a/plugins/inputs/mongodb/mongostat.go b/plugins/inputs/mongodb/mongostat.go index f3bd526690680..980f717bc5ef4 100644 --- a/plugins/inputs/mongodb/mongostat.go +++ b/plugins/inputs/mongodb/mongostat.go @@ -12,18 +12,7 @@ import ( ) const ( - MongosProcess = "mongos" -) - -// Flags to determine cases when to activate/deactivate columns for output. -const ( - Always = 1 << iota // always activate the column - Discover // only active when mongostat is in discover mode - Repl // only active if one of the nodes being monitored is in a replset - Locks // only active if node is capable of calculating lock info - AllOnly // only active if mongostat was run with --all option - MMAPOnly // only active if node has mmap-specific fields - WTOnly // only active if node has wiredtiger-specific fields + mongosProcess = "mongos" ) type mongoStatus struct { @@ -557,48 +546,6 @@ type storageStats struct { FreelistSearchScanned int64 `bson:"freelist.search.scanned"` } -// statHeader describes a single column for mongostat's terminal output, its formatting, and in which modes it should be displayed. -type statHeader struct { - // The text to appear in the column's header cell - HeaderText string - - // Bitmask containing flags to determine if this header is active or not - ActivateFlags int -} - -// StatHeaders are the complete set of data metrics supported by mongostat. -var StatHeaders = []statHeader{ - {"", Always}, // placeholder for hostname column (blank header text) - {"insert", Always}, - {"query", Always}, - {"update", Always}, - {"delete", Always}, - {"getmore", Always}, - {"command", Always}, - {"% dirty", WTOnly}, - {"% used", WTOnly}, - {"flushes", Always}, - {"mapped", MMAPOnly}, - {"vsize", Always}, - {"res", Always}, - {"non-mapped", MMAPOnly | AllOnly}, - {"faults", MMAPOnly}, - {"lr|lw %", MMAPOnly | AllOnly}, - {"lrt|lwt", MMAPOnly | AllOnly}, - {" locked db", Locks}, - {"qr|qw", Always}, - {"ar|aw", Always}, - {"netIn", Always}, - {"netOut", Always}, - {"conn", Always}, - {"set", Repl}, - {"repl", Repl}, - {"time", Always}, -} - -// NamespacedLocks stores information on the lockStatus of namespaces. -type NamespacedLocks map[string]lockStatus - // lockUsage stores information related to a namespace's lock usage. type lockUsage struct { Namespace string @@ -931,8 +878,8 @@ func diff(newVal, oldVal, sampleTime int64) (avg, newValue int64) { return d / sampleTime, newVal } -// NewStatLine constructs a statLine object from two mongoStatus objects. -func NewStatLine(oldMongo, newMongo mongoStatus, key string, all bool, sampleSecs int64) *statLine { +// newStatLine constructs a statLine object from two mongoStatus objects. +func newStatLine(oldMongo, newMongo mongoStatus, key string, sampleSecs int64) *statLine { oldStat := *oldMongo.ServerStatus newStat := *newMongo.ServerStatus @@ -1179,7 +1126,7 @@ func NewStatLine(oldMongo, newMongo mongoStatus, key string, all bool, sampleSec returnVal.Time = newMongo.SampleTime returnVal.IsMongos = - newStat.ShardCursorType != nil || strings.HasPrefix(newStat.Process, MongosProcess) + newStat.ShardCursorType != nil || strings.HasPrefix(newStat.Process, mongosProcess) // BEGIN code modification if oldStat.Mem.Supported.(bool) { @@ -1190,7 +1137,7 @@ func NewStatLine(oldMongo, newMongo mongoStatus, key string, all bool, sampleSec returnVal.Virtual = newStat.Mem.Virtual returnVal.Resident = newStat.Mem.Resident - if !returnVal.IsMongos && all { + if !returnVal.IsMongos { returnVal.NonMapped = newStat.Mem.Virtual - newStat.Mem.Mapped } } diff --git a/plugins/inputs/mongodb/mongostat_test.go b/plugins/inputs/mongodb/mongostat_test.go index 7d59d96d6b2aa..57cd0c00ad26d 100644 --- a/plugins/inputs/mongodb/mongostat_test.go +++ b/plugins/inputs/mongodb/mongostat_test.go @@ -7,7 +7,7 @@ import ( ) func TestLatencyStats(t *testing.T) { - sl := NewStatLine( + sl := newStatLine( mongoStatus{ ServerStatus: &serverStatus{ Connections: &connectionStats{}, @@ -49,7 +49,6 @@ func TestLatencyStats(t *testing.T) { }, }, "foo", - true, 60, ) @@ -62,7 +61,7 @@ func TestLatencyStats(t *testing.T) { } func TestLatencyStatsDiffZero(t *testing.T) { - sl := NewStatLine( + sl := newStatLine( mongoStatus{ ServerStatus: &serverStatus{ Connections: &connectionStats{}, @@ -118,7 +117,6 @@ func TestLatencyStatsDiffZero(t *testing.T) { }, }, "foo", - true, 60, ) @@ -131,7 +129,7 @@ func TestLatencyStatsDiffZero(t *testing.T) { } func TestLatencyStatsDiff(t *testing.T) { - sl := NewStatLine( + sl := newStatLine( mongoStatus{ ServerStatus: &serverStatus{ Connections: &connectionStats{}, @@ -187,7 +185,6 @@ func TestLatencyStatsDiff(t *testing.T) { }, }, "foo", - true, 60, ) @@ -200,7 +197,7 @@ func TestLatencyStatsDiff(t *testing.T) { } func TestLocksStatsNilWhenLocksMissingInOldStat(t *testing.T) { - sl := NewStatLine( + sl := newStatLine( mongoStatus{ ServerStatus: &serverStatus{ Connections: &connectionStats{}, @@ -223,7 +220,6 @@ func TestLocksStatsNilWhenLocksMissingInOldStat(t *testing.T) { }, }, "foo", - true, 60, ) @@ -231,7 +227,7 @@ func TestLocksStatsNilWhenLocksMissingInOldStat(t *testing.T) { } func TestLocksStatsNilWhenGlobalLockStatsMissingInOldStat(t *testing.T) { - sl := NewStatLine( + sl := newStatLine( mongoStatus{ ServerStatus: &serverStatus{ Connections: &connectionStats{}, @@ -255,7 +251,6 @@ func TestLocksStatsNilWhenGlobalLockStatsMissingInOldStat(t *testing.T) { }, }, "foo", - true, 60, ) @@ -263,7 +258,7 @@ func TestLocksStatsNilWhenGlobalLockStatsMissingInOldStat(t *testing.T) { } func TestLocksStatsNilWhenGlobalLockStatsEmptyInOldStat(t *testing.T) { - sl := NewStatLine( + sl := newStatLine( mongoStatus{ ServerStatus: &serverStatus{ Connections: &connectionStats{}, @@ -289,7 +284,6 @@ func TestLocksStatsNilWhenGlobalLockStatsEmptyInOldStat(t *testing.T) { }, }, "foo", - true, 60, ) @@ -297,7 +291,7 @@ func TestLocksStatsNilWhenGlobalLockStatsEmptyInOldStat(t *testing.T) { } func TestLocksStatsNilWhenCollectionLockStatsMissingInOldStat(t *testing.T) { - sl := NewStatLine( + sl := newStatLine( mongoStatus{ ServerStatus: &serverStatus{ Connections: &connectionStats{}, @@ -325,7 +319,6 @@ func TestLocksStatsNilWhenCollectionLockStatsMissingInOldStat(t *testing.T) { }, }, "foo", - true, 60, ) @@ -333,7 +326,7 @@ func TestLocksStatsNilWhenCollectionLockStatsMissingInOldStat(t *testing.T) { } func TestLocksStatsNilWhenCollectionLockStatsEmptyInOldStat(t *testing.T) { - sl := NewStatLine( + sl := newStatLine( mongoStatus{ ServerStatus: &serverStatus{ Connections: &connectionStats{}, @@ -362,7 +355,6 @@ func TestLocksStatsNilWhenCollectionLockStatsEmptyInOldStat(t *testing.T) { }, }, "foo", - true, 60, ) @@ -370,7 +362,7 @@ func TestLocksStatsNilWhenCollectionLockStatsEmptyInOldStat(t *testing.T) { } func TestLocksStatsNilWhenLocksMissingInNewStat(t *testing.T) { - sl := NewStatLine( + sl := newStatLine( mongoStatus{ ServerStatus: &serverStatus{ Connections: &connectionStats{}, @@ -393,7 +385,6 @@ func TestLocksStatsNilWhenLocksMissingInNewStat(t *testing.T) { }, }, "foo", - true, 60, ) @@ -401,7 +392,7 @@ func TestLocksStatsNilWhenLocksMissingInNewStat(t *testing.T) { } func TestLocksStatsNilWhenGlobalLockStatsMissingInNewStat(t *testing.T) { - sl := NewStatLine( + sl := newStatLine( mongoStatus{ ServerStatus: &serverStatus{ Connections: &connectionStats{}, @@ -425,7 +416,6 @@ func TestLocksStatsNilWhenGlobalLockStatsMissingInNewStat(t *testing.T) { }, }, "foo", - true, 60, ) @@ -433,7 +423,7 @@ func TestLocksStatsNilWhenGlobalLockStatsMissingInNewStat(t *testing.T) { } func TestLocksStatsNilWhenGlobalLockStatsEmptyInNewStat(t *testing.T) { - sl := NewStatLine( + sl := newStatLine( mongoStatus{ ServerStatus: &serverStatus{ Connections: &connectionStats{}, @@ -459,7 +449,6 @@ func TestLocksStatsNilWhenGlobalLockStatsEmptyInNewStat(t *testing.T) { }, }, "foo", - true, 60, ) @@ -467,7 +456,7 @@ func TestLocksStatsNilWhenGlobalLockStatsEmptyInNewStat(t *testing.T) { } func TestLocksStatsNilWhenCollectionLockStatsMissingInNewStat(t *testing.T) { - sl := NewStatLine( + sl := newStatLine( mongoStatus{ ServerStatus: &serverStatus{ Connections: &connectionStats{}, @@ -495,7 +484,6 @@ func TestLocksStatsNilWhenCollectionLockStatsMissingInNewStat(t *testing.T) { }, }, "foo", - true, 60, ) @@ -503,7 +491,7 @@ func TestLocksStatsNilWhenCollectionLockStatsMissingInNewStat(t *testing.T) { } func TestLocksStatsNilWhenCollectionLockStatsEmptyInNewStat(t *testing.T) { - sl := NewStatLine( + sl := newStatLine( mongoStatus{ ServerStatus: &serverStatus{ Connections: &connectionStats{}, @@ -532,7 +520,6 @@ func TestLocksStatsNilWhenCollectionLockStatsEmptyInNewStat(t *testing.T) { }, }, "foo", - true, 60, ) @@ -540,7 +527,7 @@ func TestLocksStatsNilWhenCollectionLockStatsEmptyInNewStat(t *testing.T) { } func TestLocksStatsPopulated(t *testing.T) { - sl := NewStatLine( + sl := newStatLine( mongoStatus{ ServerStatus: &serverStatus{ Connections: &connectionStats{}, @@ -596,7 +583,6 @@ func TestLocksStatsPopulated(t *testing.T) { }, }, "foo", - true, 60, ) diff --git a/plugins/inputs/monit/monit.go b/plugins/inputs/monit/monit.go index 8ed865ea2d754..f2af8f15367fb 100644 --- a/plugins/inputs/monit/monit.go +++ b/plugins/inputs/monit/monit.go @@ -19,6 +19,8 @@ import ( //go:embed sample.conf var sampleConfig string +var pendingActions = []string{"ignore", "alert", "restart", "stop", "exec", "unmonitor", "start", "monitor"} + const ( fileSystem = "0" directory = "1" @@ -31,7 +33,14 @@ const ( network = "8" ) -var pendingActions = []string{"ignore", "alert", "restart", "stop", "exec", "unmonitor", "start", "monitor"} +type Monit struct { + Address string `toml:"address"` + Username string `toml:"username"` + Password string `toml:"password"` + Timeout config.Duration `toml:"timeout"` + client http.Client + tls.ClientConfig +} type status struct { Server server `xml:"server"` @@ -179,15 +188,6 @@ type system struct { } `xml:"swap"` } -type Monit struct { - Address string `toml:"address"` - Username string `toml:"username"` - Password string `toml:"password"` - client http.Client - tls.ClientConfig - Timeout config.Duration `toml:"timeout"` -} - func (*Monit) SampleConfig() string { return sampleConfig } diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer.go b/plugins/inputs/mqtt_consumer/mqtt_consumer.go index f1163be42b268..bbf826d81ab39 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer.go @@ -24,32 +24,18 @@ import ( //go:embed sample.conf var sampleConfig string -var once sync.Once - var ( + once sync.Once // 30 Seconds is the default used by paho.mqtt.golang defaultConnectionTimeout = config.Duration(30 * time.Second) defaultMaxUndeliveredMessages = 1000 ) -type empty struct{} -type semaphore chan empty - -type Client interface { - Connect() mqtt.Token - SubscribeMultiple(filters map[string]byte, callback mqtt.MessageHandler) mqtt.Token - AddRoute(topic string, callback mqtt.MessageHandler) - Disconnect(quiesce uint) - IsConnected() bool -} - -type ClientFactory func(o *mqtt.ClientOptions) Client - type MQTTConsumer struct { Servers []string `toml:"servers"` Topics []string `toml:"topics"` TopicTag *string `toml:"topic_tag"` - TopicParserConfig []TopicParsingConfig `toml:"topic_parsing"` + TopicParserConfig []topicParsingConfig `toml:"topic_parsing"` Username config.Secret `toml:"username"` Password config.Secret `toml:"password"` QoS int `toml:"qos"` @@ -64,15 +50,15 @@ type MQTTConsumer struct { tls.ClientConfig parser telegraf.Parser - clientFactory ClientFactory - client Client + clientFactory clientFactory + client client opts *mqtt.ClientOptions acc telegraf.TrackingAccumulator sem semaphore messages map[telegraf.TrackingID]mqtt.Message messagesMutex sync.Mutex topicTagParse string - topicParsers []*TopicParser + topicParsers []*topicParser ctx context.Context cancel context.CancelFunc payloadSize selfstat.Stat @@ -80,13 +66,22 @@ type MQTTConsumer struct { wg sync.WaitGroup } +type client interface { + Connect() mqtt.Token + SubscribeMultiple(filters map[string]byte, callback mqtt.MessageHandler) mqtt.Token + AddRoute(topic string, callback mqtt.MessageHandler) + Disconnect(quiesce uint) + IsConnected() bool +} + +type empty struct{} +type semaphore chan empty +type clientFactory func(o *mqtt.ClientOptions) client + func (*MQTTConsumer) SampleConfig() string { return sampleConfig } -func (m *MQTTConsumer) SetParser(parser telegraf.Parser) { - m.parser = parser -} func (m *MQTTConsumer) Init() error { if m.ClientTrace { log := &mqttLogger{m.Log} @@ -116,9 +111,9 @@ func (m *MQTTConsumer) Init() error { m.opts = opts m.messages = make(map[telegraf.TrackingID]mqtt.Message) - m.topicParsers = make([]*TopicParser, 0, len(m.TopicParserConfig)) + m.topicParsers = make([]*topicParser, 0, len(m.TopicParserConfig)) for _, cfg := range m.TopicParserConfig { - p, err := cfg.NewParser() + p, err := cfg.newParser() if err != nil { return fmt.Errorf("config error topic parsing: %w", err) } @@ -129,6 +124,11 @@ func (m *MQTTConsumer) Init() error { m.messagesRecv = selfstat.Register("mqtt_consumer", "messages_received", make(map[string]string)) return nil } + +func (m *MQTTConsumer) SetParser(parser telegraf.Parser) { + m.parser = parser +} + func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error { m.acc = acc.WithTracking(m.MaxUndeliveredMessages) m.sem = make(semaphore, m.MaxUndeliveredMessages) @@ -149,6 +149,26 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error { return m.connect() } + +func (m *MQTTConsumer) Gather(_ telegraf.Accumulator) error { + if !m.client.IsConnected() { + m.Log.Debugf("Connecting %v", m.Servers) + return m.connect() + } + return nil +} + +func (m *MQTTConsumer) Stop() { + if m.client.IsConnected() { + m.Log.Debugf("Disconnecting %v", m.Servers) + m.client.Disconnect(200) + m.Log.Debugf("Disconnected %v", m.Servers) + } + if m.cancel != nil { + m.cancel() + } +} + func (m *MQTTConsumer) connect() error { m.client = m.clientFactory(m.opts) // AddRoute sets up the function for handling messages. These need to be @@ -196,6 +216,7 @@ func (m *MQTTConsumer) connect() error { } return nil } + func (m *MQTTConsumer) onConnectionLost(_ mqtt.Client, err error) { // Should already be disconnected, but make doubly sure m.client.Disconnect(5) @@ -250,7 +271,7 @@ func (m *MQTTConsumer) onMessage(_ mqtt.Client, msg mqtt.Message) { metric.AddTag(m.topicTagParse, msg.Topic()) } for _, p := range m.topicParsers { - if err := p.Parse(metric, msg.Topic()); err != nil { + if err := p.parse(metric, msg.Topic()); err != nil { if m.PersistentSession { msg.Ack() } @@ -265,23 +286,7 @@ func (m *MQTTConsumer) onMessage(_ mqtt.Client, msg mqtt.Message) { m.messages[id] = msg m.messagesMutex.Unlock() } -func (m *MQTTConsumer) Stop() { - if m.client.IsConnected() { - m.Log.Debugf("Disconnecting %v", m.Servers) - m.client.Disconnect(200) - m.Log.Debugf("Disconnected %v", m.Servers) - } - if m.cancel != nil { - m.cancel() - } -} -func (m *MQTTConsumer) Gather(_ telegraf.Accumulator) error { - if !m.client.IsConnected() { - m.Log.Debugf("Connecting %v", m.Servers) - return m.connect() - } - return nil -} + func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) { opts := mqtt.NewClientOptions() opts.ConnectTimeout = time.Duration(m.ConnectionTimeout) @@ -342,7 +347,7 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) { return opts, nil } -func New(factory ClientFactory) *MQTTConsumer { +func newMQTTConsumer(factory clientFactory) *MQTTConsumer { return &MQTTConsumer{ Servers: []string{"tcp://127.0.0.1:1883"}, MaxUndeliveredMessages: defaultMaxUndeliveredMessages, @@ -354,7 +359,7 @@ func New(factory ClientFactory) *MQTTConsumer { } func init() { inputs.Add("mqtt_consumer", func() telegraf.Input { - return New(func(o *mqtt.ClientOptions) Client { + return newMQTTConsumer(func(o *mqtt.ClientOptions) client { return mqtt.NewClient(o) }) }) diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go index cb15e2ac03536..a1ec7dd272eb1 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go @@ -18,11 +18,11 @@ import ( "github.com/influxdata/telegraf/testutil" ) -type FakeClient struct { - ConnectF func() mqtt.Token - SubscribeMultipleF func() mqtt.Token - AddRouteF func(callback mqtt.MessageHandler) - DisconnectF func() +type fakeClient struct { + connectF func() mqtt.Token + subscribeMultipleF func() mqtt.Token + addRouteF func(callback mqtt.MessageHandler) + disconnectF func() connectCallCount int subscribeCallCount int @@ -32,75 +32,75 @@ type FakeClient struct { connected bool } -func (c *FakeClient) Connect() mqtt.Token { +func (c *fakeClient) Connect() mqtt.Token { c.connectCallCount++ - token := c.ConnectF() + token := c.connectF() c.connected = token.Error() == nil return token } -func (c *FakeClient) SubscribeMultiple(map[string]byte, mqtt.MessageHandler) mqtt.Token { +func (c *fakeClient) SubscribeMultiple(map[string]byte, mqtt.MessageHandler) mqtt.Token { c.subscribeCallCount++ - return c.SubscribeMultipleF() + return c.subscribeMultipleF() } -func (c *FakeClient) AddRoute(_ string, callback mqtt.MessageHandler) { +func (c *fakeClient) AddRoute(_ string, callback mqtt.MessageHandler) { c.addRouteCallCount++ - c.AddRouteF(callback) + c.addRouteF(callback) } -func (c *FakeClient) Disconnect(uint) { +func (c *fakeClient) Disconnect(uint) { c.disconnectCallCount++ - c.DisconnectF() + c.disconnectF() c.connected = false } -func (c *FakeClient) IsConnected() bool { +func (c *fakeClient) IsConnected() bool { return c.connected } -type FakeParser struct{} +type fakeParser struct{} -// FakeParser satisfies telegraf.Parser -var _ telegraf.Parser = &FakeParser{} +// fakeParser satisfies telegraf.Parser +var _ telegraf.Parser = &fakeParser{} -func (p *FakeParser) Parse(_ []byte) ([]telegraf.Metric, error) { +func (p *fakeParser) Parse(_ []byte) ([]telegraf.Metric, error) { panic("not implemented") } -func (p *FakeParser) ParseLine(_ string) (telegraf.Metric, error) { +func (p *fakeParser) ParseLine(_ string) (telegraf.Metric, error) { panic("not implemented") } -func (p *FakeParser) SetDefaultTags(_ map[string]string) { +func (p *fakeParser) SetDefaultTags(_ map[string]string) { panic("not implemented") } -type FakeToken struct { +type fakeToken struct { sessionPresent bool complete chan struct{} } -// FakeToken satisfies mqtt.Token -var _ mqtt.Token = &FakeToken{} +// fakeToken satisfies mqtt.Token +var _ mqtt.Token = &fakeToken{} -func (t *FakeToken) Wait() bool { +func (t *fakeToken) Wait() bool { return true } -func (t *FakeToken) WaitTimeout(time.Duration) bool { +func (t *fakeToken) WaitTimeout(time.Duration) bool { return true } -func (t *FakeToken) Error() error { +func (t *fakeToken) Error() error { return nil } -func (t *FakeToken) SessionPresent() bool { +func (t *fakeToken) SessionPresent() bool { return t.sessionPresent } -func (t *FakeToken) Done() <-chan struct{} { +func (t *fakeToken) Done() <-chan struct{} { return t.complete } @@ -108,24 +108,24 @@ func (t *FakeToken) Done() <-chan struct{} { func TestLifecycleSanity(t *testing.T) { var acc testutil.Accumulator - plugin := New(func(*mqtt.ClientOptions) Client { - return &FakeClient{ - ConnectF: func() mqtt.Token { - return &FakeToken{} + plugin := newMQTTConsumer(func(*mqtt.ClientOptions) client { + return &fakeClient{ + connectF: func() mqtt.Token { + return &fakeToken{} }, - AddRouteF: func(mqtt.MessageHandler) { + addRouteF: func(mqtt.MessageHandler) { }, - SubscribeMultipleF: func() mqtt.Token { - return &FakeToken{} + subscribeMultipleF: func() mqtt.Token { + return &fakeToken{} }, - DisconnectF: func() { + disconnectF: func() { }, } }) plugin.Log = testutil.Logger{} plugin.Servers = []string{"tcp://127.0.0.1"} - parser := &FakeParser{} + parser := &fakeParser{} plugin.SetParser(parser) require.NoError(t, plugin.Init()) @@ -138,12 +138,12 @@ func TestLifecycleSanity(t *testing.T) { func TestRandomClientID(t *testing.T) { var err error - m1 := New(nil) + m1 := newMQTTConsumer(nil) m1.Log = testutil.Logger{} err = m1.Init() require.NoError(t, err) - m2 := New(nil) + m2 := newMQTTConsumer(nil) m2.Log = testutil.Logger{} err = m2.Init() require.NoError(t, err) @@ -153,7 +153,7 @@ func TestRandomClientID(t *testing.T) { // PersistentSession requires ClientID func TestPersistentClientIDFail(t *testing.T) { - plugin := New(nil) + plugin := newMQTTConsumer(nil) plugin.Log = testutil.Logger{} plugin.PersistentSession = true @@ -161,36 +161,36 @@ func TestPersistentClientIDFail(t *testing.T) { require.Error(t, err) } -type Message struct { +type message struct { topic string qos byte } -func (m *Message) Duplicate() bool { +func (m *message) Duplicate() bool { panic("not implemented") } -func (m *Message) Qos() byte { +func (m *message) Qos() byte { return m.qos } -func (m *Message) Retained() bool { +func (m *message) Retained() bool { panic("not implemented") } -func (m *Message) Topic() string { +func (m *message) Topic() string { return m.topic } -func (m *Message) MessageID() uint16 { +func (m *message) MessageID() uint16 { panic("not implemented") } -func (m *Message) Payload() []byte { +func (m *message) Payload() []byte { return []byte("cpu time_idle=42i") } -func (m *Message) Ack() { +func (m *message) Ack() { panic("not implemented") } @@ -200,7 +200,7 @@ func TestTopicTag(t *testing.T) { topic string topicTag func() *string expectedError string - topicParsing []TopicParsingConfig + topicParsing []topicParsingConfig expected []telegraf.Metric }{ { @@ -267,7 +267,7 @@ func TestTopicTag(t *testing.T) { tag := "" return &tag }, - topicParsing: []TopicParsingConfig{ + topicParsing: []topicParsingConfig{ { Topic: "telegraf/123/test", Measurement: "_/_/measurement", @@ -299,7 +299,7 @@ func TestTopicTag(t *testing.T) { tag := "" return &tag }, - topicParsing: []TopicParsingConfig{ + topicParsing: []topicParsingConfig{ { Topic: "telegraf/+/test/hello", Measurement: "_/_/measurement/_", @@ -333,7 +333,7 @@ func TestTopicTag(t *testing.T) { return &tag }, expectedError: "config error topic parsing: fields length does not equal topic length", - topicParsing: []TopicParsingConfig{ + topicParsing: []topicParsingConfig{ { Topic: "telegraf/+/test/hello", Measurement: "_/_/measurement/_", @@ -366,7 +366,7 @@ func TestTopicTag(t *testing.T) { tag := "" return &tag }, - topicParsing: []TopicParsingConfig{ + topicParsing: []topicParsingConfig{ { Topic: "telegraf/+/test/hello", Measurement: "_/_/measurement/_", @@ -396,7 +396,7 @@ func TestTopicTag(t *testing.T) { tag := "" return &tag }, - topicParsing: []TopicParsingConfig{ + topicParsing: []topicParsingConfig{ { Topic: "telegraf/+/test/hello", Tags: "testTag/_/_/_", @@ -428,7 +428,7 @@ func TestTopicTag(t *testing.T) { tag := "" return &tag }, - topicParsing: []TopicParsingConfig{ + topicParsing: []topicParsingConfig{ { Topic: "/telegraf/+/test/hello", Measurement: "/_/_/measurement/_", @@ -461,7 +461,7 @@ func TestTopicTag(t *testing.T) { tag := "" return &tag }, - topicParsing: []TopicParsingConfig{ + topicParsing: []topicParsingConfig{ { Topic: "/telegraf/#/test/hello", Measurement: "/#/measurement/_", @@ -495,7 +495,7 @@ func TestTopicTag(t *testing.T) { tag := "" return &tag }, - topicParsing: []TopicParsingConfig{ + topicParsing: []topicParsingConfig{ { Topic: "/telegraf/#", Measurement: "/#/measurement/_", @@ -521,22 +521,22 @@ func TestTopicTag(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { var handler mqtt.MessageHandler - client := &FakeClient{ - ConnectF: func() mqtt.Token { - return &FakeToken{} + fClient := &fakeClient{ + connectF: func() mqtt.Token { + return &fakeToken{} }, - AddRouteF: func(callback mqtt.MessageHandler) { + addRouteF: func(callback mqtt.MessageHandler) { handler = callback }, - SubscribeMultipleF: func() mqtt.Token { - return &FakeToken{} + subscribeMultipleF: func() mqtt.Token { + return &fakeToken{} }, - DisconnectF: func() { + disconnectF: func() { }, } - plugin := New(func(*mqtt.ClientOptions) Client { - return client + plugin := newMQTTConsumer(func(*mqtt.ClientOptions) client { + return fClient }) plugin.Log = testutil.Logger{} plugin.Topics = []string{tt.topic} @@ -557,7 +557,7 @@ func TestTopicTag(t *testing.T) { var acc testutil.Accumulator require.NoError(t, plugin.Start(&acc)) - var m Message + var m message m.topic = tt.topic handler(nil, &m) @@ -570,20 +570,20 @@ func TestTopicTag(t *testing.T) { } func TestAddRouteCalledForEachTopic(t *testing.T) { - client := &FakeClient{ - ConnectF: func() mqtt.Token { - return &FakeToken{} + fClient := &fakeClient{ + connectF: func() mqtt.Token { + return &fakeToken{} }, - AddRouteF: func(mqtt.MessageHandler) { + addRouteF: func(mqtt.MessageHandler) { }, - SubscribeMultipleF: func() mqtt.Token { - return &FakeToken{} + subscribeMultipleF: func() mqtt.Token { + return &fakeToken{} }, - DisconnectF: func() { + disconnectF: func() { }, } - plugin := New(func(*mqtt.ClientOptions) Client { - return client + plugin := newMQTTConsumer(func(*mqtt.ClientOptions) client { + return fClient }) plugin.Log = testutil.Logger{} plugin.Topics = []string{"a", "b"} @@ -595,24 +595,24 @@ func TestAddRouteCalledForEachTopic(t *testing.T) { plugin.Stop() - require.Equal(t, 2, client.addRouteCallCount) + require.Equal(t, 2, fClient.addRouteCallCount) } func TestSubscribeCalledIfNoSession(t *testing.T) { - client := &FakeClient{ - ConnectF: func() mqtt.Token { - return &FakeToken{} + fClient := &fakeClient{ + connectF: func() mqtt.Token { + return &fakeToken{} }, - AddRouteF: func(mqtt.MessageHandler) { + addRouteF: func(mqtt.MessageHandler) { }, - SubscribeMultipleF: func() mqtt.Token { - return &FakeToken{} + subscribeMultipleF: func() mqtt.Token { + return &fakeToken{} }, - DisconnectF: func() { + disconnectF: func() { }, } - plugin := New(func(*mqtt.ClientOptions) Client { - return client + plugin := newMQTTConsumer(func(*mqtt.ClientOptions) client { + return fClient }) plugin.Log = testutil.Logger{} plugin.Topics = []string{"b"} @@ -624,24 +624,24 @@ func TestSubscribeCalledIfNoSession(t *testing.T) { plugin.Stop() - require.Equal(t, 1, client.subscribeCallCount) + require.Equal(t, 1, fClient.subscribeCallCount) } func TestSubscribeNotCalledIfSession(t *testing.T) { - client := &FakeClient{ - ConnectF: func() mqtt.Token { - return &FakeToken{sessionPresent: true} + fClient := &fakeClient{ + connectF: func() mqtt.Token { + return &fakeToken{sessionPresent: true} }, - AddRouteF: func(mqtt.MessageHandler) { + addRouteF: func(mqtt.MessageHandler) { }, - SubscribeMultipleF: func() mqtt.Token { - return &FakeToken{} + subscribeMultipleF: func() mqtt.Token { + return &fakeToken{} }, - DisconnectF: func() { + disconnectF: func() { }, } - plugin := New(func(*mqtt.ClientOptions) Client { - return client + plugin := newMQTTConsumer(func(*mqtt.ClientOptions) client { + return fClient }) plugin.Log = testutil.Logger{} plugin.Topics = []string{"b"} @@ -652,7 +652,7 @@ func TestSubscribeNotCalledIfSession(t *testing.T) { require.NoError(t, plugin.Start(&acc)) plugin.Stop() - require.Equal(t, 0, client.subscribeCallCount) + require.Equal(t, 0, fClient.subscribeCallCount) } func TestIntegration(t *testing.T) { @@ -679,7 +679,7 @@ func TestIntegration(t *testing.T) { // Setup the plugin and connect to the broker url := fmt.Sprintf("tcp://%s:%s", container.Address, container.Ports[servicePort]) topic := "/telegraf/test" - factory := func(o *mqtt.ClientOptions) Client { return mqtt.NewClient(o) } + factory := func(o *mqtt.ClientOptions) client { return mqtt.NewClient(o) } plugin := &MQTTConsumer{ Servers: []string{url}, Topics: []string{topic}, @@ -768,7 +768,7 @@ func TestStartupErrorBehaviorErrorIntegration(t *testing.T) { // Setup the plugin and connect to the broker url := fmt.Sprintf("tcp://%s:%s", container.Address, container.Ports[servicePort]) topic := "/telegraf/test" - factory := func(o *mqtt.ClientOptions) Client { return mqtt.NewClient(o) } + factory := func(o *mqtt.ClientOptions) client { return mqtt.NewClient(o) } plugin := &MQTTConsumer{ Servers: []string{url}, Topics: []string{topic}, @@ -827,7 +827,7 @@ func TestStartupErrorBehaviorIgnoreIntegration(t *testing.T) { // Setup the plugin and connect to the broker url := fmt.Sprintf("tcp://%s:%s", container.Address, container.Ports[servicePort]) topic := "/telegraf/test" - factory := func(o *mqtt.ClientOptions) Client { return mqtt.NewClient(o) } + factory := func(o *mqtt.ClientOptions) client { return mqtt.NewClient(o) } plugin := &MQTTConsumer{ Servers: []string{url}, Topics: []string{topic}, @@ -892,7 +892,7 @@ func TestStartupErrorBehaviorRetryIntegration(t *testing.T) { // Setup the plugin and connect to the broker url := fmt.Sprintf("tcp://%s:%s", container.Address, container.Ports[servicePort]) topic := "/telegraf/test" - factory := func(o *mqtt.ClientOptions) Client { return mqtt.NewClient(o) } + factory := func(o *mqtt.ClientOptions) client { return mqtt.NewClient(o) } plugin := &MQTTConsumer{ Servers: []string{url}, Topics: []string{topic}, @@ -997,7 +997,7 @@ func TestReconnectIntegration(t *testing.T) { // Setup the plugin and connect to the broker url := fmt.Sprintf("tcp://%s:%s", container.Address, container.Ports[servicePort]) topic := "/telegraf/test" - factory := func(o *mqtt.ClientOptions) Client { return mqtt.NewClient(o) } + factory := func(o *mqtt.ClientOptions) client { return mqtt.NewClient(o) } plugin := &MQTTConsumer{ Servers: []string{url}, Topics: []string{topic}, diff --git a/plugins/inputs/mqtt_consumer/mqtt_logger.go b/plugins/inputs/mqtt_consumer/mqtt_logger.go index d3490043d73b8..9e34e52b938b5 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_logger.go +++ b/plugins/inputs/mqtt_consumer/mqtt_logger.go @@ -8,9 +8,12 @@ type mqttLogger struct { telegraf.Logger } +// Printf implements mqtt.Logger func (l mqttLogger) Printf(fmt string, args ...interface{}) { l.Logger.Debugf(fmt, args...) } + +// Println implements mqtt.Logger func (l mqttLogger) Println(args ...interface{}) { l.Logger.Debug(args...) } diff --git a/plugins/inputs/mqtt_consumer/topic_parser.go b/plugins/inputs/mqtt_consumer/topic_parser.go index 12cbc10f2efd4..5a290cd4741c0 100644 --- a/plugins/inputs/mqtt_consumer/topic_parser.go +++ b/plugins/inputs/mqtt_consumer/topic_parser.go @@ -9,7 +9,7 @@ import ( "github.com/influxdata/telegraf" ) -type TopicParsingConfig struct { +type topicParsingConfig struct { Topic string `toml:"topic"` Measurement string `toml:"measurement"` Tags string `toml:"tags"` @@ -17,7 +17,7 @@ type TopicParsingConfig struct { FieldTypes map[string]string `toml:"types"` } -type TopicParser struct { +type topicParser struct { topicIndices map[string]int topicVarLength bool topicMinLength int @@ -29,8 +29,8 @@ type TopicParser struct { fieldTypes map[string]string } -func (cfg *TopicParsingConfig) NewParser() (*TopicParser, error) { - p := &TopicParser{ +func (cfg *topicParsingConfig) newParser() (*topicParser, error) { + p := &topicParser{ fieldTypes: cfg.FieldTypes, } @@ -150,7 +150,7 @@ func (cfg *TopicParsingConfig) NewParser() (*TopicParser, error) { return p, nil } -func (p *TopicParser) Parse(metric telegraf.Metric, topic string) error { +func (p *topicParser) parse(metric telegraf.Metric, topic string) error { // Split the actual topic into its elements and check for a match topicParts := strings.Split(topic, "/") if p.topicVarLength && len(topicParts) < p.topicMinLength || !p.topicVarLength && len(topicParts) != p.topicMinLength { @@ -200,7 +200,7 @@ func (p *TopicParser) Parse(metric telegraf.Metric, topic string) error { return nil } -func (p *TopicParser) convertToFieldType(value, key string) (interface{}, error) { +func (p *topicParser) convertToFieldType(value, key string) (interface{}, error) { // If the user configured inputs.mqtt_consumer.topic.types, check for the desired type desiredType, ok := p.fieldTypes[key] if !ok { diff --git a/plugins/inputs/multifile/multifile.go b/plugins/inputs/multifile/multifile.go index 94b12569811e7..a98a512b14888 100644 --- a/plugins/inputs/multifile/multifile.go +++ b/plugins/inputs/multifile/multifile.go @@ -19,15 +19,15 @@ import ( var sampleConfig string type MultiFile struct { - BaseDir string - FailEarly bool - Files []File `toml:"file"` + BaseDir string `toml:"base_dir"` + FailEarly bool `toml:"fail_early"` + Files []file `toml:"file"` } -type File struct { +type file struct { Name string `toml:"file"` - Dest string - Conversion string + Dest string `toml:"dest"` + Conversion string `toml:"conversion"` } func (*MultiFile) SampleConfig() string { diff --git a/plugins/inputs/multifile/multifile_test.go b/plugins/inputs/multifile/multifile_test.go index 441df66f384be..83bfbf704c7dd 100644 --- a/plugins/inputs/multifile/multifile_test.go +++ b/plugins/inputs/multifile/multifile_test.go @@ -17,7 +17,7 @@ func TestFileTypes(t *testing.T) { m := MultiFile{ BaseDir: path.Join(wd, `testdata`), FailEarly: true, - Files: []File{ + Files: []file{ {Name: `bool.txt`, Dest: `examplebool`, Conversion: `bool`}, {Name: `float.txt`, Dest: `examplefloat`, Conversion: `float`}, {Name: `int.txt`, Dest: `examplefloatX`, Conversion: `float(3)`}, @@ -43,14 +43,14 @@ func TestFileTypes(t *testing.T) { }, acc.Metrics[0].Fields) } -func FailEarly(failEarly bool, t *testing.T) error { +func failEarly(failEarly bool, t *testing.T) error { wd, err := os.Getwd() require.NoError(t, err) m := MultiFile{ BaseDir: path.Join(wd, `testdata`), FailEarly: failEarly, - Files: []File{ + Files: []file{ {Name: `int.txt`, Dest: `exampleint`, Conversion: `int`}, {Name: `int.txt`, Dest: `exampleerror`, Conversion: `bool`}, }, @@ -71,8 +71,8 @@ func FailEarly(failEarly bool, t *testing.T) error { } func TestFailEarly(t *testing.T) { - err := FailEarly(false, t) + err := failEarly(false, t) require.NoError(t, err) - err = FailEarly(true, t) + err = failEarly(true, t) require.Error(t, err) } diff --git a/plugins/inputs/mysql/mysql.go b/plugins/inputs/mysql/mysql.go index 3f332f50eb700..fb37dfba571cf 100644 --- a/plugins/inputs/mysql/mysql.go +++ b/plugins/inputs/mysql/mysql.go @@ -29,6 +29,14 @@ var sampleConfig string var tlsRe = regexp.MustCompile(`([\?&])(?:tls=custom)($|&)`) +const ( + defaultPerfEventsStatementsDigestTextLimit = 120 + defaultPerfEventsStatementsLimit = 250 + defaultPerfEventsStatementsTimeLimit = 86400 + defaultGatherGlobalVars = true + localhost = "" +) + type Mysql struct { Servers []*config.Secret `toml:"servers"` PerfEventsStatementsDigestTextLimit int64 `toml:"perf_events_statements_digest_text_limit"` @@ -64,15 +72,6 @@ type Mysql struct { loggedConvertFields map[string]bool } -const ( - defaultPerfEventsStatementsDigestTextLimit = 120 - defaultPerfEventsStatementsLimit = 250 - defaultPerfEventsStatementsTimeLimit = 86400 - defaultGatherGlobalVars = true -) - -const localhost = "" - func (*Mysql) SampleConfig() string { return sampleConfig } diff --git a/plugins/inputs/mysql/v2/convert.go b/plugins/inputs/mysql/v2/convert.go index b1e5b5880ea72..d5da52ed462ba 100644 --- a/plugins/inputs/mysql/v2/convert.go +++ b/plugins/inputs/mysql/v2/convert.go @@ -8,7 +8,7 @@ import ( "strconv" ) -type ConversionFunc func(value sql.RawBytes) (interface{}, error) +type conversionFunc func(value sql.RawBytes) (interface{}, error) func ParseInt(value sql.RawBytes) (interface{}, error) { v, err := strconv.ParseInt(string(value), 10, 64) @@ -86,7 +86,7 @@ func ParseValue(value sql.RawBytes) (interface{}, error) { return nil, fmt.Errorf("unconvertible value: %q", string(value)) } -var GlobalStatusConversions = map[string]ConversionFunc{ +var globalStatusConversions = map[string]conversionFunc{ "innodb_available_undo_logs": ParseUint, "innodb_buffer_pool_pages_misc": ParseUint, "innodb_data_pending_fsyncs": ParseUint, @@ -108,7 +108,7 @@ var GlobalStatusConversions = map[string]ConversionFunc{ "wsrep_local_send_queue_avg": ParseFloat, } -var GlobalVariableConversions = map[string]ConversionFunc{ +var globalVariableConversions = map[string]conversionFunc{ // see https://dev.mysql.com/doc/refman/5.7/en/server-system-variables.html // see https://dev.mysql.com/doc/refman/8.0/en/server-system-variables.html "delay_key_write": ParseString, // ON, OFF, ALL @@ -140,7 +140,7 @@ func ConvertGlobalStatus(key string, value sql.RawBytes) (interface{}, error) { return nil, nil } - if conv, ok := GlobalStatusConversions[key]; ok { + if conv, ok := globalStatusConversions[key]; ok { return conv(value) } @@ -152,7 +152,7 @@ func ConvertGlobalVariables(key string, value sql.RawBytes) (interface{}, error) return nil, nil } - if conv, ok := GlobalVariableConversions[key]; ok { + if conv, ok := globalVariableConversions[key]; ok { return conv(value) }