diff --git a/plugin/input/file/file.go b/plugin/input/file/file.go index e452e4673..1d76f72d9 100644 --- a/plugin/input/file/file.go +++ b/plugin/input/file/file.go @@ -58,6 +58,7 @@ type Plugin struct { possibleOffsetCorruptionMetric *prometheus.CounterVec alreadyWrittenEventsSkippedMetric *prometheus.CounterVec + errorOpenFileMetric *prometheus.CounterVec } type persistenceMode int @@ -194,7 +195,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.InputPluginPa p.config.OffsetsFileTmp = p.config.OffsetsFile + ".atomic" - p.jobProvider = NewJobProvider(p.config, p.possibleOffsetCorruptionMetric, p.logger) + p.jobProvider = NewJobProvider(p.config, p.possibleOffsetCorruptionMetric, p.errorOpenFileMetric, p.logger) ResetterRegistryInstance.AddResetter(params.PipelineName, p) @@ -205,6 +206,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.InputPluginPa func (p *Plugin) registerMetrics(ctl *metric.Ctl) { p.possibleOffsetCorruptionMetric = ctl.RegisterCounter("input_file_possible_offset_corruptions_total", "Total number of possible offset corruptions") p.alreadyWrittenEventsSkippedMetric = ctl.RegisterCounter("input_file_already_written_event_skipped_total", "Total number of skipped events that was already written") + p.errorOpenFileMetric = ctl.RegisterCounter("input_file_open_error_total", "Total number of file opening errors") } func (p *Plugin) startWorkers() { diff --git a/plugin/input/file/provider.go b/plugin/input/file/provider.go index 7c90ce712..f7be76b26 100644 --- a/plugin/input/file/provider.go +++ b/plugin/input/file/provider.go @@ -56,6 +56,7 @@ type jobProvider struct { // provider metrics possibleOffsetCorruptionMetric *prometheus.CounterVec + errorOpenFileMetric *prometheus.CounterVec } type Job struct { @@ -99,7 +100,7 @@ type symlinkInfo struct { inode inodeID } -func NewJobProvider(config *Config, possibleOffsetCorruptionMetric *prometheus.CounterVec, sugLogger *zap.SugaredLogger) *jobProvider { +func NewJobProvider(config *Config, possibleOffsetCorruptionMetric, errorOpenFileMetric *prometheus.CounterVec, sugLogger *zap.SugaredLogger) *jobProvider { jp := &jobProvider{ config: config, offsetDB: newOffsetDB(config.OffsetsFile, config.OffsetsFileTmp), @@ -121,6 +122,7 @@ func NewJobProvider(config *Config, possibleOffsetCorruptionMetric *prometheus.C logger: sugLogger, possibleOffsetCorruptionMetric: possibleOffsetCorruptionMetric, + errorOpenFileMetric: errorOpenFileMetric, } jp.watcher = NewWatcher( @@ -282,6 +284,7 @@ func (jp *jobProvider) refreshFile(stat os.FileInfo, filename string, symlink st file, err := os.Open(filename) if err != nil { jp.logger.Warnf("file was already moved from creation place %s: %s", filename, err.Error()) + jp.errorOpenFileMetric.WithLabelValues().Inc() return } diff --git a/plugin/input/file/worker_test.go b/plugin/input/file/worker_test.go index cc145a144..d0ddaf7dd 100644 --- a/plugin/input/file/worker_test.go +++ b/plugin/input/file/worker_test.go @@ -94,7 +94,8 @@ func TestWorkerWork(t *testing.T) { } ctl := metric.New("test", prometheus.NewRegistry()) possibleOffsetCorruptionMetric := ctl.RegisterCounter("worker", "help_test") - jp := NewJobProvider(&Config{}, possibleOffsetCorruptionMetric, &zap.SugaredLogger{}) + errorOpenFileMetric := ctl.RegisterCounter("worker", "help_test") + jp := NewJobProvider(&Config{}, possibleOffsetCorruptionMetric, errorOpenFileMetric, &zap.SugaredLogger{}) jp.jobsChan = make(chan *Job, 2) jp.jobs = map[pipeline.SourceID]*Job{ 1: job, @@ -223,7 +224,8 @@ func TestWorkerWorkMultiData(t *testing.T) { ctl := metric.New("test", prometheus.NewRegistry()) possibleOffsetCorruptionMetric := ctl.RegisterCounter("worker", "help_test") - jp := NewJobProvider(&Config{}, possibleOffsetCorruptionMetric, &zap.SugaredLogger{}) + errorOpenFileMetric := ctl.RegisterCounter("worker", "help_test") + jp := NewJobProvider(&Config{}, possibleOffsetCorruptionMetric, errorOpenFileMetric, &zap.SugaredLogger{}) jp.jobsChan = make(chan *Job, 2) jp.jobs = map[pipeline.SourceID]*Job{ 1: job,