Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into 536-new-match-fields
Browse files Browse the repository at this point in the history
  • Loading branch information
HeadHunter483 committed Nov 27, 2023
2 parents be9b114 + 54cf379 commit 1666454
Show file tree
Hide file tree
Showing 30 changed files with 589 additions and 68 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ jobs:
cache: true

- name: Run Kafka
run: docker compose -f ./e2e/kafka_file/docker-compose-kafka.yml up -d
run: docker compose -f ./e2e/kafka_file/docker-compose.yml up -d

- name: Run Clickhouse
run: docker compose -f ./e2e/file_clickhouse/docker-compose.yml up -d
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ TBD: throughput on production servers.

**Input**: [dmesg](plugin/input/dmesg/README.md), [fake](plugin/input/fake/README.md), [file](plugin/input/file/README.md), [http](plugin/input/http/README.md), [journalctl](plugin/input/journalctl/README.md), [k8s](plugin/input/k8s/README.md), [kafka](plugin/input/kafka/README.md)

**Action**: [add_file_name](plugin/action/add_file_name/README.md), [add_host](plugin/action/add_host/README.md), [convert_date](plugin/action/convert_date/README.md), [convert_log_level](plugin/action/convert_log_level/README.md), [debug](plugin/action/debug/README.md), [discard](plugin/action/discard/README.md), [flatten](plugin/action/flatten/README.md), [join](plugin/action/join/README.md), [join_template](plugin/action/join_template/README.md), [json_decode](plugin/action/json_decode/README.md), [json_encode](plugin/action/json_encode/README.md), [keep_fields](plugin/action/keep_fields/README.md), [mask](plugin/action/mask/README.md), [modify](plugin/action/modify/README.md), [move](plugin/action/move/README.md), [parse_es](plugin/action/parse_es/README.md), [parse_re2](plugin/action/parse_re2/README.md), [remove_fields](plugin/action/remove_fields/README.md), [rename](plugin/action/rename/README.md), [set_time](plugin/action/set_time/README.md), [throttle](plugin/action/throttle/README.md)
**Action**: [add_file_name](plugin/action/add_file_name/README.md), [add_host](plugin/action/add_host/README.md), [convert_date](plugin/action/convert_date/README.md), [convert_log_level](plugin/action/convert_log_level/README.md), [debug](plugin/action/debug/README.md), [discard](plugin/action/discard/README.md), [flatten](plugin/action/flatten/README.md), [join](plugin/action/join/README.md), [join_template](plugin/action/join_template/README.md), [json_decode](plugin/action/json_decode/README.md), [json_encode](plugin/action/json_encode/README.md), [keep_fields](plugin/action/keep_fields/README.md), [mask](plugin/action/mask/README.md), [modify](plugin/action/modify/README.md), [move](plugin/action/move/README.md), [parse_es](plugin/action/parse_es/README.md), [parse_re2](plugin/action/parse_re2/README.md), [remove_fields](plugin/action/remove_fields/README.md), [rename](plugin/action/rename/README.md), [set_time](plugin/action/set_time/README.md), [split](plugin/action/split/README.md), [throttle](plugin/action/throttle/README.md)

**Output**: [clickhouse](plugin/output/clickhouse/README.md), [devnull](plugin/output/devnull/README.md), [elasticsearch](plugin/output/elasticsearch/README.md), [file](plugin/output/file/README.md), [gelf](plugin/output/gelf/README.md), [kafka](plugin/output/kafka/README.md), [postgres](plugin/output/postgres/README.md), [s3](plugin/output/s3/README.md), [splunk](plugin/output/splunk/README.md), [stdout](plugin/output/stdout/README.md)

Expand Down
1 change: 1 addition & 0 deletions _sidebar.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
- [remove_fields](plugin/action/remove_fields/README.md)
- [rename](plugin/action/rename/README.md)
- [set_time](plugin/action/set_time/README.md)
- [split](plugin/action/split/README.md)
- [throttle](plugin/action/throttle/README.md)

- Output
Expand Down
1 change: 1 addition & 0 deletions cmd/file.d/file.d.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
_ "github.com/ozontech/file.d/plugin/action/remove_fields"
_ "github.com/ozontech/file.d/plugin/action/rename"
_ "github.com/ozontech/file.d/plugin/action/set_time"
_ "github.com/ozontech/file.d/plugin/action/split"
_ "github.com/ozontech/file.d/plugin/action/throttle"
_ "github.com/ozontech/file.d/plugin/input/dmesg"
_ "github.com/ozontech/file.d/plugin/input/fake"
Expand Down
File renamed without changes.
22 changes: 22 additions & 0 deletions e2e/split_join/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
pipelines:
split_join:
settings:
event_timeout: 1h
capacity: 128
input:
type: file
offsets_op: reset
maintenance_interval: 1m
actions:
- type: debug
message: input event sample
- type: split
field: data
- type: join
field: message
start: '/^start/'
continue: '/^continue/'
- type: debug
message: output event sample
output:
type: kafka
19 changes: 19 additions & 0 deletions e2e/split_join/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package split_join

