Skip to content

Commit

Permalink
fix: improved shard deletion (#24602)
Browse files Browse the repository at this point in the history
Avoid unnecessarily deleting series from the series file
Try harder to delete series from InMem indices
Log all errors on shard deletion

Closes #24834
  • Loading branch information
davidby-influx authored Mar 26, 2024
1 parent bc80e88 commit 8ff06d5
Showing 1 changed file with 33 additions and 13 deletions.
46 changes: 33 additions & 13 deletions tsdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -811,7 +811,7 @@ func (s *Store) DeleteShard(shardID uint64) error {
return ErrShardNotFound
}

// Remove the shard from Store so it's not returned to callers requesting
// Remove the shard from Store, so it's not returned to callers requesting
// shards. Also mark that this shard is currently being deleted in a separate
// map so that we do not have to retain the global store lock while deleting
// files.
Expand Down Expand Up @@ -840,7 +840,6 @@ func (s *Store) DeleteShard(shardID uint64) error {
defer s.mu.Unlock()
delete(s.epochs, shardID)
delete(s.pendingShardDeletes, shardID)
s.databases[db].removeIndexType(sh.IndexType())
}()

// Get the shard's local bitset of series IDs.
Expand All @@ -851,16 +850,23 @@ func (s *Store) DeleteShard(shardID uint64) error {

ss := index.SeriesIDSet()

s.walkShards(shards, func(sh *Shard) error {
err = s.walkShards(shards, func(sh *Shard) error {
index, err := sh.Index()
if err != nil {
s.Logger.Error("cannot find shard index", zap.Uint64("shard_id", sh.ID()), zap.Error(err))
return err
}

ss.Diff(index.SeriesIDSet())
return nil
})

if err != nil {
// We couldn't get the index for a shard. Rather than deleting series which may
// exist in that shard as well as in the current shard, we stop the current deletion
return err
}

// Remove any remaining series in the set from the series file, as they don't
// exist in any of the database's remaining shards.
if ss.Cardinality() > 0 {
Expand All @@ -872,7 +878,7 @@ func (s *Store) DeleteShard(shardID uint64) error {
var keyBuf []byte // Series key buffer.
var name []byte
var tagsBuf models.Tags // Buffer for tags container.
var err error
var errs []error

ss.ForEach(func(id uint64) {
skey := sfile.SeriesKey(id) // Series File series key
Expand All @@ -881,22 +887,32 @@ func (s *Store) DeleteShard(shardID uint64) error {
}

name, tagsBuf = ParseSeriesKeyInto(skey, tagsBuf)
keyBuf = keyBuf[:0]
keyBuf = models.AppendMakeKey(keyBuf, name, tagsBuf)
if err = index.DropSeriesGlobal(keyBuf); err != nil {
return
if tmpErr := index.DropSeriesGlobal(keyBuf); tmpErr != nil {
sfile.Logger.Error(
"cannot drop series",
zap.Uint64("series_id", id),
zap.String("key", string(keyBuf)),
zap.Error(tmpErr))
errs = append(errs, tmpErr)
}
})

if err != nil {
return err
if len(errs) != 0 {
return errors.Join(errs...)
}
}

ss.ForEach(func(id uint64) {
sfile.DeleteSeriesID(id)
if err := sfile.DeleteSeriesID(id); err != nil {
sfile.Logger.Error(
"cannot delete series in shard",
zap.Uint64("series_id", id),
zap.Uint64("shard_id", shardID),
zap.Error(err))
}
})
}

}

// enter the epoch tracker
Expand All @@ -916,9 +932,13 @@ func (s *Store) DeleteShard(shardID uint64) error {
// Remove the on-disk shard data.
if err := os.RemoveAll(sh.path); err != nil {
return err
} else if err = os.RemoveAll(sh.walPath); err != nil {
return err
} else {
// Remove index type from the database on success
s.databases[db].removeIndexType(sh.IndexType())
return nil
}

return os.RemoveAll(sh.walPath)
}

// DeleteDatabase will close all shards associated with a database and remove the directory and files from disk.
Expand Down

0 comments on commit 8ff06d5

Please sign in to comment.