Skip to content

Commit

Permalink
Merge pull request #105 from ktock/fs-p
Browse files Browse the repository at this point in the history
Improve file read performance
  • Loading branch information
AkihiroSuda authored Jun 4, 2020
2 parents 7c2e019 + b3c5173 commit 8f92e35
Show file tree
Hide file tree
Showing 11 changed files with 639 additions and 278 deletions.
295 changes: 232 additions & 63 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
package cache

import (
"bytes"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"sync"
Expand All @@ -28,117 +28,211 @@ import (
"github.com/pkg/errors"
)

const (
defaultMaxLRUCacheEntry = 10
defaultMaxCacheFds = 10
)

type DirectoryCacheConfig struct {
MaxLRUCacheEntry int `toml:"max_lru_cache_entry"`
MaxCacheFds int `toml:"max_cache_fds"`
SyncAdd bool `toml:"sync_add"`
}

// TODO: contents validation.

type BlobCache interface {
Fetch(blobHash string) ([]byte, error)
Add(blobHash string, p []byte)
Add(key string, p []byte, opts ...Option)
FetchAt(key string, offset int64, p []byte, opts ...Option) (n int, err error)
}

type dirOpt struct {
syncAdd bool
type cacheOpt struct {
direct bool
}

type DirOption func(o *dirOpt) *dirOpt
type Option func(o *cacheOpt) *cacheOpt

func SyncAdd() DirOption {
return func(o *dirOpt) *dirOpt {
o.syncAdd = true
// When Direct option is specified for FetchAt and Add methods, these operation
// won't use on-memory caches. When you know that the targeting value won't be
// used immediately, you can prevent the limited space of on-memory caches from
// being polluted by these unimportant values.
func Direct() Option {
return func(o *cacheOpt) *cacheOpt {
o.direct = true
return o
}
}

func NewDirectoryCache(directory string, memCacheSize int, opts ...DirOption) (BlobCache, error) {
opt := &dirOpt{}
for _, o := range opts {
opt = o(opt)
func NewDirectoryCache(directory string, config DirectoryCacheConfig) (BlobCache, error) {
maxEntry := config.MaxLRUCacheEntry
if maxEntry == 0 {
maxEntry = defaultMaxLRUCacheEntry
}
maxFds := config.MaxCacheFds
if maxFds == 0 {
maxFds = defaultMaxCacheFds
}
if err := os.MkdirAll(directory, os.ModePerm); err != nil {
return nil, err
}
dc := &directoryCache{
cache: lru.New(memCacheSize),
cache: newObjectCache(maxEntry),
fileCache: newObjectCache(maxFds),
directory: directory,
bufPool: sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
},
}
if opt.syncAdd {
dc.syncAdd = true
dc.cache.finalize = func(value interface{}) {
dc.bufPool.Put(value)
}
dc.fileCache.finalize = func(value interface{}) {
value.(*os.File).Close()
}
dc.syncAdd = config.SyncAdd
return dc, nil
}

// directoryCache is a cache implementation which backend is a directory.
type directoryCache struct {
cache *lru.Cache
cacheMu sync.Mutex
cache *objectCache
fileCache *objectCache
directory string
syncAdd bool
fileMu sync.Mutex

bufPool sync.Pool

syncAdd bool
}

func (dc *directoryCache) Fetch(blobHash string) (p []byte, err error) {
dc.cacheMu.Lock()
defer dc.cacheMu.Unlock()
func (dc *directoryCache) FetchAt(key string, offset int64, p []byte, opts ...Option) (n int, err error) {
opt := &cacheOpt{}
for _, o := range opts {
opt = o(opt)
}

if cache, ok := dc.cache.Get(blobHash); ok {
p, ok := cache.([]byte)
if ok {
return p, nil
if !opt.direct {
// Get data from memory
if b, done, ok := dc.cache.get(key); ok {
defer done()
data := b.(*bytes.Buffer).Bytes()
if int64(len(data)) < offset {
return 0, fmt.Errorf("invalid offset %d exceeds chunk size %d",
offset, len(data))
}
return copy(p, data[offset:]), nil
}
}

c := filepath.Join(dc.directory, blobHash[:2], blobHash)
if _, err := os.Stat(c); err != nil {
return nil, errors.Wrapf(err, "Missed cache %q", c)
// Get data from disk. If the file is already opened, use it.
if f, done, ok := dc.fileCache.get(key); ok {
defer done()
return f.(*os.File).ReadAt(p, offset)
}
}

file, err := os.Open(c)
// Open the cache file and read the target region
// TODO: If the target cache is write-in-progress, should we wait for the completion
// or simply report the cache miss?
file, err := os.Open(dc.cachePath(key))
if err != nil {
return nil, errors.Wrapf(err, "Failed to Open cached blob file %q", c)
return 0, errors.Wrapf(err, "failed to open blob file for %q", key)
}
if n, err = file.ReadAt(p, offset); err == io.EOF {
err = nil
}
defer file.Close()

if p, err = ioutil.ReadAll(file); err != nil && err != io.EOF {
return nil, errors.Wrapf(err, "failed to read cached data %q", c)
// Cache the opened file for future use. If "direct" option is specified, this
// won't be done. This option is useful for preventing file cache from being
// polluted by data that won't be accessed immediately.
if opt.direct || !dc.fileCache.add(key, file) {
file.Close()
}
dc.cache.Add(blobHash, p)

return
// TODO: should we cache the entire file data on memory?
// but making I/O (possibly huge) on every fetching
// might be costly.

return n, err
}

func (dc *directoryCache) Add(blobHash string, p []byte) {
// Copy the original data for avoiding the cached contents to be edited accidentally
p2 := make([]byte, len(p))
copy(p2, p)
p = p2
func (dc *directoryCache) Add(key string, p []byte, opts ...Option) {
opt := &cacheOpt{}
for _, o := range opts {
opt = o(opt)
}

dc.cacheMu.Lock()
dc.cache.Add(blobHash, p)
dc.cacheMu.Unlock()
if !opt.direct {
// Cache the passed data on memory. This enables to serve this data even
// during writing it to the disk. If "direct" option is specified, this
// won't be done. This option is useful for preventing memory cache from being
// polluted by data that won't be accessed immediately.
b := dc.bufPool.Get().(*bytes.Buffer)
b.Reset()
b.Write(p)
if !dc.cache.add(key, b) {
dc.bufPool.Put(b) // Already exists. No need to cache.
}
}

// Cache the passed data to disk.
b2 := dc.bufPool.Get().(*bytes.Buffer)
b2.Reset()
b2.Write(p)
addFunc := func() {
dc.fileMu.Lock()
defer dc.fileMu.Unlock()
defer dc.bufPool.Put(b2)

// Check if cache exists.
c := filepath.Join(dc.directory, blobHash[:2], blobHash)
var (
c = dc.cachePath(key)
wip = dc.wipPath(key)
)
if _, err := os.Stat(wip); err == nil {
return // Write in progress
}
if _, err := os.Stat(c); err == nil {
return // Already exists.
}

// Write the contents to a temporary file
if err := os.MkdirAll(filepath.Dir(wip), os.ModePerm); err != nil {
fmt.Printf("Warning: Failed to Create blob cache directory %q: %v\n", c, err)
return
}
wipfile, err := os.Create(wip)
if err != nil {
fmt.Printf("Warning: failed to prepare temp file for storing cache %q", key)
return
}
defer func() {
wipfile.Close()
os.Remove(wipfile.Name())
}()
want := b2.Len()
if _, err := io.CopyN(wipfile, b2, int64(want)); err != nil {
fmt.Printf("Warning: failed to write cache: %v\n", err)
return
}

// Create cache file
// Commit the cache contents
if err := os.MkdirAll(filepath.Dir(c), os.ModePerm); err != nil {
fmt.Printf("Warning: Failed to Create blob cache directory %q: %v\n", c, err)
return
}
f, err := os.Create(c)
if err := os.Rename(wipfile.Name(), c); err != nil {
fmt.Printf("Warning: failed to commit cache to %q: %v\n", c, err)
return
}
file, err := os.Open(c)
if err != nil {
fmt.Printf("Warning: could not create a cache file at %q: %v\n", c, err)
fmt.Printf("Warning: failed to open cache on %q: %v\n", c, err)
return
}
defer f.Close()
if n, err := f.Write(p); err != nil || n != len(p) {
fmt.Printf("Warning: failed to write cache: %d(wrote)/%d(expected): %v\n",
n, len(p), err)

// Cache the opened file for future use. If "direct" option is specified, this
// won't be done. This option is useful for preventing file cache from being
// polluted by data that won't be accessed immediately.
if opt.direct || !dc.fileCache.add(key, file) {
file.Close()
}
}

Expand All @@ -149,6 +243,81 @@ func (dc *directoryCache) Add(blobHash string, p []byte) {
}
}

func (dc *directoryCache) cachePath(key string) string {
return filepath.Join(dc.directory, key[:2], key)
}

func (dc *directoryCache) wipPath(key string) string {
return filepath.Join(dc.directory, key[:2], "w", key)
}

func newObjectCache(maxEntries int) *objectCache {
oc := &objectCache{
cache: lru.New(maxEntries),
}
oc.cache.OnEvicted = func(key lru.Key, value interface{}) {
value.(*object).release() // Decrease ref count incremented in add operation.
}
return oc
}

type objectCache struct {
cache *lru.Cache
cacheMu sync.Mutex
finalize func(interface{})
}

func (oc *objectCache) get(key string) (value interface{}, done func(), ok bool) {
oc.cacheMu.Lock()
defer oc.cacheMu.Unlock()
o, ok := oc.cache.Get(key)
if !ok {
return nil, nil, false
}
o.(*object).use()
return o.(*object).v, func() { o.(*object).release() }, true
}

func (oc *objectCache) add(key string, value interface{}) bool {
oc.cacheMu.Lock()
defer oc.cacheMu.Unlock()
if _, ok := oc.cache.Get(key); ok {
return false // TODO: should we swap the object?
}
o := &object{
v: value,
finalize: oc.finalize,
}
o.use() // Keep this object having at least 1 ref count (will be decreased on eviction)
oc.cache.Add(key, o)
return true
}

type object struct {
v interface{}

refCounts int64
finalize func(interface{})

mu sync.Mutex
}

func (o *object) use() {
o.mu.Lock()
defer o.mu.Unlock()
o.refCounts++
}

func (o *object) release() {
o.mu.Lock()
defer o.mu.Unlock()
o.refCounts--
if o.refCounts <= 0 && o.finalize != nil {
// nobody will refer this object
o.finalize(o.v)
}
}

func NewMemoryCache() BlobCache {
return &memoryCache{
membuf: map[string]string{},
Expand All @@ -161,19 +330,19 @@ type memoryCache struct {
mu sync.Mutex
}

func (mc *memoryCache) Fetch(blobHash string) ([]byte, error) {
func (mc *memoryCache) FetchAt(key string, offset int64, p []byte, opts ...Option) (n int, err error) {
mc.mu.Lock()
defer mc.mu.Unlock()

cache, ok := mc.membuf[blobHash]
cache, ok := mc.membuf[key]
if !ok {
return nil, fmt.Errorf("Missed cache: %q", blobHash)
return 0, fmt.Errorf("Missed cache: %q", key)
}
return []byte(cache), nil
return copy(p, cache[offset:]), nil
}

func (mc *memoryCache) Add(blobHash string, p []byte) {
func (mc *memoryCache) Add(key string, p []byte, opts ...Option) {
mc.mu.Lock()
defer mc.mu.Unlock()
mc.membuf[blobHash] = string(p)
mc.membuf[key] = string(p)
}
Loading

0 comments on commit 8f92e35

Please sign in to comment.