import (
"github.com/Shopify/sarama"
)

type handlerFunc func(message *sarama.ConsumerMessage)

func (h handlerFunc) Setup(_ sarama.ConsumerGroupSession) error { return nil }

func (h handlerFunc) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }

func (h handlerFunc) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
h(msg)
session.MarkMessage(msg, "")
}
return nil
}
120 changes: 120 additions & 0 deletions e2e/split_join/split_join.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package split_join

import (
"context"
"fmt"
"os"
"path"
"path/filepath"
"strings"
"testing"
"time"

"github.com/Shopify/sarama"
"github.com/ozontech/file.d/cfg"
"github.com/stretchr/testify/require"
)

const (
brokerHost = "localhost:9092"
group = "file_d_test_split_join_client"

arrayLen = 4
sample = `{ "data": [ { "first": "1" }, { "message": "start " }, { "message": "continue" }, { "second": "2" }, { "third": "3" } ] }`

messages = 10
)

type Config struct {
inputDir string
consumer sarama.ConsumerGroup
topic string
}

func (c *Config) Configure(t *testing.T, conf *cfg.Config, pipelineName string) {
r := require.New(t)

c.inputDir = t.TempDir()
offsetsDir := t.TempDir()
c.topic = fmt.Sprintf("file_d_test_split_join_%d", time.Now().UnixNano())
t.Logf("generated topic: %s", c.topic)

input := conf.Pipelines[pipelineName].Raw.Get("input")
input.Set("watching_dir", c.inputDir)
input.Set("filename_pattern", "input.log")
input.Set("offsets_file", filepath.Join(offsetsDir, "offsets.yaml"))

output := conf.Pipelines[pipelineName].Raw.Get("output")
output.Set("brokers", []string{brokerHost})
output.Set("default_topic", c.topic)

addrs := []string{brokerHost}
config := sarama.NewConfig()
config.Consumer.Offsets.Initial = sarama.OffsetOldest

admin, err := sarama.NewClusterAdmin(addrs, config)
r.NoError(err)
r.NoError(admin.CreateTopic(c.topic, &sarama.TopicDetail{
NumPartitions: 1,
ReplicationFactor: 1,
}, false))

c.consumer, err = sarama.NewConsumerGroup(addrs, group, config)
r.NoError(err)
}

func (c *Config) Send(t *testing.T) {
file, err := os.Create(path.Join(c.inputDir, "input.log"))
require.NoError(t, err)
defer func(file *os.File) {
_ = file.Close()
}(file)

for i := 0; i < messages; i++ {
_, err = file.WriteString(sample + "\n")
require.NoError(t, err)
}
}

