Skip to content

Commit

Permalink
set the upload flag to false if the upload fails
Browse files Browse the repository at this point in the history
  • Loading branch information
zhijian-pro committed Aug 13, 2024
1 parent eae3e78 commit b88b8cb
Showing 1 changed file with 17 additions and 11 deletions.
28 changes: 17 additions & 11 deletions pkg/chunk/cached_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -945,44 +945,44 @@ func parseObjOrigSize(key string) int {
return l
}

func (store *cachedStore) uploadStagingFile(key string, stagingPath string) {
func (store *cachedStore) uploadStagingFile(key string, stagingPath string) error {
store.currentUpload <- true
defer func() {
<-store.currentUpload
}()

if !store.canUpload() {
return
return nil
}
store.pendingMutex.Lock()
item, ok := store.pendingKeys[key]
store.pendingMutex.Unlock()
if !ok {
logger.Debugf("Key %s is not needed, drop it", key)
return
return nil
}
blen := parseObjOrigSize(key)
f, err := openCacheFile(stagingPath, blen, store.conf.CacheChecksum)
if err != nil {
if store.isPendingValid(key) {
logger.Errorf("Open staging file %s: %s", stagingPath, err)
} else {
logger.Debugf("Key %s is not needed, drop it", key)
return err
}
return
logger.Debugf("Key %s is not needed, drop it", key)
return nil
}
block := NewOffPage(blen)
_, err = f.ReadAt(block.Data, 0)
_ = f.Close()
if err != nil {
block.Release()
logger.Errorf("Read staging file %s: %s", stagingPath, err)
return
return err
}
if !store.isPendingValid(key) {
block.Release()
logger.Debugf("Key %s is not needed, drop it", key)
return
return nil
}

store.stageBlockDelay.Add(time.Since(item.ts).Seconds())
Expand All @@ -997,9 +997,8 @@ func (store *cachedStore) uploadStagingFile(key string, stagingPath string) {
logger.Warnf("failed to remove stage %s, in upload staging file", stagingPath)
}
}
} else {
item.uploading = false
}
return err
}

func (store *cachedStore) addDelayedStaging(key, stagingPath string, added time.Time, force bool) bool {
Expand Down Expand Up @@ -1057,7 +1056,14 @@ func (store *cachedStore) scanDelayedStaging() {

func (store *cachedStore) uploader() {
for it := range store.pendingCh {
store.uploadStagingFile(it.key, it.fpath)
if err := store.uploadStagingFile(it.key, it.fpath); err != nil {
store.pendingMutex.Lock()
item, ok := store.pendingKeys[it.key]
store.pendingMutex.Unlock()
if ok {
item.uploading = false
}
}
}
}

Expand Down

0 comments on commit b88b8cb

Please sign in to comment.