Skip to content

s3fifo add ghost fifo #21653

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 39 commits into from
May 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
7cdb069
s3fifo
cpegeric Apr 4, 2025
f285c50
Merge branch 'main' into s3fifo
mergify[bot] Apr 4, 2025
39962f2
cleanup s3fifo
cpegeric Apr 7, 2025
3657f06
Merge branch 's3fifo' of github.com:cpegeric/matrixone into s3fifo
cpegeric Apr 7, 2025
1221a24
update
cpegeric Apr 8, 2025
c74b9ba
update
cpegeric Apr 8, 2025
57fdc12
add license
cpegeric Apr 8, 2025
ed93655
remove shards
cpegeric Apr 8, 2025
85b5127
update comment
cpegeric Apr 8, 2025
308a877
fix sca
cpegeric Apr 8, 2025
1f35b2a
add shardmap and cleanup mutex
cpegeric Apr 9, 2025
748d4f6
fix sca
cpegeric Apr 9, 2025
9548e7c
gofmt
cpegeric Apr 9, 2025
d0e0f5e
use RWMutex
cpegeric Apr 9, 2025
64ff0b0
revert queue changes
cpegeric Apr 9, 2025
64c377a
update
cpegeric Apr 9, 2025
bd1dcd3
Merge branch 'main' into s3fifo-merge
cpegeric Apr 10, 2025
203959b
cleanup
cpegeric Apr 10, 2025
375ba77
fix sca
cpegeric Apr 10, 2025
062d820
cleanup
cpegeric Apr 10, 2025
eb79a8c
more comments
cpegeric Apr 10, 2025
2a18a98
bug fix and add s3fifo related tests
cpegeric Apr 10, 2025
7ba1be8
set min target = 1
cpegeric Apr 10, 2025
6742e29
test push to main when ghost has entries
cpegeric Apr 10, 2025
2c2ad79
fix evict criteria
cpegeric Apr 10, 2025
cf2120b
Revert "fix evict criteria"
cpegeric Apr 10, 2025
a677266
reuse the old evict
cpegeric Apr 10, 2025
4f9c530
add bench2
cpegeric Apr 11, 2025
8ab887e
update
cpegeric Apr 11, 2025
48b9a96
update
cpegeric Apr 11, 2025
dec93cc
b.N
cpegeric Apr 11, 2025
894e10e
add license
cpegeric Apr 11, 2025
452d372
Merge branch 'main' into s3fifo
cpegeric Apr 22, 2025
71db09e
add disable_s3fifo
cpegeric Apr 29, 2025
f65a319
add disable_s3fifo
cpegeric Apr 29, 2025
e5a38e3
set 32K per data
cpegeric May 1, 2025
a596684
improve the locking
cpegeric May 6, 2025
4439fc0
use defer unlock
cpegeric May 6, 2025
a82d397
fix sca test and merge fix
cpegeric May 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/mo-service/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func (c *Config) setDefaultValue() error {

func (c *Config) initMetaCache() {
if c.MetaCache.MemoryCapacity > 0 {
objectio.InitMetaCache(int64(c.MetaCache.MemoryCapacity))
objectio.InitMetaCache(int64(c.MetaCache.MemoryCapacity), c.MetaCache.DisableS3Fifo)
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func (c *ServiceConfig) setDefaultValue() error {

func (c *ServiceConfig) initMetaCache() {
if c.MetaCache.MemoryCapacity > 0 {
objectio.InitMetaCache(int64(c.MetaCache.MemoryCapacity))
objectio.InitMetaCache(int64(c.MetaCache.MemoryCapacity), c.MetaCache.DisableS3Fifo)
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/fileservice/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type CacheConfig struct {
RemoteCacheEnabled bool `toml:"remote-cache-enabled"`
RPC morpc.Config `toml:"rpc"`
CheckOverlaps bool `toml:"check-overlaps"`
DisableS3Fifo bool `toml:"disable-s3fifo"`

QueryClient client.QueryClient `json:"-"`
KeyRouterFactory KeyRouterFactory[pb.CacheKey] `json:"-"`
Expand Down
2 changes: 1 addition & 1 deletion pkg/fileservice/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func Test_readCache(t *testing.T) {
slowCacheReadThreshold = time.Second

size := int64(128)
m := NewMemCache(fscache.ConstCapacity(size), nil, nil, "")
m := NewMemCache(fscache.ConstCapacity(size), nil, nil, "", false)
defer m.Close(ctx)

ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*3)
Expand Down
2 changes: 2 additions & 0 deletions pkg/fileservice/disk_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func NewDiskCache(
asyncLoad bool,
cacheDataAllocator CacheDataAllocator,
name string,
disable_s3fifo bool,
) (ret *DiskCache, err error) {

err = os.MkdirAll(path, 0755)
Expand Down Expand Up @@ -119,6 +120,7 @@ func NewDiskCache(
)
}
},
disable_s3fifo,
),
}
ret.updatingPaths.Cond = sync.NewCond(new(sync.Mutex))
Expand Down
24 changes: 14 additions & 10 deletions pkg/fileservice/disk_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestDiskCache(t *testing.T) {
})

// new
cache, err := NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "")
cache, err := NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "", false)
assert.Nil(t, err)
defer cache.Close(ctx)

Expand Down Expand Up @@ -130,7 +130,7 @@ func TestDiskCache(t *testing.T) {
testRead(cache)

// new cache instance and read
cache, err = NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "")
cache, err = NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "", false)
assert.Nil(t, err)
defer cache.Close(ctx)

