diff --git a/pkg/meta/base.go b/pkg/meta/base.go index 51188f4dfc33..453ab0a42906 100644 --- a/pkg/meta/base.go +++ b/pkg/meta/base.go @@ -2941,16 +2941,13 @@ func (m *baseMeta) NewDirStream(ctx Context, inode Ino, plus bool, initEntries [ } type dirBatch struct { + isEnd bool offset int cursor interface{} entries []*Entry indexes map[string]int } -func (b *dirBatch) empty() bool { - return b == nil || len(b.entries) == 0 -} - func (b *dirBatch) contain(offset int) bool { if b == nil { return false @@ -2977,6 +2974,9 @@ type dirStream struct { func (s *dirStream) fetch(ctx Context, offset int) (*dirBatch, error) { var cursor interface{} if s.batch != nil && s.batch.predecessor(offset) { + if s.batch.isEnd { + return &dirBatch{isEnd: true, entries: []*Entry{}, indexes: make(map[string]int), offset: offset}, nil + } cursor = s.batch.cursor } nextCursor, entries, err := s.fetcher(ctx, s.inode, cursor, offset, DirBatchNum, s.plus) @@ -2987,11 +2987,15 @@ func (s *dirStream) fetch(ctx Context, offset int) (*dirBatch, error) { entries = []*Entry{} nextCursor = cursor } + isEnd := false + if len(entries) < DirBatchNum { + isEnd = true + } indexes := make(map[string]int, len(entries)) for i, e := range entries { indexes[string(e.Name)] = i } - return &dirBatch{offset: offset, entries: entries, cursor: nextCursor, indexes: indexes}, nil + return &dirBatch{isEnd: isEnd, offset: offset, cursor: nextCursor, entries: entries, indexes: indexes}, nil } func (s *dirStream) List(ctx Context, offset int) ([]*Entry, syscall.Errno) { @@ -3002,10 +3006,10 @@ func (s *dirStream) List(ctx Context, offset int) ([]*Entry, syscall.Errno) { var err error s.Lock() + defer s.Unlock() if !s.batch.contain(offset) { s.batch, err = s.fetch(ctx, offset) } - s.Unlock() if err != nil { return nil, errno(err) @@ -3016,7 +3020,7 @@ func (s *dirStream) List(ctx Context, offset int) ([]*Entry, syscall.Errno) { } func (s *dirStream) delete(name string) { - if s.batch.empty() { + if s.batch == nil || len(s.batch.entries) == 0 { return } diff --git a/pkg/meta/sql.go b/pkg/meta/sql.go index 4eeed29a2088..9aa4734a2ad5 100644 --- a/pkg/meta/sql.go +++ b/pkg/meta/sql.go @@ -4451,7 +4451,15 @@ type dbDirStream struct { dirStream } -func (s *dbDirStream) Insert(inode Ino, name string, attr *Attr) {} +func (s *dbDirStream) Insert(inode Ino, name string, attr *Attr) { + s.Lock() + defer s.Unlock() + if s.batch == nil || !s.batch.isEnd { + return + } + s.batch.entries = append(s.batch.entries, &Entry{Inode: inode, Name: []byte(name), Attr: attr}) + s.batch.indexes[name] = len(s.batch.entries) - 1 +} func (s *dbDirStream) Delete(name string) { s.Lock() diff --git a/pkg/meta/tkv.go b/pkg/meta/tkv.go index 1ba1a7cbf52f..4da70b6b4342 100644 --- a/pkg/meta/tkv.go +++ b/pkg/meta/tkv.go @@ -3788,17 +3788,23 @@ type kvDirStream struct { func (s *kvDirStream) Insert(inode Ino, name string, attr *Attr) { s.Lock() defer s.Unlock() - if s.batch.empty() { + if s.batch == nil { return } - start, end := s.batch.entries[0].Name, []byte(s.batch.cursor.(string)) target := []byte(name) - if bytes.Compare(target, start) < 0 || bytes.Compare(target, end) >= 0 { - return + if !s.batch.isEnd { + if len(s.batch.entries) == 0 { + return + } + start, end := s.batch.entries[0].Name, []byte(s.batch.cursor.(string)) + if bytes.Compare(target, start) <= 0 || bytes.Compare(target, end) >= 0 { + return + } } + // TODO: maybe sort it - s.batch.entries = append(s.batch.entries, &Entry{Inode: inode, Name: []byte(name), Attr: attr}) + s.batch.entries = append(s.batch.entries, &Entry{Inode: inode, Name: target, Attr: attr}) s.batch.indexes[name] = len(s.batch.entries) - 1 } diff --git a/pkg/vfs/vfs_test.go b/pkg/vfs/vfs_test.go index 110253f71317..d3ba82854f6b 100644 --- a/pkg/vfs/vfs_test.go +++ b/pkg/vfs/vfs_test.go @@ -21,6 +21,7 @@ import ( "encoding/json" "fmt" "log" + "math/rand" "reflect" "slices" "strings" @@ -873,12 +874,19 @@ func TestInternalFile(t *testing.T) { } func TestReaddirCache(t *testing.T) { - testReaddirCache(t, "") - testReaddirCache(t, "sqlite3://") - testReaddirCache(t, "redis://127.0.0.1:6379/2") + for _, metaUri := range []string{"", "sqlite3://", "redis://127.0.0.1:6379/2"} { + testReaddirCache(t, metaUri, 20) + testReaddirCache(t, metaUri, 4096) + } } -func testReaddirCache(t *testing.T, metaUri string) { +func testReaddirCache(t *testing.T, metaUri string, batchNum int) { + old := meta.DirBatchNum + meta.DirBatchNum = batchNum + defer func() { + meta.DirBatchNum = old + }() + v, _ := createTestVFS(nil, metaUri) ctx := NewLogContext(meta.Background) @@ -1045,7 +1053,61 @@ func testReaddirStreaming(t *testing.T, metaUri string) { } func TestReadDirSteaming(t *testing.T) { - testReaddirStreaming(t, "memkv://") - testReaddirStreaming(t, "sqlite3://") - testReaddirStreaming(t, "redis://127.0.0.1:6379/2") + for _, metaUri := range []string{"", "sqlite3://", "redis://127.0.0.1:6379/2"} { + testReaddirStreaming(t, metaUri) + } +} + +func TestReaddir(t *testing.T) { + extra := rand.Intn(meta.DirBatchNum) + for _, metaUri := range []string{"", "sqlite3://", "redis://127.0.0.1:6379/2"} { + testReaddir(t, metaUri, 20, 0) + testReaddir(t, metaUri, 20, 5) + testReaddir(t, metaUri, 4*meta.DirBatchNum, 0) + testReaddir(t, metaUri, 4*meta.DirBatchNum, extra) + testReaddir(t, metaUri, 5*meta.DirBatchNum+extra, 0) + testReaddir(t, metaUri, 5*meta.DirBatchNum+extra, 2*meta.DirBatchNum) + } +} + +func testReaddir(t *testing.T, metaUri string, dirNum int, offset int) { + v, _ := createTestVFS(nil, metaUri) + ctx := NewLogContext(meta.Background) + + entry, st := v.Mkdir(ctx, 1, "testdir", 0777, 022) + if st != 0 { + t.Fatalf("mkdir testdir: %s", st) + } + + parent := entry.Inode + for i := 0; i < dirNum; i++ { + _, _ = v.Mkdir(ctx, parent, fmt.Sprintf("d%d", i), 0777, 022) + } + defer func() { + for i := 0; i < dirNum; i++ { + _ = v.Rmdir(ctx, parent, fmt.Sprintf("d%d", i)) + } + v.Rmdir(ctx, 1, "testdir") + }() + + fh, _ := v.Opendir(ctx, parent, 0) + defer v.Releasedir(ctx, parent, fh) + + readAll := func(ctx Context, parent Ino, fh uint64, off int) []*meta.Entry { + var entries []*meta.Entry + for { + ents, _, st := v.Readdir(ctx, parent, 0, off, fh, true) + require.Equal(t, st, syscall.Errno(0)) + if len(ents) == 0 { + break + } + off += len(ents) + entries = append(entries, ents...) + } + return entries + } + + entriesOne := readAll(ctx, parent, fh, offset) + entriesTwo := readAll(ctx, parent, fh, offset) + require.True(t, reflect.DeepEqual(entriesOne, entriesTwo)) }