Skip to content

Commit

Permalink
mute package
Browse files Browse the repository at this point in the history
  • Loading branch information
lspgn committed Dec 9, 2023
1 parent f9b74f0 commit 0fbc506
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 14 deletions.
22 changes: 8 additions & 14 deletions cmd/goflow2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ var (
Format = flag.String("format", "json", fmt.Sprintf("Choose the format (available: %s)", strings.Join(format.GetFormats(), ", ")))
Transport = flag.String("transport", "file", fmt.Sprintf("Choose the transport (available: %s)", strings.Join(transport.GetTransports(), ", ")))

TransportErrCt = flag.Int("transport.err.cnt", 10, "Maximum transport errors per batch")
TransportErrInt = flag.Duration("transport.err.int", time.Second*10, "Maximum transport errors interval")
ErrCnt = flag.Int("err.cnt", 10, "Maximum errors per batch for muting")
ErrInt = flag.Duration("err.int", time.Second*10, "Maximum errors interval for muting")

Addr = flag.String("addr", ":8080", "HTTP server address")

Expand Down Expand Up @@ -321,9 +321,7 @@ func main() {
transportErr = transportErrorFct.Errors()
}

var trErrCtr int
lastTrErr := time.Now().UTC()
maxTrErrLog := *TransportErrCt
bm := utils.NewBatchMute(*ErrInt, *ErrCnt)

for {
select {
Expand All @@ -334,16 +332,12 @@ func main() {
return
}

curTime := time.Now().UTC()
if *TransportErrInt > 0 && curTime.Sub(lastTrErr) > *TransportErrInt {
lastTrErr = curTime
trErrCtr = 0
}
trErrCtr += 1

if trErrCtr == maxTrErrLog {
muted, skipped := bm.Increment()
if muted && skipped == 0 {
log.Warn("too many transport errors, muting")
} else if trErrCtr < maxTrErrLog || maxTrErrLog == 0 {
} else if !muted && skipped > 0 {
log.Warnf("skipped %d transport errors", skipped)
} else if !muted {
l := log.WithError(err)
l.Error("transport error")
}
Expand Down
43 changes: 43 additions & 0 deletions utils/mute.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package utils

import (
"time"
)

type BatchMute struct {
batchTime time.Time
resetInterval time.Duration
ctr int
max int
}

func (b *BatchMute) increment(val int, t time.Time) (muted bool, skipped int) {

if b.max == 0 || b.resetInterval == 0 {
return muted, skipped
}

if b.ctr >= b.max {
skipped = b.ctr - b.max
}

if t.Sub(b.batchTime) > b.resetInterval {
b.ctr = 0
b.batchTime = t
}
b.ctr += val

return b.max > 0 && b.ctr > b.max, skipped
}

func (b *BatchMute) Increment() (muting bool, skipped int) {
return b.increment(1, time.Now().UTC())
}

func NewBatchMute(resetInterval time.Duration, max int) *BatchMute {
return &BatchMute{
batchTime: time.Now().UTC(),
resetInterval: resetInterval,
max: max,
}
}
51 changes: 51 additions & 0 deletions utils/mute_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package utils

import (
"testing"
"time"
)

func TestBatchMute(t *testing.T) {
tm := time.Date(2023, time.November, 10, 23, 0, 0, 0, time.UTC)
bm := BatchMute{
batchTime: tm,
resetInterval: time.Second * 10,
max: 5,
}

for i := 0; i < 20; i++ {
tm = tm.Add(time.Second)
t.Log(bm.increment(1, tm))
}

}

func TestBatchMuteZero(t *testing.T) {
tm := time.Date(2023, time.November, 10, 23, 0, 0, 0, time.UTC)
bm := BatchMute{
batchTime: tm,
resetInterval: time.Second * 10,
max: 0,
}

for i := 0; i < 20; i++ {
tm = tm.Add(time.Second)
t.Log(bm.increment(1, tm))
}

}

func TestBatchMuteInterval(t *testing.T) {
tm := time.Date(2023, time.November, 10, 23, 0, 0, 0, time.UTC)
bm := BatchMute{
batchTime: tm,
resetInterval: 0,
max: 5,
}

for i := 0; i < 20; i++ {
tm = tm.Add(time.Second)
t.Log(bm.increment(1, tm))
}

}

0 comments on commit 0fbc506

Please sign in to comment.