From 0f2bf2d7a413f13d201e83e631535df993a9fed3 Mon Sep 17 00:00:00 2001 From: arkbriar Date: Wed, 24 Apr 2024 17:49:01 +0800 Subject: [PATCH 1/2] feat: expose stats Signed-off-by: arkbriar --- .github/workflows/go.yml | 8 +-- filechannel.go | 30 ++++++++++ go.mod | 2 +- go.sum | 1 + internal/filechannel/filechannel.go | 30 +++++++++- stats.go | 48 +++++++++++++++ stats_test.go | 91 +++++++++++++++++++++++++++++ 7 files changed, 204 insertions(+), 6 deletions(-) create mode 100644 stats.go create mode 100644 stats_test.go diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 9fc3b08..ca3f66b 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -14,15 +14,15 @@ jobs: build: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up Go - uses: actions/setup-go@v4 + uses: actions/setup-go@v5 with: - go-version: '1.21' + go-version: '1.22' - name: Run golangci-lint - uses: golangci/golangci-lint-action@v3.7.0 + uses: golangci/golangci-lint-action@v4 - name: Build run: go build -v ./... diff --git a/filechannel.go b/filechannel.go index 755c052..b516195 100644 --- a/filechannel.go +++ b/filechannel.go @@ -23,6 +23,8 @@ import ( // Sender sends bytes to file channel. type Sender interface { + SenderStats + // Send bytes to file channel. Data will be finally persistent on disk. Send(context.Context, []byte) error @@ -32,6 +34,8 @@ type Sender interface { // Receiver receives bytes from file channel in the sending order. type Receiver interface { + ReceiverStats + // Recv bytes from file channel. Recv(context.Context) ([]byte, error) @@ -51,6 +55,8 @@ type AckReceiver interface { // FileChannel is the interface for a file-based persistent channel. type FileChannel interface { + Stats + // Tx creates a Sender. Sender is thread safe. // It's possible to have multiple senders at the same time. Tx() Sender @@ -89,6 +95,7 @@ var ( // Compiler fence. var _ AckFileChannel = &fileChannel{} +var _ Stats = &fileChannel{} type fileChannel struct { wRefLock sync.Mutex @@ -98,6 +105,14 @@ type fileChannel struct { inner *filechannel.FileChannel } +func (f *fileChannel) FlushOffset() uint64 { + return f.inner.FlushOffset() +} + +func (f *fileChannel) DiskUsage() (uint64, error) { + return f.inner.DiskUsage() +} + func (f *fileChannel) Close() error { f.wRefLock.Lock() defer f.wRefLock.Unlock() @@ -110,6 +125,10 @@ func (f *fileChannel) Close() error { return err } +func (f *fileChannel) writeOffset() uint64 { + return f.inner.WriteOffset() +} + func (f *fileChannel) send(bytes []byte) error { f.wLock.Lock() defer f.wLock.Unlock() @@ -175,11 +194,16 @@ func OpenAckFileChannel(dir string, opts ...Option) (AckFileChannel, error) { // Compiler fence. var _ Sender = &fileChannelSender{} +var _ SenderStats = &fileChannelSender{} type fileChannelSender struct { inner *fileChannel } +func (s *fileChannelSender) WriteOffset() uint64 { + return s.inner.writeOffset() +} + func (s *fileChannelSender) Close() error { if s.inner != nil { s.inner.closeTx() @@ -200,11 +224,16 @@ func (s *fileChannelSender) Send(ctx context.Context, p []byte) error { // Compiler fence. var _ Receiver = &fileChannelReceiver{} +var _ ReceiverStats = &fileChannelReceiver{} type fileChannelReceiver struct { inner *filechannel.Iterator } +func (r *fileChannelReceiver) ReadOffset() uint64 { + return r.inner.Offset() +} + func (r *fileChannelReceiver) Close() error { return r.inner.Close() } @@ -215,6 +244,7 @@ func (r *fileChannelReceiver) Recv(ctx context.Context) ([]byte, error) { // Compiler fence. var _ AckReceiver = &fileChannelAckReceiver{} +var _ ReceiverStats = &fileChannelAckReceiver{} type fileChannelAckReceiver struct { fileChannelReceiver diff --git a/go.mod b/go.mod index bd762bb..f25e536 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.21.4 require ( github.com/elliotchance/orderedmap/v2 v2.2.0 github.com/gofrs/flock v0.8.1 - github.com/klauspost/compress v1.17.3 + github.com/klauspost/compress v1.17.8 github.com/stretchr/testify v1.8.4 ) diff --git a/go.sum b/go.sum index 27fd3ad..810c95c 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,7 @@ github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw= github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= github.com/klauspost/compress v1.17.3 h1:qkRjuerhUU1EmXLYGkSH6EZL+vPSxIrYjLNAK4slzwA= github.com/klauspost/compress v1.17.3/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= +github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= diff --git a/internal/filechannel/filechannel.go b/internal/filechannel/filechannel.go index c03bb77..2698fd6 100644 --- a/internal/filechannel/filechannel.go +++ b/internal/filechannel/filechannel.go @@ -1,4 +1,4 @@ -// Copyright 2023 RisingWave Labs +// Copyright 2023-2024 RisingWave Labs // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -25,6 +25,7 @@ import ( "math" "os" "path" + "path/filepath" "regexp" "slices" "strconv" @@ -160,6 +161,10 @@ type Iterator struct { buf *bytes.Buffer } +func (it *Iterator) Offset() uint64 { + return it.offset +} + func (it *Iterator) updateOffset(offset uint64) error { if it.offset == math.MaxUint64 { it.offset = offset @@ -1306,3 +1311,26 @@ func (fc *FileChannel) IteratorAcknowledgable() *Iterator { } return NewIterator(fc.segmentManager, fc.position, false) } + +func (fc *FileChannel) WriteOffset() uint64 { + return fc.currentOffset +} + +func (fc *FileChannel) FlushOffset() uint64 { + return fc.position.Get() +} + +func (fc *FileChannel) DiskUsage() (uint64, error) { + var total uint64 + err := filepath.Walk(fc.dir, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if info.IsDir() || !segmentFilePattern.MatchString(info.Name()) { + return nil + } + total += uint64(info.Size()) + return nil + }) + return total, err +} diff --git a/stats.go b/stats.go new file mode 100644 index 0000000..7a0212f --- /dev/null +++ b/stats.go @@ -0,0 +1,48 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package filechannel + +// ReceiverStats is the interface for getting the stats of a receiver. +type ReceiverStats interface { + // ReadOffset returns the offset of the last read message. + // Note that the offset is local to the receiver. It will only change + // when the receiver reads a message. + // + // The initial offset is math.MaxUint64 to indicate that no message has + // been read. + ReadOffset() uint64 +} + +// SenderStats is the interface for getting the stats of a sender. +type SenderStats interface { + // WriteOffset returns the offset of the last written message. + // Note that the offset is not the byte offset in the file. It's the + // offset of the message in the channel. The offset will change no matter + // a message is written by the sender, or the other senders of the same + // channel. + WriteOffset() uint64 +} + +// Stats is the interface for getting the stats of a file channel. +type Stats interface { + // DiskUsage returns the disk usage of the file channel. + // Note that calling DiskUsage() is an expensive operation. + DiskUsage() (uint64, error) + + // FlushOffset returns the offset of the last flushed message. + // Messages with offset less than the flush offset are guaranteed to be + // seen by the readers. + FlushOffset() uint64 +} diff --git a/stats_test.go b/stats_test.go new file mode 100644 index 0000000..1909c13 --- /dev/null +++ b/stats_test.go @@ -0,0 +1,91 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package filechannel + +import ( + "context" + "math" + "os" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestFileChannel_Stats(t *testing.T) { + tmpDir := mkdirTemp(t) + defer os.RemoveAll(tmpDir) + + fch, err := OpenFileChannel(tmpDir) + if !assert.NoError(t, err) { + t.FailNow() + } + defer fch.Close() + + // Assert disk usage == 64 (header size). + usage, err := fch.DiskUsage() + if !assert.NoError(t, err) { + t.FailNow() + } + assert.Equal(t, uint64(64), usage) + + msg := []byte("Hello world!") + + tx := fch.Tx() + defer tx.Close() + + // Assert sender offset == 0. + assert.Equal(t, uint64(0), tx.WriteOffset()) + + err = tx.Send(context.Background(), msg) + if !assert.NoError(t, err) { + t.FailNow() + } + + // Assert sender offset != 0. + assert.NotEqual(t, uint64(0), tx.WriteOffset()) + + rx := fch.Rx() + defer rx.Close() + + // Assert reader offset == math.MaxUint64. + assert.Equal(t, uint64(math.MaxUint64), rx.ReadOffset()) + + p, err := rx.Recv(context.Background()) + if !assert.NoError(t, err) { + t.FailNow() + } + if !assert.Equal(t, msg, p) { + t.FailNow() + } + + // Assert reader offset != 0. + assert.NotEqual(t, uint64(0), rx.ReadOffset()) + + // Assert reader offset == sender offset. + assert.Equal(t, tx.WriteOffset(), rx.ReadOffset()) + + // Recv happened means the file channel has flushed. + // Now we can examine the disk usage and flush offset. + + // Assert flush offset != 0. + assert.NotEqual(t, uint64(0), fch.FlushOffset()) + + // Assert disk usage > 64. + usage, err = fch.DiskUsage() + if !assert.NoError(t, err) { + t.FailNow() + } + assert.NotEqual(t, uint64(64), usage) +} From 14c4cffeca17a873875b73a6a1276f082f3e2024 Mon Sep 17 00:00:00 2001 From: arkbriar Date: Wed, 24 Apr 2024 17:56:58 +0800 Subject: [PATCH 2/2] Try fixing CI Signed-off-by: arkbriar --- .github/workflows/go.yml | 1 + go.sum | 3 +-- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index ca3f66b..0f527cc 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -20,6 +20,7 @@ jobs: uses: actions/setup-go@v5 with: go-version: '1.22' + cache: false - name: Run golangci-lint uses: golangci/golangci-lint-action@v4 diff --git a/go.sum b/go.sum index 810c95c..91ffd70 100644 --- a/go.sum +++ b/go.sum @@ -4,8 +4,7 @@ github.com/elliotchance/orderedmap/v2 v2.2.0 h1:7/2iwO98kYT4XkOjA9mBEIwvi4KpGB4c github.com/elliotchance/orderedmap/v2 v2.2.0/go.mod h1:85lZyVbpGaGvHvnKa7Qhx7zncAdBIBq6u56Hb1PRU5Q= github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw= github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= -github.com/klauspost/compress v1.17.3 h1:qkRjuerhUU1EmXLYGkSH6EZL+vPSxIrYjLNAK4slzwA= -github.com/klauspost/compress v1.17.3/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= +github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=