From b88b8cb82b4efd7bb875fe7c284dd039dfad00f0 Mon Sep 17 00:00:00 2001 From: zhijian Date: Tue, 13 Aug 2024 13:56:50 +0800 Subject: [PATCH] set the upload flag to false if the upload fails --- pkg/chunk/cached_store.go | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/pkg/chunk/cached_store.go b/pkg/chunk/cached_store.go index da348ddccd56..109ec16171b7 100644 --- a/pkg/chunk/cached_store.go +++ b/pkg/chunk/cached_store.go @@ -945,31 +945,31 @@ func parseObjOrigSize(key string) int { return l } -func (store *cachedStore) uploadStagingFile(key string, stagingPath string) { +func (store *cachedStore) uploadStagingFile(key string, stagingPath string) error { store.currentUpload <- true defer func() { <-store.currentUpload }() if !store.canUpload() { - return + return nil } store.pendingMutex.Lock() item, ok := store.pendingKeys[key] store.pendingMutex.Unlock() if !ok { logger.Debugf("Key %s is not needed, drop it", key) - return + return nil } blen := parseObjOrigSize(key) f, err := openCacheFile(stagingPath, blen, store.conf.CacheChecksum) if err != nil { if store.isPendingValid(key) { logger.Errorf("Open staging file %s: %s", stagingPath, err) - } else { - logger.Debugf("Key %s is not needed, drop it", key) + return err } - return + logger.Debugf("Key %s is not needed, drop it", key) + return nil } block := NewOffPage(blen) _, err = f.ReadAt(block.Data, 0) @@ -977,12 +977,12 @@ func (store *cachedStore) uploadStagingFile(key string, stagingPath string) { if err != nil { block.Release() logger.Errorf("Read staging file %s: %s", stagingPath, err) - return + return err } if !store.isPendingValid(key) { block.Release() logger.Debugf("Key %s is not needed, drop it", key) - return + return nil } store.stageBlockDelay.Add(time.Since(item.ts).Seconds()) @@ -997,9 +997,8 @@ func (store *cachedStore) uploadStagingFile(key string, stagingPath string) { logger.Warnf("failed to remove stage %s, in upload staging file", stagingPath) } } - } else { - item.uploading = false } + return err } func (store *cachedStore) addDelayedStaging(key, stagingPath string, added time.Time, force bool) bool { @@ -1057,7 +1056,14 @@ func (store *cachedStore) scanDelayedStaging() { func (store *cachedStore) uploader() { for it := range store.pendingCh { - store.uploadStagingFile(it.key, it.fpath) + if err := store.uploadStagingFile(it.key, it.fpath); err != nil { + store.pendingMutex.Lock() + item, ok := store.pendingKeys[it.key] + store.pendingMutex.Unlock() + if ok { + item.uploading = false + } + } } }