From 8286df1a704fc3d36d595ce0f3ebb9c7ab189bcf Mon Sep 17 00:00:00 2001 From: arkbriar Date: Fri, 26 Jul 2024 18:00:57 +0800 Subject: [PATCH 1/2] fix: fix a serious bug caused by misunderstand of orderedmap Signed-off-by: arkbriar --- go.mod | 2 + go.sum | 4 ++ internal/filechannel/filechannel.go | 51 ++++++++++++------------ internal/filechannel/filechannel_test.go | 28 +++++++++++++ internal/filechannel/utils.go | 25 ++++++++++++ 5 files changed, 85 insertions(+), 25 deletions(-) create mode 100644 internal/filechannel/utils.go diff --git a/go.mod b/go.mod index f25e536..d28affd 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.21.4 require ( github.com/elliotchance/orderedmap/v2 v2.2.0 github.com/gofrs/flock v0.8.1 + github.com/ironpark/skiplist v0.0.0-20230103051251-d63941a7d606 github.com/klauspost/compress v1.17.8 github.com/stretchr/testify v1.8.4 ) @@ -12,6 +13,7 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + golang.org/x/exp v0.0.0-20221217163422-3c43f8badb15 // indirect golang.org/x/sys v0.15.0 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index 91ffd70..96bcb9f 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,8 @@ github.com/elliotchance/orderedmap/v2 v2.2.0 h1:7/2iwO98kYT4XkOjA9mBEIwvi4KpGB4c github.com/elliotchance/orderedmap/v2 v2.2.0/go.mod h1:85lZyVbpGaGvHvnKa7Qhx7zncAdBIBq6u56Hb1PRU5Q= github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw= github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= +github.com/ironpark/skiplist v0.0.0-20230103051251-d63941a7d606 h1:wAlKAaIDI0lz5hiRVLhpPclpRm+Foqud9Aw+ujyNic0= +github.com/ironpark/skiplist v0.0.0-20230103051251-d63941a7d606/go.mod h1:4anVKuA54EQY/g+NGk+fB2QnIB+kUqRuQ1VkHSnsjmI= github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= @@ -15,6 +17,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +golang.org/x/exp v0.0.0-20221217163422-3c43f8badb15 h1:5oN1Pz/eDhCpbMbLstvIPa0b/BEQo6g6nwV3pLjfM6w= +golang.org/x/exp v0.0.0-20221217163422-3c43f8badb15/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/internal/filechannel/filechannel.go b/internal/filechannel/filechannel.go index 2698fd6..c697a3a 100644 --- a/internal/filechannel/filechannel.go +++ b/internal/filechannel/filechannel.go @@ -16,6 +16,7 @@ package filechannel import ( "bytes" + "cmp" "context" "encoding/binary" "errors" @@ -32,8 +33,8 @@ import ( "sync" "time" - "github.com/elliotchance/orderedmap/v2" "github.com/gofrs/flock" + "github.com/ironpark/skiplist" "github.com/klauspost/compress/snappy" "github.com/risingwavelabs/filechannel/internal/condvar" @@ -155,7 +156,7 @@ type Iterator struct { autoAck bool readerIndex uint32 pendingAckCount int - pendingAck *orderedmap.OrderedMap[uint32, int] + pendingAck skiplist.SkipList[uint32, int] lastErr error buf *bytes.Buffer @@ -294,7 +295,7 @@ func (it *Iterator) readNext() ([]byte, error) { } it.pendingAckCount++ - it.pendingAck.Set(it.segmentIndex, it.pendingAck.GetOrDefault(it.segmentIndex, 0)+1) + it.pendingAck.Set(it.segmentIndex, getOrDefault(it.pendingAck, it.segmentIndex, 0)+1) it.offset += uint64(MessageHeaderBinarySize) + uint64(it.buf.Len()) msg := it.buf.Bytes() @@ -320,7 +321,7 @@ func (it *Iterator) Ack(n int) error { for n > 0 { if n >= f.Value { n -= f.Value - it.pendingAck.Delete(f.Key) + it.pendingAck.Remove(f.Key()) f = it.pendingAck.Front() } else { f.Value -= n @@ -330,7 +331,7 @@ func (it *Iterator) Ack(n int) error { curSegmentIndex := it.segmentIndex if f != nil { - curSegmentIndex = f.Key + curSegmentIndex = f.Key() } if curSegmentIndex > it.readerIndex { @@ -444,7 +445,7 @@ func NewIterator(manager *SegmentManager, position *Position, autoAck bool) *Ite readerIndex: reader, buf: bytes.NewBuffer(make([]byte, 0, 4096)), autoAck: autoAck, - pendingAck: orderedmap.NewOrderedMap[uint32, int](), + pendingAck: skiplist.New[uint32, int](cmp.Compare[uint32]), } } @@ -512,8 +513,8 @@ type SegmentManager struct { readMu sync.RWMutex readCond *condvar.Cond - pinFreq *orderedmap.OrderedMap[uint32, int] - readFreq *orderedmap.OrderedMap[uint32, int] + pinFreq skiplist.SkipList[uint32, int] + readFreq skiplist.SkipList[uint32, int] maxReadIndex int64 watermark uint32 } @@ -531,7 +532,7 @@ func (sm *SegmentManager) NewReader() uint32 { defer sm.readMu.Unlock() beginIndex := sm.watermark - sm.readFreq.Set(beginIndex, sm.readFreq.GetOrDefault(beginIndex, 0)+1) + sm.readFreq.Set(beginIndex, getOrDefault(sm.readFreq, beginIndex, 0)+1) return beginIndex } @@ -540,7 +541,7 @@ func (sm *SegmentManager) Pin(index uint32) bool { defer sm.readMu.Unlock() if index >= sm.watermark { - sm.pinFreq.Set(index, sm.pinFreq.GetOrDefault(index, 0)+1) + sm.pinFreq.Set(index, getOrDefault(sm.pinFreq, index, 0)+1) return true } @@ -551,14 +552,14 @@ func (sm *SegmentManager) Unpin(index uint32) { sm.readMu.Lock() defer sm.readMu.Unlock() - v, ok := sm.pinFreq.Get(index) + v, ok := sm.pinFreq.GetValue(index) if !ok { panic("unpin non-existing segment") } - isFront := sm.pinFreq.Front().Key == index + isFront := sm.pinFreq.Front().Key() == index if v == 1 { - sm.pinFreq.Delete(index) + sm.pinFreq.Remove(index) } else { sm.pinFreq.Set(index, v-1) } @@ -576,11 +577,11 @@ func (sm *SegmentManager) updateWatermark() { if readFreqEmpty && pinFreqEmpty { sm.watermark = uint32(sm.maxReadIndex + 1) } else if readFreqEmpty { - sm.watermark = sm.pinFreq.Front().Key + sm.watermark = sm.pinFreq.Front().Key() } else if pinFreqEmpty { - sm.watermark = sm.readFreq.Front().Key + sm.watermark = sm.readFreq.Front().Key() } else { - sm.watermark = min(sm.pinFreq.Front().Key, sm.readFreq.Front().Key) + sm.watermark = min(sm.pinFreq.Front().Key(), sm.readFreq.Front().Key()) } if prevWatermark != sm.watermark { @@ -592,21 +593,21 @@ func (sm *SegmentManager) AdvanceReader(prev uint32, delta uint32) (uint32, uint sm.readMu.Lock() defer sm.readMu.Unlock() - v, ok := sm.readFreq.Get(prev) + v, ok := sm.readFreq.GetValue(prev) if !ok { panic("advancing a non-existing reader") } - isFront := sm.readFreq.Front().Key == prev + isFront := sm.readFreq.Front().Key() == prev if v == 1 { - sm.readFreq.Delete(prev) + sm.readFreq.Remove(prev) } else { sm.readFreq.Set(prev, v-1) } next := prev + delta sm.maxReadIndex = max(int64(next-1), sm.maxReadIndex) - sm.readFreq.Set(next, sm.readFreq.GetOrDefault(next, 0)+1) + sm.readFreq.Set(next, getOrDefault(sm.readFreq, next, 0)+1) // If the minimum reader index is deleted, there's a chance to // advance the watermark. @@ -621,14 +622,14 @@ func (sm *SegmentManager) CloseReader(cur uint32) uint32 { sm.readMu.Lock() defer sm.readMu.Unlock() - v, ok := sm.readFreq.Get(cur) + v, ok := sm.readFreq.GetValue(cur) if !ok { panic("closing a non-existing reader") } - isFront := sm.readFreq.Front().Key == cur + isFront := sm.readFreq.Front().Key() == cur if v == 1 { - sm.readFreq.Delete(cur) + sm.readFreq.Remove(cur) } else { sm.readFreq.Set(cur, v-1) } @@ -724,8 +725,8 @@ func (sm *SegmentManager) IncSegmentIndex() uint32 { func NewSegmentManager(dir string) *SegmentManager { sm := &SegmentManager{ dir: dir, - readFreq: orderedmap.NewOrderedMap[uint32, int](), - pinFreq: orderedmap.NewOrderedMap[uint32, int](), + readFreq: skiplist.New[uint32, int](cmp.Compare[uint32]), + pinFreq: skiplist.New[uint32, int](cmp.Compare[uint32]), maxReadIndex: -1, } cond := condvar.NewCond(sm.readMu.RLocker()) diff --git a/internal/filechannel/filechannel_test.go b/internal/filechannel/filechannel_test.go index 0b8fe2e..0493780 100644 --- a/internal/filechannel/filechannel_test.go +++ b/internal/filechannel/filechannel_test.go @@ -247,6 +247,34 @@ func TestFileChannel_ReadCompressed(t *testing.T) { assert.NoError(t, it.Close()) } +func TestFileChannel_ReadCompressed_HoldDeleted(t *testing.T) { + fc := setup(t, "test_file_channel_read_compressed", RotateThreshold(1<<20)) + defer teardown(t, fc, true) + + itCreatedBeforeCompression := fc.Iterator() + + const payloadSize, totalSize = 128, 10 << 20 + msgNum := totalSize / payloadSize + payload := magicPayload(payloadSize) + + writeAll(t, fc, func(_ int) []byte { return payload }, msgNum) + + // Wait until the first segment is compressed. + checkFileChannelDir(t, fc.dir, func(entries []os.DirEntry) bool { + return slices.ContainsFunc(entries, func(entry os.DirEntry) bool { + return entry.Name() == "segment.0.z" + }) + }, 10*time.Second) + + itCreatedAfterCompression := fc.Iterator() + + readAll(t, itCreatedBeforeCompression, func(_ int) []byte { return payload }, msgNum, 10*time.Second) + readAll(t, itCreatedAfterCompression, func(_ int) []byte { return payload }, msgNum, 10*time.Second) + + assert.NoError(t, itCreatedBeforeCompression.Close()) + assert.NoError(t, itCreatedAfterCompression.Close()) +} + func testFileChannelWithRandomStrings(t *testing.T, rand *rand.Rand, minLen, maxLen, size int, parallelism int, opts ...Option) { fc := setup(t, fmt.Sprintf("file_channel_benchmark_random_%d_%d_%d", minLen, maxLen, size), opts...) defer teardown(t, fc, false) diff --git a/internal/filechannel/utils.go b/internal/filechannel/utils.go new file mode 100644 index 0000000..7395ef4 --- /dev/null +++ b/internal/filechannel/utils.go @@ -0,0 +1,25 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package filechannel + +import "github.com/ironpark/skiplist" + +func getOrDefault[K, V any](sl skiplist.SkipList[K, V], key K, defaultValue V) V { + v, ok := sl.GetValue(key) + if !ok { + return defaultValue + } + return v +} From b77498e37c42816c4d3c8c5c10dc68aaeb8d3824 Mon Sep 17 00:00:00 2001 From: arkbriar Date: Fri, 26 Jul 2024 18:02:05 +0800 Subject: [PATCH 2/2] go mod tidy Signed-off-by: arkbriar --- go.mod | 1 - go.sum | 2 -- 2 files changed, 3 deletions(-) diff --git a/go.mod b/go.mod index d28affd..8f7cd37 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,6 @@ module github.com/risingwavelabs/filechannel go 1.21.4 require ( - github.com/elliotchance/orderedmap/v2 v2.2.0 github.com/gofrs/flock v0.8.1 github.com/ironpark/skiplist v0.0.0-20230103051251-d63941a7d606 github.com/klauspost/compress v1.17.8 diff --git a/go.sum b/go.sum index 96bcb9f..db3b890 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,5 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/elliotchance/orderedmap/v2 v2.2.0 h1:7/2iwO98kYT4XkOjA9mBEIwvi4KpGB4cyHeOFOnj4Vk= -github.com/elliotchance/orderedmap/v2 v2.2.0/go.mod h1:85lZyVbpGaGvHvnKa7Qhx7zncAdBIBq6u56Hb1PRU5Q= github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw= github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= github.com/ironpark/skiplist v0.0.0-20230103051251-d63941a7d606 h1:wAlKAaIDI0lz5hiRVLhpPclpRm+Foqud9Aw+ujyNic0=