Skip to content

Commit

Permalink
tarfs: fix a data race condition
Browse files Browse the repository at this point in the history
Fix a data race condition
WARNING: DATA RACE
Write at 0x00c000178428 by goroutine 27:
  github.com/containerd/nydus-snapshotter/pkg/remote/remotes/docker.(*httpReadSeeker).Close()
      /home/runner/work/nydus-snapshotter/nydus-snapshotter/pkg/remote/remotes/docker/httpreadseeker.go:87 +0x57
  github.com/containerd/nydus-snapshotter/pkg/tarfs.(*Manager).blobProcess.func2.1()
      /home/runner/work/nydus-snapshotter/nydus-snapshotter/pkg/tarfs/tarfs.go:339 +0x48
  runtime.deferreturn()
      /opt/hostedtoolcache/go/1.20.1/x64/src/runtime/panic.go:476 +0x32
  github.com/containerd/nydus-snapshotter/pkg/tarfs.(*Manager).blobProcess.func3()
      /home/runner/work/nydus-snapshotter/nydus-snapshotter/pkg/tarfs/tarfs.go:394 +0x71

Previous read at 0x00c000178428 by goroutine 40:
  github.com/containerd/nydus-snapshotter/pkg/remote/remotes/docker.(*httpReadSeeker).Read()
      /home/runner/work/nydus-snapshotter/nydus-snapshotter/pkg/remote/remotes/docker/httpreadseeker.go:48 +0x68
  bufio.(*Reader).Read()
      /opt/hostedtoolcache/go/1.20.1/x64/src/bufio/bufio.go:223 +0x2c3
  github.com/containerd/containerd/archive/compression.(*bufferedReader).Read()
      /home/runner/go/pkg/mod/github.com/containerd/[email protected]/archive/compression/compression.go:113 +0xa4
  io.copyBuffer()
      /opt/hostedtoolcache/go/1.20.1/x64/src/io/io.go:427 +0x28d
  io.Copy()
      /opt/hostedtoolcache/go/1.20.1/x64/src/io/io.go:386 +0x88
  os.genericReadFrom()
      /opt/hostedtoolcache/go/1.20.1/x64/src/os/file.go:161 +0x34
  os.(*File).ReadFrom()
      /opt/hostedtoolcache/go/1.20.1/x64/src/os/file.go:155 +0x324
  io.copyBuffer()
      /opt/hostedtoolcache/go/1.20.1/x64/src/io/io.go:413 +0x1c5
  io.Copy()
      /opt/hostedtoolcache/go/1.20.1/x64/src/io/io.go:386 +0x84
  os/exec.(*Cmd).childStdin.func1()
      /opt/hostedtoolcache/go/1.20.1/x64/src/os/exec/exec.go:511 +0x45
  os/exec.(*Cmd).Start.func2()
      /opt/hostedtoolcache/go/1.20.1/x64/src/os/exec/exec.go:717 +0x42
  os/exec.(*Cmd).Start.func3()
      /opt/hostedtoolcache/go/1.20.1/x64/src/os/exec/exec.go:729 +0x47

Goroutine 27 (running) created at:
  github.com/containerd/nydus-snapshotter/pkg/tarfs.(*Manager).blobProcess()
      /home/runner/work/nydus-snapshotter/nydus-snapshotter/pkg/tarfs/tarfs.go:393 +0x9dd
  github.com/containerd/nydus-snapshotter/pkg/tarfs.(*Manager).PrepareLayer()
      /home/runner/work/nydus-snapshotter/nydus-snapshotter/pkg/tarfs/tarfs.go:465 +0x444
  github.com/containerd/nydus-snapshotter/pkg/tarfs.TestPrepareLayer()
      /home/runner/work/nydus-snapshotter/nydus-snapshotter/pkg/tarfs/tarfs_test.go:33 +0x188
  testing.tRunner()
      /opt/hostedtoolcache/go/1.20.1/x64/src/testing/testing.go:1576 +0x216
  testing.(*T).Run.func1()
      /opt/hostedtoolcache/go/1.20.1/x64/src/testing/testing.go:1629 +0x47

Goroutine 40 (finished) created at:
  os/exec.(*Cmd).Start()
      /opt/hostedtoolcache/go/1.20.1/x64/src/os/exec/exec.go:716 +0xf8e
  github.com/containerd/containerd/archive/compression.cmdStream()
      /home/runner/go/pkg/mod/github.com/containerd/[email protected]/archive/compression/compression.go:284 +0x36f
  github.com/containerd/containerd/archive/compression.gzipDecompress()
      /home/runner/go/pkg/mod/github.com/containerd/[email protected]/archive/compression/compression.go:272 +0x152
  github.com/containerd/containerd/archive/compression.DecompressStream()
      /home/runner/go/pkg/mod/github.com/containerd/[email protected]/archive/compression/compression.go:203 +0x3e4
  github.com/containerd/nydus-snapshotter/pkg/tarfs.(*Manager).blobProcess.func2()
      /home/runner/work/nydus-snapshotter/nydus-snapshotter/pkg/tarfs/tarfs.go:341 +0x1b1
  github.com/containerd/nydus-snapshotter/pkg/tarfs.(*Manager).blobProcess.func3()
      /home/runner/work/nydus-snapshotter/nydus-snapshotter/pkg/tarfs/tarfs.go:394 +0x71
