diff --git a/pkg/tarfs/tarfs.go b/pkg/tarfs/tarfs.go index 90382c1f68..a60a0b44af 100755 --- a/pkg/tarfs/tarfs.go +++ b/pkg/tarfs/tarfs.go @@ -20,6 +20,7 @@ import ( "strings" "sync" "syscall" + "time" "github.com/containerd/containerd/archive/compression" "github.com/containerd/containerd/log" @@ -213,7 +214,7 @@ func (t *Manager) getBlobStream(ctx context.Context, remote *remote.Remote, ref } // generate tar file and layer bootstrap, return if this blob is an empty blob -func (t *Manager) generateBootstrap(tarReader io.Reader, snapshotID, layerBlobID, upperDirPath string) (err error) { +func (t *Manager) generateBootstrap(tarReader io.Reader, snapshotID, layerBlobID, upperDirPath string, w *sync.WaitGroup) (err error) { snapshotImageDir := filepath.Join(upperDirPath, "image") if err := os.MkdirAll(snapshotImageDir, 0750); err != nil { return errors.Wrapf(err, "create data dir %s for tarfs snapshot", snapshotImageDir) @@ -235,20 +236,33 @@ func (t *Manager) generateBootstrap(tarReader io.Reader, snapshotID, layerBlobID defer os.Remove(layerTarFileTmp) fifoName := filepath.Join(upperDirPath, "layer_"+snapshotID+"_"+"tar.fifo") - if err = syscall.Mkfifo(fifoName, 0644); err != nil { + if err = syscall.Mkfifo(fifoName, 0640); err != nil { return err } defer os.Remove(fifoName) + w.Add(1) go func() { - fifoFile, err := os.OpenFile(fifoName, os.O_WRONLY, os.ModeNamedPipe) - if err != nil { - log.L.Warnf("can not open fifo file, err %v", err) - return + defer w.Done() + + var fifoFile *os.File + for i := 1; i < 100 && fifoFile == nil; i++ { + file, err := os.OpenFile(fifoName, os.O_RDWR, os.ModeNamedPipe) + switch { + case err == nil: + fifoFile = file + case os.IsNotExist(err) || os.IsPermission(err): + log.L.Warnf("open fifo file, %v", err) + return + default: + log.L.Warnf("open fifo file, %v", err) + time.Sleep(time.Duration(i) * 10 * time.Millisecond) + } } defer fifoFile.Close() + if _, err := io.Copy(fifoFile, io.TeeReader(tarReader, tarFile)); err != nil { - log.L.Warnf("tar stream copy err %v", err) + log.L.Warnf("tar stream copy, %v", err) } }() @@ -338,6 +352,9 @@ func (t *Manager) blobProcess(ctx context.Context, snapshotID, ref string, process := func(rc io.ReadCloser, remote *remote.Remote) error { defer rc.Close() + var w sync.WaitGroup + defer w.Wait() + ds, err := compression.DecompressStream(rc) if err != nil { return epilog(err, "unpack layer blob stream for tarfs") @@ -351,7 +368,7 @@ func (t *Manager) blobProcess(ctx context.Context, snapshotID, ref string, } digester := digest.Canonical.Digester() dr := io.TeeReader(ds, digester.Hash()) - err = t.generateBootstrap(dr, snapshotID, layerBlobID, upperDirPath) + err = t.generateBootstrap(dr, snapshotID, layerBlobID, upperDirPath, &w) switch { case err != nil && !errdefs.IsAlreadyExists(err): return epilog(err, "generate tarfs from image layer blob") @@ -362,7 +379,7 @@ func (t *Manager) blobProcess(ctx context.Context, snapshotID, ref string, return epilog(nil, msg) } } else { - err = t.generateBootstrap(ds, snapshotID, layerBlobID, upperDirPath) + err = t.generateBootstrap(ds, snapshotID, layerBlobID, upperDirPath, &w) if err != nil && !errdefs.IsAlreadyExists(err) { return epilog(err, "generate tarfs data from image layer blob") }