Skip to content

Commit

Permalink
Implement move object for s3 storage and other fixups (#61)
Browse files Browse the repository at this point in the history
  • Loading branch information
reshke authored Oct 3, 2024
1 parent b23d038 commit 5ddca9c
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 4 deletions.
6 changes: 3 additions & 3 deletions pkg/proc/delete_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ func (dh *BasicDeleteHandler) ListGarbageFiles(msg message.DeleteMessage) ([]str
return nil, errors.Wrap(err, "could not get virtual and expire indexes")
}
ylogger.Zero.Info().Msg("recieved virtual index and expire index")
ylogger.Zero.Debug().Int("virtual", len(vi)).Msg("vi count")
ylogger.Zero.Debug().Int("expire", len(ei)).Msg("ei count")
ylogger.Zero.Debug().Int("virtual", len(vi)).Msg("virtual index match count")
ylogger.Zero.Debug().Int("expire", len(ei)).Msg("exprire index match count")

filesToDelete := make([]string, 0)
for i := 0; i < len(objectMetas); i++ {
Expand All @@ -102,7 +102,7 @@ func (dh *BasicDeleteHandler) ListGarbageFiles(msg message.DeleteMessage) ([]str
ylogger.Zero.Debug().Str("file", objectMetas[i].Path).
Bool("file in expire index", ok).
Bool("lsn is less than in first backup", lsn < firstBackupLSN).
Msg("file will be deleted")
Msg("file does not persisnt in virtual index, not needed for PITR, so will be deleted")
filesToDelete = append(filesToDelete, objectMetas[i].Path)
}
}
Expand Down
14 changes: 13 additions & 1 deletion pkg/proc/interaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,19 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl
BackupInterractor: backupHandler,
}

ylogger.Zero.Debug().Str("Name", msg.Name).Bool("garb", msg.Garbage).Bool("confirm", msg.Confirm).Msg("requested to remove external chunk")
if msg.Garbage {
ylogger.Zero.Debug().
Str("Name", msg.Name).
Uint64("port", msg.Port).
Uint64("segment", msg.Segnum).
Bool("confirm", msg.Confirm).Msg("requested to perform external storage VACUUM")
} else {
ylogger.Zero.Debug().
Str("Name", msg.Name).
Uint64("port", msg.Port).
Uint64("segment", msg.Segnum).
Bool("confirm", msg.Confirm).Msg("requested to remove external chunk")
}

if msg.Garbage {
err = dh.HandleDeleteGarbage(msg)
Expand Down
39 changes: 39 additions & 0 deletions pkg/storage/s3storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,42 @@ func (s *S3StorageInteractor) DeleteObject(key string) error {
ylogger.Zero.Debug().Str("path", key).Msg("deleted object")
return nil
}

func (s *S3StorageInteractor) SScopyObject(from string, to string) error {
sess, err := s.pool.GetSession(context.TODO())
if err != nil {
ylogger.Zero.Err(err).Msg("failed to acquire s3 session")
return err
}
ylogger.Zero.Debug().Msg("aquired session for server-side copy")

if !strings.HasPrefix(from, s.cnf.StoragePrefix) {
from = path.Join(s.cnf.StoragePrefix, from)
}

if !strings.HasPrefix(to, s.cnf.StoragePrefix) {
to = path.Join(s.cnf.StoragePrefix, to)
}

inp := s3.CopyObjectInput{
Bucket: &s.cnf.StorageBucket,
CopySource: aws.String(path.Join(s.cnf.StorageBucket, from)),
Key: aws.String(to),
}

_, err = sess.CopyObject(&inp)
if err != nil {
ylogger.Zero.Err(err).Msg("failed to copy object")
return err
}
ylogger.Zero.Debug().Str("path-from", from).Str("path-to", to).Msg("copied object")

return nil
}

func (s *S3StorageInteractor) MoveObject(from string, to string) error {
if err := s.SScopyObject(from, to); err != nil {
return err
}
return s.DeleteObject(from)
}

0 comments on commit 5ddca9c

Please sign in to comment.