func (c *Config) Validate(t *testing.T) {
r := require.New(t)

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

expectedEventsCount := messages * arrayLen

strBuilder := strings.Builder{}
gotEvents := 0
done := make(chan struct{})

go func() {
r.NoError(c.consumer.Consume(ctx, []string{c.topic}, handlerFunc(func(msg *sarama.ConsumerMessage) {
strBuilder.Write(msg.Value)
strBuilder.WriteString("\n")
gotEvents++
if gotEvents == expectedEventsCount {
close(done)
}
})))
}()

select {
case <-done:
case <-ctx.Done():
r.Failf("test timed out", "got: %v, expected: %v, consumed: %s", gotEvents, expectedEventsCount, strBuilder.String())

Check warning on line 105 in e2e/split_join/split_join.go

View check run for this annotation

Codecov / codecov/patch

e2e/split_join/split_join.go#L104-L105

Added lines #L104 - L105 were not covered by tests
}

got := strBuilder.String()

expected := strings.Repeat(`{"first":"1"}
{"message":"start continue"}
{"second":"2"}
{"third":"3"}
`,
messages)

r.Equal(len(expected), len(got))
r.Equal(expected, got)
r.Equal(expectedEventsCount, gotEvents)
}
7 changes: 7 additions & 0 deletions e2e/start_work_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/ozontech/file.d/e2e/http_file"
"github.com/ozontech/file.d/e2e/join_throttle"
"github.com/ozontech/file.d/e2e/kafka_file"
"github.com/ozontech/file.d/e2e/split_join"
"github.com/ozontech/file.d/fd"
_ "github.com/ozontech/file.d/plugin/action/add_host"
_ "github.com/ozontech/file.d/plugin/action/convert_date"
Expand All @@ -34,6 +35,7 @@ import (
_ "github.com/ozontech/file.d/plugin/action/remove_fields"
_ "github.com/ozontech/file.d/plugin/action/rename"
_ "github.com/ozontech/file.d/plugin/action/set_time"
_ "github.com/ozontech/file.d/plugin/action/split"
_ "github.com/ozontech/file.d/plugin/action/throttle"
_ "github.com/ozontech/file.d/plugin/input/dmesg"
_ "github.com/ozontech/file.d/plugin/input/fake"
Expand Down Expand Up @@ -109,6 +111,11 @@ func TestE2EStabilityWorkCase(t *testing.T) {
},
cfgPath: "./join_throttle/config.yml",
},
{
name: "split_join",
e2eTest: &split_join.Config{},
cfgPath: "./split_join/config.yml",
},
{
name: "file_clickhouse",
e2eTest: &file_clickhouse.Config{},
Expand Down
55 changes: 43 additions & 12 deletions pipeline/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ const (
)

type Batch struct {
Events []*Event
events []*Event
// hasIterableEvents is the truth if the Batch contains iterable events
hasIterableEvents bool

// eventsSize contains total size of the Events in bytes
eventsSize int
Expand All @@ -34,6 +36,13 @@ type Batch struct {
status BatchStatus
}

func NewPreparedBatch(events []*Event) *Batch {
b := &Batch{}
b.reset()
b.events = events
return b
}

func newBatch(maxSizeCount, maxSizeBytes int, timeout time.Duration) *Batch {
if maxSizeCount < 0 {
logger.Fatalf("why batch max count less than 0?")
Expand All @@ -45,30 +54,41 @@ func newBatch(maxSizeCount, maxSizeBytes int, timeout time.Duration) *Batch {
logger.Fatalf("batch limits are not set")
}

return &Batch{
b := &Batch{
maxSizeCount: maxSizeCount,
maxSizeBytes: maxSizeBytes,
timeout: timeout,
Events: make([]*Event, 0, maxSizeCount),
events: make([]*Event, 0, maxSizeCount),
}
b.reset()

return b
}

func (b *Batch) reset() {
b.Events = b.Events[:0]
b.events = b.events[:0]
b.eventsSize = 0
b.status = BatchStatusNotReady
b.hasIterableEvents = false
b.startTime = time.Now()
}

func (b *Batch) append(e *Event) {
b.Events = append(b.Events, e)
b.hasIterableEvents = b.hasIterableEvents || !e.IsChildParentKind()

b.events = append(b.events, e)
b.eventsSize += e.Size
}

func (b *Batch) updateStatus() BatchStatus {
l := len(b.Events)
l := len(b.events)
if len(b.events) == 0 {
// batch is empty
return BatchStatusNotReady
}

switch {
case (b.maxSizeCount != 0 && l == b.maxSizeCount) || (b.maxSizeBytes != 0 && b.maxSizeBytes <= b.eventsSize):
case (b.maxSizeCount != 0 && l >= b.maxSizeCount) || (b.maxSizeBytes != 0 && b.maxSizeBytes <= b.eventsSize):
b.status = BatchStatusMaxSizeExceeded
case l > 0 && time.Since(b.startTime) > b.timeout:
b.status = BatchStatusTimeoutExceeded
Expand All @@ -78,6 +98,15 @@ func (b *Batch) updateStatus() BatchStatus {
return b.status
}

func (b *Batch) ForEach(cb func(event *Event)) {
for _, event := range b.events {
if event.IsChildParentKind() {
continue
}
cb(event)
}
}

type Batcher struct {
opts BatcherOptions

Expand Down Expand Up @@ -171,9 +200,11 @@ func (b *Batcher) work() {
for batch := range b.fullBatches {
b.workersInProgress.Inc()

now := time.Now()
b.opts.OutFn(&data, batch)
b.batchOutFnSeconds.Observe(time.Since(now).Seconds())
if batch.hasIterableEvents {
now := time.Now()
b.opts.OutFn(&data, batch)
b.batchOutFnSeconds.Observe(time.Since(now).Seconds())
}

status := b.commitBatch(batch)

Expand Down Expand Up @@ -213,8 +244,8 @@ func (b *Batcher) commitBatch(batch *Batch) BatchStatus {
b.commitSeq++
b.commitWaitingSeconds.Observe(time.Since(now).Seconds())

for i := range batch.Events {
b.opts.Controller.Commit(batch.Events[i])
for i := range batch.events {
b.opts.Controller.Commit(batch.events[i])
}

status := batch.status
Expand Down
Loading

0 comments on commit 1666454

Please sign in to comment.