diff --git a/cache/cache.go b/cache/cache.go index 47c49a384..11641d6f9 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -17,9 +17,9 @@ package cache import ( + "bytes" "fmt" "io" - "io/ioutil" "os" "path/filepath" "sync" @@ -28,117 +28,211 @@ import ( "github.com/pkg/errors" ) +const ( + defaultMaxLRUCacheEntry = 10 + defaultMaxCacheFds = 10 +) + +type DirectoryCacheConfig struct { + MaxLRUCacheEntry int `toml:"max_lru_cache_entry"` + MaxCacheFds int `toml:"max_cache_fds"` + SyncAdd bool `toml:"sync_add"` +} + // TODO: contents validation. type BlobCache interface { - Fetch(blobHash string) ([]byte, error) - Add(blobHash string, p []byte) + Add(key string, p []byte, opts ...Option) + FetchAt(key string, offset int64, p []byte, opts ...Option) (n int, err error) } -type dirOpt struct { - syncAdd bool +type cacheOpt struct { + direct bool } -type DirOption func(o *dirOpt) *dirOpt +type Option func(o *cacheOpt) *cacheOpt -func SyncAdd() DirOption { - return func(o *dirOpt) *dirOpt { - o.syncAdd = true +// When Direct option is specified for FetchAt and Add methods, these operation +// won't use on-memory caches. When you know that the targeting value won't be +// used immediately, you can prevent the limited space of on-memory caches from +// being polluted by these unimportant values. +func Direct() Option { + return func(o *cacheOpt) *cacheOpt { + o.direct = true return o } } -func NewDirectoryCache(directory string, memCacheSize int, opts ...DirOption) (BlobCache, error) { - opt := &dirOpt{} - for _, o := range opts { - opt = o(opt) +func NewDirectoryCache(directory string, config DirectoryCacheConfig) (BlobCache, error) { + maxEntry := config.MaxLRUCacheEntry + if maxEntry == 0 { + maxEntry = defaultMaxLRUCacheEntry + } + maxFds := config.MaxCacheFds + if maxFds == 0 { + maxFds = defaultMaxCacheFds } if err := os.MkdirAll(directory, os.ModePerm); err != nil { return nil, err } dc := &directoryCache{ - cache: lru.New(memCacheSize), + cache: newObjectCache(maxEntry), + fileCache: newObjectCache(maxFds), directory: directory, + bufPool: sync.Pool{ + New: func() interface{} { + return new(bytes.Buffer) + }, + }, } - if opt.syncAdd { - dc.syncAdd = true + dc.cache.finalize = func(value interface{}) { + dc.bufPool.Put(value) } + dc.fileCache.finalize = func(value interface{}) { + value.(*os.File).Close() + } + dc.syncAdd = config.SyncAdd return dc, nil } // directoryCache is a cache implementation which backend is a directory. type directoryCache struct { - cache *lru.Cache - cacheMu sync.Mutex + cache *objectCache + fileCache *objectCache directory string - syncAdd bool - fileMu sync.Mutex + + bufPool sync.Pool + + syncAdd bool } -func (dc *directoryCache) Fetch(blobHash string) (p []byte, err error) { - dc.cacheMu.Lock() - defer dc.cacheMu.Unlock() +func (dc *directoryCache) FetchAt(key string, offset int64, p []byte, opts ...Option) (n int, err error) { + opt := &cacheOpt{} + for _, o := range opts { + opt = o(opt) + } - if cache, ok := dc.cache.Get(blobHash); ok { - p, ok := cache.([]byte) - if ok { - return p, nil + if !opt.direct { + // Get data from memory + if b, done, ok := dc.cache.get(key); ok { + defer done() + data := b.(*bytes.Buffer).Bytes() + if int64(len(data)) < offset { + return 0, fmt.Errorf("invalid offset %d exceeds chunk size %d", + offset, len(data)) + } + return copy(p, data[offset:]), nil } - } - c := filepath.Join(dc.directory, blobHash[:2], blobHash) - if _, err := os.Stat(c); err != nil { - return nil, errors.Wrapf(err, "Missed cache %q", c) + // Get data from disk. If the file is already opened, use it. + if f, done, ok := dc.fileCache.get(key); ok { + defer done() + return f.(*os.File).ReadAt(p, offset) + } } - file, err := os.Open(c) + // Open the cache file and read the target region + // TODO: If the target cache is write-in-progress, should we wait for the completion + // or simply report the cache miss? + file, err := os.Open(dc.cachePath(key)) if err != nil { - return nil, errors.Wrapf(err, "Failed to Open cached blob file %q", c) + return 0, errors.Wrapf(err, "failed to open blob file for %q", key) + } + if n, err = file.ReadAt(p, offset); err == io.EOF { + err = nil } - defer file.Close() - if p, err = ioutil.ReadAll(file); err != nil && err != io.EOF { - return nil, errors.Wrapf(err, "failed to read cached data %q", c) + // Cache the opened file for future use. If "direct" option is specified, this + // won't be done. This option is useful for preventing file cache from being + // polluted by data that won't be accessed immediately. + if opt.direct || !dc.fileCache.add(key, file) { + file.Close() } - dc.cache.Add(blobHash, p) - return + // TODO: should we cache the entire file data on memory? + // but making I/O (possibly huge) on every fetching + // might be costly. + + return n, err } -func (dc *directoryCache) Add(blobHash string, p []byte) { - // Copy the original data for avoiding the cached contents to be edited accidentally - p2 := make([]byte, len(p)) - copy(p2, p) - p = p2 +func (dc *directoryCache) Add(key string, p []byte, opts ...Option) { + opt := &cacheOpt{} + for _, o := range opts { + opt = o(opt) + } - dc.cacheMu.Lock() - dc.cache.Add(blobHash, p) - dc.cacheMu.Unlock() + if !opt.direct { + // Cache the passed data on memory. This enables to serve this data even + // during writing it to the disk. If "direct" option is specified, this + // won't be done. This option is useful for preventing memory cache from being + // polluted by data that won't be accessed immediately. + b := dc.bufPool.Get().(*bytes.Buffer) + b.Reset() + b.Write(p) + if !dc.cache.add(key, b) { + dc.bufPool.Put(b) // Already exists. No need to cache. + } + } + // Cache the passed data to disk. + b2 := dc.bufPool.Get().(*bytes.Buffer) + b2.Reset() + b2.Write(p) addFunc := func() { - dc.fileMu.Lock() - defer dc.fileMu.Unlock() + defer dc.bufPool.Put(b2) - // Check if cache exists. - c := filepath.Join(dc.directory, blobHash[:2], blobHash) + var ( + c = dc.cachePath(key) + wip = dc.wipPath(key) + ) + if _, err := os.Stat(wip); err == nil { + return // Write in progress + } if _, err := os.Stat(c); err == nil { + return // Already exists. + } + + // Write the contents to a temporary file + if err := os.MkdirAll(filepath.Dir(wip), os.ModePerm); err != nil { + fmt.Printf("Warning: Failed to Create blob cache directory %q: %v\n", c, err) + return + } + wipfile, err := os.Create(wip) + if err != nil { + fmt.Printf("Warning: failed to prepare temp file for storing cache %q", key) + return + } + defer func() { + wipfile.Close() + os.Remove(wipfile.Name()) + }() + want := b2.Len() + if _, err := io.CopyN(wipfile, b2, int64(want)); err != nil { + fmt.Printf("Warning: failed to write cache: %v\n", err) return } - // Create cache file + // Commit the cache contents if err := os.MkdirAll(filepath.Dir(c), os.ModePerm); err != nil { fmt.Printf("Warning: Failed to Create blob cache directory %q: %v\n", c, err) return } - f, err := os.Create(c) + if err := os.Rename(wipfile.Name(), c); err != nil { + fmt.Printf("Warning: failed to commit cache to %q: %v\n", c, err) + return + } + file, err := os.Open(c) if err != nil { - fmt.Printf("Warning: could not create a cache file at %q: %v\n", c, err) + fmt.Printf("Warning: failed to open cache on %q: %v\n", c, err) return } - defer f.Close() - if n, err := f.Write(p); err != nil || n != len(p) { - fmt.Printf("Warning: failed to write cache: %d(wrote)/%d(expected): %v\n", - n, len(p), err) + + // Cache the opened file for future use. If "direct" option is specified, this + // won't be done. This option is useful for preventing file cache from being + // polluted by data that won't be accessed immediately. + if opt.direct || !dc.fileCache.add(key, file) { + file.Close() } } @@ -149,6 +243,81 @@ func (dc *directoryCache) Add(blobHash string, p []byte) { } } +func (dc *directoryCache) cachePath(key string) string { + return filepath.Join(dc.directory, key[:2], key) +} + +func (dc *directoryCache) wipPath(key string) string { + return filepath.Join(dc.directory, key[:2], "w", key) +} + +func newObjectCache(maxEntries int) *objectCache { + oc := &objectCache{ + cache: lru.New(maxEntries), + } + oc.cache.OnEvicted = func(key lru.Key, value interface{}) { + value.(*object).release() // Decrease ref count incremented in add operation. + } + return oc +} + +type objectCache struct { + cache *lru.Cache + cacheMu sync.Mutex + finalize func(interface{}) +} + +func (oc *objectCache) get(key string) (value interface{}, done func(), ok bool) { + oc.cacheMu.Lock() + defer oc.cacheMu.Unlock() + o, ok := oc.cache.Get(key) + if !ok { + return nil, nil, false + } + o.(*object).use() + return o.(*object).v, func() { o.(*object).release() }, true +} + +func (oc *objectCache) add(key string, value interface{}) bool { + oc.cacheMu.Lock() + defer oc.cacheMu.Unlock() + if _, ok := oc.cache.Get(key); ok { + return false // TODO: should we swap the object? + } + o := &object{ + v: value, + finalize: oc.finalize, + } + o.use() // Keep this object having at least 1 ref count (will be decreased on eviction) + oc.cache.Add(key, o) + return true +} + +type object struct { + v interface{} + + refCounts int64 + finalize func(interface{}) + + mu sync.Mutex +} + +func (o *object) use() { + o.mu.Lock() + defer o.mu.Unlock() + o.refCounts++ +} + +func (o *object) release() { + o.mu.Lock() + defer o.mu.Unlock() + o.refCounts-- + if o.refCounts <= 0 && o.finalize != nil { + // nobody will refer this object + o.finalize(o.v) + } +} + func NewMemoryCache() BlobCache { return &memoryCache{ membuf: map[string]string{}, @@ -161,19 +330,19 @@ type memoryCache struct { mu sync.Mutex } -func (mc *memoryCache) Fetch(blobHash string) ([]byte, error) { +func (mc *memoryCache) FetchAt(key string, offset int64, p []byte, opts ...Option) (n int, err error) { mc.mu.Lock() defer mc.mu.Unlock() - cache, ok := mc.membuf[blobHash] + cache, ok := mc.membuf[key] if !ok { - return nil, fmt.Errorf("Missed cache: %q", blobHash) + return 0, fmt.Errorf("Missed cache: %q", key) } - return []byte(cache), nil + return copy(p, cache[offset:]), nil } -func (mc *memoryCache) Add(blobHash string, p []byte) { +func (mc *memoryCache) Add(key string, p []byte, opts ...Option) { mc.mu.Lock() defer mc.mu.Unlock() - mc.membuf[blobHash] = string(p) + mc.membuf[key] = string(p) } diff --git a/cache/cache_test.go b/cache/cache_test.go index 362597ee5..8aa5df308 100644 --- a/cache/cache_test.go +++ b/cache/cache_test.go @@ -42,7 +42,10 @@ func TestDirectoryCache(t *testing.T) { if err != nil { t.Fatalf("failed to make tempdir: %v", err) } - c, err := NewDirectoryCache(tmp, 10, SyncAdd()) + c, err := NewDirectoryCache(tmp, DirectoryCacheConfig{ + MaxLRUCacheEntry: 10, + SyncAdd: true, + }) if err != nil { t.Fatalf("failed to make cache: %v", err) } @@ -56,7 +59,10 @@ func TestDirectoryCache(t *testing.T) { if err != nil { t.Fatalf("failed to make tempdir: %v", err) } - c, err := NewDirectoryCache(tmp, 1, SyncAdd()) + c, err := NewDirectoryCache(tmp, DirectoryCacheConfig{ + MaxLRUCacheEntry: 1, + SyncAdd: true, + }) if err != nil { t.Fatalf("failed to make cache: %v", err) } @@ -144,28 +150,35 @@ func digestFor(content string) string { func hit(sample string) check { return func(t *testing.T, c BlobCache) { - d := digestFor(sample) - p, err := c.Fetch(d) - if err != nil { - t.Errorf("failed to fetch blob %q: %v", d, err) - return - } - if len(p) != len(sample) { - t.Errorf("fetched size %d; want %d", len(p), len(sample)) - return - } - df := digestFor(string(p)) - if df != d { - t.Errorf("fetched digest %q(%q); want %q(%q)", - df, string(p), d, sample) - } + // test whole blob + key := digestFor(sample) + testChunk(t, c, key, 0, sample) + + // test a chunk + chunk := len(sample) / 3 + testChunk(t, c, key, int64(chunk), sample[chunk:2*chunk]) + } +} + +func testChunk(t *testing.T, c BlobCache, key string, offset int64, sample string) { + p := make([]byte, len(sample)) + if n, err := c.FetchAt(key, offset, p); err != nil { + t.Errorf("failed to fetch blob %q: %v", key, err) + return + } else if n != len(sample) { + t.Errorf("fetched size %d; want %d", len(p), len(sample)) + return + } + if digestFor(sample) != digestFor(string(p)) { + t.Errorf("fetched %q; want %q", string(p), sample) } } func miss(sample string) check { return func(t *testing.T, c BlobCache) { d := digestFor(sample) - _, err := c.Fetch(d) + p := make([]byte, len(sample)) + _, err := c.FetchAt(d, 0, p) if err == nil { t.Errorf("hit blob %q but must be missed: %v", d, err) return diff --git a/stargz/fs.go b/stargz/fs.go index 2f12f7bfc..c50c66eda 100644 --- a/stargz/fs.go +++ b/stargz/fs.go @@ -71,15 +71,15 @@ import ( ) const ( - blockSize = 512 + blockSize = 4096 memoryCacheType = "memory" whiteoutPrefix = ".wh." whiteoutOpaqueDir = whiteoutPrefix + whiteoutPrefix + ".opq" opaqueXattr = "trusted.overlay.opaque" opaqueXattrValue = "y" stateDirName = ".stargz-snapshotter" - defaultLRUCacheEntry = 100 defaultResolveResultEntry = 100 + defaultPrefetchTimeoutSec = 10 statFileMode = syscall.S_IFREG | 0400 // -r-------- stateDirMode = syscall.S_IFDIR | 0500 // dr-x------ @@ -104,27 +104,38 @@ type Config struct { remote.ResolverConfig `toml:"resolver"` remote.BlobConfig `toml:"blob"` keychain.KubeconfigKeychainConfig `toml:"kubeconfig_keychain"` + cache.DirectoryCacheConfig `toml:"directory_cache"` HTTPCacheType string `toml:"http_cache_type"` FSCacheType string `toml:"filesystem_cache_type"` - LRUCacheEntry int `toml:"lru_max_entry"` ResolveResultEntry int `toml:"resolve_result_entry"` PrefetchSize int64 `toml:"prefetch_size"` + PrefetchTimeoutSec int64 `toml:"prefetch_timeout_sec"` NoPrefetch bool `toml:"noprefetch"` Debug bool `toml:"debug"` } -func NewFilesystem(ctx context.Context, root string, config *Config) (snbase.FileSystem, error) { - maxEntry := config.LRUCacheEntry - if maxEntry == 0 { - maxEntry = defaultLRUCacheEntry - } - httpCache, err := getCache(config.HTTPCacheType, filepath.Join(root, "httpcache"), maxEntry) - if err != nil { - return nil, err +func NewFilesystem(ctx context.Context, root string, config *Config) (_ snbase.FileSystem, err error) { + var httpCache cache.BlobCache + if config.HTTPCacheType == memoryCacheType { + httpCache = cache.NewMemoryCache() + } else { + if httpCache, err = cache.NewDirectoryCache( + filepath.Join(root, "http"), + config.DirectoryCacheConfig, + ); err != nil { + return nil, errors.Wrap(err, "failed to prepare HTTP cache") + } } - fsCache, err := getCache(config.FSCacheType, filepath.Join(root, "fscache"), maxEntry) - if err != nil { - return nil, err + var fsCache cache.BlobCache + if config.FSCacheType == memoryCacheType { + fsCache = cache.NewMemoryCache() + } else { + if fsCache, err = cache.NewDirectoryCache( + filepath.Join(root, "fscache"), + config.DirectoryCacheConfig, + ); err != nil { + return nil, errors.Wrap(err, "failed to prepare filesystem cache") + } } keychain := authn.NewMultiKeychain( authn.DefaultKeychain, @@ -134,12 +145,17 @@ func NewFilesystem(ctx context.Context, root string, config *Config) (snbase.Fil if resolveResultEntry == 0 { resolveResultEntry = defaultResolveResultEntry } + prefetchTimeout := time.Duration(config.PrefetchTimeoutSec) * time.Second + if prefetchTimeout == 0 { + prefetchTimeout = defaultPrefetchTimeoutSec * time.Second + } return &filesystem{ resolver: remote.NewResolver(keychain, config.ResolverConfig), blobConfig: config.BlobConfig, httpCache: httpCache, fsCache: fsCache, prefetchSize: config.PrefetchSize, + prefetchTimeout: prefetchTimeout, noprefetch: config.NoPrefetch, debug: config.Debug, layer: make(map[string]*layer), @@ -148,20 +164,13 @@ func NewFilesystem(ctx context.Context, root string, config *Config) (snbase.Fil }, nil } -// getCache gets a cache corresponding to specified type. -func getCache(ctype, dir string, maxEntry int) (cache.BlobCache, error) { - if ctype == memoryCacheType { - return cache.NewMemoryCache(), nil - } - return cache.NewDirectoryCache(dir, maxEntry) -} - type filesystem struct { resolver *remote.Resolver blobConfig remote.BlobConfig httpCache cache.BlobCache fsCache cache.BlobCache prefetchSize int64 + prefetchTimeout time.Duration noprefetch bool debug bool layer map[string]*layer @@ -245,7 +254,9 @@ func (fs *filesystem) Mount(ctx context.Context, mountpoint string, labels map[s // Check() for this layer waits for the prefetch completion. We recreate // RoundTripper to avoid disturbing other NW-related operations. if !fs.noprefetch { + l.doPrefetch() go func() { + defer l.donePrefetch() fs.backgroundTaskManager.DoPrioritizedTask() defer fs.backgroundTaskManager.DonePrioritizedTask() tr, err := fetchTr() @@ -275,11 +286,17 @@ func (fs *filesystem) Mount(ctx context.Context, mountpoint string, labels map[s retN, retErr = 0, err return } - retN, retErr = l.blob.ReadAt(p, offset, remote.WithContext(ctx), remote.WithRoundTripper(tr)) + retN, retErr = l.blob.ReadAt( + p, + offset, + remote.WithContext(ctx), // Make cancellable + remote.WithRoundTripper(tr), // Use dedicated Transport + remote.WithCacheOpts(cache.Direct()), // Do not pollute mem cache + ) }, 120*time.Second) return }), 0, l.blob.Size()) - if err := l.reader.CacheTarGzWithReader(br); err != nil { + if err := l.reader.CacheTarGzWithReader(br, cache.Direct()); err != nil { logCtx.WithError(err).Debug("failed to fetch whole layer") return } @@ -349,7 +366,7 @@ func (fs *filesystem) resolve(ctx context.Context, ref, digest string) *resolveR return nil, errors.Wrap(err, "failed to read layer") } - return newLayer(blob, gr, root), nil + return newLayer(blob, gr, root, fs.prefetchTimeout), nil }) } @@ -371,7 +388,7 @@ func (fs *filesystem) Check(ctx context.Context, mountpoint string) error { } // Wait for prefetch compeletion - if err := l.waitForPrefetchCompletion(10 * time.Second); err != nil { + if err := l.waitForPrefetchCompletion(); err != nil { logCtx.WithError(err).Warn("failed to sync with prefetch completion") } @@ -507,26 +524,33 @@ func (rr *resolveResult) isInProgress() bool { return rr.progress.isInProgress() } -func newLayer(blob remote.Blob, r reader.Reader, root *stargz.TOCEntry) *layer { +func newLayer(blob remote.Blob, r reader.Reader, root *stargz.TOCEntry, prefetchTimeout time.Duration) *layer { return &layer{ - blob: blob, - reader: r, - root: root, - prefetchWaiter: newWaiter(), + blob: blob, + reader: r, + root: root, + prefetchWaiter: newWaiter(), + prefetchTimeout: prefetchTimeout, } } type layer struct { - blob remote.Blob - reader reader.Reader - root *stargz.TOCEntry - prefetchWaiter *waiter + blob remote.Blob + reader reader.Reader + root *stargz.TOCEntry + prefetchWaiter *waiter + prefetchTimeout time.Duration } -func (l *layer) prefetch(prefetchSize int64, opts ...remote.Option) error { +func (l *layer) doPrefetch() { l.prefetchWaiter.start() - defer l.prefetchWaiter.done() +} + +func (l *layer) donePrefetch() { + l.prefetchWaiter.done() +} +func (l *layer) prefetch(prefetchSize int64, opts ...remote.Option) error { if _, ok := l.reader.Lookup(NoPrefetchLandmark); ok { // do not prefetch this layer return nil @@ -545,15 +569,15 @@ func (l *layer) prefetch(prefetchSize int64, opts ...remote.Option) error { return l.blob.ReadAt(p, off, opts...) }), 0, prefetchSize) err := l.reader.CacheTarGzWithReader(pr) - if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { + if err != nil && errors.Cause(err) != io.EOF && errors.Cause(err) != io.ErrUnexpectedEOF { return errors.Wrap(err, "failed to cache prefetched layer") } return nil } -func (l *layer) waitForPrefetchCompletion(timeout time.Duration) error { - return l.prefetchWaiter.wait(timeout) +func (l *layer) waitForPrefetchCompletion() error { + return l.prefetchWaiter.wait(l.prefetchTimeout) } func newWaiter() *waiter { diff --git a/stargz/fs_test.go b/stargz/fs_test.go index f4156b0c5..8266c4946 100644 --- a/stargz/fs_test.go +++ b/stargz/fs_test.go @@ -41,6 +41,7 @@ import ( "testing" "time" + "github.com/containerd/stargz-snapshotter/cache" "github.com/containerd/stargz-snapshotter/stargz/reader" "github.com/containerd/stargz-snapshotter/stargz/remote" "github.com/containerd/stargz-snapshotter/task" @@ -62,7 +63,7 @@ func TestCheck(t *testing.T) { bb := &breakBlob{} fs := &filesystem{ layer: map[string]*layer{ - "test": newLayer(bb, nopreader{}, nil), + "test": newLayer(bb, nopreader{}, nil, time.Second), }, backgroundTaskManager: task.NewBackgroundTaskManager(1, time.Millisecond), } @@ -79,9 +80,9 @@ func TestCheck(t *testing.T) { type nopreader struct{} -func (r nopreader) OpenFile(name string) (io.ReaderAt, error) { return nil, nil } -func (r nopreader) Lookup(name string) (*stargz.TOCEntry, bool) { return nil, false } -func (r nopreader) CacheTarGzWithReader(ir io.Reader) error { return nil } +func (r nopreader) OpenFile(name string) (io.ReaderAt, error) { return nil, nil } +func (r nopreader) Lookup(name string) (*stargz.TOCEntry, bool) { return nil, false } +func (r nopreader) CacheTarGzWithReader(ir io.Reader, opts ...cache.Option) error { return nil } type breakBlob struct { success bool @@ -885,7 +886,7 @@ func TestPrefetch(t *testing.T) { if err != nil { t.Fatalf("failed to make stargz reader: %v", err) } - l := newLayer(blob, gr, nil) + l := newLayer(blob, gr, nil, time.Second) prefetchSize := int64(0) if tt.prefetchSize != nil { prefetchSize = tt.prefetchSize(l) @@ -966,22 +967,22 @@ type testCache struct { mu sync.Mutex } -func (tc *testCache) Fetch(blobHash string) ([]byte, error) { +func (tc *testCache) FetchAt(key string, offset int64, p []byte, opts ...cache.Option) (int, error) { tc.mu.Lock() defer tc.mu.Unlock() - cache, ok := tc.membuf[blobHash] + cache, ok := tc.membuf[key] if !ok { - return nil, fmt.Errorf("Missed cache: %q", blobHash) + return 0, fmt.Errorf("Missed cache: %q", key) } - return []byte(cache), nil + return copy(p, cache[offset:]), nil } -func (tc *testCache) Add(blobHash string, p []byte) { +func (tc *testCache) Add(key string, p []byte, opts ...cache.Option) { tc.mu.Lock() defer tc.mu.Unlock() - tc.membuf[blobHash] = string(p) - tc.t.Logf(" cached [%s...]: %q", blobHash[:8], string(p)) + tc.membuf[key] = string(p) + tc.t.Logf(" cached [%s...]: %q", key[:8], string(p)) } func TestWaiter(t *testing.T) { diff --git a/stargz/reader/reader.go b/stargz/reader/reader.go index d4a64c45c..f2a35103e 100644 --- a/stargz/reader/reader.go +++ b/stargz/reader/reader.go @@ -24,11 +24,14 @@ package reader import ( "archive/tar" + "bytes" "compress/gzip" "crypto/sha256" "fmt" "io" + "io/ioutil" "strings" + "sync" "github.com/containerd/stargz-snapshotter/cache" "github.com/google/crfs/stargz" @@ -38,7 +41,7 @@ import ( type Reader interface { OpenFile(name string) (io.ReaderAt, error) Lookup(name string) (*stargz.TOCEntry, bool) - CacheTarGzWithReader(r io.Reader) error + CacheTarGzWithReader(r io.Reader, opts ...cache.Option) error } func NewReader(sr *io.SectionReader, cache cache.BlobCache) (Reader, *stargz.TOCEntry, error) { @@ -56,13 +59,19 @@ func NewReader(sr *io.SectionReader, cache cache.BlobCache) (Reader, *stargz.TOC r: r, sr: sr, cache: cache, + bufPool: sync.Pool{ + New: func() interface{} { + return new(bytes.Buffer) + }, + }, }, root, nil } type reader struct { - r *stargz.Reader - sr *io.SectionReader - cache cache.BlobCache + r *stargz.Reader + sr *io.SectionReader + cache cache.BlobCache + bufPool sync.Pool } func (gr *reader) OpenFile(name string) (io.ReaderAt, error) { @@ -80,6 +89,7 @@ func (gr *reader) OpenFile(name string) (io.ReaderAt, error) { r: gr.r, cache: gr.cache, ra: sr, + gr: gr, }, nil } @@ -87,10 +97,10 @@ func (gr *reader) Lookup(name string) (*stargz.TOCEntry, bool) { return gr.r.Lookup(name) } -func (gr *reader) CacheTarGzWithReader(r io.Reader) error { +func (gr *reader) CacheTarGzWithReader(r io.Reader, opts ...cache.Option) error { gzr, err := gzip.NewReader(r) if err != nil { - return err + return errors.Wrapf(err, "failed to get gzip reader") } defer gzr.Close() tr := tar.NewReader(gzr) @@ -98,7 +108,7 @@ func (gr *reader) CacheTarGzWithReader(r io.Reader) error { h, err := tr.Next() if err != nil { if err != io.EOF { - return err + return errors.Wrapf(err, "failed to read next tar entry") } break } @@ -116,20 +126,42 @@ func (gr *reader) CacheTarGzWithReader(r io.Reader) error { if !ok { break } - id := genID(fe.Digest, ce.ChunkOffset, ce.ChunkSize) - if cacheData, err := gr.cache.Fetch(id); err != nil || len(cacheData) != int(ce.ChunkSize) { - // make sure that this range is at ce.ChunkOffset for ce.ChunkSize - if nr != ce.ChunkOffset { - return fmt.Errorf("invalid offset %d != %d", nr, ce.ChunkOffset) - } - data := make([]byte, int(ce.ChunkSize)) + // make sure that this range is at ce.ChunkOffset for ce.ChunkSize + if nr != ce.ChunkOffset { + return fmt.Errorf("invalid offset %d != %d", nr, ce.ChunkOffset) + } - // Cache this chunk (offset: ce.ChunkOffset, size: ce.ChunkSize) - if _, err := io.ReadFull(tr, data); err != nil && err != io.EOF { - return err + // Check if the target chunks exists in the cache + id := genID(fe.Digest, ce.ChunkOffset, ce.ChunkSize) + if _, err := gr.cache.FetchAt(id, 0, nil, opts...); err != nil { + // missed cache, needs to fetch and add it to the cache + b := gr.bufPool.Get().(*bytes.Buffer) + b.Reset() + b.Grow(int(ce.ChunkSize)) + if _, err := io.CopyN(b, tr, ce.ChunkSize); err != nil { + gr.bufPool.Put(b) + return errors.Wrapf(err, + "failed to read file payload of %q (offset:%d,size:%d)", + h.Name, ce.ChunkOffset, ce.ChunkSize) + } + if int64(b.Len()) != ce.ChunkSize { + gr.bufPool.Put(b) + return fmt.Errorf("unexpected copied data size %d; want %d", + b.Len(), ce.ChunkSize) } - gr.cache.Add(id, data) + gr.cache.Add(id, b.Bytes()[:ce.ChunkSize], opts...) + gr.bufPool.Put(b) + + nr += ce.ChunkSize + continue + } + + // Discard the target chunk + if _, err := io.CopyN(ioutil.Discard, tr, ce.ChunkSize); err != nil { + return errors.Wrapf(err, + "failed to discard file payload of %q (offset:%d,size:%d)", + h.Name, ce.ChunkOffset, ce.ChunkSize) } nr += ce.ChunkSize } @@ -143,6 +175,7 @@ type file struct { ra io.ReaderAt r *stargz.Reader cache cache.BlobCache + gr *reader } // ReadAt reads chunks from the stargz file with trying to fetch as many chunks @@ -154,40 +187,51 @@ func (sf *file) ReadAt(p []byte, offset int64) (int, error) { if !ok { break } - id := genID(sf.digest, ce.ChunkOffset, ce.ChunkSize) - if cached, err := sf.cache.Fetch(id); err == nil && int64(len(cached)) == ce.ChunkSize { - nr += copy(p[nr:], cached[offset+int64(nr)-ce.ChunkOffset:]) - } else { - var ( - ip []byte - tmp bool - lowerUnread = positive(offset - ce.ChunkOffset) - upperUnread = positive(ce.ChunkOffset + ce.ChunkSize - (offset + int64(len(p)))) - ) - if lowerUnread == 0 && upperUnread == 0 { - ip = p[nr : int64(nr)+ce.ChunkSize] - } else { - // Use temporally buffer for aligning this chunk - ip = make([]byte, ce.ChunkSize) - tmp = true - } + var ( + id = genID(sf.digest, ce.ChunkOffset, ce.ChunkSize) + lowerDiscard = positive(offset - ce.ChunkOffset) + upperDiscard = positive(ce.ChunkOffset + ce.ChunkSize - (offset + int64(len(p)))) + expectedSize = ce.ChunkSize - upperDiscard - lowerDiscard + ) + + // Check if the content exists in the cache + n, err := sf.cache.FetchAt(id, lowerDiscard, p[nr:int64(nr)+expectedSize]) + if err == nil && int64(n) == expectedSize { + nr += n + continue + } + + // We missed cache. Take it from underlying reader. + // We read the whole chunk here and add it to the cache so that following + // reads against neighboring chunks can take the data without decmpression. + if lowerDiscard == 0 && upperDiscard == 0 { + // We can directly store the result to the given buffer + ip := p[nr : int64(nr)+ce.ChunkSize] n, err := sf.ra.ReadAt(ip, ce.ChunkOffset) if err != nil && err != io.EOF { return 0, errors.Wrap(err, "failed to read data") - } else if int64(n) != ce.ChunkSize { - return 0, fmt.Errorf("invalid chunk size %d; want %d", n, ce.ChunkSize) - } - if tmp { - // Write temporally buffer to resulting slice - n = copy(p[nr:], ip[lowerUnread:ce.ChunkSize-upperUnread]) - if int64(n) != ce.ChunkSize-upperUnread-lowerUnread { - return 0, fmt.Errorf("unexpected final data size %d; want %d", - n, ce.ChunkSize-upperUnread-lowerUnread) - } } sf.cache.Add(id, ip) nr += n + continue + } + + // Use temporally buffer for aligning this chunk + b := sf.gr.bufPool.Get().(*bytes.Buffer) + b.Reset() + b.Grow(int(ce.ChunkSize)) + ip := b.Bytes()[:ce.ChunkSize] + if _, err := sf.ra.ReadAt(ip, ce.ChunkOffset); err != nil && err != io.EOF { + sf.gr.bufPool.Put(b) + return 0, errors.Wrap(err, "failed to read data") + } + sf.cache.Add(id, ip) + n = copy(p[nr:], ip[lowerDiscard:ce.ChunkSize-upperDiscard]) + sf.gr.bufPool.Put(b) + if int64(n) != expectedSize { + return 0, fmt.Errorf("unexpected final data size %d; want %d", n, expectedSize) } + nr += n } return nr, nil diff --git a/stargz/reader/reader_test.go b/stargz/reader/reader_test.go index af3b6f05a..ee9936e6e 100644 --- a/stargz/reader/reader_test.go +++ b/stargz/reader/reader_test.go @@ -101,11 +101,11 @@ func (br *breakReaderAt) ReadAt(p []byte, off int64) (int, error) { type nopCache struct{} -func (nc *nopCache) Fetch(blobHash string) ([]byte, error) { - return nil, fmt.Errorf("Missed cache: %q", blobHash) +func (nc *nopCache) FetchAt(key string, offset int64, p []byte, opts ...cache.Option) (int, error) { + return 0, fmt.Errorf("Missed cache: %q", key) } -func (nc *nopCache) Add(blobHash string, p []byte) {} +func (nc *nopCache) Add(key string, p []byte, opts ...cache.Option) {} type testCache struct { membuf map[string]string @@ -113,22 +113,22 @@ type testCache struct { mu sync.Mutex } -func (tc *testCache) Fetch(blobHash string) ([]byte, error) { +func (tc *testCache) FetchAt(key string, offset int64, p []byte, opts ...cache.Option) (int, error) { tc.mu.Lock() defer tc.mu.Unlock() - cache, ok := tc.membuf[blobHash] + cache, ok := tc.membuf[key] if !ok { - return nil, fmt.Errorf("Missed cache: %q", blobHash) + return 0, fmt.Errorf("Missed cache: %q", key) } - return []byte(cache), nil + return copy(p, cache[offset:]), nil } -func (tc *testCache) Add(blobHash string, p []byte) { +func (tc *testCache) Add(key string, p []byte, opts ...cache.Option) { tc.mu.Lock() defer tc.mu.Unlock() - tc.membuf[blobHash] = string(p) - tc.t.Logf(" cached [%s...]: %q", blobHash[:8], string(p)) + tc.membuf[key] = string(p) + tc.t.Logf(" cached [%s...]: %q", key[:8], string(p)) } type region struct{ b, e int64 } @@ -220,8 +220,9 @@ func TestFileReadAt(t *testing.T) { if !ok { break } - data, err := f.cache.Fetch(genID(f.digest, ce.ChunkOffset, ce.ChunkSize)) - if err != nil || len(data) != int(ce.ChunkSize) { + data := make([]byte, ce.ChunkSize) + n, err := f.cache.FetchAt(genID(f.digest, ce.ChunkOffset, ce.ChunkSize), 0, data) + if err != nil || n != int(ce.ChunkSize) { t.Errorf("missed cache of offset=%d, size=%d: %v(got size=%d)", ce.ChunkOffset, ce.ChunkSize, err, n) return } diff --git a/stargz/remote/blob.go b/stargz/remote/blob.go index b2b8e1a15..d4b1e8be0 100644 --- a/stargz/remote/blob.go +++ b/stargz/remote/blob.go @@ -62,6 +62,8 @@ type blob struct { fetchedRegionSet regionSet fetchedRegionSetMu sync.Mutex + + resolver *Resolver } func (b *blob) Authn(tr http.RoundTripper) (http.RoundTripper, error) { @@ -96,13 +98,24 @@ func (b *blob) FetchedSize() int64 { } func (b *blob) Cache(offset int64, size int64, opts ...Option) error { + var cacheOpts options + for _, o := range opts { + o(&cacheOpts) + } + + b.fetcherMu.Lock() + fr := b.fetcher + b.fetcherMu.Unlock() + fetchReg := region{floor(offset, b.chunkSize), ceil(offset+size-1, b.chunkSize) - 1} discard := make(map[region]io.Writer) b.walkChunks(fetchReg, func(reg region) error { - discard[reg] = ioutil.Discard // do not read chunks (only cached) + if _, err := b.cache.FetchAt(fr.genID(reg), 0, nil, cacheOpts.cacheOpts...); err != nil { + discard[reg] = ioutil.Discard + } return nil }) - if err := b.fetchRange(discard, opts...); err != nil { + if err := b.fetchRange(discard, &cacheOpts); err != nil { return err } @@ -120,36 +133,75 @@ func (b *blob) ReadAt(p []byte, offset int64, opts ...Option) (int, error) { // Make the buffer chunk aligned allRegion := region{floor(offset, b.chunkSize), ceil(offset+int64(len(p))-1, b.chunkSize) - 1} allData := make(map[region]io.Writer) + var putBufs []*bytes.Buffer + defer func() { + for _, bf := range putBufs { + b.resolver.bufPool.Put(bf) + } + }() + + var readAtOpts options + for _, o := range opts { + o(&readAtOpts) + } + + // Fetcher can be suddenly updated so we take and use the snapshot of it for + // consistency. + b.fetcherMu.Lock() + fr := b.fetcher + b.fetcherMu.Unlock() + var commits []func() error b.walkChunks(allRegion, func(chunk region) error { var ( - ip []byte - base = positive(chunk.b - offset) - lowerUnread = positive(offset - chunk.b) - upperUnread = positive(chunk.e + 1 - (offset + int64(len(p)))) + base = positive(chunk.b - offset) + lowerUnread = positive(offset - chunk.b) + upperUnread = positive(chunk.e + 1 - (offset + int64(len(p)))) + expectedSize = chunk.size() - upperUnread - lowerUnread ) + + // Check if the content exists in the cache + n, err := b.cache.FetchAt(fr.genID(chunk), lowerUnread, p[base:base+expectedSize], readAtOpts.cacheOpts...) + if err == nil && n == int(expectedSize) { + return nil + } + + // We missed cache. Take it from remote registry. + // We get the whole chunk here and add it to the cache so that following + // reads against neighboring chunks can take the data without making HTTP requests. if lowerUnread == 0 && upperUnread == 0 { - ip = p[base : base+chunk.size()] + // We can directly store the result in the given buffer + allData[chunk] = &byteWriter{ + p: p[base : base+chunk.size()], + } } else { // Use temporally buffer for aligning this chunk - ip = make([]byte, chunk.size()) + bf := b.resolver.bufPool.Get().(*bytes.Buffer) + putBufs = append(putBufs, bf) + bf.Reset() + bf.Grow(int(chunk.size())) + allData[chunk] = bf + + // Function for committing the buffered chunk into the result slice. commits = append(commits, func() error { - n := copy(p[base:], ip[lowerUnread:chunk.size()-upperUnread]) - if int64(n) != chunk.size()-upperUnread-lowerUnread { + if int64(bf.Len()) != chunk.size() { return fmt.Errorf("unexpected data size %d; want %d", - n, chunk.size()-upperUnread-lowerUnread) + bf.Len(), chunk.size()) + } + bb := bf.Bytes()[:chunk.size()] + n := copy(p[base:], bb[lowerUnread:chunk.size()-upperUnread]) + if int64(n) != expectedSize { + return fmt.Errorf("invalid copied data size %d; want %d", + n, expectedSize) } return nil }) } - allData[chunk] = &byteWriter{ - p: ip, - } return nil }) // Read required data - if err := b.fetchRange(allData, opts...); err != nil { + if err := b.fetchRange(allData, &readAtOpts); err != nil { return 0, err } @@ -172,40 +224,27 @@ func (b *blob) ReadAt(p []byte, offset int64, opts ...Option) (int, error) { } // fetchRange fetches all specified chunks from local cache and remote blob. -func (b *blob) fetchRange(allData map[region]io.Writer, opts ...Option) error { +func (b *blob) fetchRange(allData map[region]io.Writer, opts *options) error { + if len(allData) == 0 { + return nil + } + // Fetcher can be suddenly updated so we take and use the snapshot of it for // consistency. b.fetcherMu.Lock() fr := b.fetcher b.fetcherMu.Unlock() - // Read data from cache - fetched := make(map[region]bool) - for chunk := range allData { - data, err := b.cache.Fetch(fr.genID(chunk)) - if err != nil || int64(len(data)) != chunk.size() { - fetched[chunk] = false // missed cache, needs to fetch remotely. - continue - } - if n, err := io.Copy(allData[chunk], bytes.NewReader(data)); err != nil { - return err - } else if n != chunk.size() { - return fmt.Errorf("unexpected cached data size %d; want %d", n, chunk.size()) - } - } - if len(fetched) == 0 { - // We successfully served whole range from cache - return nil - } - // request missed regions var req []region - for reg := range fetched { + fetched := make(map[region]bool) + for reg := range allData { req = append(req, reg) + fetched[reg] = false } ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - mr, err := fr.fetch(ctx, req, opts...) + mr, err := fr.fetch(ctx, req, opts) if err != nil { return err } @@ -224,23 +263,34 @@ func (b *blob) fetchRange(allData map[region]io.Writer, opts ...Option) error { return errors.Wrapf(err, "failed to read multipart resp") } if err := b.walkChunks(reg, func(chunk region) error { - data := make([]byte, chunk.size()) - if _, err := io.ReadFull(p, data); err != nil { + + // Prepare the temporary buffer + bf := b.resolver.bufPool.Get().(*bytes.Buffer) + defer b.resolver.bufPool.Put(bf) + bf.Reset() + bf.Grow(int(chunk.size())) + w := io.Writer(bf) + + // If this chunk is one of the targets, write the content to the + // passed reader too. + if _, ok := fetched[chunk]; ok { + w = io.MultiWriter(bf, allData[chunk]) + } + + // Copy the target chunk + if _, err := io.CopyN(w, p, chunk.size()); err != nil { return err + } else if int64(bf.Len()) != chunk.size() { + return fmt.Errorf("unexpected fetched data size %d; want %d", + bf.Len(), chunk.size()) } - b.cache.Add(fr.genID(chunk), data) + + // Add the target chunk to the cache + b.cache.Add(fr.genID(chunk), bf.Bytes()[:chunk.size()], opts.cacheOpts...) b.fetchedRegionSetMu.Lock() b.fetchedRegionSet.add(chunk) b.fetchedRegionSetMu.Unlock() - if _, ok := fetched[chunk]; ok { - fetched[chunk] = true - if n, err := io.Copy(allData[chunk], bytes.NewReader(data)); err != nil { - return errors.Wrap(err, "failed to write chunk to buffer") - } else if n != chunk.size() { - return fmt.Errorf("unexpected fetched data size %d; want %d", - n, chunk.size()) - } - } + fetched[chunk] = true return nil }); err != nil { return errors.Wrapf(err, "failed to get chunks") diff --git a/stargz/remote/blob_test.go b/stargz/remote/blob_test.go index 28cfd38bc..25ca1ec25 100644 --- a/stargz/remote/blob_test.go +++ b/stargz/remote/blob_test.go @@ -37,6 +37,8 @@ import ( "sync" "testing" "time" + + "github.com/containerd/stargz-snapshotter/cache" ) const ( @@ -213,8 +215,9 @@ func checkAllCached(t *testing.T, r *blob, offset, size int64) { cn := 0 whole := region{floor(offset, r.chunkSize), ceil(offset+size-1, r.chunkSize) - 1} if err := r.walkChunks(whole, func(reg region) error { - data, err := r.cache.Fetch(r.fetcher.genID(reg)) - if err != nil || int64(len(data)) != reg.size() { + data := make([]byte, reg.size()) + n, err := r.cache.FetchAt(r.fetcher.genID(reg), 0, data) + if err != nil || int64(n) != reg.size() { return fmt.Errorf("missed cache of region={%d,%d}(size=%d): %v", reg.b, reg.e, reg.size(), err) } @@ -279,6 +282,13 @@ func makeBlob(t *testing.T, size int64, chunkSize int64, fn RoundTripFunc) *blob size: size, chunkSize: chunkSize, cache: &testCache{membuf: map[string]string{}, t: t}, + resolver: &Resolver{ + bufPool: sync.Pool{ + New: func() interface{} { + return new(bytes.Buffer) + }, + }, + }, } } @@ -288,22 +298,22 @@ type testCache struct { mu sync.Mutex } -func (tc *testCache) Fetch(blobHash string) ([]byte, error) { +func (tc *testCache) FetchAt(key string, offset int64, p []byte, opts ...cache.Option) (int, error) { tc.mu.Lock() defer tc.mu.Unlock() - cache, ok := tc.membuf[blobHash] + cache, ok := tc.membuf[key] if !ok { - return nil, fmt.Errorf("Missed cache: %q", blobHash) + return 0, fmt.Errorf("Missed cache: %q", key) } - return []byte(cache), nil + return copy(p, cache[offset:]), nil } -func (tc *testCache) Add(blobHash string, p []byte) { +func (tc *testCache) Add(key string, p []byte, opts ...cache.Option) { tc.mu.Lock() defer tc.mu.Unlock() - tc.membuf[blobHash] = string(p) - tc.t.Logf(" cached [%s...]: %q", blobHash[:8], string(p)) + tc.membuf[key] = string(p) + tc.t.Logf(" cached [%s...]: %q", key[:8], string(p)) } func TestCheckInterval(t *testing.T) { diff --git a/stargz/remote/resolver.go b/stargz/remote/resolver.go index c28c08714..b8d36cec4 100644 --- a/stargz/remote/resolver.go +++ b/stargz/remote/resolver.go @@ -23,6 +23,7 @@ package remote import ( + "bytes" "context" "crypto/sha256" "fmt" @@ -84,6 +85,11 @@ func NewResolver(keychain authn.Keychain, config ResolverConfig) *Resolver { trPool: lru.New(poolEntry), keychain: keychain, config: config, + bufPool: sync.Pool{ + New: func() interface{} { + return new(bytes.Buffer) + }, + }, } } @@ -93,6 +99,7 @@ type Resolver struct { trPoolMu sync.Mutex keychain authn.Keychain config ResolverConfig + bufPool sync.Pool } func (r *Resolver) Resolve(ref, digest string, cache cache.BlobCache, config BlobConfig) (Blob, error) { @@ -124,6 +131,7 @@ func (r *Resolver) Resolve(ref, digest string, cache cache.BlobCache, config Blo cache: cache, lastCheck: time.Now(), checkInterval: checkInterval, + resolver: r, }, nil } @@ -352,7 +360,7 @@ type multipartReadCloser interface { Close() error } -func (f *fetcher) fetch(ctx context.Context, rs []region, opts ...Option) (multipartReadCloser, error) { +func (f *fetcher) fetch(ctx context.Context, rs []region, opts *options) (multipartReadCloser, error) { if len(rs) == 0 { return nil, fmt.Errorf("no request queried") } @@ -362,16 +370,11 @@ func (f *fetcher) fetch(ctx context.Context, rs []region, opts ...Option) (multi singleRangeMode = f.isSingleRangeMode() ) - // Parse options - var opt options - for _, o := range opts { - o(&opt) + if opts.ctx != nil { + ctx = opts.ctx } - if opt.ctx != nil { - ctx = opt.ctx - } - if opt.tr != nil { - tr = opt.tr + if opts.tr != nil { + tr = opts.tr } // squash requesting chunks for reducing the total size of request header @@ -428,8 +431,8 @@ func (f *fetcher) fetch(ctx context.Context, rs []region, opts ...Option) (multi } return singlePartReader(reg, res.Body), nil } else if !singleRangeMode { - f.singleRangeMode() // fallbacks to singe range request mode - return f.fetch(ctx, rs, opts...) // retries with the single range mode + f.singleRangeMode() // fallbacks to singe range request mode + return f.fetch(ctx, rs, opts) // retries with the single range mode } return nil, fmt.Errorf("unexpected status code on %q: %v", f.url, res.Status) @@ -548,8 +551,9 @@ func parseRange(header string) (region, error) { type Option func(*options) type options struct { - ctx context.Context - tr http.RoundTripper + ctx context.Context + tr http.RoundTripper + cacheOpts []cache.Option } func WithContext(ctx context.Context) Option { @@ -563,3 +567,9 @@ func WithRoundTripper(tr http.RoundTripper) Option { opts.tr = tr } } + +func WithCacheOpts(cacheOpts ...cache.Option) Option { + return func(opts *options) { + opts.cacheOpts = cacheOpts + } +}