Skip to content

Commit

Permalink
add metric file_open_error_total
Browse files Browse the repository at this point in the history
  • Loading branch information
DmitryRomanov committed Nov 30, 2023
1 parent 86db123 commit db74dcb
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 4 deletions.
4 changes: 3 additions & 1 deletion plugin/input/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type Plugin struct {

possibleOffsetCorruptionMetric *prometheus.CounterVec
alreadyWrittenEventsSkippedMetric *prometheus.CounterVec
errorOpenFileMetric *prometheus.CounterVec
}

type persistenceMode int
Expand Down Expand Up @@ -194,7 +195,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.InputPluginPa

p.config.OffsetsFileTmp = p.config.OffsetsFile + ".atomic"

p.jobProvider = NewJobProvider(p.config, p.possibleOffsetCorruptionMetric, p.logger)
p.jobProvider = NewJobProvider(p.config, p.possibleOffsetCorruptionMetric, p.errorOpenFileMetric, p.logger)

ResetterRegistryInstance.AddResetter(params.PipelineName, p)

Expand All @@ -205,6 +206,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.InputPluginPa
func (p *Plugin) registerMetrics(ctl *metric.Ctl) {
p.possibleOffsetCorruptionMetric = ctl.RegisterCounter("input_file_possible_offset_corruptions_total", "Total number of possible offset corruptions")
p.alreadyWrittenEventsSkippedMetric = ctl.RegisterCounter("input_file_already_written_event_skipped_total", "Total number of skipped events that was already written")
p.errorOpenFileMetric = ctl.RegisterCounter("input_file_open_error_total", "Total number of file opening errors")
}

func (p *Plugin) startWorkers() {
Expand Down
5 changes: 4 additions & 1 deletion plugin/input/file/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type jobProvider struct {
// provider metrics

possibleOffsetCorruptionMetric *prometheus.CounterVec
errorOpenFileMetric *prometheus.CounterVec
}

type Job struct {
Expand Down Expand Up @@ -99,7 +100,7 @@ type symlinkInfo struct {
inode inodeID
}

func NewJobProvider(config *Config, possibleOffsetCorruptionMetric *prometheus.CounterVec, sugLogger *zap.SugaredLogger) *jobProvider {
func NewJobProvider(config *Config, possibleOffsetCorruptionMetric, errorOpenFileMetric *prometheus.CounterVec, sugLogger *zap.SugaredLogger) *jobProvider {
jp := &jobProvider{
config: config,
offsetDB: newOffsetDB(config.OffsetsFile, config.OffsetsFileTmp),
Expand All @@ -121,6 +122,7 @@ func NewJobProvider(config *Config, possibleOffsetCorruptionMetric *prometheus.C

logger: sugLogger,
possibleOffsetCorruptionMetric: possibleOffsetCorruptionMetric,
errorOpenFileMetric: errorOpenFileMetric,
}

jp.watcher = NewWatcher(
Expand Down Expand Up @@ -282,6 +284,7 @@ func (jp *jobProvider) refreshFile(stat os.FileInfo, filename string, symlink st
file, err := os.Open(filename)
if err != nil {
jp.logger.Warnf("file was already moved from creation place %s: %s", filename, err.Error())
jp.errorOpenFileMetric.WithLabelValues().Inc()
return
}

Expand Down
6 changes: 4 additions & 2 deletions plugin/input/file/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ func TestWorkerWork(t *testing.T) {
}
ctl := metric.New("test", prometheus.NewRegistry())
possibleOffsetCorruptionMetric := ctl.RegisterCounter("worker", "help_test")
jp := NewJobProvider(&Config{}, possibleOffsetCorruptionMetric, &zap.SugaredLogger{})
errorOpenFileMetric := ctl.RegisterCounter("worker", "help_test")
jp := NewJobProvider(&Config{}, possibleOffsetCorruptionMetric, errorOpenFileMetric, &zap.SugaredLogger{})
jp.jobsChan = make(chan *Job, 2)
jp.jobs = map[pipeline.SourceID]*Job{
1: job,
Expand Down Expand Up @@ -223,7 +224,8 @@ func TestWorkerWorkMultiData(t *testing.T) {

ctl := metric.New("test", prometheus.NewRegistry())
possibleOffsetCorruptionMetric := ctl.RegisterCounter("worker", "help_test")
jp := NewJobProvider(&Config{}, possibleOffsetCorruptionMetric, &zap.SugaredLogger{})
errorOpenFileMetric := ctl.RegisterCounter("worker", "help_test")
jp := NewJobProvider(&Config{}, possibleOffsetCorruptionMetric, errorOpenFileMetric, &zap.SugaredLogger{})
jp.jobsChan = make(chan *Job, 2)
jp.jobs = map[pipeline.SourceID]*Job{
1: job,
Expand Down

0 comments on commit db74dcb

Please sign in to comment.