diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 769f03555..8f9cc5d57 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -69,7 +69,7 @@ jobs:
cache: true
- name: Run Kafka
- run: docker compose -f ./e2e/kafka_file/docker-compose-kafka.yml up -d
+ run: docker compose -f ./e2e/kafka_file/docker-compose.yml up -d
- name: Run Clickhouse
run: docker compose -f ./e2e/file_clickhouse/docker-compose.yml up -d
diff --git a/README.md b/README.md
index 4a3c25363..f073524b3 100755
--- a/README.md
+++ b/README.md
@@ -42,7 +42,7 @@ TBD: throughput on production servers.
**Input**: [dmesg](plugin/input/dmesg/README.md), [fake](plugin/input/fake/README.md), [file](plugin/input/file/README.md), [http](plugin/input/http/README.md), [journalctl](plugin/input/journalctl/README.md), [k8s](plugin/input/k8s/README.md), [kafka](plugin/input/kafka/README.md)
-**Action**: [add_file_name](plugin/action/add_file_name/README.md), [add_host](plugin/action/add_host/README.md), [convert_date](plugin/action/convert_date/README.md), [convert_log_level](plugin/action/convert_log_level/README.md), [debug](plugin/action/debug/README.md), [discard](plugin/action/discard/README.md), [flatten](plugin/action/flatten/README.md), [join](plugin/action/join/README.md), [join_template](plugin/action/join_template/README.md), [json_decode](plugin/action/json_decode/README.md), [json_encode](plugin/action/json_encode/README.md), [keep_fields](plugin/action/keep_fields/README.md), [mask](plugin/action/mask/README.md), [modify](plugin/action/modify/README.md), [move](plugin/action/move/README.md), [parse_es](plugin/action/parse_es/README.md), [parse_re2](plugin/action/parse_re2/README.md), [remove_fields](plugin/action/remove_fields/README.md), [rename](plugin/action/rename/README.md), [set_time](plugin/action/set_time/README.md), [throttle](plugin/action/throttle/README.md)
+**Action**: [add_file_name](plugin/action/add_file_name/README.md), [add_host](plugin/action/add_host/README.md), [convert_date](plugin/action/convert_date/README.md), [convert_log_level](plugin/action/convert_log_level/README.md), [debug](plugin/action/debug/README.md), [discard](plugin/action/discard/README.md), [flatten](plugin/action/flatten/README.md), [join](plugin/action/join/README.md), [join_template](plugin/action/join_template/README.md), [json_decode](plugin/action/json_decode/README.md), [json_encode](plugin/action/json_encode/README.md), [keep_fields](plugin/action/keep_fields/README.md), [mask](plugin/action/mask/README.md), [modify](plugin/action/modify/README.md), [move](plugin/action/move/README.md), [parse_es](plugin/action/parse_es/README.md), [parse_re2](plugin/action/parse_re2/README.md), [remove_fields](plugin/action/remove_fields/README.md), [rename](plugin/action/rename/README.md), [set_time](plugin/action/set_time/README.md), [split](plugin/action/split/README.md), [throttle](plugin/action/throttle/README.md)
**Output**: [clickhouse](plugin/output/clickhouse/README.md), [devnull](plugin/output/devnull/README.md), [elasticsearch](plugin/output/elasticsearch/README.md), [file](plugin/output/file/README.md), [gelf](plugin/output/gelf/README.md), [kafka](plugin/output/kafka/README.md), [postgres](plugin/output/postgres/README.md), [s3](plugin/output/s3/README.md), [splunk](plugin/output/splunk/README.md), [stdout](plugin/output/stdout/README.md)
diff --git a/_sidebar.md b/_sidebar.md
index 2d9cab01b..3b3716536 100644
--- a/_sidebar.md
+++ b/_sidebar.md
@@ -42,6 +42,7 @@
- [remove_fields](plugin/action/remove_fields/README.md)
- [rename](plugin/action/rename/README.md)
- [set_time](plugin/action/set_time/README.md)
+ - [split](plugin/action/split/README.md)
- [throttle](plugin/action/throttle/README.md)
- Output
diff --git a/cmd/file.d/file.d.go b/cmd/file.d/file.d.go
index ae0fb18f8..898be2c0d 100644
--- a/cmd/file.d/file.d.go
+++ b/cmd/file.d/file.d.go
@@ -34,6 +34,7 @@ import (
_ "github.com/ozontech/file.d/plugin/action/remove_fields"
_ "github.com/ozontech/file.d/plugin/action/rename"
_ "github.com/ozontech/file.d/plugin/action/set_time"
+ _ "github.com/ozontech/file.d/plugin/action/split"
_ "github.com/ozontech/file.d/plugin/action/throttle"
_ "github.com/ozontech/file.d/plugin/input/dmesg"
_ "github.com/ozontech/file.d/plugin/input/fake"
diff --git a/e2e/kafka_file/docker-compose-kafka.yml b/e2e/kafka_file/docker-compose.yml
similarity index 100%
rename from e2e/kafka_file/docker-compose-kafka.yml
rename to e2e/kafka_file/docker-compose.yml
diff --git a/e2e/split_join/config.yml b/e2e/split_join/config.yml
new file mode 100644
index 000000000..5aac2602c
--- /dev/null
+++ b/e2e/split_join/config.yml
@@ -0,0 +1,22 @@
+pipelines:
+ split_join:
+ settings:
+ event_timeout: 1h
+ capacity: 128
+ input:
+ type: file
+ offsets_op: reset
+ maintenance_interval: 1m
+ actions:
+ - type: debug
+ message: input event sample
+ - type: split
+ field: data
+ - type: join
+ field: message
+ start: '/^start/'
+ continue: '/^continue/'
+ - type: debug
+ message: output event sample
+ output:
+ type: kafka
diff --git a/e2e/split_join/handler.go b/e2e/split_join/handler.go
new file mode 100644
index 000000000..bd102d28e
--- /dev/null
+++ b/e2e/split_join/handler.go
@@ -0,0 +1,19 @@
+package split_join
+
+import (
+ "github.com/Shopify/sarama"
+)
+
+type handlerFunc func(message *sarama.ConsumerMessage)
+
+func (h handlerFunc) Setup(_ sarama.ConsumerGroupSession) error { return nil }
+
+func (h handlerFunc) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
+
+func (h handlerFunc) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
+ for msg := range claim.Messages() {
+ h(msg)
+ session.MarkMessage(msg, "")
+ }
+ return nil
+}
diff --git a/e2e/split_join/split_join.go b/e2e/split_join/split_join.go
new file mode 100644
index 000000000..74d3be0ab
--- /dev/null
+++ b/e2e/split_join/split_join.go
@@ -0,0 +1,120 @@
+package split_join
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "path"
+ "path/filepath"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/Shopify/sarama"
+ "github.com/ozontech/file.d/cfg"
+ "github.com/stretchr/testify/require"
+)
+
+const (
+ brokerHost = "localhost:9092"
+ group = "file_d_test_split_join_client"
+
+ arrayLen = 4
+ sample = `{ "data": [ { "first": "1" }, { "message": "start " }, { "message": "continue" }, { "second": "2" }, { "third": "3" } ] }`
+
+ messages = 10
+)
+
+type Config struct {
+ inputDir string
+ consumer sarama.ConsumerGroup
+ topic string
+}
+
+func (c *Config) Configure(t *testing.T, conf *cfg.Config, pipelineName string) {
+ r := require.New(t)
+
+ c.inputDir = t.TempDir()
+ offsetsDir := t.TempDir()
+ c.topic = fmt.Sprintf("file_d_test_split_join_%d", time.Now().UnixNano())
+ t.Logf("generated topic: %s", c.topic)
+
+ input := conf.Pipelines[pipelineName].Raw.Get("input")
+ input.Set("watching_dir", c.inputDir)
+ input.Set("filename_pattern", "input.log")
+ input.Set("offsets_file", filepath.Join(offsetsDir, "offsets.yaml"))
+
+ output := conf.Pipelines[pipelineName].Raw.Get("output")
+ output.Set("brokers", []string{brokerHost})
+ output.Set("default_topic", c.topic)
+
+ addrs := []string{brokerHost}
+ config := sarama.NewConfig()
+ config.Consumer.Offsets.Initial = sarama.OffsetOldest
+
+ admin, err := sarama.NewClusterAdmin(addrs, config)
+ r.NoError(err)
+ r.NoError(admin.CreateTopic(c.topic, &sarama.TopicDetail{
+ NumPartitions: 1,
+ ReplicationFactor: 1,
+ }, false))
+
+ c.consumer, err = sarama.NewConsumerGroup(addrs, group, config)
+ r.NoError(err)
+}
+
+func (c *Config) Send(t *testing.T) {
+ file, err := os.Create(path.Join(c.inputDir, "input.log"))
+ require.NoError(t, err)
+ defer func(file *os.File) {
+ _ = file.Close()
+ }(file)
+
+ for i := 0; i < messages; i++ {
+ _, err = file.WriteString(sample + "\n")
+ require.NoError(t, err)
+ }
+}
+
+func (c *Config) Validate(t *testing.T) {
+ r := require.New(t)
+
+ ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
+ defer cancel()
+
+ expectedEventsCount := messages * arrayLen
+
+ strBuilder := strings.Builder{}
+ gotEvents := 0
+ done := make(chan struct{})
+
+ go func() {
+ r.NoError(c.consumer.Consume(ctx, []string{c.topic}, handlerFunc(func(msg *sarama.ConsumerMessage) {
+ strBuilder.Write(msg.Value)
+ strBuilder.WriteString("\n")
+ gotEvents++
+ if gotEvents == expectedEventsCount {
+ close(done)
+ }
+ })))
+ }()
+
+ select {
+ case <-done:
+ case <-ctx.Done():
+ r.Failf("test timed out", "got: %v, expected: %v, consumed: %s", gotEvents, expectedEventsCount, strBuilder.String())
+ }
+
+ got := strBuilder.String()
+
+ expected := strings.Repeat(`{"first":"1"}
+{"message":"start continue"}
+{"second":"2"}
+{"third":"3"}
+`,
+ messages)
+
+ r.Equal(len(expected), len(got))
+ r.Equal(expected, got)
+ r.Equal(expectedEventsCount, gotEvents)
+}
diff --git a/e2e/start_work_test.go b/e2e/start_work_test.go
index 0d9e08b7e..eb92039ec 100644
--- a/e2e/start_work_test.go
+++ b/e2e/start_work_test.go
@@ -15,6 +15,7 @@ import (
"github.com/ozontech/file.d/e2e/http_file"
"github.com/ozontech/file.d/e2e/join_throttle"
"github.com/ozontech/file.d/e2e/kafka_file"
+ "github.com/ozontech/file.d/e2e/split_join"
"github.com/ozontech/file.d/fd"
_ "github.com/ozontech/file.d/plugin/action/add_host"
_ "github.com/ozontech/file.d/plugin/action/convert_date"
@@ -34,6 +35,7 @@ import (
_ "github.com/ozontech/file.d/plugin/action/remove_fields"
_ "github.com/ozontech/file.d/plugin/action/rename"
_ "github.com/ozontech/file.d/plugin/action/set_time"
+ _ "github.com/ozontech/file.d/plugin/action/split"
_ "github.com/ozontech/file.d/plugin/action/throttle"
_ "github.com/ozontech/file.d/plugin/input/dmesg"
_ "github.com/ozontech/file.d/plugin/input/fake"
@@ -109,6 +111,11 @@ func TestE2EStabilityWorkCase(t *testing.T) {
},
cfgPath: "./join_throttle/config.yml",
},
+ {
+ name: "split_join",
+ e2eTest: &split_join.Config{},
+ cfgPath: "./split_join/config.yml",
+ },
{
name: "file_clickhouse",
e2eTest: &file_clickhouse.Config{},
diff --git a/pipeline/batch.go b/pipeline/batch.go
index e06637941..5b3a2dec7 100644
--- a/pipeline/batch.go
+++ b/pipeline/batch.go
@@ -19,7 +19,9 @@ const (
)
type Batch struct {
- Events []*Event
+ events []*Event
+ // hasIterableEvents is the truth if the Batch contains iterable events
+ hasIterableEvents bool
// eventsSize contains total size of the Events in bytes
eventsSize int
@@ -34,6 +36,13 @@ type Batch struct {
status BatchStatus
}
+func NewPreparedBatch(events []*Event) *Batch {
+ b := &Batch{}
+ b.reset()
+ b.events = events
+ return b
+}
+
func newBatch(maxSizeCount, maxSizeBytes int, timeout time.Duration) *Batch {
if maxSizeCount < 0 {
logger.Fatalf("why batch max count less than 0?")
@@ -45,30 +54,41 @@ func newBatch(maxSizeCount, maxSizeBytes int, timeout time.Duration) *Batch {
logger.Fatalf("batch limits are not set")
}
- return &Batch{
+ b := &Batch{
maxSizeCount: maxSizeCount,
maxSizeBytes: maxSizeBytes,
timeout: timeout,
- Events: make([]*Event, 0, maxSizeCount),
+ events: make([]*Event, 0, maxSizeCount),
}
+ b.reset()
+
+ return b
}
func (b *Batch) reset() {
- b.Events = b.Events[:0]
+ b.events = b.events[:0]
b.eventsSize = 0
b.status = BatchStatusNotReady
+ b.hasIterableEvents = false
b.startTime = time.Now()
}
func (b *Batch) append(e *Event) {
- b.Events = append(b.Events, e)
+ b.hasIterableEvents = b.hasIterableEvents || !e.IsChildParentKind()
+
+ b.events = append(b.events, e)
b.eventsSize += e.Size
}
func (b *Batch) updateStatus() BatchStatus {
- l := len(b.Events)
+ l := len(b.events)
+ if len(b.events) == 0 {
+ // batch is empty
+ return BatchStatusNotReady
+ }
+
switch {
- case (b.maxSizeCount != 0 && l == b.maxSizeCount) || (b.maxSizeBytes != 0 && b.maxSizeBytes <= b.eventsSize):
+ case (b.maxSizeCount != 0 && l >= b.maxSizeCount) || (b.maxSizeBytes != 0 && b.maxSizeBytes <= b.eventsSize):
b.status = BatchStatusMaxSizeExceeded
case l > 0 && time.Since(b.startTime) > b.timeout:
b.status = BatchStatusTimeoutExceeded
@@ -78,6 +98,15 @@ func (b *Batch) updateStatus() BatchStatus {
return b.status
}
+func (b *Batch) ForEach(cb func(event *Event)) {
+ for _, event := range b.events {
+ if event.IsChildParentKind() {
+ continue
+ }
+ cb(event)
+ }
+}
+
type Batcher struct {
opts BatcherOptions
@@ -171,9 +200,11 @@ func (b *Batcher) work() {
for batch := range b.fullBatches {
b.workersInProgress.Inc()
- now := time.Now()
- b.opts.OutFn(&data, batch)
- b.batchOutFnSeconds.Observe(time.Since(now).Seconds())
+ if batch.hasIterableEvents {
+ now := time.Now()
+ b.opts.OutFn(&data, batch)
+ b.batchOutFnSeconds.Observe(time.Since(now).Seconds())
+ }
status := b.commitBatch(batch)
@@ -213,8 +244,8 @@ func (b *Batcher) commitBatch(batch *Batch) BatchStatus {
b.commitSeq++
b.commitWaitingSeconds.Observe(time.Since(now).Seconds())
- for i := range batch.Events {
- b.opts.Controller.Commit(batch.Events[i])
+ for i := range batch.events {
+ b.opts.Controller.Commit(batch.events[i])
}
status := batch.status
diff --git a/pipeline/event.go b/pipeline/event.go
index 2631fed0d..97639d6a2 100644
--- a/pipeline/event.go
+++ b/pipeline/event.go
@@ -14,6 +14,8 @@ import (
type Event struct {
kind Kind
+ children []*Event
+
Root *insaneJSON.Root
Buf []byte
@@ -33,17 +35,19 @@ type Event struct {
}
const (
- eventStagePool = 0
- eventStageInput = 1
- eventStageStream = 2
- eventStageProcessor = 3
- eventStageOutput = 4
+ eventStagePool = iota
+ eventStageInput
+ eventStageStream
+ eventStageProcessor
+ eventStageOutput
)
type Kind byte
const (
EventKindRegular Kind = iota
+ eventKindChild
+ eventKindChildParent
EventKindTimeout
EventKindUnlock
)
@@ -54,6 +58,10 @@ func (k Kind) String() string {
return "REGULAR"
case EventKindTimeout:
return "TIMEOUT"
+ case eventKindChildParent:
+ return "PARENT"
+ case eventKindChild:
+ return "CHILD"
case EventKindUnlock:
return "UNLOCK"
}
@@ -117,6 +125,7 @@ func (e *Event) reset(avgEventSize int) {
e.next = nil
e.action = 0
e.stream = nil
+ e.children = e.children[:0]
e.kind = EventKindRegular
}
@@ -148,6 +157,22 @@ func (e *Event) IsTimeoutKind() bool {
return e.kind == EventKindTimeout
}
+func (e *Event) SetChildKind() {
+ e.kind = eventKindChild
+}
+
+func (e *Event) IsChildKind() bool {
+ return e.kind == eventKindChild
+}
+
+func (e *Event) SetChildParentKind() {
+ e.kind = eventKindChildParent
+}
+
+func (e *Event) IsChildParentKind() bool {
+ return e.kind == eventKindChildParent
+}
+
func (e *Event) parseJSON(json []byte) error {
return e.Root.DecodeBytes(json)
}
diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go
index d95a81c4c..239b3fa41 100644
--- a/pipeline/pipeline.go
+++ b/pipeline/pipeline.go
@@ -17,6 +17,7 @@ import (
"github.com/ozontech/file.d/metric"
"github.com/ozontech/file.d/pipeline/antispam"
"github.com/prometheus/client_golang/prometheus"
+ insaneJSON "github.com/vitkovskii/insane-json"
"go.uber.org/atomic"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
@@ -52,6 +53,7 @@ type InputPluginController interface {
type ActionPluginController interface {
Propagate(event *Event) // throw held event back to pipeline
+ Spawn(parent *Event, nodes []*insaneJSON.Node)
}
type OutputPluginController interface {
@@ -503,7 +505,7 @@ func (p *Pipeline) Error(err string) {
}
func (p *Pipeline) finalize(event *Event, notifyInput bool, backEvent bool) {
- if event.IsTimeoutKind() {
+ if event.IsTimeoutKind() || event.IsChildKind() {
return
}
@@ -526,6 +528,9 @@ func (p *Pipeline) finalize(event *Event, notifyInput bool, backEvent bool) {
p.eventLogMu.Unlock()
}
+ for _, e := range event.children {
+ insaneJSON.Release(e.Root)
+ }
p.eventPool.back(event)
}
diff --git a/pipeline/processor.go b/pipeline/processor.go
index e7543395c..492e3f6f8 100644
--- a/pipeline/processor.go
+++ b/pipeline/processor.go
@@ -2,6 +2,7 @@ package pipeline
import (
"github.com/ozontech/file.d/logger"
+ insaneJSON "github.com/vitkovskii/insane-json"
"go.uber.org/atomic"
"go.uber.org/zap"
)
@@ -10,17 +11,19 @@ type ActionResult int
const (
// ActionPass pass event to the next action in a pipeline
- ActionPass ActionResult = 0
+ ActionPass ActionResult = iota
// ActionCollapse skip further processing of event and request next event from the same stream and source as current
// plugin may receive event with EventKindTimeout if it takes to long to read next event from same stream
- ActionCollapse ActionResult = 2
+ ActionCollapse
// ActionDiscard skip further processing of event and request next event from any stream and source
- ActionDiscard ActionResult = 1
+ ActionDiscard
// ActionHold hold event in a plugin and request next event from the same stream and source as current.
// same as ActionCollapse but held event should be manually committed or returned into pipeline.
// check out Commit()/Propagate() functions in InputPluginController.
// plugin may receive event with EventKindTimeout if it takes to long to read next event from same stream.
- ActionHold ActionResult = 3
+ ActionHold
+ // ActionBreak abort the event processing and pass it to an output.
+ ActionBreak
)
type eventStatus string
@@ -32,6 +35,7 @@ const (
eventStatusDiscarded eventStatus = "discarded"
eventStatusCollapse eventStatus = "collapsed"
eventStatusHold eventStatus = "held"
+ eventStatusBroke eventStatus = "broke"
)
func allEventStatuses() []eventStatus {
@@ -187,11 +191,17 @@ func (p *processor) doActions(event *Event) (isPassed bool, lastAction int) {
p.actionWatcher.setEventBefore(index, event)
- switch action.Do(event) {
+ result := action.Do(event)
+ switch result {
case ActionPass:
p.countEvent(event, index, eventStatusPassed)
p.tryResetBusy(index)
p.actionWatcher.setEventAfter(index, event, eventStatusPassed)
+ case ActionBreak:
+ p.countEvent(event, index, eventStatusBroke)
+ p.tryResetBusy(index)
+ p.actionWatcher.setEventAfter(index, event, eventStatusBroke)
+ return true, index
case ActionDiscard:
p.countEvent(event, index, eventStatusDiscarded)
p.tryResetBusy(index)
@@ -346,6 +356,43 @@ func (p *processor) Propagate(event *Event) {
p.processSequence(event)
}
+// Spawn the children of the parent and process in the actions.
+// Any attempts to ActionHold or ActionCollapse the event will be suppressed by timeout events.
+func (p *processor) Spawn(parent *Event, nodes []*insaneJSON.Node) {
+ parent.SetChildParentKind()
+ nextActionIdx := parent.action + 1
+
+ for _, node := range nodes {
+ // we can't reuse parent event (using insaneJSON.Root{Node: child}
+ // because of nil decoder
+ child := &Event{Root: insaneJSON.Spawn()}
+ parent.children = append(parent.children, child)
+ child.Root.MutateToNode(node)
+ child.SetChildKind()
+ child.action = nextActionIdx
+
+ ok, _ := p.doActions(child)
+ if ok {
+ child.stage = eventStageOutput
+ p.output.Out(child)
+ }
+ }
+
+ if p.busyActionsTotal == 0 {
+ return
+ }
+
+ for i, busy := range p.busyActions {
+ if !busy {
+ continue
+ }
+
+ timeout := newTimeoutEvent(parent.stream)
+ timeout.action = i
+ p.doActions(timeout)
+ }
+}
+
func (p *processor) RecoverFromPanic() {
p.recoverFromPanic()
}
diff --git a/plugin/README.md b/plugin/README.md
index db6385eeb..16eb84962 100755
--- a/plugin/README.md
+++ b/plugin/README.md
@@ -495,6 +495,31 @@ The resulting event could look like:
It adds time field to the event.
[More details...](plugin/action/set_time/README.md)
+## split
+It splits array of objects into different events.
+
+For example:
+```json
+{
+ "data": [
+ { "message": "go" },
+ { "message": "rust" },
+ { "message": "c++" }
+ ]
+}
+```
+
+Split produces:
+```json
+{ "message": "go" },
+{ "message": "rust" },
+{ "message": "c++" }
+```
+
+Parent event will be discarded.
+If the value of the JSON field is not an array of objects, then the event will be pass unchanged.
+
+[More details...](plugin/action/split/README.md)
## throttle
It discards the events if pipeline throughput gets higher than a configured threshold.
diff --git a/plugin/action/README.md b/plugin/action/README.md
index 40374c9b5..6debdc504 100755
--- a/plugin/action/README.md
+++ b/plugin/action/README.md
@@ -330,6 +330,31 @@ The resulting event could look like:
It adds time field to the event.
[More details...](plugin/action/set_time/README.md)
+## split
+It splits array of objects into different events.
+
+For example:
+```json
+{
+ "data": [
+ { "message": "go" },
+ { "message": "rust" },
+ { "message": "c++" }
+ ]
+}
+```
+
+Split produces:
+```json
+{ "message": "go" },
+{ "message": "rust" },
+{ "message": "c++" }
+```
+
+Parent event will be discarded.
+If the value of the JSON field is not an array of objects, then the event will be pass unchanged.
+
+[More details...](plugin/action/split/README.md)
## throttle
It discards the events if pipeline throughput gets higher than a configured threshold.
diff --git a/plugin/action/split/README.idoc.md b/plugin/action/split/README.idoc.md
new file mode 100644
index 000000000..20f771399
--- /dev/null
+++ b/plugin/action/split/README.idoc.md
@@ -0,0 +1,5 @@
+# Host adding plugin
+@introduction
+
+### Config params
+@config-params|description
diff --git a/plugin/action/split/README.md b/plugin/action/split/README.md
new file mode 100755
index 000000000..6ebb83a5c
--- /dev/null
+++ b/plugin/action/split/README.md
@@ -0,0 +1,33 @@
+# Host adding plugin
+It splits array of objects into different events.
+
+For example:
+```json
+{
+ "data": [
+ { "message": "go" },
+ { "message": "rust" },
+ { "message": "c++" }
+ ]
+}
+```
+
+Split produces:
+```json
+{ "message": "go" },
+{ "message": "rust" },
+{ "message": "c++" }
+```
+
+Parent event will be discarded.
+If the value of the JSON field is not an array of objects, then the event will be pass unchanged.
+
+### Config params
+**`field`** *`cfg.FieldSelector`*
+
+Path to the array of objects.
+
+
+
+
+
*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*
\ No newline at end of file
diff --git a/plugin/action/split/split.go b/plugin/action/split/split.go
new file mode 100644
index 000000000..c0a7401fa
--- /dev/null
+++ b/plugin/action/split/split.go
@@ -0,0 +1,105 @@
+package split
+
+import (
+ "github.com/ozontech/file.d/cfg"
+ "github.com/ozontech/file.d/fd"
+ "github.com/ozontech/file.d/pipeline"
+ insaneJSON "github.com/vitkovskii/insane-json"
+ "go.uber.org/zap"
+)
+
+/*{ introduction
+It splits array of objects into different events.
+
+For example:
+```json
+{
+ "data": [
+ { "message": "go" },
+ { "message": "rust" },
+ { "message": "c++" }
+ ]
+}
+```
+
+Split produces:
+```json
+{ "message": "go" },
+{ "message": "rust" },
+{ "message": "c++" }
+```
+
+Parent event will be discarded.
+If the value of the JSON field is not an array of objects, then the event will be pass unchanged.
+}*/
+
+type Plugin struct {
+ config *Config
+ logger *zap.Logger
+ pluginController pipeline.ActionPluginController
+}
+
+// ! config-params
+// ^ config-params
+type Config struct {
+ // > @3@4@5@6
+ // >
+ // > Path to the array of objects.
+ Field cfg.FieldSelector `json:"field" parse:"selector"` // *
+ Field_ []string
+}
+
+func init() {
+ fd.DefaultPluginRegistry.RegisterAction(&pipeline.PluginStaticInfo{
+ Type: "split",
+ Factory: factory,
+ })
+}
+
+func factory() (pipeline.AnyPlugin, pipeline.AnyConfig) {
+ return &Plugin{}, &Config{}
+}
+
+func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginParams) {
+ p.config = config.(*Config)
+ p.logger = params.Logger.Desugar()
+ p.pluginController = params.Controller
+}
+
+func (p *Plugin) Stop() {
+}
+
+func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult {
+ if event.IsChildKind() {
+ return pipeline.ActionPass
+ }
+
+ data := event.Root.Dig(p.config.Field_...)
+ if data == nil {
+ return pipeline.ActionPass
+ }
+
+ if !data.IsArray() {
+ p.logger.Warn("skip an event because field is not an array", zap.String("type", data.TypeStr()))
+ return pipeline.ActionPass
+ }
+
+ nodeArray := data.AsArray()
+ children := make([]*insaneJSON.Node, 0, len(nodeArray))
+ for _, elem := range nodeArray {
+ if !elem.IsObject() {
+ p.logger.Warn("skip an event because it is not an object", zap.String("type", data.TypeStr()))
+ continue
+ }
+ children = append(children, elem)
+ }
+
+ if len(children) == 0 {
+ // zero array or an array that does not contain objects
+ return pipeline.ActionPass
+ }
+
+ p.pluginController.Spawn(event, children)
+
+ return pipeline.ActionBreak
+}
diff --git a/plugin/action/split/split_test.go b/plugin/action/split/split_test.go
new file mode 100644
index 000000000..58da1dcd4
--- /dev/null
+++ b/plugin/action/split/split_test.go
@@ -0,0 +1,54 @@
+package split
+
+import (
+ "strings"
+ "sync"
+ "testing"
+
+ "github.com/ozontech/file.d/pipeline"
+ "github.com/ozontech/file.d/test"
+ "github.com/stretchr/testify/require"
+)
+
+func TestPlugin_Do(t *testing.T) {
+ const field = "data"
+ config := test.NewConfig(&Config{Field: field}, nil)
+ p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, config, pipeline.MatchModeAnd, nil, false))
+ wg := &sync.WaitGroup{}
+
+ const children = 6
+ const parents = 2
+ const total = children + parents
+ wg.Add(total)
+
+ splitted := make([]string, 0)
+ output.SetOutFn(func(e *pipeline.Event) {
+ defer wg.Done()
+ if e.IsChildParentKind() {
+ return
+ }
+
+ splitted = append(splitted, strings.Clone(e.Root.Dig("message").AsString()))
+ })
+
+ input.In(0, "test.log", 0, []byte(`{
+ "data": [
+ { "message": "go" },
+ { "message": "rust" },
+ { "message": "c++" }
+ ]
+}`))
+ input.In(0, "test.log", 0, []byte(`{
+ "data": [
+ { "message": "python" },
+ { "message": "ruby" },
+ { "message": "js" }
+ ]
+}`))
+
+ wg.Wait()
+ p.Stop()
+
+ require.Equal(t, children, len(splitted))
+ require.Equal(t, []string{"go", "rust", "c++", "python", "ruby", "js"}, splitted)
+}
diff --git a/plugin/input/file/provider.go b/plugin/input/file/provider.go
index 8625292f9..7c90ce712 100644
--- a/plugin/input/file/provider.go
+++ b/plugin/input/file/provider.go
@@ -183,7 +183,7 @@ func (jp *jobProvider) commit(event *pipeline.Event) {
job.mu.Lock()
// commit offsets only not ignored AND regular events
- if !event.IsRegularKind() || event.SeqID <= job.ignoreEventsLE {
+ if (!event.IsRegularKind() && !event.IsChildParentKind()) || event.SeqID <= job.ignoreEventsLE {
job.mu.Unlock()
return
}
diff --git a/plugin/output/clickhouse/clickhouse.go b/plugin/output/clickhouse/clickhouse.go
index bfd3a39b6..626f47396 100644
--- a/plugin/output/clickhouse/clickhouse.go
+++ b/plugin/output/clickhouse/clickhouse.go
@@ -412,7 +412,7 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
data := (*workerData).(data)
data.reset()
- for _, event := range batch.Events {
+ batch.ForEach(func(event *pipeline.Event) {
for _, col := range data.cols {
node := event.Root.Dig(col.Name)
@@ -436,7 +436,7 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
}
}
}
- }
+ })
var err error
for try := 0; try < p.config.Retry; try++ {
diff --git a/plugin/output/elasticsearch/elasticsearch.go b/plugin/output/elasticsearch/elasticsearch.go
index 7b0ede2f7..e5380bbcd 100644
--- a/plugin/output/elasticsearch/elasticsearch.go
+++ b/plugin/output/elasticsearch/elasticsearch.go
@@ -252,9 +252,9 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
}
data.outBuf = data.outBuf[:0]
- for _, event := range batch.Events {
+ batch.ForEach(func(event *pipeline.Event) {
data.outBuf = p.appendEvent(data.outBuf, event)
- }
+ })
for {
if err := p.send(data.outBuf); err != nil {
diff --git a/plugin/output/file/file.go b/plugin/output/file/file.go
index 819086859..8311e08da 100644
--- a/plugin/output/file/file.go
+++ b/plugin/output/file/file.go
@@ -196,10 +196,10 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
outBuf := data.outBuf[:0]
- for _, event := range batch.Events {
+ batch.ForEach(func(event *pipeline.Event) {
outBuf, _ = event.Encode(outBuf)
outBuf = append(outBuf, byte('\n'))
- }
+ })
data.outBuf = outBuf
p.write(outBuf)
diff --git a/plugin/output/gelf/gelf.go b/plugin/output/gelf/gelf.go
index 05a39e18c..cabc9f533 100644
--- a/plugin/output/gelf/gelf.go
+++ b/plugin/output/gelf/gelf.go
@@ -236,11 +236,13 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
outBuf := data.outBuf[:0]
encodeBuf := data.encodeBuf[:0]
- for _, event := range batch.Events {
+
+ batch.ForEach(func(event *pipeline.Event) {
encodeBuf = p.formatEvent(encodeBuf, event)
outBuf, _ = event.Encode(outBuf)
outBuf = append(outBuf, byte(0))
- }
+ })
+
data.outBuf = outBuf
data.encodeBuf = encodeBuf
diff --git a/plugin/output/kafka/kafka.go b/plugin/output/kafka/kafka.go
index 07f856cf5..0aaff88d4 100644
--- a/plugin/output/kafka/kafka.go
+++ b/plugin/output/kafka/kafka.go
@@ -187,7 +187,8 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
outBuf := data.outBuf[:0]
start := 0
- for i, event := range batch.Events {
+ i := 0
+ batch.ForEach(func(event *pipeline.Event) {
outBuf, start = event.Encode(outBuf)
topic := p.config.DefaultTopic
@@ -203,11 +204,12 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
}
data.messages[i].Value = outBuf[start:]
data.messages[i].Topic = topic
- }
+ i++
+ })
data.outBuf = outBuf
- err := p.producer.SendMessages(data.messages[:len(batch.Events)])
+ err := p.producer.SendMessages(data.messages[:i])
if err != nil {
errs := err.(sarama.ProducerErrors)
for _, e := range errs {
diff --git a/plugin/output/kafka/kafka_test.go b/plugin/output/kafka/kafka_test.go
index 8cd658aba..552a494a3 100644
--- a/plugin/output/kafka/kafka_test.go
+++ b/plugin/output/kafka/kafka_test.go
@@ -122,11 +122,9 @@ func FuzzKafka(f *testing.F) {
t: t,
}
- data := pipeline.Batch{
- Events: []*pipeline.Event{
- newEvent(t, topicField, topicVal, key, val),
- },
- }
- p.out(&worker, &data)
+ data := pipeline.NewPreparedBatch([]*pipeline.Event{
+ newEvent(t, topicField, topicVal, key, val),
+ })
+ p.out(&worker, data)
})
}
diff --git a/plugin/output/postgres/postgres.go b/plugin/output/postgres/postgres.go
index fc6a730ba..db7cd120e 100644
--- a/plugin/output/postgres/postgres.go
+++ b/plugin/output/postgres/postgres.go
@@ -263,10 +263,10 @@ func (p *Plugin) out(_ *pipeline.WorkerData, batch *pipeline.Batch) {
uniqFields := p.queryBuilder.GetUniqueFields()
// Deduplicate events, pg can't do upsert with duplication.
- uniqueEventsMap := make(map[string]struct{}, len(batch.Events))
+ uniqueEventsMap := make(map[string]struct{}, p.config.BatchSize_)
var anyValidValue bool
- for _, event := range batch.Events {
+ batch.ForEach(func(event *pipeline.Event) {
fieldValues, uniqueID, err := p.processEvent(event, pgFields, uniqFields)
if err != nil {
switch {
@@ -281,7 +281,7 @@ func (p *Plugin) out(_ *pipeline.WorkerData, batch *pipeline.Batch) {
p.logger.Fatalf("undefined error: %w", err)
}
- continue
+ return
}
// passes here only if event valid.
@@ -295,8 +295,7 @@ func (p *Plugin) out(_ *pipeline.WorkerData, batch *pipeline.Batch) {
builder = builder.Values(fieldValues...)
anyValidValue = true
}
- }
-
+ })
builder = builder.Suffix(p.queryBuilder.GetPostfix()).PlaceholderFormat(sq.Dollar)
// no valid events passed.
diff --git a/plugin/output/postgres/postgres_test.go b/plugin/output/postgres/postgres_test.go
index f9129d2f5..5c2f53159 100644
--- a/plugin/output/postgres/postgres_test.go
+++ b/plugin/output/postgres/postgres_test.go
@@ -92,7 +92,7 @@ func TestPrivateOut(t *testing.T) {
p.registerMetrics(metric.New("test", prometheus.NewRegistry()))
- batch := &pipeline.Batch{Events: []*pipeline.Event{{Root: root}}}
+ batch := pipeline.NewPreparedBatch([]*pipeline.Event{{Root: root}})
p.out(nil, batch)
}
@@ -167,7 +167,7 @@ func TestPrivateOutWithRetry(t *testing.T) {
p.registerMetrics(metric.New("test", prometheus.NewRegistry()))
- batch := &pipeline.Batch{Events: []*pipeline.Event{{Root: root}}}
+ batch := pipeline.NewPreparedBatch([]*pipeline.Event{{Root: root}})
p.out(nil, batch)
}
@@ -220,7 +220,7 @@ func TestPrivateOutNoGoodEvents(t *testing.T) {
p.registerMetrics(metric.New("test", prometheus.NewRegistry()))
- batch := &pipeline.Batch{Events: []*pipeline.Event{{Root: root}}}
+ batch := pipeline.NewPreparedBatch([]*pipeline.Event{{Root: root}})
p.out(nil, batch)
}
@@ -300,11 +300,11 @@ func TestPrivateOutDeduplicatedEvents(t *testing.T) {
p.registerMetrics(metric.New("test", prometheus.NewRegistry()))
- batch := &pipeline.Batch{Events: []*pipeline.Event{
+ batch := pipeline.NewPreparedBatch([]*pipeline.Event{
{Root: root},
{Root: rootDuplication},
{Root: rootDuplicationMore},
- }}
+ })
p.out(nil, batch)
}
@@ -366,7 +366,7 @@ func TestPrivateOutWrongTypeInField(t *testing.T) {
p.registerMetrics(metric.New("test", prometheus.NewRegistry()))
- batch := &pipeline.Batch{Events: []*pipeline.Event{{Root: root}}}
+ batch := pipeline.NewPreparedBatch([]*pipeline.Event{{Root: root}})
p.out(nil, batch)
}
@@ -471,13 +471,13 @@ func TestPrivateOutFewUniqueEventsYetWithDeduplicationEventsAnpooladEvents(t *te
p.registerMetrics(metric.New("test", prometheus.NewRegistry()))
- batch := &pipeline.Batch{Events: []*pipeline.Event{
+ batch := pipeline.NewPreparedBatch([]*pipeline.Event{
{Root: root},
{Root: rootDuplication},
{Root: rootDuplicationMore},
{Root: secondUniqueRoot},
{Root: badRoot},
- }}
+ })
p.out(nil, batch)
}
diff --git a/plugin/output/splunk/splunk.go b/plugin/output/splunk/splunk.go
index 25c8f7270..25c150014 100644
--- a/plugin/output/splunk/splunk.go
+++ b/plugin/output/splunk/splunk.go
@@ -152,11 +152,11 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
root := insaneJSON.Spawn()
outBuf := data.outBuf[:0]
- for _, event := range batch.Events {
+ batch.ForEach(func(event *pipeline.Event) {
root.AddField("event").MutateToNode(event.Root.Node)
outBuf = root.Encode(outBuf)
_ = root.DecodeString("{}")
- }
+ })
insaneJSON.Release(root)
data.outBuf = outBuf
diff --git a/plugin/output/splunk/splunk_test.go b/plugin/output/splunk/splunk_test.go
index fab7aa13a..2661ecf48 100644
--- a/plugin/output/splunk/splunk_test.go
+++ b/plugin/output/splunk/splunk_test.go
@@ -51,18 +51,13 @@ func TestSplunk(t *testing.T) {
logger: zap.NewExample().Sugar(),
}
- batch := pipeline.Batch{
- Events: []*pipeline.Event{
- {
- Root: input,
- },
- {
- Root: input,
- }},
- }
+ batch := pipeline.NewPreparedBatch([]*pipeline.Event{
+ {Root: input},
+ {Root: input},
+ })
data := pipeline.WorkerData(nil)
- plugin.out(&data, &batch)
+ plugin.out(&data, batch)
assert.Equal(t, testCase.expected+testCase.expected, string(response))
})