Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Journalctl: Async commit the Event #445

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,8 +611,9 @@ func (p *Pipeline) expandProcs() {
p.logger.Warn("too many processors", zap.Int32("new", to))
}

lastProc := p.Procs[from-1]
for x := 0; x < int(to-from); x++ {
proc := p.newProc(p.Procs[from-1].id + x)
proc := p.newProc(lastProc.id + x) // create new proc with last+1 id
p.Procs = append(p.Procs, proc)
proc.start(p.actionParams, p.logger.Sugar())
}
Expand Down
10 changes: 0 additions & 10 deletions pipeline/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,6 @@ import (
insaneJSON "github.com/vitkovskii/insane-json"
)

// Clone deeply copies string
func CloneString(s string) string {
if s == "" {
return ""
}
b := make([]byte, len(s))
copy(b, s)
return *(*string)(unsafe.Pointer(&b))
}

// ByteToStringUnsafe converts byte slice to string without memory copy
// This creates mutable string, thus unsafe method, should be used with caution (never modify provided byte slice)
func ByteToStringUnsafe(b []byte) string {
Expand Down
13 changes: 13 additions & 0 deletions plugin/input/journalctl/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,18 @@ you can use any additional args.

<br>

**`persistence_mode`** *`string`* *`default=async`* *`options=async|sync`*

It defines how to save the offsets file:
* `async` – it periodically saves the offsets using `async_interval`. The saving operation is skipped if offsets haven't been changed. Suitable, in most cases, it guarantees at least one delivery and makes almost no overhead.
* `sync` – saves offsets as part of event commitment. It's very slow but excludes the possibility of event duplication in extreme situations like power loss.

Save operation takes three steps:
* Write the temporary file with all offsets;
* Call `fsync()` on it;
* Rename the temporary file to the original one.

<br>


<br>*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*
75 changes: 75 additions & 0 deletions plugin/input/journalctl/commiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package journalctl

import (
"strings"
"sync"
"sync/atomic"

"github.com/ozontech/file.d/pipeline"
)

type SaveOffsetsFunc func(info offsetInfo)

type Commiter interface {
Commit(event *pipeline.Event)
Shutdown()
}

type SyncCommiter struct {
offset offsetInfo
save SaveOffsetsFunc
}

func NewSyncCommiter(save SaveOffsetsFunc) *SyncCommiter {
return &SyncCommiter{save: save}
}

var _ Commiter = &SyncCommiter{}

func (a *SyncCommiter) Commit(event *pipeline.Event) {
a.offset.set(strings.Clone(event.Root.Dig("__CURSOR").AsString()))
a.save(a.offset)
}

func (a *SyncCommiter) Shutdown() {
// do nothing because we are saved the offsets in the commit func
}

type AsyncCommiter struct {
mu sync.Mutex

offset atomic.Pointer[offsetInfo]
debouncer Debouncer
save SaveOffsetsFunc
}

func NewAsyncCommiter(debouncer Debouncer, save SaveOffsetsFunc) *AsyncCommiter {
commiter := &AsyncCommiter{debouncer: debouncer, save: save}
commiter.offset.Store(&offsetInfo{})
return commiter
}

var _ Commiter = &AsyncCommiter{}

func (a *AsyncCommiter) Commit(event *pipeline.Event) {
offInfo := *a.offset.Load()
offInfo.set(strings.Clone(event.Root.Dig("__CURSOR").AsString()))
a.offset.Store(&offInfo)

// save offsets
a.mu.Lock()
defer a.mu.Unlock()

a.debouncer.Do(func() {
a.save(offInfo)
})
}

func (a *AsyncCommiter) Shutdown() {
a.mu.Lock()
defer a.mu.Unlock()

a.debouncer.Do(func() {
a.save(*a.offset.Load())
})
}
26 changes: 26 additions & 0 deletions plugin/input/journalctl/debouncer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package journalctl

import (
"time"
)

type Debouncer struct {
lastCall time.Time
// interval of time during which only 1 Do can be called
interval time.Duration
}

func NewDebouncer(interval time.Duration) Debouncer {
return Debouncer{interval: interval}
}

func (d *Debouncer) Do(cb func()) {
if d.Ready() {
cb()
d.lastCall = time.Now()
}
}

func (d *Debouncer) Ready() bool {
return time.Since(d.lastCall) > d.interval
}
59 changes: 45 additions & 14 deletions plugin/input/journalctl/journalctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
package journalctl

import (
"sync/atomic"
"time"

"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/fd"
"github.com/ozontech/file.d/metric"
"github.com/ozontech/file.d/offset"
Expand All @@ -21,21 +22,29 @@ type Plugin struct {
params *pipeline.InputPluginParams
config *Config
reader *journalReader
offInfo atomic.Pointer[offsetInfo]
currentOffset int64
logger *zap.Logger

commiter Commiter

// plugin metrics

offsetErrorsMetric *prometheus.CounterVec
journalCtlStopErrorMetric *prometheus.CounterVec
readerErrorsMetric *prometheus.CounterVec
}

type Config struct {
// ! config-params
// ^ config-params
type persistenceMode int

const (
// ! "persistenceMode" #1 /`([a-z]+)`/
persistenceModeAsync persistenceMode = iota // * `async` – it periodically saves the offsets using `async_interval`. The saving operation is skipped if offsets haven't been changed. Suitable, in most cases, it guarantees at least one delivery and makes almost no overhead.
persistenceModeSync // * `sync` – saves offsets as part of event commitment. It's very slow but excludes the possibility of event duplication in extreme situations like power loss.
)

// ! config-params
// ^ config-params
type Config struct {
// > @3@4@5@6
// >
// > The filename to store offsets of processed messages.
Expand All @@ -51,6 +60,24 @@ type Config struct {

// for testing mostly
MaxLines int `json:"max_lines"`

// > @3@4@5@6
// >
// > It defines how to save the offsets file:
// > @persistenceMode|comment-list
// >
// > Save operation takes three steps:
// > * Write the temporary file with all offsets;
// > * Call `fsync()` on it;
// > * Rename the temporary file to the original one.
PersistenceMode string `json:"persistence_mode" default:"async" options:"async|sync"` // *
PersistenceMode_ persistenceMode

// > @3@4@5@6
// >
// > Offsets saving interval. Only used if `persistence_mode` is set to `async`.
AsyncInterval cfg.Duration `json:"async_interval" default:"1s" parse:"duration"`
AsyncInterval_ time.Duration
}

type offsetInfo struct {
Expand Down Expand Up @@ -91,7 +118,15 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.InputPluginPa
p.offsetErrorsMetric.WithLabelValues().Inc()
p.logger.Error("can't load offset file", zap.Error(err))
}
p.offInfo.Store(offInfo)

if p.config.PersistenceMode_ == persistenceModeAsync {
if p.config.AsyncInterval_ < 0 {
p.logger.Fatal("invalid async interval", zap.Duration("interval", p.config.AsyncInterval_))
}
p.commiter = NewAsyncCommiter(NewDebouncer(p.config.AsyncInterval_), p.sync)
} else {
p.commiter = NewSyncCommiter(p.sync)
}

readConfig := &journalReaderConfig{
output: p,
Expand Down Expand Up @@ -119,18 +154,14 @@ func (p *Plugin) Stop() {
p.logger.Error("can't stop journalctl cmd", zap.Error(err))
}

offsets := *p.offInfo.Load()
if err := offset.SaveYAML(p.config.OffsetsFile, offsets); err != nil {
p.offsetErrorsMetric.WithLabelValues().Inc()
p.logger.Error("can't save offset file", zap.Error(err))
}
p.commiter.Shutdown()
}

func (p *Plugin) Commit(event *pipeline.Event) {
offInfo := *p.offInfo.Load()
offInfo.set(pipeline.CloneString(event.Root.Dig("__CURSOR").AsString()))
p.offInfo.Store(&offInfo)
p.commiter.Commit(event)
}

func (p *Plugin) sync(offInfo offsetInfo) {
if err := offset.SaveYAML(p.config.OffsetsFile, offInfo); err != nil {
p.offsetErrorsMetric.WithLabelValues().Inc()
p.logger.Error("can't save offset file", zap.Error(err))
Expand Down
2 changes: 1 addition & 1 deletion plugin/output/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
if p.config.UseTopicField {
fieldValue := event.Root.Dig(p.config.TopicField).AsString()
if fieldValue != "" {
topic = pipeline.CloneString(fieldValue)
topic = strings.Clone(fieldValue)
}
}

Expand Down
3 changes: 2 additions & 1 deletion plugin/output/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -339,7 +340,7 @@ func (p *Plugin) Out(event *pipeline.Event) {

// getBucketName decides which s3 bucket shall receive event.
func (p *Plugin) getBucketName(event *pipeline.Event) string {
bucketName := pipeline.CloneString(event.Root.Dig(p.config.BucketEventField).AsString())
bucketName := strings.Clone(event.Root.Dig(p.config.BucketEventField).AsString())

// no BucketEventField in message, it's DefaultBucket, showtime
if bucketName == "" {
Expand Down