Skip to content

Commit

Permalink
Merge pull request #1 from risingwavelabs/feat/stats
Browse files Browse the repository at this point in the history
feat: expose stats
  • Loading branch information
arkbriar authored Apr 30, 2024
2 parents c42e078 + 14c4cff commit 171dda6
Show file tree
Hide file tree
Showing 7 changed files with 206 additions and 8 deletions.
9 changes: 5 additions & 4 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,16 @@ jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v4
uses: actions/setup-go@v5
with:
go-version: '1.21'
go-version: '1.22'
cache: false

- name: Run golangci-lint
uses: golangci/golangci-lint-action@v3.7.0
uses: golangci/golangci-lint-action@v4

- name: Build
run: go build -v ./...
Expand Down
30 changes: 30 additions & 0 deletions filechannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (

// Sender sends bytes to file channel.
type Sender interface {
SenderStats

// Send bytes to file channel. Data will be finally persistent on disk.
Send(context.Context, []byte) error

Expand All @@ -32,6 +34,8 @@ type Sender interface {

// Receiver receives bytes from file channel in the sending order.
type Receiver interface {
ReceiverStats

// Recv bytes from file channel.
Recv(context.Context) ([]byte, error)

Expand All @@ -51,6 +55,8 @@ type AckReceiver interface {

// FileChannel is the interface for a file-based persistent channel.
type FileChannel interface {
Stats

// Tx creates a Sender. Sender is thread safe.
// It's possible to have multiple senders at the same time.
Tx() Sender
Expand Down Expand Up @@ -89,6 +95,7 @@ var (

// Compiler fence.
var _ AckFileChannel = &fileChannel{}
var _ Stats = &fileChannel{}

type fileChannel struct {
wRefLock sync.Mutex
Expand All @@ -98,6 +105,14 @@ type fileChannel struct {
inner *filechannel.FileChannel
}

func (f *fileChannel) FlushOffset() uint64 {
return f.inner.FlushOffset()
}

func (f *fileChannel) DiskUsage() (uint64, error) {
return f.inner.DiskUsage()
}

func (f *fileChannel) Close() error {
f.wRefLock.Lock()
defer f.wRefLock.Unlock()
Expand All @@ -110,6 +125,10 @@ func (f *fileChannel) Close() error {
return err
}

func (f *fileChannel) writeOffset() uint64 {
return f.inner.WriteOffset()
}

func (f *fileChannel) send(bytes []byte) error {
f.wLock.Lock()
defer f.wLock.Unlock()
Expand Down Expand Up @@ -175,11 +194,16 @@ func OpenAckFileChannel(dir string, opts ...Option) (AckFileChannel, error) {

// Compiler fence.
var _ Sender = &fileChannelSender{}
var _ SenderStats = &fileChannelSender{}

type fileChannelSender struct {
inner *fileChannel
}

func (s *fileChannelSender) WriteOffset() uint64 {
return s.inner.writeOffset()
}

func (s *fileChannelSender) Close() error {
if s.inner != nil {
s.inner.closeTx()
Expand All @@ -200,11 +224,16 @@ func (s *fileChannelSender) Send(ctx context.Context, p []byte) error {

// Compiler fence.
var _ Receiver = &fileChannelReceiver{}
var _ ReceiverStats = &fileChannelReceiver{}

type fileChannelReceiver struct {
inner *filechannel.Iterator
}

func (r *fileChannelReceiver) ReadOffset() uint64 {
return r.inner.Offset()
}

func (r *fileChannelReceiver) Close() error {
return r.inner.Close()
}
Expand All @@ -215,6 +244,7 @@ func (r *fileChannelReceiver) Recv(ctx context.Context) ([]byte, error) {

// Compiler fence.
var _ AckReceiver = &fileChannelAckReceiver{}
var _ ReceiverStats = &fileChannelAckReceiver{}

type fileChannelAckReceiver struct {
fileChannelReceiver
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.21.4
require (
github.com/elliotchance/orderedmap/v2 v2.2.0
github.com/gofrs/flock v0.8.1
github.com/klauspost/compress v1.17.3
github.com/klauspost/compress v1.17.8
github.com/stretchr/testify v1.8.4
)

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ github.com/elliotchance/orderedmap/v2 v2.2.0 h1:7/2iwO98kYT4XkOjA9mBEIwvi4KpGB4c
github.com/elliotchance/orderedmap/v2 v2.2.0/go.mod h1:85lZyVbpGaGvHvnKa7Qhx7zncAdBIBq6u56Hb1PRU5Q=
github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw=
github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU=
github.com/klauspost/compress v1.17.3 h1:qkRjuerhUU1EmXLYGkSH6EZL+vPSxIrYjLNAK4slzwA=
github.com/klauspost/compress v1.17.3/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
Expand Down
30 changes: 29 additions & 1 deletion internal/filechannel/filechannel.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 RisingWave Labs
// Copyright 2023-2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -25,6 +25,7 @@ import (
"math"
"os"
"path"
"path/filepath"
"regexp"
"slices"
"strconv"
Expand Down Expand Up @@ -160,6 +161,10 @@ type Iterator struct {
buf *bytes.Buffer
}

func (it *Iterator) Offset() uint64 {
return it.offset
}

func (it *Iterator) updateOffset(offset uint64) error {
if it.offset == math.MaxUint64 {
it.offset = offset
Expand Down Expand Up @@ -1306,3 +1311,26 @@ func (fc *FileChannel) IteratorAcknowledgable() *Iterator {
}
return NewIterator(fc.segmentManager, fc.position, false)
}

func (fc *FileChannel) WriteOffset() uint64 {
return fc.currentOffset
}

func (fc *FileChannel) FlushOffset() uint64 {
return fc.position.Get()
}

func (fc *FileChannel) DiskUsage() (uint64, error) {
var total uint64
err := filepath.Walk(fc.dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() || !segmentFilePattern.MatchString(info.Name()) {
return nil
}
total += uint64(info.Size())
return nil
})
return total, err
}
48 changes: 48 additions & 0 deletions stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package filechannel

// ReceiverStats is the interface for getting the stats of a receiver.
type ReceiverStats interface {
// ReadOffset returns the offset of the last read message.
// Note that the offset is local to the receiver. It will only change
// when the receiver reads a message.
//
// The initial offset is math.MaxUint64 to indicate that no message has
// been read.
ReadOffset() uint64
}

// SenderStats is the interface for getting the stats of a sender.
type SenderStats interface {
// WriteOffset returns the offset of the last written message.
// Note that the offset is not the byte offset in the file. It's the
// offset of the message in the channel. The offset will change no matter
// a message is written by the sender, or the other senders of the same
// channel.
WriteOffset() uint64
}

// Stats is the interface for getting the stats of a file channel.
type Stats interface {
// DiskUsage returns the disk usage of the file channel.
// Note that calling DiskUsage() is an expensive operation.
DiskUsage() (uint64, error)

// FlushOffset returns the offset of the last flushed message.
// Messages with offset less than the flush offset are guaranteed to be
// seen by the readers.
FlushOffset() uint64
}
91 changes: 91 additions & 0 deletions stats_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package filechannel

import (
"context"
"math"
"os"
"testing"

"github.com/stretchr/testify/assert"
)

func TestFileChannel_Stats(t *testing.T) {
tmpDir := mkdirTemp(t)
defer os.RemoveAll(tmpDir)

fch, err := OpenFileChannel(tmpDir)
if !assert.NoError(t, err) {
t.FailNow()
}
defer fch.Close()

// Assert disk usage == 64 (header size).
usage, err := fch.DiskUsage()
if !assert.NoError(t, err) {
t.FailNow()
}
assert.Equal(t, uint64(64), usage)

msg := []byte("Hello world!")

tx := fch.Tx()
defer tx.Close()

// Assert sender offset == 0.
assert.Equal(t, uint64(0), tx.WriteOffset())

err = tx.Send(context.Background(), msg)
if !assert.NoError(t, err) {
t.FailNow()
}

// Assert sender offset != 0.
assert.NotEqual(t, uint64(0), tx.WriteOffset())

rx := fch.Rx()
defer rx.Close()

// Assert reader offset == math.MaxUint64.
assert.Equal(t, uint64(math.MaxUint64), rx.ReadOffset())

p, err := rx.Recv(context.Background())
if !assert.NoError(t, err) {
t.FailNow()
}
if !assert.Equal(t, msg, p) {
t.FailNow()
}

// Assert reader offset != 0.
assert.NotEqual(t, uint64(0), rx.ReadOffset())

// Assert reader offset == sender offset.
assert.Equal(t, tx.WriteOffset(), rx.ReadOffset())

// Recv happened means the file channel has flushed.
// Now we can examine the disk usage and flush offset.

// Assert flush offset != 0.
assert.NotEqual(t, uint64(0), fch.FlushOffset())

// Assert disk usage > 64.
usage, err = fch.DiskUsage()
if !assert.NoError(t, err) {
t.FailNow()
}
assert.NotEqual(t, uint64(64), usage)
}

0 comments on commit 171dda6

Please sign in to comment.