diff --git a/internal/cache/file/downloader/parallel_downloader.go b/internal/cache/file/downloader/parallel_downloader.go new file mode 100644 index 0000000000..58e3f0befe --- /dev/null +++ b/internal/cache/file/downloader/parallel_downloader.go @@ -0,0 +1,191 @@ +package downloader + +import ( + "errors" + "fmt" + "io" + "os" + "strings" + "sync" + + "github.com/googlecloudplatform/gcsfuse/v2/internal/cache/lru" + cacheutil "github.com/googlecloudplatform/gcsfuse/v2/internal/cache/util" + "github.com/googlecloudplatform/gcsfuse/v2/internal/logger" + "github.com/googlecloudplatform/gcsfuse/v2/internal/monitor" + "github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs" + "github.com/googlecloudplatform/gcsfuse/v2/internal/util" + "golang.org/x/net/context" +) + +func (job *Job) rangeDownloader(start, end int64) (err error) { + newReader, err := job.bucket.NewReader( + job.cancelCtx, + &gcs.ReadObjectRequest{ + Name: job.object.Name, + Generation: job.object.Generation, + Range: &gcs.ByteRange{ + Start: uint64(start), + Limit: uint64(end), + }, + ReadCompressed: job.object.HasContentEncodingGzip(), + }) + if err != nil { + return + } + monitor.CaptureGCSReadMetrics(job.cancelCtx, util.Sequential, end-start) + + // Open cacheFile and write into it. + cacheFile, err := os.OpenFile(job.fileSpec.Path, os.O_RDWR, job.fileSpec.FilePerm) + defer func() { + if err != nil { + _ = newReader.Close() + _ = cacheFile.Close() + return + } + err = newReader.Close() + if err != nil { + return + } + err = cacheFile.Close() + if err != nil { + return + } + }() + if err != nil { + return + } + + _, err = cacheFile.Seek(start, 0) + if err != nil { + err = fmt.Errorf("downloadObjectAsync: error while seeking file handle, seek %d: %w", start, err) + return + } + // Copy the contents from NewReader to cache file. + _, readErr := io.CopyN(cacheFile, newReader, end-start) + if readErr != nil { + return + } + return +} + +// downloadObjectAsync downloads the backing GCS object into a file as part of +// file cache using NewReader method of gcs.Bucket. +// +// Note: There can only be one async download running for a job at a time. +// Acquires and releases LOCK(job.mu) +func (job *Job) downloadObjectInParallelAsync() { + // Close the job.doneCh, clear the cancelFunc & cancelCtx and call the + // remove job callback function in any case - completion/failure. + defer func() { + job.cancelFunc() + close(job.doneCh) + + job.mu.Lock() + if job.removeJobCallback != nil { + job.removeJobCallback() + job.removeJobCallback = nil + } + job.cancelCtx, job.cancelFunc = nil, nil + job.mu.Unlock() + }() + + // Create, open and truncate cache file for writing object into it. + cacheFile, err := cacheutil.CreateFile(job.fileSpec, os.O_TRUNC|os.O_WRONLY) + if err != nil { + err = fmt.Errorf("downloadObjectAsync: error in creating cache file: %w", err) + job.failWhileDownloading(err) + return + } + err = cacheFile.Truncate(int64(job.object.Size)) + if err != nil { + err = fmt.Errorf("downloadObjectAsync: error while truncating the cache file: %w", err) + job.failWhileDownloading(err) + return + } + err = cacheFile.Close() + if err != nil { + err = fmt.Errorf("downloadObjectAsync: error while closing cache file: %w", err) + job.failWhileDownloading(err) + } + + notifyInvalid := func() { + job.mu.Lock() + job.status.Name = Invalid + job.notifySubscribers() + job.mu.Unlock() + } + + var start, end int64 + end = int64(job.object.Size) + var readRequestSize int64 = int64(job.readRequestSizeMb * cacheutil.MiB) + var oneStepTotalReadSize int64 = readRequestSize * int64(job.downloadParallelism) + + for { + select { + case <-job.cancelCtx.Done(): + return + default: + if start < end { + // Parallely download different ranges not more than ReadChunkSize and + // not using go routines more than AsyncDownloadParallelism + asyncDownloaderErrors := make([]error, job.downloadParallelism) + wg := sync.WaitGroup{} + var singleStepRead int64 = 0 + for jobNum := 0; (jobNum < job.downloadParallelism) && (singleStepRead < oneStepTotalReadSize) && (start < end); jobNum++ { + wg.Add(1) + rangeStart := start + rangeEnd := min(rangeStart+readRequestSize, end) + downloadFunc := func(wg *sync.WaitGroup, assignedJobNum, rangeStart, rangeEnd int64) { + defer wg.Done() + asyncDownloaderErrors[assignedJobNum] = job.rangeDownloader(rangeStart, rangeEnd) + } + singleStepRead = singleStepRead + rangeEnd - rangeStart + start = rangeEnd + go downloadFunc(&wg, int64(jobNum), rangeStart, rangeEnd) + } + wg.Wait() + // If any of the go routine failed, consider the async job failed. + for jobNum := 0; jobNum < job.downloadParallelism; jobNum++ { + if err != nil { + if errors.Is(asyncDownloaderErrors[jobNum], context.Canceled) { + notifyInvalid() + return + } + job.failWhileDownloading(err) + return + } + } + + job.mu.Lock() + job.status.Offset = start + err = job.updateFileInfoCache() + // Notify subscribers if file cache is updated. + if err == nil { + job.notifySubscribers() + } else if strings.Contains(err.Error(), lru.EntryNotExistErrMsg) { + // Download job expects entry in file info cache for the file it is + // downloading. If the entry is deleted in between which is expected + // to happen at the time of eviction, then the job should be + // marked Invalid instead of Failed. + job.status.Name = Invalid + job.notifySubscribers() + logger.Tracef("Job:%p (%s:/%s) is no longer valid due to absense of entry in file info cache.", job, job.bucket.Name(), job.object.Name) + job.mu.Unlock() + return + } + job.mu.Unlock() + // Change status of job in case of error while updating file cache. + if err != nil { + job.failWhileDownloading(err) + return + } + } else { + job.mu.Lock() + job.status.Name = Completed + job.notifySubscribers() + job.mu.Unlock() + return + } + } + } +}