Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement move object for s3 storage and other fixups #61

Merged
merged 1 commit into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/proc/delete_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ func (dh *BasicDeleteHandler) ListGarbageFiles(msg message.DeleteMessage) ([]str
return nil, errors.Wrap(err, "could not get virtual and expire indexes")
}
ylogger.Zero.Info().Msg("recieved virtual index and expire index")
ylogger.Zero.Debug().Int("virtual", len(vi)).Msg("vi count")
ylogger.Zero.Debug().Int("expire", len(ei)).Msg("ei count")
ylogger.Zero.Debug().Int("virtual", len(vi)).Msg("virtual index match count")
ylogger.Zero.Debug().Int("expire", len(ei)).Msg("exprire index match count")

filesToDelete := make([]string, 0)
for i := 0; i < len(objectMetas); i++ {
Expand All @@ -102,7 +102,7 @@ func (dh *BasicDeleteHandler) ListGarbageFiles(msg message.DeleteMessage) ([]str
ylogger.Zero.Debug().Str("file", objectMetas[i].Path).
Bool("file in expire index", ok).
Bool("lsn is less than in first backup", lsn < firstBackupLSN).
Msg("file will be deleted")
Msg("file does not persisnt in virtual index, not needed for PITR, so will be deleted")
filesToDelete = append(filesToDelete, objectMetas[i].Path)
}
}
Expand Down
14 changes: 13 additions & 1 deletion pkg/proc/interaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,19 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl
BackupInterractor: backupHandler,
}

ylogger.Zero.Debug().Str("Name", msg.Name).Bool("garb", msg.Garbage).Bool("confirm", msg.Confirm).Msg("requested to remove external chunk")
if msg.Garbage {
ylogger.Zero.Debug().
Str("Name", msg.Name).
Uint64("port", msg.Port).
Uint64("segment", msg.Segnum).
Bool("confirm", msg.Confirm).Msg("requested to perform external storage VACUUM")
} else {
ylogger.Zero.Debug().
Str("Name", msg.Name).
Uint64("port", msg.Port).
Uint64("segment", msg.Segnum).
Bool("confirm", msg.Confirm).Msg("requested to remove external chunk")
}

if msg.Garbage {
err = dh.HandleDeleteGarbage(msg)
Expand Down
39 changes: 39 additions & 0 deletions pkg/storage/s3storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,42 @@ func (s *S3StorageInteractor) DeleteObject(key string) error {
ylogger.Zero.Debug().Str("path", key).Msg("deleted object")
return nil
}

func (s *S3StorageInteractor) SScopyObject(from string, to string) error {
sess, err := s.pool.GetSession(context.TODO())
if err != nil {
ylogger.Zero.Err(err).Msg("failed to acquire s3 session")
return err
}
ylogger.Zero.Debug().Msg("aquired session for server-side copy")

if !strings.HasPrefix(from, s.cnf.StoragePrefix) {
from = path.Join(s.cnf.StoragePrefix, from)
}

if !strings.HasPrefix(to, s.cnf.StoragePrefix) {
to = path.Join(s.cnf.StoragePrefix, to)
}

inp := s3.CopyObjectInput{
Bucket: &s.cnf.StorageBucket,
CopySource: aws.String(path.Join(s.cnf.StorageBucket, from)),
Key: aws.String(to),
}

_, err = sess.CopyObject(&inp)
if err != nil {
ylogger.Zero.Err(err).Msg("failed to copy object")
return err
}
ylogger.Zero.Debug().Str("path-from", from).Str("path-to", to).Msg("copied object")

return nil
}

func (s *S3StorageInteractor) MoveObject(from string, to string) error {
if err := s.SScopyObject(from, to); err != nil {
return err
}
return s.DeleteObject(from)
}
Loading