Skip to content

Commit

Permalink
optimise last call
Browse files Browse the repository at this point in the history
Signed-off-by: jiefenghuang <[email protected]>
  • Loading branch information
jiefenghuang committed Sep 26, 2024
1 parent 2e57e00 commit d1c4feb
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 20 deletions.
18 changes: 11 additions & 7 deletions pkg/meta/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -2941,16 +2941,13 @@ func (m *baseMeta) NewDirStream(ctx Context, inode Ino, plus bool, initEntries [
}

type dirBatch struct {
isEnd bool
offset int
cursor interface{}
entries []*Entry
indexes map[string]int
}

func (b *dirBatch) empty() bool {
return b == nil || len(b.entries) == 0
}

func (b *dirBatch) contain(offset int) bool {
if b == nil {
return false
Expand All @@ -2977,6 +2974,9 @@ type dirStream struct {
func (s *dirStream) fetch(ctx Context, offset int) (*dirBatch, error) {
var cursor interface{}
if s.batch != nil && s.batch.predecessor(offset) {
if s.batch.isEnd {
return &dirBatch{isEnd: true, entries: []*Entry{}, indexes: make(map[string]int), offset: offset}, nil
}
cursor = s.batch.cursor
}
nextCursor, entries, err := s.fetcher(ctx, s.inode, cursor, offset, DirBatchNum, s.plus)
Expand All @@ -2987,11 +2987,15 @@ func (s *dirStream) fetch(ctx Context, offset int) (*dirBatch, error) {
entries = []*Entry{}
nextCursor = cursor
}
isEnd := false
if len(entries) < DirBatchNum {
isEnd = true
}
indexes := make(map[string]int, len(entries))
for i, e := range entries {
indexes[string(e.Name)] = i
}
return &dirBatch{offset: offset, entries: entries, cursor: nextCursor, indexes: indexes}, nil
return &dirBatch{isEnd: isEnd, offset: offset, cursor: nextCursor, entries: entries, indexes: indexes}, nil
}

func (s *dirStream) List(ctx Context, offset int) ([]*Entry, syscall.Errno) {
Expand All @@ -3002,10 +3006,10 @@ func (s *dirStream) List(ctx Context, offset int) ([]*Entry, syscall.Errno) {

var err error
s.Lock()
defer s.Unlock()
if !s.batch.contain(offset) {
s.batch, err = s.fetch(ctx, offset)
}
s.Unlock()

if err != nil {
return nil, errno(err)
Expand All @@ -3016,7 +3020,7 @@ func (s *dirStream) List(ctx Context, offset int) ([]*Entry, syscall.Errno) {
}

func (s *dirStream) delete(name string) {
if s.batch.empty() {
if s.batch == nil || len(s.batch.entries) == 0 {
return
}

Expand Down
10 changes: 9 additions & 1 deletion pkg/meta/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -4451,7 +4451,15 @@ type dbDirStream struct {
dirStream
}

func (s *dbDirStream) Insert(inode Ino, name string, attr *Attr) {}
func (s *dbDirStream) Insert(inode Ino, name string, attr *Attr) {
s.Lock()
defer s.Unlock()
if s.batch == nil || !s.batch.isEnd {
return
}
s.batch.entries = append(s.batch.entries, &Entry{Inode: inode, Name: []byte(name), Attr: attr})
s.batch.indexes[name] = len(s.batch.entries) - 1
}

func (s *dbDirStream) Delete(name string) {
s.Lock()
Expand Down
16 changes: 11 additions & 5 deletions pkg/meta/tkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -3788,17 +3788,23 @@ type kvDirStream struct {
func (s *kvDirStream) Insert(inode Ino, name string, attr *Attr) {
s.Lock()
defer s.Unlock()
if s.batch.empty() {
if s.batch == nil {
return
}

start, end := s.batch.entries[0].Name, []byte(s.batch.cursor.(string))
target := []byte(name)
if bytes.Compare(target, start) < 0 || bytes.Compare(target, end) >= 0 {
return
if !s.batch.isEnd {
if len(s.batch.entries) == 0 {
return
}
start, end := s.batch.entries[0].Name, []byte(s.batch.cursor.(string))
if bytes.Compare(target, start) <= 0 || bytes.Compare(target, end) >= 0 {
return
}
}

// TODO: maybe sort it
s.batch.entries = append(s.batch.entries, &Entry{Inode: inode, Name: []byte(name), Attr: attr})
s.batch.entries = append(s.batch.entries, &Entry{Inode: inode, Name: target, Attr: attr})
s.batch.indexes[name] = len(s.batch.entries) - 1
}

Expand Down
76 changes: 69 additions & 7 deletions pkg/vfs/vfs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/json"
"fmt"
"log"
"math/rand"
"reflect"
"slices"
"strings"
Expand Down Expand Up @@ -873,12 +874,19 @@ func TestInternalFile(t *testing.T) {
}

func TestReaddirCache(t *testing.T) {
testReaddirCache(t, "")
testReaddirCache(t, "sqlite3://")
testReaddirCache(t, "redis://127.0.0.1:6379/2")
for _, metaUri := range []string{"", "sqlite3://", "redis://127.0.0.1:6379/2"} {
testReaddirCache(t, metaUri, 20)
testReaddirCache(t, metaUri, 4096)
}
}

func testReaddirCache(t *testing.T, metaUri string) {
func testReaddirCache(t *testing.T, metaUri string, batchNum int) {
old := meta.DirBatchNum
meta.DirBatchNum = batchNum
defer func() {
meta.DirBatchNum = old
}()

v, _ := createTestVFS(nil, metaUri)
ctx := NewLogContext(meta.Background)

Expand Down Expand Up @@ -1045,7 +1053,61 @@ func testReaddirStreaming(t *testing.T, metaUri string) {
}

func TestReadDirSteaming(t *testing.T) {
testReaddirStreaming(t, "memkv://")
testReaddirStreaming(t, "sqlite3://")
testReaddirStreaming(t, "redis://127.0.0.1:6379/2")
for _, metaUri := range []string{"", "sqlite3://", "redis://127.0.0.1:6379/2"} {
testReaddirStreaming(t, metaUri)
}
}

func TestReaddir(t *testing.T) {
extra := rand.Intn(meta.DirBatchNum)
for _, metaUri := range []string{"", "sqlite3://", "redis://127.0.0.1:6379/2"} {
testReaddir(t, metaUri, 20, 0)
testReaddir(t, metaUri, 20, 5)
testReaddir(t, metaUri, 4*meta.DirBatchNum, 0)
testReaddir(t, metaUri, 4*meta.DirBatchNum, extra)
testReaddir(t, metaUri, 5*meta.DirBatchNum+extra, 0)
testReaddir(t, metaUri, 5*meta.DirBatchNum+extra, 2*meta.DirBatchNum)
}
}

func testReaddir(t *testing.T, metaUri string, dirNum int, offset int) {
v, _ := createTestVFS(nil, metaUri)
ctx := NewLogContext(meta.Background)

entry, st := v.Mkdir(ctx, 1, "testdir", 0777, 022)
if st != 0 {
t.Fatalf("mkdir testdir: %s", st)
}

parent := entry.Inode
for i := 0; i < dirNum; i++ {
_, _ = v.Mkdir(ctx, parent, fmt.Sprintf("d%d", i), 0777, 022)
}
defer func() {
for i := 0; i < dirNum; i++ {
_ = v.Rmdir(ctx, parent, fmt.Sprintf("d%d", i))
}
v.Rmdir(ctx, 1, "testdir")
}()

fh, _ := v.Opendir(ctx, parent, 0)
defer v.Releasedir(ctx, parent, fh)

readAll := func(ctx Context, parent Ino, fh uint64, off int) []*meta.Entry {
var entries []*meta.Entry
for {
ents, _, st := v.Readdir(ctx, parent, 0, off, fh, true)
require.Equal(t, st, syscall.Errno(0))
if len(ents) == 0 {
break
}
off += len(ents)
entries = append(entries, ents...)
}
return entries
}

entriesOne := readAll(ctx, parent, fh, offset)
entriesTwo := readAll(ctx, parent, fh, offset)
require.True(t, reflect.DeepEqual(entriesOne, entriesTwo))
}

0 comments on commit d1c4feb

Please sign in to comment.