==================
    testing.go:1446: race detected during execution of test

Signed-off-by: Jiang Liu <[email protected]>
  • Loading branch information
jiangliu committed Dec 7, 2023
1 parent 54e2afb commit e0124e7
Showing 1 changed file with 26 additions and 9 deletions.
35 changes: 26 additions & 9 deletions pkg/tarfs/tarfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"strings"
"sync"
"syscall"
"time"

"github.com/containerd/containerd/archive/compression"
"github.com/containerd/containerd/log"
Expand Down Expand Up @@ -213,7 +214,7 @@ func (t *Manager) getBlobStream(ctx context.Context, remote *remote.Remote, ref
}

// generate tar file and layer bootstrap, return if this blob is an empty blob
func (t *Manager) generateBootstrap(tarReader io.Reader, snapshotID, layerBlobID, upperDirPath string) (err error) {
func (t *Manager) generateBootstrap(tarReader io.Reader, snapshotID, layerBlobID, upperDirPath string, w *sync.WaitGroup) (err error) {
snapshotImageDir := filepath.Join(upperDirPath, "image")
if err := os.MkdirAll(snapshotImageDir, 0750); err != nil {
return errors.Wrapf(err, "create data dir %s for tarfs snapshot", snapshotImageDir)
Expand All @@ -235,20 +236,33 @@ func (t *Manager) generateBootstrap(tarReader io.Reader, snapshotID, layerBlobID
defer os.Remove(layerTarFileTmp)

fifoName := filepath.Join(upperDirPath, "layer_"+snapshotID+"_"+"tar.fifo")
if err = syscall.Mkfifo(fifoName, 0644); err != nil {
if err = syscall.Mkfifo(fifoName, 0640); err != nil {
return err
}
defer os.Remove(fifoName)

w.Add(1)
go func() {
fifoFile, err := os.OpenFile(fifoName, os.O_WRONLY, os.ModeNamedPipe)
if err != nil {
log.L.Warnf("can not open fifo file, err %v", err)
return
defer w.Done()

var fifoFile *os.File
for i := 1; i < 100 && fifoFile == nil; i++ {
file, err := os.OpenFile(fifoName, os.O_RDWR, os.ModeNamedPipe)
switch {
case err == nil:
fifoFile = file
case os.IsNotExist(err) || os.IsPermission(err):
log.L.Warnf("open fifo file, %v", err)
return
default:
log.L.Warnf("open fifo file, %v", err)
time.Sleep(time.Duration(i) * 10 * time.Millisecond)
}
}
defer fifoFile.Close()

if _, err := io.Copy(fifoFile, io.TeeReader(tarReader, tarFile)); err != nil {
log.L.Warnf("tar stream copy err %v", err)
log.L.Warnf("tar stream copy, %v", err)
}
}()

Expand Down Expand Up @@ -338,6 +352,9 @@ func (t *Manager) blobProcess(ctx context.Context, snapshotID, ref string,
process := func(rc io.ReadCloser, remote *remote.Remote) error {
defer rc.Close()

var w sync.WaitGroup
defer w.Wait()

ds, err := compression.DecompressStream(rc)
if err != nil {
return epilog(err, "unpack layer blob stream for tarfs")
Expand All @@ -351,7 +368,7 @@ func (t *Manager) blobProcess(ctx context.Context, snapshotID, ref string,
}
digester := digest.Canonical.Digester()
dr := io.TeeReader(ds, digester.Hash())
err = t.generateBootstrap(dr, snapshotID, layerBlobID, upperDirPath)
err = t.generateBootstrap(dr, snapshotID, layerBlobID, upperDirPath, &w)
switch {
case err != nil && !errdefs.IsAlreadyExists(err):
return epilog(err, "generate tarfs from image layer blob")
Expand All @@ -362,7 +379,7 @@ func (t *Manager) blobProcess(ctx context.Context, snapshotID, ref string,
return epilog(nil, msg)
}
} else {
err = t.generateBootstrap(ds, snapshotID, layerBlobID, upperDirPath)
err = t.generateBootstrap(ds, snapshotID, layerBlobID, upperDirPath, &w)
if err != nil && !errdefs.IsAlreadyExists(err) {
return epilog(err, "generate tarfs data from image layer blob")
}
Expand Down

0 comments on commit e0124e7

Please sign in to comment.