Skip to content

Commit

Permalink
refactor(repository): added fs.DirectoryIterator (kopia#3365)
Browse files Browse the repository at this point in the history
* refactor(repository): added fs.DirectoryIterator

This significantly reduces number of small allocations while
taking snapshots of lots of files, which leads to faster snapshots.

```
$ runbench --kopia-exe ~/go/bin/kopia \
   --compare-to-exe ~/go/bin/kopia-baseline --min-duration 30s \
   ./snapshot-linux-parallel-4.sh
DIFF duration: current:5.1 baseline:5.8 change:-13.0 %
DIFF repo_size: current:1081614127.6 baseline:1081615302.8 change:-0.0 %
DIFF num_files: current:60.0 baseline:60.0 change:0%
DIFF avg_heap_objects: current:4802666.0 baseline:4905741.8 change:-2.1 %
DIFF avg_heap_bytes: current:737397275.2 baseline:715263289.6 change:+3.1 %
DIFF avg_ram: current:215.0 baseline:211.5 change:+1.6 %
DIFF max_ram: current:294.8 baseline:311.4 change:-5.3 %
DIFF avg_cpu: current:167.3 baseline:145.3 change:+15.1 %
DIFF max_cpu: current:227.2 baseline:251.0 change:-9.5 %
```

* changed `Next()` API

* mechanical move of the iterator to its own file

* clarified comment

* pr feedback

* mechanical move of all localfs dependencies on os.FileInfo to a separate file

* Update fs/entry.go

Co-authored-by: ashmrtn <[email protected]>

* Update fs/entry_dir_iterator.go

Co-authored-by: Julio Lopez <[email protected]>

* doc: clarified valid results from Next()

---------

Co-authored-by: ashmrtn <[email protected]>
Co-authored-by: Julio Lopez <[email protected]>
  • Loading branch information
3 people authored Oct 5, 2023
1 parent 6602772 commit c8d1b22
Show file tree
Hide file tree
Showing 27 changed files with 524 additions and 514 deletions.
19 changes: 16 additions & 3 deletions cli/command_ls.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,22 @@ func (c *commandList) run(ctx context.Context, rep repo.Repository) error {
}

func (c *commandList) listDirectory(ctx context.Context, d fs.Directory, prefix, indent string) error {
if err := d.IterateEntries(ctx, func(innerCtx context.Context, e fs.Entry) error {
return c.printDirectoryEntry(innerCtx, e, prefix, indent)
}); err != nil {
iter, err := d.Iterate(ctx)
if err != nil {
return err //nolint:wrapcheck
}
defer iter.Close()

e, err := iter.Next(ctx)
for e != nil {
if err2 := c.printDirectoryEntry(ctx, e, prefix, indent); err2 != nil {
return err2
}

e, err = iter.Next(ctx)
}

if err != nil {
return err //nolint:wrapcheck
}

Expand Down
2 changes: 1 addition & 1 deletion fs/cachefs/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (c *Cache) IterateEntries(ctx context.Context, d fs.Directory, w EntryWrapp
return nil
}

return d.IterateEntries(ctx, callback) //nolint:wrapcheck
return fs.IterateEntries(ctx, d, callback) //nolint:wrapcheck
}

func (c *Cache) getEntriesFromCacheLocked(ctx context.Context, id string) []fs.Entry {
Expand Down
90 changes: 69 additions & 21 deletions fs/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,56 @@ type StreamingFile interface {
// Directory represents contents of a directory.
type Directory interface {
Entry

Child(ctx context.Context, name string) (Entry, error)
IterateEntries(ctx context.Context, cb func(context.Context, Entry) error) error
Iterate(ctx context.Context) (DirectoryIterator, error)
// SupportsMultipleIterations returns true if the Directory supports iterating
// through the entries multiple times. Otherwise it returns false.
SupportsMultipleIterations() bool
}

// IterateEntries iterates entries the provided directory and invokes given callback for each entry
// or until the callback returns an error.
func IterateEntries(ctx context.Context, dir Directory, cb func(context.Context, Entry) error) error {
iter, err := dir.Iterate(ctx)
if err != nil {
return err //nolint:wrapcheck
}

defer iter.Close()

cur, err := iter.Next(ctx)

for cur != nil {
if err2 := cb(ctx, cur); err2 != nil {
return err2
}

cur, err = iter.Next(ctx)
}

return err //nolint:wrapcheck
}

// DirectoryIterator iterates entries in a directory.
//
// The client is expected to call Next() in a loop until it returns a nil entry to signal
// end of iteration or until an error has occurred.
//
// Valid results:
//
// (nil,nil) - end of iteration, success
// (entry,nil) - iteration in progress, success
// (nil,err) - iteration stopped, failure
//
// The behavior of calling Next() after iteration has signaled its end is undefined.
//
// To release any resources associated with iteration the client must call Close().
type DirectoryIterator interface {
Next(ctx context.Context) (Entry, error)
Close()
}

// DirectoryWithSummary is optionally implemented by Directory that provide summary.
type DirectoryWithSummary interface {
Summary(ctx context.Context) (*DirectorySummary, error)
Expand All @@ -78,14 +121,22 @@ type ErrorEntry interface {
ErrorInfo() error
}

// GetAllEntries uses IterateEntries to return all entries in a Directory.
// GetAllEntries uses Iterate to return all entries in a Directory.
func GetAllEntries(ctx context.Context, d Directory) ([]Entry, error) {
entries := []Entry{}

err := d.IterateEntries(ctx, func(ctx context.Context, e Entry) error {
entries = append(entries, e)
return nil
})
iter, err := d.Iterate(ctx)
if err != nil {
return nil, err //nolint:wrapcheck
}

defer iter.Close()

cur, err := iter.Next(ctx)
for cur != nil {
entries = append(entries, cur)
cur, err = iter.Next(ctx)
}

return entries, err //nolint:wrapcheck
}
Expand All @@ -96,30 +147,27 @@ var ErrEntryNotFound = errors.New("entry not found")
// IterateEntriesAndFindChild iterates through entries from a directory and returns one by name.
// This is a convenience function that may be helpful in implementations of Directory.Child().
func IterateEntriesAndFindChild(ctx context.Context, d Directory, name string) (Entry, error) {
type errStop struct {
error
iter, err := d.Iterate(ctx)
if err != nil {
return nil, err //nolint:wrapcheck
}

var result Entry
defer iter.Close()

err := d.IterateEntries(ctx, func(c context.Context, e Entry) error {
if result == nil && e.Name() == name {
result = e
return errStop{errors.New("")}
cur, err := iter.Next(ctx)
for cur != nil {
if cur.Name() == name {
return cur, nil
}
return nil
})

var stopped errStop
if err != nil && !errors.As(err, &stopped) {
return nil, errors.Wrap(err, "error reading directory")
cur, err = iter.Next(ctx)
}

if result == nil {
return nil, ErrEntryNotFound
if err != nil {
return nil, err //nolint:wrapcheck
}

return result, nil
return nil, ErrEntryNotFound
}

// MaxFailedEntriesPerDirectorySummary is the maximum number of failed entries per directory summary.
Expand Down
30 changes: 30 additions & 0 deletions fs/entry_dir_iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package fs

import "context"

type staticIterator struct {
cur int
entries []Entry
err error
}

func (it *staticIterator) Close() {
}

func (it *staticIterator) Next(ctx context.Context) (Entry, error) {
if it.cur < len(it.entries) {
v := it.entries[it.cur]
it.cur++

return v, it.err
}

return nil, nil
}

// StaticIterator returns a DirectoryIterator which returns the provided
// entries in order followed by a given final error.
// It is not safe to concurrently access directory iterator.
func StaticIterator(entries []Entry, err error) DirectoryIterator {
return &staticIterator{0, entries, err}
}
85 changes: 73 additions & 12 deletions fs/ignorefs/ignorefs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"bufio"
"context"
"strings"
"sync"

"github.com/pkg/errors"

Expand Down Expand Up @@ -147,28 +148,81 @@ func (d *ignoreDirectory) DirEntryOrNil(ctx context.Context) (*snapshot.DirEntry
return nil, nil
}

func (d *ignoreDirectory) IterateEntries(ctx context.Context, callback func(ctx context.Context, entry fs.Entry) error) error {
type ignoreDirIterator struct {
//nolint:containedctx
ctx context.Context
d *ignoreDirectory
inner fs.DirectoryIterator
thisContext *ignoreContext
}

func (i *ignoreDirIterator) Next(ctx context.Context) (fs.Entry, error) {
cur, err := i.inner.Next(ctx)

for cur != nil {
//nolint:contextcheck
if wrapped, ok := i.d.maybeWrappedChildEntry(i.ctx, i.thisContext, cur); ok {
return wrapped, nil
}

cur, err = i.inner.Next(ctx)
}

return nil, err //nolint:wrapcheck
}

func (i *ignoreDirIterator) Close() {
i.inner.Close()

*i = ignoreDirIterator{}
ignoreDirIteratorPool.Put(i)
}

func (d *ignoreDirectory) Iterate(ctx context.Context) (fs.DirectoryIterator, error) {
if d.skipCacheDirectory(ctx, d.relativePath, d.policyTree) {
return nil
return fs.StaticIterator(nil, nil), nil
}

thisContext, err := d.buildContext(ctx)
if err != nil {
return err
return nil, err
}

//nolint:wrapcheck
return d.Directory.IterateEntries(ctx, func(ctx context.Context, e fs.Entry) error {
if wrapped, ok := d.maybeWrappedChildEntry(ctx, thisContext, e); ok {
return callback(ctx, wrapped)
}
inner, err := d.Directory.Iterate(ctx)
if err != nil {
return nil, err //nolint:wrapcheck
}

it := ignoreDirIteratorPool.Get().(*ignoreDirIterator) //nolint:forcetypeassert
it.ctx = ctx
it.d = d
it.inner = inner
it.thisContext = thisContext

return it, nil
}

//nolint:gochecknoglobals
var ignoreDirectoryPool = sync.Pool{
New: func() any { return &ignoreDirectory{} },
}

return nil
})
//nolint:gochecknoglobals
var ignoreDirIteratorPool = sync.Pool{
New: func() any { return &ignoreDirIterator{} },
}

func (d *ignoreDirectory) Close() {
d.Directory.Close()

*d = ignoreDirectory{}
ignoreDirectoryPool.Put(d)
}

func (d *ignoreDirectory) maybeWrappedChildEntry(ctx context.Context, ic *ignoreContext, e fs.Entry) (fs.Entry, bool) {
if !ic.shouldIncludeByName(ctx, d.relativePath+"/"+e.Name(), e, d.policyTree) {
s := d.relativePath + "/" + e.Name()

if !ic.shouldIncludeByName(ctx, s, e, d.policyTree) {
return nil, false
}

Expand All @@ -181,7 +235,14 @@ func (d *ignoreDirectory) maybeWrappedChildEntry(ctx context.Context, ic *ignore
}

if dir, ok := e.(fs.Directory); ok {
return &ignoreDirectory{d.relativePath + "/" + e.Name(), ic, d.policyTree.Child(e.Name()), dir}, true
id := ignoreDirectoryPool.Get().(*ignoreDirectory) //nolint:forcetypeassert

id.relativePath = s
id.parentContext = ic
id.policyTree = d.policyTree.Child(e.Name())
id.Directory = dir

return id, true
}

return e, true
Expand Down
2 changes: 1 addition & 1 deletion fs/ignorefs/ignorefs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ func walkTree(t *testing.T, dir fs.Directory) []string {
walk = func(path string, d fs.Directory) error {
output = append(output, path+"/")

return d.IterateEntries(testlogging.Context(t), func(innerCtx context.Context, e fs.Entry) error {
return fs.IterateEntries(testlogging.Context(t), d, func(innerCtx context.Context, e fs.Entry) error {
relPath := path + "/" + e.Name()

if subdir, ok := e.(fs.Directory); ok {
Expand Down
Loading

0 comments on commit c8d1b22

Please sign in to comment.