Expand All @@ -139,7 +139,7 @@ func TestDiskCache(t *testing.T) {
assert.Equal(t, 1, numWritten)

// new cache instance and update
cache, err = NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "")
cache, err = NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "", false)
assert.Nil(t, err)
defer cache.Close(ctx)

Expand All @@ -159,7 +159,7 @@ func TestDiskCacheWriteAgain(t *testing.T) {
var counterSet perfcounter.CounterSet
ctx = perfcounter.WithCounterSet(ctx, &counterSet)

cache, err := NewDiskCache(ctx, dir, fscache.ConstCapacity(4096), nil, false, nil, "")
cache, err := NewDiskCache(ctx, dir, fscache.ConstCapacity(4096), nil, false, nil, "", false)
assert.Nil(t, err)
defer cache.Close(ctx)

Expand Down Expand Up @@ -224,7 +224,7 @@ func TestDiskCacheWriteAgain(t *testing.T) {
func TestDiskCacheFileCache(t *testing.T) {
dir := t.TempDir()
ctx := context.Background()
cache, err := NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "")
cache, err := NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "", false)
assert.Nil(t, err)
defer cache.Close(ctx)

Expand Down Expand Up @@ -284,7 +284,7 @@ func TestDiskCacheDirSize(t *testing.T) {

dir := t.TempDir()
capacity := 1 << 20
cache, err := NewDiskCache(ctx, dir, fscache.ConstCapacity(int64(capacity)), nil, false, nil, "")
cache, err := NewDiskCache(ctx, dir, fscache.ConstCapacity(int64(capacity)), nil, false, nil, "", false)
assert.Nil(t, err)
defer cache.Close(ctx)

Expand Down Expand Up @@ -347,6 +347,7 @@ func benchmarkDiskCacheWriteThenRead(
false,
nil,
"",
false,
)
if err != nil {
b.Fatal(err)
Expand Down Expand Up @@ -442,6 +443,7 @@ func benchmarkDiskCacheReadRandomOffsetAtLargeFile(
false,
nil,
"",
false,
)
if err != nil {
b.Fatal(err)
Expand Down Expand Up @@ -513,6 +515,7 @@ func BenchmarkDiskCacheMultipleIOEntries(b *testing.B) {
false,
nil,
"",
false,
)
if err != nil {
b.Fatal(err)
Expand Down Expand Up @@ -584,7 +587,7 @@ func TestDiskCacheClearFiles(t *testing.T) {
assert.Nil(t, err)
numFiles := len(files)

_, err = NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "")
_, err = NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "", false)
assert.Nil(t, err)

files, err = filepath.Glob(filepath.Join(dir, "*"))
Expand All @@ -598,7 +601,7 @@ func TestDiskCacheClearFiles(t *testing.T) {
func TestDiskCacheBadWrite(t *testing.T) {
dir := t.TempDir()
ctx := context.Background()
cache, err := NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "")
cache, err := NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "", false)
assert.Nil(t, err)

written, err := cache.writeFile(
Expand Down Expand Up @@ -633,6 +636,7 @@ func TestDiskCacheGlobalSizeHint(t *testing.T) {
false,
nil,
"test",
false,
)
assert.Nil(t, err)
defer cache.Close(ctx)
Expand Down Expand Up @@ -665,7 +669,7 @@ func TestDiskCacheGlobalSizeHint(t *testing.T) {

func TestDiskCacheSetFromFile(t *testing.T) {
ctx := context.Background()
cache, err := NewDiskCache(ctx, t.TempDir(), fscache.ConstCapacity(1<<30), nil, false, nil, "")
cache, err := NewDiskCache(ctx, t.TempDir(), fscache.ConstCapacity(1<<30), nil, false, nil, "", false)
require.Nil(t, err)
defer cache.Close(ctx)

Expand All @@ -690,7 +694,7 @@ func TestDiskCacheSetFromFile(t *testing.T) {

func TestDiskCacheQuotaExceeded(t *testing.T) {
ctx := context.Background()
cache, err := NewDiskCache(ctx, t.TempDir(), fscache.ConstCapacity(3), nil, false, nil, "")
cache, err := NewDiskCache(ctx, t.TempDir(), fscache.ConstCapacity(3), nil, false, nil, "", false)
require.Nil(t, err)
defer cache.Close(ctx)

Expand Down
130 changes: 130 additions & 0 deletions pkg/fileservice/fifocache/bench2_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright 2024 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package fifocache

import (
"context"
"math/rand/v2"
"runtime"
"sync"
"testing"
"time"

"github.com/matrixorigin/matrixone/pkg/fileservice/fscache"
)

const g_cache_size = 51200000 // 512M
const g_item_size = 128000 // 128K
const g_io_read_time = 20 * time.Microsecond

func cache_read(ctx context.Context, cache *Cache[int64, int64], key int64) {

_, ok := cache.Get(ctx, key)
if !ok {
// cache miss and sleep penalty as IO read
time.Sleep(g_io_read_time)
cache.Set(ctx, key, int64(0), g_item_size)
}
}

func get_rand(start int64, end int64, r *rand.Rand) int64 {
return start + r.Int64N(end-start)
}

func dataset_read(b *testing.B, ctx context.Context, cache *Cache[int64, int64], startkey int64, endkey int64, r *rand.Rand) {

ncpu := runtime.NumCPU()
var wg sync.WaitGroup

for i := 0; i < ncpu; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for n := 0; n < b.N; n++ {

if n%ncpu != i {
continue
}

//fmt.Printf("start = %d, end = %d\n", startkey, endkey)
for range endkey - startkey {

key := get_rand(startkey, endkey, r)
cache_read(ctx, cache, key)
}
}
}()
}

wg.Wait()
}

func data_shift(b *testing.B, time int64) {
ctx := context.Background()
cache_size := g_cache_size
cache := New[int64, int64](fscache.ConstCapacity(int64(cache_size)), ShardInt[int64], nil, nil, nil, false)
r := rand.New(rand.NewPCG(1, 2))

offset := int64(0)
start := int64(0)
end := int64(g_cache_size) / int64(g_item_size) * time
d1 := []int64{start, end}
offset += end
d2 := []int64{offset + start, offset + end}
offset += end
d3 := []int64{offset + start, offset + end}

b.ResetTimer()

dataset_read(b, ctx, cache, d1[0], d1[1], r)
dataset_read(b, ctx, cache, d2[0], d2[1], r)
dataset_read(b, ctx, cache, d3[0], d3[1], r)
}

func data_readNx(b *testing.B, time int64) {
ctx := context.Background()
cache_size := g_cache_size
cache := New[int64, int64](fscache.ConstCapacity(int64(cache_size)), ShardInt[int64], nil, nil, nil, false)
start := int64(0)
end := int64(g_cache_size) / int64(g_item_size) * time
r := rand.New(rand.NewPCG(1, 2))

b.ResetTimer()
dataset_read(b, ctx, cache, start, end, r)
}

func BenchmarkSimCacheRead1x(b *testing.B) {
data_readNx(b, 1)
}

func BenchmarkSimCacheRead1xShift(b *testing.B) {
data_shift(b, 1)
}

func BenchmarkSimCacheRead2x(b *testing.B) {
data_readNx(b, 2)
}

func BenchmarkSimCacheRead2xShift(b *testing.B) {
data_shift(b, 2)
}

func BenchmarkSimCacheRead4x(b *testing.B) {
data_readNx(b, 4)
}

func BenchmarkSimCacheRead4xShift(b *testing.B) {
data_shift(b, 4)
}
12 changes: 6 additions & 6 deletions pkg/fileservice/fifocache/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
func BenchmarkSequentialSet(b *testing.B) {
ctx := context.Background()
size := 65536
cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil)
cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil, false)
nElements := size * 16
b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand All @@ -37,7 +37,7 @@ func BenchmarkSequentialSet(b *testing.B) {
func BenchmarkParallelSet(b *testing.B) {
ctx := context.Background()
size := 65536
cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil)
cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil, false)
nElements := size * 16
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
Expand All @@ -50,7 +50,7 @@ func BenchmarkParallelSet(b *testing.B) {
func BenchmarkGet(b *testing.B) {
ctx := context.Background()
size := 65536
cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil)
cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil, false)
nElements := size * 16
for i := 0; i < nElements; i++ {
cache.Set(ctx, i, i, int64(1+i%3))
Expand All @@ -64,7 +64,7 @@ func BenchmarkGet(b *testing.B) {
func BenchmarkParallelGet(b *testing.B) {
ctx := context.Background()
size := 65536
cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil)
cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil, false)
nElements := size * 16
for i := 0; i < nElements; i++ {
cache.Set(ctx, i, i, int64(1+i%3))
Expand All @@ -80,7 +80,7 @@ func BenchmarkParallelGet(b *testing.B) {
func BenchmarkParallelGetOrSet(b *testing.B) {
ctx := context.Background()
size := 65536
cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil)
cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil, false)
nElements := size * 16
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
Expand All @@ -97,7 +97,7 @@ func BenchmarkParallelGetOrSet(b *testing.B) {
func BenchmarkParallelEvict(b *testing.B) {
ctx := context.Background()
size := 65536
cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil)
cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil, false)
nElements := size * 16
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
Expand Down
Loading
Loading