Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make resolving timeout configurable #602

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions fs/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type Config struct {
ResolveResultEntry int `toml:"resolve_result_entry"` // deprecated
PrefetchSize int64 `toml:"prefetch_size"`
PrefetchTimeoutSec int64 `toml:"prefetch_timeout_sec"`
ResolveTimeoutSec int64 `toml:"resolve_timeout_sec"`
ResolveRequestTimeoutSec int64 `toml:"resolve_request_timeout_sec"`
NoPrefetch bool `toml:"noprefetch"`
NoBackgroundFetch bool `toml:"no_background_fetch"`
Debug bool `toml:"debug"`
Expand Down
18 changes: 13 additions & 5 deletions fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,10 @@ import (
)

const (
defaultFuseTimeout = time.Second
defaultMaxConcurrency = 2
fusermountBin = "fusermount"
defaultFuseTimeout = time.Second
defaultResolveTimeoutSec = 60
defaultMaxConcurrency = 2
fusermountBin = "fusermount"
)

type Option func(*options)
Expand Down Expand Up @@ -122,6 +123,11 @@ func NewFilesystem(root string, cfg config.Config, opts ...Option) (_ snapshot.F
entryTimeout = defaultFuseTimeout
}

resolveTimeout := time.Duration(cfg.ResolveTimeoutSec) * time.Second
if resolveTimeout == 0 {
resolveTimeout = time.Duration(defaultResolveTimeoutSec) * time.Second
}

metadataStore := fsOpts.metadataStore
if metadataStore == nil {
metadataStore = memorymetadata.NewReader
Expand Down Expand Up @@ -163,6 +169,7 @@ func NewFilesystem(root string, cfg config.Config, opts ...Option) (_ snapshot.F
metricsController: c,
attrTimeout: attrTimeout,
entryTimeout: entryTimeout,
resolveTimeout: resolveTimeout,
}, nil
}

Expand All @@ -181,6 +188,7 @@ type filesystem struct {
metricsController *layermetrics.Controller
attrTimeout time.Duration
entryTimeout time.Duration
resolveTimeout time.Duration
}

func (fs *filesystem) Mount(ctx context.Context, mountpoint string, labels map[string]string) (retErr error) {
Expand Down Expand Up @@ -255,8 +263,8 @@ func (fs *filesystem) Mount(ctx context.Context, mountpoint string, labels map[s
case err := <-errChan:
log.G(ctx).WithError(err).Debug("failed to resolve layer")
return errors.Wrapf(err, "failed to resolve layer")
case <-time.After(30 * time.Second):
log.G(ctx).Debug("failed to resolve layer (timeout)")
case <-time.After(fs.resolveTimeout):
log.G(ctx).WithField("timeout(sec)", fs.resolveTimeout.Seconds()).Debug("failed to resolve layer (timeout)")
return fmt.Errorf("failed to resolve layer (timeout)")
}
defer func() {
Expand Down
97 changes: 76 additions & 21 deletions fs/layer/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ const (
defaultMaxLRUCacheEntry = 10
defaultMaxCacheFds = 10
defaultPrefetchTimeoutSec = 10
defaultResolveTimeoutSec = 60
defaultResolveRequestTimeoutSec = 30
memoryCacheType = "memory"
)

Expand Down Expand Up @@ -117,6 +119,8 @@ type Resolver struct {
rootDir string
resolver *remote.Resolver
prefetchTimeout time.Duration
resolveTimeout time.Duration
resolveRequestTimeout time.Duration
layerCache *cacheutil.TTLCache
layerCacheMu sync.Mutex
blobCache *cacheutil.TTLCache
Expand All @@ -137,6 +141,14 @@ func NewResolver(root string, backgroundTaskManager *task.BackgroundTaskManager,
if prefetchTimeout == 0 {
prefetchTimeout = defaultPrefetchTimeoutSec * time.Second
}
resolveTimeout := time.Duration(cfg.ResolveTimeoutSec) * time.Second
if resolveTimeout == 0 {
resolveTimeout = time.Duration(defaultResolveTimeoutSec) * time.Second
}
resolveRequestTimeout := time.Duration(cfg.ResolveRequestTimeoutSec) * time.Second
if resolveRequestTimeout == 0 {
resolveRequestTimeout = time.Duration(defaultResolveRequestTimeoutSec) * time.Second
}

// layerCache caches resolved layers for future use. This is useful in a use-case where
// the filesystem resolves and caches all layers in an image (not only queried one) in parallel,
Expand Down Expand Up @@ -171,6 +183,8 @@ func NewResolver(root string, backgroundTaskManager *task.BackgroundTaskManager,
layerCache: layerCache,
blobCache: blobCache,
prefetchTimeout: prefetchTimeout,
resolveTimeout: resolveTimeout,
resolveRequestTimeout: resolveRequestTimeout,
backgroundTaskManager: backgroundTaskManager,
config: cfg,
resolveLock: new(namedmutex.NamedMutex),
Expand Down Expand Up @@ -236,6 +250,8 @@ func (r *Resolver) Resolve(ctx context.Context, hosts source.RegistryHosts, refs
defer r.resolveLock.Unlock(name)

ctx = log.WithLogger(ctx, log.G(ctx).WithField("src", name))
ctx, cancel := context.WithTimeout(ctx, r.resolveTimeout)
defer cancel()

// First, try to retrieve this layer from the underlying cache.
r.layerCacheMu.Lock()
Expand All @@ -256,7 +272,7 @@ func (r *Resolver) Resolve(ctx context.Context, hosts source.RegistryHosts, refs
log.G(ctx).Debugf("resolving")

// Resolve the blob.
blobR, err := r.resolveBlob(ctx, hosts, refspec, desc)
blobR, err := r.resolveBlob(ctx, hosts, refspec, desc, name)
if err != nil {
return nil, errors.Wrapf(err, "failed to resolve the blob")
}
Expand All @@ -280,11 +296,62 @@ func (r *Resolver) Resolve(ctx context.Context, hosts source.RegistryHosts, refs
// Each file's read operation is a prioritized task and all background tasks
// will be stopped during the execution so this can avoid being disturbed for
// NW traffic by background tasks.
var (
isTimeout bool
isTimeoutMu sync.Mutex
// Use context during resolving, to make it cancellable
curR = readerAtFunc(func(p []byte, offset int64) (n int, err error) {
ctx, cancel := context.WithTimeout(ctx, r.resolveRequestTimeout)
defer cancel()
n, err = blobR.ReadAt(p, offset, remote.WithContext(ctx))
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
isTimeoutMu.Lock()
isTimeout = true
isTimeoutMu.Unlock()
}
return n, err
})
curRMu sync.Mutex
)
sr := io.NewSectionReader(readerAtFunc(func(p []byte, offset int64) (n int, err error) {
r.backgroundTaskManager.DoPrioritizedTask()
defer r.backgroundTaskManager.DonePrioritizedTask()
return blobR.ReadAt(p, offset)
curRMu.Lock()
br := curR
curRMu.Unlock()
return br.ReadAt(p, offset)
}), 0, blobR.Size())
vr, err := r.newReader(sr, desc, fsCache, esgzOpts...)
if err != nil {
isTimeoutMu.Lock()
discardBlob := isTimeout
isTimeoutMu.Unlock()
if discardBlob {
r.blobCacheMu.Lock()
r.blobCache.Remove(name)
r.blobCacheMu.Unlock()
}
return nil, errors.Wrap(err, "failed to read layer")
}
// do not propagate context after resolve is done
curRMu.Lock()
curR = readerAtFunc(func(p []byte, offset int64) (n int, err error) { return blobR.ReadAt(p, offset) })
curRMu.Unlock()

// Combine layer information together and cache it.
l := newLayer(r, desc, blobR, vr)
r.layerCacheMu.Lock()
cachedL, done2, added := r.layerCache.Add(name, l)
r.layerCacheMu.Unlock()
if !added {
l.close() // layer already exists in the cache. discrad this.
}

log.G(ctx).Debugf("resolved")
return &layerRef{cachedL.(*layer), done2}, nil
}

func (r *Resolver) newReader(sr *io.SectionReader, desc ocispec.Descriptor, fsCache cache.BlobCache, esgzOpts ...metadata.Option) (*reader.VerifiableReader, error) {
// define telemetry hooks to measure latency metrics inside estargz package
telemetry := metadata.Telemetry{
GetFooterLatency: func(start time.Time) {
Expand All @@ -306,36 +373,24 @@ func (r *Resolver) Resolve(ctx context.Context, hosts source.RegistryHosts, refs
if err != nil {
return nil, errors.Wrap(err, "failed to read layer")
}

// Combine layer information together and cache it.
l := newLayer(r, desc, blobR, vr)
r.layerCacheMu.Lock()
cachedL, done2, added := r.layerCache.Add(name, l)
r.layerCacheMu.Unlock()
if !added {
l.close() // layer already exists in the cache. discrad this.
}

log.G(ctx).Debugf("resolved")
return &layerRef{cachedL.(*layer), done2}, nil
return vr, nil
}

// resolveBlob resolves a blob based on the passed layer blob information.
func (r *Resolver) resolveBlob(ctx context.Context, hosts source.RegistryHosts, refspec reference.Spec, desc ocispec.Descriptor) (_ *blobRef, retErr error) {
name := refspec.String() + "/" + desc.Digest.String()

func (r *Resolver) resolveBlob(ctx context.Context, hosts source.RegistryHosts, refspec reference.Spec, desc ocispec.Descriptor, cacheKey string) (_ *blobRef, retErr error) {
// Try to retrieve the blob from the underlying cache.
r.blobCacheMu.Lock()
c, done, ok := r.blobCache.Get(name)
c, done, ok := r.blobCache.Get(cacheKey)
r.blobCacheMu.Unlock()
if ok {
if blob := c.(remote.Blob); blob.Check() == nil {
blob := c.(remote.Blob)
if err := blob.Check(); err == nil {
return &blobRef{blob, done}, nil
}
// invalid blob. discard this.
done()
r.blobCacheMu.Lock()
r.blobCache.Remove(name)
r.blobCache.Remove(cacheKey)
r.blobCacheMu.Unlock()
}

Expand All @@ -355,7 +410,7 @@ func (r *Resolver) resolveBlob(ctx context.Context, hosts source.RegistryHosts,
return nil, errors.Wrap(err, "failed to resolve the source")
}
r.blobCacheMu.Lock()
cachedB, done, added := r.blobCache.Add(name, b)
cachedB, done, added := r.blobCache.Add(cacheKey, b)
r.blobCacheMu.Unlock()
if !added {
b.Close() // blob already exists in the cache. discard this.
Expand Down
13 changes: 10 additions & 3 deletions store/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ const (
prepareSucceeded = "true"
prepareFailed = "false"

defaultMaxConcurrency = 2
defaultMaxConcurrency = 2
defaultResolveTimeoutSec = 60
)

func NewLayerManager(ctx context.Context, root string, hosts source.RegistryHosts, metadataStore metadata.Store, cfg config.Config) (*LayerManager, error) {
Expand All @@ -71,6 +72,10 @@ func NewLayerManager(ctx context.Context, root string, hosts source.RegistryHost
if ns != nil {
metrics.Register(ns)
}
resolveTimeout := time.Duration(cfg.ResolveTimeoutSec) * time.Second
if resolveTimeout == 0 {
resolveTimeout = time.Duration(defaultResolveTimeoutSec) * time.Second
}
return &LayerManager{
refPool: refPool,
hosts: hosts,
Expand All @@ -85,6 +90,7 @@ func NewLayerManager(ctx context.Context, root string, hosts source.RegistryHost
resolveLock: new(namedmutex.NamedMutex),
layer: make(map[string]map[string]layer.Layer),
refcounter: make(map[string]map[string]int),
resolveTimeout: resolveTimeout,
}, nil
}

Expand All @@ -102,6 +108,7 @@ type LayerManager struct {
disableVerification bool
metricsController *layermetrics.Controller
resolveLock *namedmutex.NamedMutex
resolveTimeout time.Duration

layer map[string]map[string]layer.Layer
refcounter map[string]map[string]int
Expand Down Expand Up @@ -220,8 +227,8 @@ func (r *LayerManager) getLayer(ctx context.Context, refspec reference.Spec, dgs
case err := <-errChan:
log.G(ctx).WithError(err).Debug("failed to resolve layer")
return nil, errors.Wrapf(err, "failed to resolve layer")
case <-time.After(30 * time.Second):
log.G(ctx).Debug("failed to resolve layer (timeout)")
case <-time.After(r.resolveTimeout):
log.G(ctx).WithField("timeout(sec)", r.resolveTimeout.Seconds()).Debug("failed to resolve layer (timeout)")
return nil, fmt.Errorf("failed to resolve layer (timeout)")
}

Expand Down