Skip to content

Commit

Permalink
Adjust vacuum API (#62)
Browse files Browse the repository at this point in the history
* Adjust vacuum API

* Fxies
  • Loading branch information
reshke authored Oct 3, 2024
1 parent 5ddca9c commit ffc32ae
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 21 deletions.
2 changes: 2 additions & 0 deletions config/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ type Instance struct {

CryptoCnf Crypto `json:"crypto" toml:"crypto" yaml:"crypto"`

VacuumCnf Vacuum `json:"vacuum" toml:"vacuum" yaml:"vacuum"`

LogPath string `json:"log_path" toml:"log_path" yaml:"log_path"`
LogLevel string `json:"log_level" toml:"log_level" yaml:"log_level"`
SocketPath string `json:"socket_path" toml:"socket_path" yaml:"socket_path"`
Expand Down
5 changes: 5 additions & 0 deletions config/vacuum.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package config

type Vacuum struct {
CheckBackup bool `json:"check_backup,default=true" toml:"check_backup,default=true" yaml:"check_backup,default=true"`
}
2 changes: 1 addition & 1 deletion pkg/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (i *Instance) Run(instanceCnf *config.Instance) error {
defer clConn.Close()
ycl := client.NewYClient(clConn)
i.pool.Put(ycl)
if err := proc.ProcConn(s, cr, ycl); err != nil {
if err := proc.ProcConn(s, cr, ycl, &instanceCnf.VacuumCnf); err != nil {
ylogger.Zero.Debug().Uint("id", ycl.ID()).Err(err).Msg("got error serving client")
}
_, err := i.pool.Pop(ycl.ID())
Expand Down
17 changes: 12 additions & 5 deletions pkg/message/delete_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ import (
)

type DeleteMessage struct { //seg port
Name string
Port uint64
Segnum uint64
Confirm bool
Garbage bool
Name string
Port uint64
Segnum uint64
Confirm bool
Garbage bool
CrazyDrop bool
}

var _ ProtoMessage = &DeleteMessage{}
Expand Down Expand Up @@ -39,6 +40,9 @@ func (c *DeleteMessage) Encode() []byte {
if c.Garbage {
bt[2] = 1
}
if c.CrazyDrop {
bt[3] = 1
}

bt = append(bt, []byte(c.Name)...)
bt = append(bt, 0)
Expand All @@ -64,6 +68,9 @@ func (c *DeleteMessage) Decode(body []byte) {
if body[2] == 1 {
c.Garbage = true
}
if body[3] == 1 {
c.CrazyDrop = true
}
c.Name = c.GetDeleteName(body[4:])
c.Port = binary.BigEndian.Uint64(body[len(body)-16 : len(body)-8])
c.Segnum = binary.BigEndian.Uint64(body[len(body)-8:])
Expand Down
2 changes: 1 addition & 1 deletion pkg/mock/backups.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/mock/database.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 44 additions & 10 deletions pkg/proc/delete_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package proc

import (
"fmt"
"path"
"strings"

"github.com/pkg/errors"
"github.com/yezzey-gp/yproxy/config"
"github.com/yezzey-gp/yproxy/pkg/backups"
"github.com/yezzey-gp/yproxy/pkg/database"
"github.com/yezzey-gp/yproxy/pkg/message"
Expand All @@ -22,6 +24,8 @@ type BasicDeleteHandler struct {
BackupInterractor backups.BackupInterractor
DbInterractor database.DatabaseInterractor
StorageInterractor storage.StorageInteractor

Cnf *config.Vacuum
}

func (dh *BasicDeleteHandler) HandleDeleteGarbage(msg message.DeleteMessage) error {
Expand All @@ -39,10 +43,26 @@ func (dh *BasicDeleteHandler) HandleDeleteGarbage(msg message.DeleteMessage) err
for len(fileList) > 0 && retryCount < 10 {
retryCount++
for i := 0; i < len(fileList); i++ {
filePathParts := strings.Split(fileList[i], "/")
err = dh.StorageInterractor.MoveObject(fileList[i], fmt.Sprintf("segments_005/seg%d/basebackups_005/yezzey/trash/%s", msg.Segnum, filePathParts[len(filePathParts)-1]))

if msg.CrazyDrop {
ylogger.Zero.Debug().Str("path", fileList[i]).Msg("simply delete without any 'plan B'")
err = dh.StorageInterractor.DeleteObject(fileList[i])

} else {

filePathParts := strings.Split(fileList[i], "/")

destPath := path.Join(
"trash",
"segments_005",
fmt.Sprintf("seg%d", msg.Segnum),
"basebackups_005",
"yezzey", filePathParts[len(filePathParts)-1])

err = dh.StorageInterractor.MoveObject(fileList[i], destPath)
}
if err != nil {
ylogger.Zero.Warn().AnErr("err", err).Str("file", fileList[i]).Msg("failed to move file")
ylogger.Zero.Warn().AnErr("err", err).Str("file", fileList[i]).Msg("failed to obsolete file")
failed = append(failed, fileList[i])
}
}
Expand Down Expand Up @@ -70,11 +90,19 @@ func (dh *BasicDeleteHandler) HandleDeleteFile(msg message.DeleteMessage) error

func (dh *BasicDeleteHandler) ListGarbageFiles(msg message.DeleteMessage) ([]string, error) {
//get firsr backup lsn
firstBackupLSN, err := dh.BackupInterractor.GetFirstLSN(msg.Segnum)
if err != nil {
ylogger.Zero.Error().AnErr("err", err).Msg("failed to get first lsn") //return or just assume there are no backups?
var firstBackupLSN uint64
var err error

if dh.Cnf.CheckBackup {
firstBackupLSN, err = dh.BackupInterractor.GetFirstLSN(msg.Segnum)
if err != nil {
ylogger.Zero.Error().AnErr("err", err).Msg("failed to get first lsn") //return or just assume there are no backups?
}
ylogger.Zero.Info().Uint64("lsn", firstBackupLSN).Msg("first backup LSN")
} else {
firstBackupLSN = ^uint64(0)
ylogger.Zero.Info().Uint64("lsn", firstBackupLSN).Msg("omit first backup LSN")
}
ylogger.Zero.Info().Uint64("lsn", firstBackupLSN).Msg("first backup LSN")

//list files in storage
ylogger.Zero.Info().Str("path", msg.Name).Msg("going to list path")
Expand All @@ -96,13 +124,19 @@ func (dh *BasicDeleteHandler) ListGarbageFiles(msg message.DeleteMessage) ([]str
filesToDelete := make([]string, 0)
for i := 0; i < len(objectMetas); i++ {
reworkedName := ReworkFileName(objectMetas[i].Path)
ylogger.Zero.Debug().Str("reworked name", reworkedName).Msg("lookup chunk")

if vi[reworkedName] {
continue
}

lsn, ok := ei[reworkedName]
ylogger.Zero.Debug().Uint64("lsn", lsn).Uint64("backup lsn", firstBackupLSN).Msg("comparing lsn")
if !vi[reworkedName] && (lsn < firstBackupLSN || !ok) {
ylogger.Zero.Debug().Uint64("lsn", lsn).Uint64("backup lsn", firstBackupLSN).Str("path", objectMetas[i].Path).Msg("comparing lsn")
if lsn < firstBackupLSN || !ok {
ylogger.Zero.Debug().Str("file", objectMetas[i].Path).
Bool("file in expire index", ok).
Bool("lsn is less than in first backup", lsn < firstBackupLSN).
Msg("file does not persisnt in virtual index, not needed for PITR, so will be deleted")
Msg("file does not persisnt in virtual index, nor needed for PITR, so will be deleted")
filesToDelete = append(filesToDelete, objectMetas[i].Path)
}
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/proc/delete_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/yezzey-gp/yproxy/config"
"github.com/yezzey-gp/yproxy/pkg/message"
mock "github.com/yezzey-gp/yproxy/pkg/mock"
"github.com/yezzey-gp/yproxy/pkg/object"
Expand Down Expand Up @@ -90,6 +91,7 @@ func TestFilesToDeletion(t *testing.T) {
StorageInterractor: storage,
DbInterractor: database,
BackupInterractor: backup,
Cnf: &config.Vacuum{CheckBackup: true},
}

list, err := handler.ListGarbageFiles(msg)
Expand Down
5 changes: 3 additions & 2 deletions pkg/proc/interaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func ProcessPutExtended(
return nil
}

func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyClient) error {
func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyClient, cnf *config.Vacuum) error {

defer func() {
_ = ycl.Close()
Expand Down Expand Up @@ -280,7 +280,7 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl
if err != nil {
return err
}
fmt.Printf("ok new conf: %v\n", instanceCnf)
ylogger.Zero.Info().Interface("cnf", instanceCnf).Msg("loaded new config")

//list objects
objectMetas, err := oldStorage.ListPath(msg.Name)
Expand Down Expand Up @@ -396,6 +396,7 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl
StorageInterractor: s,
DbInterractor: dbInterractor,
BackupInterractor: backupHandler,
Cnf: cnf,
}

if msg.Garbage {
Expand Down

0 comments on commit ffc32ae

Please sign in to comment.