Skip to content

Commit

Permalink
Independent buffer flusher (#213)
Browse files Browse the repository at this point in the history
* Minimal working refactor

* Replace flush func with a more flexible clearer

* Move chunk size into buffer options

* Fix formatting

* Add docs for buffer configs

* Use Sprint() instead of string()
  • Loading branch information
camdencheek authored Nov 17, 2020
1 parent 9b77a31 commit 986cb26
Show file tree
Hide file tree
Showing 20 changed files with 592 additions and 379 deletions.
26 changes: 18 additions & 8 deletions docs/types/buffer.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,13 @@ cleanly, they will be saved to the agent's database.

### Memory Buffer Configuration

Memory buffers are configured by setting the `type` field of the `buffer` block on an output to `memory`. The only other
configurable field is `max_entries`, which is maximum number of entries that will be held in memory before blocking and
waiting for some entries to be flushed. The default value of `max_entries` is `1048576` (2^20).
Memory buffers are configured by setting the `type` field of the `buffer` block on an output to `memory`.

| Field | Default | Description |
| --- | --- | --- |
| `max_entries` | `1048576` (2^20) | The maximum number of entries stored in the memory buffer |
| `max_chunk_size` | 1000 | The maximum number of entries that are read from the buffer by default |
| `max_chunk_delay` | 1s | The maximum amount of time that a reader will wait to batch entries into a chunk |

Example:
```yaml
Expand All @@ -23,6 +27,8 @@ Example:
buffer:
type: memory
max_entries: 10000
max_chunk_delay: 1s
max_chunk_size: 1000
```
Expand All @@ -43,11 +49,13 @@ be logs that are lost or a corruption of the database.
Disk buffers are configured by setting the `type` field of the `buffer` block on an output to `disk`. Other fields are described below:

| Field | Default | Description |
| --- | --- | --- |
| `max_size` | `4294967296` (4GiB) | The maximum size of the disk buffer file in bytes |
| `path` | required | The path to the directory which will contain the disk buffer data |
| `sync` | `true` | Whether to open the database files with the O_SYNC flag. Disabling this improves performance, but relaxes guarantees about log delivery. |
| Field | Default | Description |
| --- | --- | --- |
| `max_size` | `4294967296` (4GiB) | The maximum size of the disk buffer file in bytes |
| `max_chunk_size` | 1000 | The maximum number of entries that are read from the buffer by default |
| `max_chunk_delay` | 1s | The maximum amount of time that a reader will wait to batch entries into a chunk |
| `path` | required | The path to the directory which will contain the disk buffer data |
| `sync` | `true` | Whether to open the database files with the O_SYNC flag. Disabling this improves performance, but relaxes guarantees about log delivery. |

Example:
```yaml
Expand All @@ -58,4 +66,6 @@ Example:
max_size: 10000000 # 10MB
path: /tmp/stanza_buffer
sync: true
max_chunk_delay: 1s
max_chunk_size: 1000
```
2 changes: 0 additions & 2 deletions docs/types/flusher.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,3 @@ Flushers are configured with the `flusher` block on output plugins.
| Field | Default | Description |
| --- | --- | --- |
| `max_concurrent` | `16` | The maximum number of goroutines flushing entries concurrently |
| `max_wait` | 1s | The maximum amount of time to wait for a chunk to fill before flushing it. Higher values can reduce load, but also increase delivery latency. |
| `max_chunk_entries` | 1000 | The maximum number of entries to flush in a single chunk. |
11 changes: 7 additions & 4 deletions operator/buffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ import (
// Buffer is an interface for an entry buffer
type Buffer interface {
Add(context.Context, *entry.Entry) error
Read([]*entry.Entry) (FlushFunc, int, error)
ReadWait(context.Context, []*entry.Entry) (FlushFunc, int, error)
Read([]*entry.Entry) (Clearer, int, error)
ReadWait(context.Context, []*entry.Entry) (Clearer, int, error)
ReadChunk(context.Context) ([]*entry.Entry, Clearer, error)
Close() error
}

Expand Down Expand Up @@ -73,5 +74,7 @@ func (bc Config) MarshalJSON() ([]byte, error) {
return json.Marshal(bc.Builder)
}

// FlushFunc is a function that can be called to mark the returned entries as flushed
type FlushFunc func() error
type Clearer interface {
MarkAllAsFlushed() error
MarkRangeAsFlushed(uint, uint) error
}
44 changes: 28 additions & 16 deletions operator/buffer/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package buffer
import (
"encoding/json"
"testing"
"time"

"github.com/observiq/stanza/operator/helper"
"github.com/stretchr/testify/require"
yaml "gopkg.in/yaml.v2"
)
Expand All @@ -22,8 +24,10 @@ func TestBufferUnmarshalYAML(t *testing.T) {
[]byte(`{"type": "memory", "max_entries": 30}`),
Config{
Builder: &MemoryBufferConfig{
Type: "memory",
MaxEntries: 30,
Type: "memory",
MaxEntries: 30,
MaxChunkDelay: helper.NewDuration(time.Second),
MaxChunkSize: 1000,
},
},
false,
Expand All @@ -34,10 +38,12 @@ func TestBufferUnmarshalYAML(t *testing.T) {
[]byte(`{"type": "disk", "max_size": 1234, "path": "/var/log/testpath"}`),
Config{
Builder: &DiskBufferConfig{
Type: "disk",
MaxSize: 1234,
Path: "/var/log/testpath",
Sync: true,
Type: "disk",
MaxSize: 1234,
Path: "/var/log/testpath",
Sync: true,
MaxChunkDelay: helper.NewDuration(time.Second),
MaxChunkSize: 1000,
},
},
false,
Expand All @@ -48,10 +54,12 @@ func TestBufferUnmarshalYAML(t *testing.T) {
[]byte(`{"type": "invalid"}`),
Config{
Builder: &DiskBufferConfig{
Type: "disk",
MaxSize: 1234,
Path: "/var/log/testpath",
Sync: true,
Type: "disk",
MaxSize: 1234,
Path: "/var/log/testpath",
Sync: true,
MaxChunkDelay: helper.NewDuration(time.Second),
MaxChunkSize: 1000,
},
},
true,
Expand All @@ -62,10 +70,12 @@ func TestBufferUnmarshalYAML(t *testing.T) {
[]byte(`{"type": 12}`),
Config{
Builder: &DiskBufferConfig{
Type: "disk",
MaxSize: 1234,
Path: "/var/log/testpath",
Sync: true,
Type: "disk",
MaxSize: 1234,
Path: "/var/log/testpath",
Sync: true,
MaxChunkDelay: helper.NewDuration(time.Second),
MaxChunkSize: 1000,
},
},
true,
Expand Down Expand Up @@ -104,8 +114,10 @@ func TestBuffer(t *testing.T) {
cfg := NewConfig()
expected := Config{
Builder: &MemoryBufferConfig{
Type: "memory",
MaxEntries: 1 << 20,
Type: "memory",
MaxEntries: 1 << 20,
MaxChunkDelay: helper.NewDuration(time.Second),
MaxChunkSize: 1000,
},
}
require.Equal(t, expected, cfg)
Expand Down
86 changes: 70 additions & 16 deletions operator/buffer/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/observiq/stanza/entry"
"github.com/observiq/stanza/operator"
"github.com/observiq/stanza/operator/helper"
"golang.org/x/sync/semaphore"
)

Expand All @@ -30,14 +31,19 @@ type DiskBufferConfig struct {
// in cases like power failures or unclean shutdowns, logs may be lost or the
// database may become corrupted.
Sync bool `json:"sync" yaml:"sync"`

MaxChunkDelay helper.Duration `json:"max_delay" yaml:"max_delay"`
MaxChunkSize uint `json:"max_chunk_size" yaml:"max_chunk_size"`
}

// NewDiskBufferConfig creates a new default disk buffer config
func NewDiskBufferConfig() *DiskBufferConfig {
return &DiskBufferConfig{
Type: "disk",
MaxSize: 1 << 32, // 4GiB
Sync: true,
Type: "disk",
MaxSize: 1 << 32, // 4GiB
Sync: true,
MaxChunkDelay: helper.NewDuration(time.Second),
MaxChunkSize: 1000,
}
}

Expand All @@ -55,6 +61,8 @@ func (c DiskBufferConfig) Build(context operator.BuildContext, _ string) (Buffer
if err := b.Open(c.Path, c.Sync); err != nil {
return nil, err
}
b.maxChunkSize = c.MaxChunkSize
b.maxChunkDelay = c.MaxChunkDelay.Raw()
return b, nil
}

Expand Down Expand Up @@ -92,6 +100,9 @@ type DiskBuffer struct {

// copyBuffer is a pre-allocated byte slice that is used during compaction
copyBuffer []byte

maxChunkDelay time.Duration
maxChunkSize uint
}

// NewDiskBuffer creates a new DiskBuffer
Expand Down Expand Up @@ -209,7 +220,7 @@ func (d *DiskBuffer) addUnreadCount(i int64) {
// buffer to fill dst or the context is cancelled. This amortizes the cost of reading from the
// disk. It returns a function that, when called, marks the read entries as flushed, the
// number of entries read, and an error.
func (d *DiskBuffer) ReadWait(ctx context.Context, dst []*entry.Entry) (FlushFunc, int, error) {
func (d *DiskBuffer) ReadWait(ctx context.Context, dst []*entry.Entry) (Clearer, int, error) {
d.readerLock.Lock()
defer d.readerLock.Unlock()

Expand All @@ -229,15 +240,34 @@ LOOP:
return d.Read(dst)
}

// ReadChunk is a thin wrapper around ReadWait that simplifies the call at the expense of an extra allocation
func (d *DiskBuffer) ReadChunk(ctx context.Context) ([]*entry.Entry, Clearer, error) {
entries := make([]*entry.Entry, d.maxChunkSize)
for {
select {
case <-ctx.Done():
return nil, nil, ctx.Err()
default:
}

ctx, cancel := context.WithTimeout(ctx, d.maxChunkDelay)
defer cancel()
flushFunc, n, err := d.ReadWait(ctx, entries)
if n > 0 {
return entries[:n], flushFunc, err
}
}
}

// Read copies entries from the disk into the destination buffer. It returns a function that,
// when called, marks the entries as flushed, the number of entries read, and an error.
func (d *DiskBuffer) Read(dst []*entry.Entry) (f FlushFunc, i int, err error) {
func (d *DiskBuffer) Read(dst []*entry.Entry) (f Clearer, i int, err error) {
d.Lock()
defer d.Unlock()

// Return fast if there are no unread entries
if d.metadata.unreadCount == 0 {
return d.newFlushFunc(nil), 0, nil
return d.newClearer(nil), 0, nil
}

// Seek to the start of the range of unread entries
Expand Down Expand Up @@ -280,20 +310,44 @@ func (d *DiskBuffer) Read(dst []*entry.Entry) (f FlushFunc, i int, err error) {
// Remove the read entries from the unread count
d.addUnreadCount(-int64(readCount))

return d.newFlushFunc(newRead), readCount, nil
return d.newClearer(newRead), readCount, nil
}

// newFlushFunc returns a function that marks read entries as flushed
func (d *DiskBuffer) newFlushFunc(newRead []*readEntry) FlushFunc {
return func() error {
d.Lock()
for _, entry := range newRead {
entry.flushed = true
d.flushedBytes += entry.length
}
d.Unlock()
return d.checkCompact()
func (d *DiskBuffer) newClearer(newRead []*readEntry) Clearer {
return &diskClearer{
buffer: d,
readEntries: newRead,
}
}

type diskClearer struct {
buffer *DiskBuffer
readEntries []*readEntry
}

func (dc *diskClearer) MarkAllAsFlushed() error {
dc.buffer.Lock()
for _, entry := range dc.readEntries {
entry.flushed = true
dc.buffer.flushedBytes += entry.length
}
dc.buffer.Unlock()
return dc.buffer.checkCompact()
}

func (dc *diskClearer) MarkRangeAsFlushed(start, end uint) error {
if int(end) > len(dc.readEntries) || int(start) > len(dc.readEntries) {
return fmt.Errorf("invalid range")
}

dc.buffer.Lock()
for _, entry := range dc.readEntries[start:end] {
entry.flushed = true
dc.buffer.flushedBytes += entry.length
}
dc.buffer.Unlock()
return dc.buffer.checkCompact()
}

// checkCompact checks if a compaction should be performed, then kicks one off
Expand Down
16 changes: 8 additions & 8 deletions operator/buffer/disk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,10 @@ func TestDiskBuffer(t *testing.T) {

// Read, flush, and compact
dst := make([]*entry.Entry, 1)
f, n, err := b.Read(dst)
c, n, err := b.Read(dst)
require.NoError(t, err)
require.Equal(t, 1, n)
f()
c.MarkAllAsFlushed()
require.NoError(t, b.Compact())

// Now there should be space for another entry
Expand Down Expand Up @@ -227,9 +227,9 @@ func TestDiskBuffer(t *testing.T) {
writes++
case j < 990:
readCount := (writes - reads) / 2
f := readN(t, b, readCount, reads)
c := readN(t, b, readCount, reads)
if j%2 == 0 {
f()
c.MarkAllAsFlushed()
}
reads += readCount
default:
Expand Down Expand Up @@ -280,11 +280,11 @@ func BenchmarkDiskBuffer(b *testing.B) {
ctx := context.Background()
for i := 0; i < b.N; {
ctx, cancel := context.WithTimeout(ctx, 50*time.Millisecond)
flush, n, err := buffer.ReadWait(ctx, dst)
c, n, err := buffer.ReadWait(ctx, dst)
cancel()
panicOnErr(err)
i += n
flush()
c.MarkAllAsFlushed()
}
}()

Expand Down Expand Up @@ -318,11 +318,11 @@ func BenchmarkDiskBuffer(b *testing.B) {
ctx := context.Background()
for i := 0; i < b.N; {
ctx, cancel := context.WithTimeout(ctx, 50*time.Millisecond)
flush, n, err := buffer.ReadWait(ctx, dst)
c, n, err := buffer.ReadWait(ctx, dst)
cancel()
panicOnErr(err)
i += n
flush()
c.MarkAllAsFlushed()
}
}()

Expand Down
Loading

0 comments on commit 986cb26

Please sign in to comment.