diff --git a/Makefile b/Makefile index 689fa0c..ca4b0b5 100644 --- a/Makefile +++ b/Makefile @@ -17,6 +17,9 @@ unittest: mockgen: mockgen -source=pkg/proc/yrreader.go -destination=pkg/mock/proc/yrreader.go -package=mock + mockgen -source=pkg/database/database.go -destination=pkg/mock/database.go -package=mock + mockgen -source=pkg/backups/backups.go -destination=pkg/mock/backups.go -package=mock + mockgen -source=pkg/storage/storage.go -destination=pkg/mock/storage.go -package=mock version = $(shell git describe --tags --abbrev=0) package: diff --git a/cmd/client/main.go b/cmd/client/main.go index f396d07..0a65828 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -22,6 +22,10 @@ var logLevel string var decrypt bool var encrypt bool var offset uint64 +var segmentPort int +var segmentNum int +var confirm bool +var garbage bool // TODOV func Runner(f func(net.Conn, *config.Instance, []string) error) func(*cobra.Command, []string) error { @@ -179,12 +183,10 @@ func listFunc(con net.Conn, instanceCnf *config.Instance, args []string) error { meta.Decode(body) res = append(res, meta.Content...) - break case message.MessageTypeReadyForQuery: done = true - break default: - return fmt.Errorf("Incorrect message type: %s", tp.String()) + return fmt.Errorf("incorrect message type: %s", tp.String()) } } @@ -213,6 +215,49 @@ var copyCmd = &cobra.Command{ RunE: Runner(copyFunc), } +var deleteCmd = &cobra.Command{ + Use: "delete", + Short: "delete", + RunE: func(cmd *cobra.Command, args []string) error { + ylogger.Zero.Info().Msg("Execute delete command") + err := config.LoadInstanceConfig(cfgPath) + if err != nil { + return err + } + instanceCnf := config.InstanceConfig() + + con, err := net.Dial("unix", instanceCnf.SocketPath) + if err != nil { + return err + } + defer con.Close() + + ylogger.Zero.Info().Str("name", args[0]).Msg("delete") + msg := message.NewDeleteMessage(args[0], segmentPort, segmentNum, confirm, garbage).Encode() + _, err = con.Write(msg) + if err != nil { + return err + } + + ylogger.Zero.Debug().Bytes("msg", msg).Msg("constructed delete msg") + + client := client.NewYClient(con) + protoReader := proc.NewProtoReader(client) + + ansType, body, err := protoReader.ReadPacket() + if err != nil { + ylogger.Zero.Debug().Err(err).Msg("error while recieving answer") + return err + } + + if ansType != message.MessageTypeReadyForQuery { + return fmt.Errorf("failed to delete, msg: %v", body) + } + + return nil + }, +} + var putCmd = &cobra.Command{ Use: "put", Short: "put", @@ -244,6 +289,12 @@ func init() { rootCmd.AddCommand(putCmd) rootCmd.AddCommand(listCmd) + + deleteCmd.PersistentFlags().IntVarP(&segmentPort, "port", "p", 6000, "port that segment is listening on") + deleteCmd.PersistentFlags().IntVarP(&segmentNum, "segnum", "s", 0, "logical number of a segment") + deleteCmd.PersistentFlags().BoolVarP(&confirm, "confirm", "", false, "confirm deletion") + deleteCmd.PersistentFlags().BoolVarP(&garbage, "garbage", "g", false, "delete garbage") + rootCmd.AddCommand(deleteCmd) } func main() { diff --git a/go.mod b/go.mod index b824141..d226e8b 100644 --- a/go.mod +++ b/go.mod @@ -15,12 +15,16 @@ require ( gopkg.in/yaml.v2 v2.4.0 ) +require golang.org/x/text v0.14.0 // indirect + require ( - github.com/jackc/pgpassfile v1.0.0 // indirect - github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect + github.com/cockroachdb/apd v1.1.0 // indirect + github.com/gofrs/uuid v4.4.0+incompatible // indirect + github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 // indirect github.com/kr/text v0.2.0 // indirect + github.com/lib/pq v1.10.9 // indirect github.com/rogpeppe/go-internal v1.12.0 // indirect - golang.org/x/text v0.14.0 // indirect + github.com/shopspring/decimal v1.4.0 // indirect ) require ( @@ -28,6 +32,7 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/golang/mock v1.6.0 github.com/inconshreveable/mousetrap v1.1.0 // indirect + github.com/jackc/pgx v3.6.2+incompatible github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.19 // indirect diff --git a/go.sum b/go.sum index c528825..ca60523 100644 --- a/go.sum +++ b/go.sum @@ -5,6 +5,8 @@ github.com/ProtonMail/go-crypto v0.0.0-20230923063757-afb1ddc0824c/go.mod h1:EjA github.com/bwesterb/go-ristretto v1.2.3/go.mod h1:fUIoIZaG73pV5biE2Blr2xEzDoMj7NFEuV9ekS419A0= github.com/cloudflare/circl v1.3.3 h1:fE/Qz0QdIGqeWfnwq0RE0R7MI51s0M2E4Ga9kq5AEMs= github.com/cloudflare/circl v1.3.3/go.mod h1:5XYMA4rFBvNIrhs50XuiBJ15vF2pZn4nnUKZrLbUZFA= +github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I= +github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= @@ -12,14 +14,20 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/gofrs/uuid v4.4.0+incompatible h1:3qXRTX8/NbyulANqlc0lchS1gqAVxRgsuW1YrTJupqA= +github.com/gofrs/uuid v4.4.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 h1:vr3AYkKovP8uR8AvSGGUK1IDqRa5lAAvEkZG1LKaCRc= +github.com/jackc/fake v0.0.0-20150926172116-812a484cc733/go.mod h1:WrMFNQdiFJ80sQsxDoMokWK1W5TQtxBFNpzWTD84ibQ= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx v3.6.2+incompatible h1:2zP5OD7kiyR3xzRYMhOcXVvkDZsImVXfj+yIyTQf3/o= +github.com/jackc/pgx v3.6.2+incompatible/go.mod h1:0ZGrqGqkRlliWnWB4zKnWtjbSWbGkVEFm4TeybAXq+I= github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY= github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw= github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= @@ -30,6 +38,8 @@ github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= @@ -45,13 +55,13 @@ github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/zerolog v1.31.0 h1:FcTR3NnLWW+NnTwwhFWiJSZr4ECLpqCm6QsEnyvbV4A= github.com/rs/zerolog v1.31.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k= +github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0= github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/yezzey-gp/aws-sdk-go v0.1.0 h1:as6ANEva14gKdhWPjZy6qaGR+/WhP0HN4UMzDHLDqmU= @@ -127,6 +137,5 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EV gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/backups/backups.go b/pkg/backups/backups.go new file mode 100644 index 0000000..6245ad1 --- /dev/null +++ b/pkg/backups/backups.go @@ -0,0 +1,67 @@ +package backups + +import ( + "bytes" + "encoding/json" + "fmt" + "os/exec" + "strings" + + "github.com/yezzey-gp/yproxy/pkg/ylogger" +) + +type BackupLSN struct { + Lsn uint64 `json:"LSN"` +} + +//go:generate mockgen -destination=pkg/mock/backups.go -package=mock +type BackupInterractor interface { + GetFirstLSN(int) (uint64, error) +} + +type WalgBackupInterractor struct { //TODO: rewrite to using s3 instead of wal-g cmd +} + +// get lsn of the oldest backup +func (b *WalgBackupInterractor) GetFirstLSN(seg int) (uint64, error) { + cmd := exec.Command("/usr/bin/wal-g", "st", "ls", fmt.Sprintf("segments_005/seg%d/basebackups_005/", seg), "--config=/etc/wal-g/wal-g.yaml") + ylogger.Zero.Debug().Any("flags", cmd.Args).Msg("Command args") + var out bytes.Buffer + cmd.Stdout = &out + + err := cmd.Run() + if err != nil { + ylogger.Zero.Debug().AnErr("error", err).Msg("Failed to run st ls") + return 0, err + } + lines := strings.Split(out.String(), "\n") + + minLSN := BackupLSN{Lsn: ^uint64(0)} + for _, line := range lines { + if !strings.Contains(line, ".json") { + continue + } + parts := strings.Split(line, " ") + fileName := parts[len(parts)-1] + + ylogger.Zero.Debug().Str("file: %s", fmt.Sprintf("segments_005/seg%d/basebackups_005/%s", seg, fileName)).Msg("check lsn in file") + catCmd := exec.Command("/usr/bin/wal-g", "st", "cat", fmt.Sprintf("segments_005/seg%d/basebackups_005/%s", seg, fileName), "--config=/etc/wal-g/wal-g.yaml") + + var catOut bytes.Buffer + catCmd.Stdout = &catOut + + err = catCmd.Run() + if err != nil { + ylogger.Zero.Debug().AnErr("error", err).Msg("Failed to run st cat") + return 0, err + } + lsn := BackupLSN{} + err = json.Unmarshal(catOut.Bytes(), &lsn) + + if lsn.Lsn < minLSN.Lsn { + minLSN.Lsn = lsn.Lsn + } + } + + return minLSN.Lsn, err +} diff --git a/pkg/database/database.go b/pkg/database/database.go new file mode 100644 index 0000000..a57a031 --- /dev/null +++ b/pkg/database/database.go @@ -0,0 +1,177 @@ +package database + +import ( + "fmt" + "strings" + + "github.com/jackc/pgx" + "github.com/jackc/pgx/pgtype" + "github.com/pkg/errors" + "github.com/yezzey-gp/yproxy/pkg/ylogger" +) + +//go:generate mockgen -destination=../mock/mock_database_interractor.go -package mock +type DatabaseInterractor interface { + GetVirtualExpireIndexes(int) (map[string]bool, map[string]uint64, error) +} + +type DatabaseHandler struct { +} + +type DB struct { + name string + tablespace pgtype.OID + oid pgtype.OID +} + +type Ei struct { + reloid pgtype.OID + relfileoid pgtype.OID + expireLsn string + fqnmd5 string +} + +func (database *DatabaseHandler) GetVirtualExpireIndexes(port int) (map[string]bool, map[string]uint64, error) { //TODO несколько баз + db, err := getDatabase(port) + if err != nil { + return nil, nil, fmt.Errorf("unable to get ao/aocs tables %v", err) //fix + } + ylogger.Zero.Debug().Str("database name", db.name).Msg("recieved database") + conn, err := connectToDatabase(port, db.name) + if err != nil { + return nil, nil, err + } + defer conn.Close() //error + ylogger.Zero.Debug().Msg("connected to database") + + rows, err := conn.Query(`SELECT reloid, relfileoid, expire_lsn, fqnmd5 FROM yezzey.yezzey_expire_index WHERE expire_lsn != '0/0';`) + if err != nil { + return nil, nil, fmt.Errorf("unable to get ao/aocs tables %v", err) //fix + } + defer rows.Close() + ylogger.Zero.Debug().Msg("executed select") + + c := make(map[string]uint64, 0) + for rows.Next() { + row := Ei{} + if err := rows.Scan(&row.reloid, &row.relfileoid, &row.expireLsn, &row.fqnmd5); err != nil { + return nil, nil, fmt.Errorf("unable to parse query output %v", err) + } + + lsn, err := pgx.ParseLSN(row.expireLsn) + if err != nil { + return nil, nil, fmt.Errorf("unable to parse query output %v", err) + } + + ylogger.Zero.Debug().Str("file", fmt.Sprintf("%d_%d_%s_%d_", db.tablespace, db.oid, row.fqnmd5, row.relfileoid)).Msg("added file to ei") + c[fmt.Sprintf("%d_%d_%s_%d_", db.tablespace, db.oid, row.fqnmd5, row.relfileoid)] = lsn + } + ylogger.Zero.Debug().Msg("read 1") + + rows2, err := conn.Query(`SELECT x_path FROM yezzey.yezzey_virtual_index;`) + if err != nil { + return nil, nil, fmt.Errorf("unable to get ao/aocs tables %v", err) //fix + } + defer rows2.Close() + ylogger.Zero.Debug().Msg("read 2") + + c2 := make(map[string]bool, 0) + for rows2.Next() { + xpath := "" + if err := rows2.Scan(&xpath); err != nil { + return nil, nil, fmt.Errorf("unable to parse query output %v", err) + } + p1 := strings.Split(xpath, "/") + p2 := p1[len(p1)-1] + p3 := strings.Split(p2, "_") + if len(p3) >= 4 { + p2 = fmt.Sprintf("%s_%s_%s_%s_", p3[0], p3[1], p3[2], p3[3]) + } + c2[p2] = true + ylogger.Zero.Debug().Str("file", p2).Msg("added") + } + ylogger.Zero.Debug().Msg("read 3") + + return c2, c, err +} + +func getDatabase(port int) (DB, error) { + conn, err := connectToDatabase(port, "postgres") + if err != nil { + return DB{}, err + } + defer conn.Close() //error + ylogger.Zero.Debug().Msg("connected to db") + rows, err := conn.Query(`SELECT dattablespace, oid, datname FROM pg_database WHERE datallowconn;`) + if err != nil { + return DB{}, err + } + defer rows.Close() + ylogger.Zero.Debug().Msg("recieved db list") + + for rows.Next() { + row := DB{} + ylogger.Zero.Debug().Msg("cycle 1") + if err := rows.Scan(&row.tablespace, &row.oid, &row.name); err != nil { + return DB{}, err + } + ylogger.Zero.Debug().Msg("cycle 2") + ylogger.Zero.Debug().Str("db", row.name).Int("db", int(row.oid)).Int("db", int(row.tablespace)).Msg("database") + if row.name == "postgres" { + continue + } + + ylogger.Zero.Debug().Str("db", row.name).Msg("check database") + connDb, err := connectToDatabase(port, row.name) + if err != nil { + return DB{}, err + } + defer connDb.Close() //error + ylogger.Zero.Debug().Msg("cycle 3") + + rowsdb, err := connDb.Query(`SELECT exists(SELECT * FROM information_schema.schemata WHERE schema_name='yezzey');`) + if err != nil { + return DB{}, err + } + defer rowsdb.Close() + ylogger.Zero.Debug().Msg("cycle 4") + var ans bool + rowsdb.Next() + err = rowsdb.Scan(&ans) + if err != nil { + ylogger.Zero.Error().AnErr("error", err).Msg("error during yezzey check") + return DB{}, err + } + ylogger.Zero.Debug().Bool("result", ans).Msg("find yezzey schema") + if ans { + ylogger.Zero.Debug().Str("db", row.name).Msg("found yezzey schema in database") + ylogger.Zero.Debug().Int("db", int(row.oid)).Int("db", int(row.tablespace)).Msg("found yezzey schema in database") + return row, nil + } + + ylogger.Zero.Debug().Str("db", row.name).Msg("no yezzey schema in database") + } + return DB{}, fmt.Errorf("no yezzey schema across databases") +} + +func connectToDatabase(port int, database string) (*pgx.Conn, error) { + config, err := pgx.ParseEnvLibpq() + if err != nil { + return nil, errors.Wrap(err, "Connect: unable to read environment variables") + } + + config.Port = uint16(port) + config.Database = database + + config.RuntimeParams["gp_role"] = "utility" + conn, err := pgx.Connect(config) + if err != nil { + config.RuntimeParams["gp_session_role"] = "utility" + conn, err = pgx.Connect(config) + if err != nil { + fmt.Printf("error in connection %v", err) // delete this + return nil, err + } + } + return conn, nil +} diff --git a/pkg/message/delete_message.go b/pkg/message/delete_message.go index ccab831..599ba1c 100644 --- a/pkg/message/delete_message.go +++ b/pkg/message/delete_message.go @@ -5,15 +5,23 @@ import ( "encoding/binary" ) -type DeleteMessage struct { - Name string +type DeleteMessage struct { //seg port + Name string + Port int + Segnum int + Confirm bool + Garbage bool } var _ ProtoMessage = &DeleteMessage{} -func NewDeleteMessage(name string) *DeleteMessage { +func NewDeleteMessage(name string, port int, seg int, confirm bool, garbage bool) *DeleteMessage { return &DeleteMessage{ - Name: name, + Name: name, + Port: port, + Segnum: seg, + Confirm: confirm, + Garbage: garbage, } } @@ -25,17 +33,40 @@ func (c *DeleteMessage) Encode() []byte { 0, } + if c.Confirm { + bt[1] = 1 + } + if c.Garbage { + bt[2] = 1 + } + bt = append(bt, []byte(c.Name)...) bt = append(bt, 0) - ln := len(bt) + 8 + p := make([]byte, 8) + binary.BigEndian.PutUint64(p, uint64(c.Port)) + bt = append(bt, p...) + + p = make([]byte, 8) + binary.BigEndian.PutUint64(p, uint64(c.Segnum)) + bt = append(bt, p...) + + ln := len(bt) + 8 bs := make([]byte, 8) binary.BigEndian.PutUint64(bs, uint64(ln)) return append(bs, bt...) } func (c *DeleteMessage) Decode(body []byte) { + if body[1] == 1 { + c.Confirm = true + } + if body[2] == 1 { + c.Garbage = true + } c.Name = c.GetDeleteName(body[4:]) + c.Port = int(binary.BigEndian.Uint64(body[len(body)-16 : len(body)-8])) + c.Segnum = int(binary.BigEndian.Uint64(body[len(body)-8:])) } func (c *DeleteMessage) GetDeleteName(b []byte) string { diff --git a/pkg/message/message_test.go b/pkg/message/message_test.go index 19ea27b..8cb8734 100644 --- a/pkg/message/message_test.go +++ b/pkg/message/message_test.go @@ -181,3 +181,21 @@ func TestCopyMsg(t *testing.T) { assert.True(msg2.Decrypt) assert.True(msg2.Encrypt) } + +func TestDeleteMsg(t *testing.T) { + assert := assert.New(t) + + msg := message.NewDeleteMessage("myname/mynextname", 5432, 42, true, true) + body := msg.Encode() + + assert.Equal(body[8], byte(message.MessageTypeDelete)) + + msg2 := message.DeleteMessage{} + msg2.Decode(body[8:]) + + assert.Equal("myname/mynextname", msg2.Name) + assert.Equal(5432, msg2.Port) + assert.Equal(42, msg2.Segnum) + assert.True(msg2.Confirm) + assert.True(msg2.Garbage) +} diff --git a/pkg/mock/backups.go b/pkg/mock/backups.go new file mode 100644 index 0000000..530bb1b --- /dev/null +++ b/pkg/mock/backups.go @@ -0,0 +1,49 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: pkg/backups/backups.go + +// Package mock is a generated GoMock package. +package mock + +import ( + reflect "reflect" + + gomock "github.com/golang/mock/gomock" +) + +// MockBackupInterractor is a mock of BackupInterractor interface. +type MockBackupInterractor struct { + ctrl *gomock.Controller + recorder *MockBackupInterractorMockRecorder +} + +// MockBackupInterractorMockRecorder is the mock recorder for MockBackupInterractor. +type MockBackupInterractorMockRecorder struct { + mock *MockBackupInterractor +} + +// NewMockBackupInterractor creates a new mock instance. +func NewMockBackupInterractor(ctrl *gomock.Controller) *MockBackupInterractor { + mock := &MockBackupInterractor{ctrl: ctrl} + mock.recorder = &MockBackupInterractorMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockBackupInterractor) EXPECT() *MockBackupInterractorMockRecorder { + return m.recorder +} + +// GetFirstLSN mocks base method. +func (m *MockBackupInterractor) GetFirstLSN(arg0 int) (uint64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetFirstLSN", arg0) + ret0, _ := ret[0].(uint64) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetFirstLSN indicates an expected call of GetFirstLSN. +func (mr *MockBackupInterractorMockRecorder) GetFirstLSN(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetFirstLSN", reflect.TypeOf((*MockBackupInterractor)(nil).GetFirstLSN), arg0) +} diff --git a/pkg/mock/database.go b/pkg/mock/database.go new file mode 100644 index 0000000..d04c573 --- /dev/null +++ b/pkg/mock/database.go @@ -0,0 +1,50 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: pkg/database/database.go + +// Package mock is a generated GoMock package. +package mock + +import ( + reflect "reflect" + + gomock "github.com/golang/mock/gomock" +) + +// MockDatabaseInterractor is a mock of DatabaseInterractor interface. +type MockDatabaseInterractor struct { + ctrl *gomock.Controller + recorder *MockDatabaseInterractorMockRecorder +} + +// MockDatabaseInterractorMockRecorder is the mock recorder for MockDatabaseInterractor. +type MockDatabaseInterractorMockRecorder struct { + mock *MockDatabaseInterractor +} + +// NewMockDatabaseInterractor creates a new mock instance. +func NewMockDatabaseInterractor(ctrl *gomock.Controller) *MockDatabaseInterractor { + mock := &MockDatabaseInterractor{ctrl: ctrl} + mock.recorder = &MockDatabaseInterractorMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockDatabaseInterractor) EXPECT() *MockDatabaseInterractorMockRecorder { + return m.recorder +} + +// GetVirtualExpireIndexes mocks base method. +func (m *MockDatabaseInterractor) GetVirtualExpireIndexes(arg0 int) (map[string]bool, map[string]uint64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetVirtualExpireIndexes", arg0) + ret0, _ := ret[0].(map[string]bool) + ret1, _ := ret[1].(map[string]uint64) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// GetVirtualExpireIndexes indicates an expected call of GetVirtualExpireIndexes. +func (mr *MockDatabaseInterractorMockRecorder) GetVirtualExpireIndexes(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetVirtualExpireIndexes", reflect.TypeOf((*MockDatabaseInterractor)(nil).GetVirtualExpireIndexes), arg0) +} diff --git a/pkg/mock/storage.go b/pkg/mock/storage.go new file mode 100644 index 0000000..a733f0d --- /dev/null +++ b/pkg/mock/storage.go @@ -0,0 +1,300 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: pkg/storage/storage.go + +// Package mock is a generated GoMock package. +package mock + +import ( + io "io" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + storage "github.com/yezzey-gp/yproxy/pkg/storage" +) + +// MockStorageReader is a mock of StorageReader interface. +type MockStorageReader struct { + ctrl *gomock.Controller + recorder *MockStorageReaderMockRecorder +} + +// MockStorageReaderMockRecorder is the mock recorder for MockStorageReader. +type MockStorageReaderMockRecorder struct { + mock *MockStorageReader +} + +// NewMockStorageReader creates a new mock instance. +func NewMockStorageReader(ctrl *gomock.Controller) *MockStorageReader { + mock := &MockStorageReader{ctrl: ctrl} + mock.recorder = &MockStorageReaderMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockStorageReader) EXPECT() *MockStorageReaderMockRecorder { + return m.recorder +} + +// CatFileFromStorage mocks base method. +func (m *MockStorageReader) CatFileFromStorage(name string, offset int64) (io.ReadCloser, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CatFileFromStorage", name, offset) + ret0, _ := ret[0].(io.ReadCloser) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CatFileFromStorage indicates an expected call of CatFileFromStorage. +func (mr *MockStorageReaderMockRecorder) CatFileFromStorage(name, offset interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CatFileFromStorage", reflect.TypeOf((*MockStorageReader)(nil).CatFileFromStorage), name, offset) +} + +// MockStorageWriter is a mock of StorageWriter interface. +type MockStorageWriter struct { + ctrl *gomock.Controller + recorder *MockStorageWriterMockRecorder +} + +// MockStorageWriterMockRecorder is the mock recorder for MockStorageWriter. +type MockStorageWriterMockRecorder struct { + mock *MockStorageWriter +} + +// NewMockStorageWriter creates a new mock instance. +func NewMockStorageWriter(ctrl *gomock.Controller) *MockStorageWriter { + mock := &MockStorageWriter{ctrl: ctrl} + mock.recorder = &MockStorageWriterMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockStorageWriter) EXPECT() *MockStorageWriterMockRecorder { + return m.recorder +} + +// PatchFile mocks base method. +func (m *MockStorageWriter) PatchFile(name string, r io.ReadSeeker, startOffset int64) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PatchFile", name, r, startOffset) + ret0, _ := ret[0].(error) + return ret0 +} + +// PatchFile indicates an expected call of PatchFile. +func (mr *MockStorageWriterMockRecorder) PatchFile(name, r, startOffset interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PatchFile", reflect.TypeOf((*MockStorageWriter)(nil).PatchFile), name, r, startOffset) +} + +// PutFileToDest mocks base method. +func (m *MockStorageWriter) PutFileToDest(name string, r io.Reader) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PutFileToDest", name, r) + ret0, _ := ret[0].(error) + return ret0 +} + +// PutFileToDest indicates an expected call of PutFileToDest. +func (mr *MockStorageWriterMockRecorder) PutFileToDest(name, r interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutFileToDest", reflect.TypeOf((*MockStorageWriter)(nil).PutFileToDest), name, r) +} + +// MockStorageLister is a mock of StorageLister interface. +type MockStorageLister struct { + ctrl *gomock.Controller + recorder *MockStorageListerMockRecorder +} + +// MockStorageListerMockRecorder is the mock recorder for MockStorageLister. +type MockStorageListerMockRecorder struct { + mock *MockStorageLister +} + +// NewMockStorageLister creates a new mock instance. +func NewMockStorageLister(ctrl *gomock.Controller) *MockStorageLister { + mock := &MockStorageLister{ctrl: ctrl} + mock.recorder = &MockStorageListerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockStorageLister) EXPECT() *MockStorageListerMockRecorder { + return m.recorder +} + +// ListPath mocks base method. +func (m *MockStorageLister) ListPath(prefix string) ([]*storage.ObjectInfo, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ListPath", prefix) + ret0, _ := ret[0].([]*storage.ObjectInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListPath indicates an expected call of ListPath. +func (mr *MockStorageListerMockRecorder) ListPath(prefix interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListPath", reflect.TypeOf((*MockStorageLister)(nil).ListPath), prefix) +} + +// MockStorageMover is a mock of StorageMover interface. +type MockStorageMover struct { + ctrl *gomock.Controller + recorder *MockStorageMoverMockRecorder +} + +// MockStorageMoverMockRecorder is the mock recorder for MockStorageMover. +type MockStorageMoverMockRecorder struct { + mock *MockStorageMover +} + +// NewMockStorageMover creates a new mock instance. +func NewMockStorageMover(ctrl *gomock.Controller) *MockStorageMover { + mock := &MockStorageMover{ctrl: ctrl} + mock.recorder = &MockStorageMoverMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockStorageMover) EXPECT() *MockStorageMoverMockRecorder { + return m.recorder +} + +// DeleteObject mocks base method. +func (m *MockStorageMover) DeleteObject(key string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteObject", key) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteObject indicates an expected call of DeleteObject. +func (mr *MockStorageMoverMockRecorder) DeleteObject(key interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteObject", reflect.TypeOf((*MockStorageMover)(nil).DeleteObject), key) +} + +// MoveObject mocks base method. +func (m *MockStorageMover) MoveObject(from, to string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "MoveObject", from, to) + ret0, _ := ret[0].(error) + return ret0 +} + +// MoveObject indicates an expected call of MoveObject. +func (mr *MockStorageMoverMockRecorder) MoveObject(from, to interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MoveObject", reflect.TypeOf((*MockStorageMover)(nil).MoveObject), from, to) +} + +// MockStorageInteractor is a mock of StorageInteractor interface. +type MockStorageInteractor struct { + ctrl *gomock.Controller + recorder *MockStorageInteractorMockRecorder +} + +// MockStorageInteractorMockRecorder is the mock recorder for MockStorageInteractor. +type MockStorageInteractorMockRecorder struct { + mock *MockStorageInteractor +} + +// NewMockStorageInteractor creates a new mock instance. +func NewMockStorageInteractor(ctrl *gomock.Controller) *MockStorageInteractor { + mock := &MockStorageInteractor{ctrl: ctrl} + mock.recorder = &MockStorageInteractorMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockStorageInteractor) EXPECT() *MockStorageInteractorMockRecorder { + return m.recorder +} + +// CatFileFromStorage mocks base method. +func (m *MockStorageInteractor) CatFileFromStorage(name string, offset int64) (io.ReadCloser, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CatFileFromStorage", name, offset) + ret0, _ := ret[0].(io.ReadCloser) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CatFileFromStorage indicates an expected call of CatFileFromStorage. +func (mr *MockStorageInteractorMockRecorder) CatFileFromStorage(name, offset interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CatFileFromStorage", reflect.TypeOf((*MockStorageInteractor)(nil).CatFileFromStorage), name, offset) +} + +// DeleteObject mocks base method. +func (m *MockStorageInteractor) DeleteObject(key string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteObject", key) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteObject indicates an expected call of DeleteObject. +func (mr *MockStorageInteractorMockRecorder) DeleteObject(key interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteObject", reflect.TypeOf((*MockStorageInteractor)(nil).DeleteObject), key) +} + +// ListPath mocks base method. +func (m *MockStorageInteractor) ListPath(prefix string) ([]*storage.ObjectInfo, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ListPath", prefix) + ret0, _ := ret[0].([]*storage.ObjectInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListPath indicates an expected call of ListPath. +func (mr *MockStorageInteractorMockRecorder) ListPath(prefix interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListPath", reflect.TypeOf((*MockStorageInteractor)(nil).ListPath), prefix) +} + +// MoveObject mocks base method. +func (m *MockStorageInteractor) MoveObject(from, to string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "MoveObject", from, to) + ret0, _ := ret[0].(error) + return ret0 +} + +// MoveObject indicates an expected call of MoveObject. +func (mr *MockStorageInteractorMockRecorder) MoveObject(from, to interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MoveObject", reflect.TypeOf((*MockStorageInteractor)(nil).MoveObject), from, to) +} + +// PatchFile mocks base method. +func (m *MockStorageInteractor) PatchFile(name string, r io.ReadSeeker, startOffset int64) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PatchFile", name, r, startOffset) + ret0, _ := ret[0].(error) + return ret0 +} + +// PatchFile indicates an expected call of PatchFile. +func (mr *MockStorageInteractorMockRecorder) PatchFile(name, r, startOffset interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PatchFile", reflect.TypeOf((*MockStorageInteractor)(nil).PatchFile), name, r, startOffset) +} + +// PutFileToDest mocks base method. +func (m *MockStorageInteractor) PutFileToDest(name string, r io.Reader) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PutFileToDest", name, r) + ret0, _ := ret[0].(error) + return ret0 +} + +// PutFileToDest indicates an expected call of PutFileToDest. +func (mr *MockStorageInteractorMockRecorder) PutFileToDest(name, r interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutFileToDest", reflect.TypeOf((*MockStorageInteractor)(nil).PutFileToDest), name, r) +} diff --git a/pkg/proc/delete_handler.go b/pkg/proc/delete_handler.go new file mode 100644 index 0000000..cb0174e --- /dev/null +++ b/pkg/proc/delete_handler.go @@ -0,0 +1,123 @@ +package proc + +import ( + "fmt" + "strings" + + "github.com/pkg/errors" + "github.com/yezzey-gp/yproxy/pkg/backups" + "github.com/yezzey-gp/yproxy/pkg/database" + "github.com/yezzey-gp/yproxy/pkg/message" + "github.com/yezzey-gp/yproxy/pkg/storage" + "github.com/yezzey-gp/yproxy/pkg/ylogger" +) + +//go:generate mockgen -destination=../../../test/mocks/mock_object.go -package mocks -build_flags -mod=readonly github.com/wal-g/wal-g/pkg/storages/storage Object +type DeleteHandler interface { + HandleDeleteGarbage(message.DeleteMessage) error + HandleDeleteFile(message.DeleteMessage) error +} + +type BasicDeleteHandler struct { + BackupInterractor backups.BackupInterractor + DbInterractor database.DatabaseInterractor + StorageInterractor storage.StorageInteractor +} + +func (dh *BasicDeleteHandler) HandleDeleteGarbage(msg message.DeleteMessage) error { + fileList, err := dh.ListGarbageFiles(msg) + if err != nil { + return errors.Wrap(err, "failed to delete file") + } + + if !msg.Confirm { //do not delete files if no confirmation flag provided + return nil + } + + var failed []string + retryCount := 0 + for len(fileList) > 0 && retryCount < 10 { + retryCount++ + for i := 0; i < len(fileList); i++ { + filePathParts := strings.Split(fileList[i], "/") + err = dh.StorageInterractor.MoveObject(fileList[i], fmt.Sprintf("segments_005/seg%d/basebackups_005/yezzey/trash/%s", msg.Segnum, filePathParts[len(filePathParts)-1])) + if err != nil { + ylogger.Zero.Warn().AnErr("err", err).Str("file", fileList[i]).Msg("failed to move file") + failed = append(failed, fileList[i]) + } + } + fileList = failed + failed = make([]string, 0) + } + + if len(fileList) > 0 { + ylogger.Zero.Error().Int("failed files count", len(fileList)).Msg("some files were not moved") + ylogger.Zero.Error().Any("failed files", fileList).Msg("failed to move some files") + return errors.Wrap(err, "failed to move some files") + } + + return nil +} + +func (dh *BasicDeleteHandler) HandleDeleteFile(msg message.DeleteMessage) error { + err := dh.StorageInterractor.DeleteObject(msg.Name) + if err != nil { + ylogger.Zero.Error().AnErr("err", err).Msg("failed to delete file") + return errors.Wrap(err, "failed to delete file") + } + return nil +} + +func (dh *BasicDeleteHandler) ListGarbageFiles(msg message.DeleteMessage) ([]string, error) { + //get firsr backup lsn + firstBackupLSN, err := dh.BackupInterractor.GetFirstLSN(msg.Segnum) + if err != nil { + ylogger.Zero.Error().AnErr("err", err).Msg("failed to get first lsn") //return or just assume there are no backups? + } + ylogger.Zero.Info().Uint64("lsn", firstBackupLSN).Msg("first backup LSN") + + //list files in storage + ylogger.Zero.Info().Str("path", msg.Name).Msg("going to list path") + objectMetas, err := dh.StorageInterractor.ListPath(msg.Name) + if err != nil { + return nil, errors.Wrap(err, "could not list objects") + } + ylogger.Zero.Info().Int("amount", len(objectMetas)).Msg("objects count") + + vi, ei, err := dh.DbInterractor.GetVirtualExpireIndexes(msg.Port) + if err != nil { + ylogger.Zero.Error().AnErr("err", err).Msg("failed to get indexes") + return nil, errors.Wrap(err, "could not get virtual and expire indexes") + } + ylogger.Zero.Info().Msg("recieved virtual index and expire index") + ylogger.Zero.Debug().Int("virtual", len(vi)).Msg("vi count") + ylogger.Zero.Debug().Int("expire", len(ei)).Msg("ei count") + + filesToDelete := make([]string, 0) + for i := 0; i < len(objectMetas); i++ { + reworkedName := ReworkFileName(objectMetas[i].Path) + lsn, ok := ei[reworkedName] + ylogger.Zero.Debug().Uint64("lsn", lsn).Uint64("backup lsn", firstBackupLSN).Msg("comparing lsn") + if !vi[reworkedName] && (lsn < firstBackupLSN || !ok) { + ylogger.Zero.Debug().Str("file", objectMetas[i].Path). + Bool("file in expire index", ok). + Bool("lsn is less than in first backup", lsn < firstBackupLSN). + Msg("file will be deleted") + filesToDelete = append(filesToDelete, objectMetas[i].Path) + } + } + + ylogger.Zero.Info().Int("amount", len(filesToDelete)).Msg("files will be deleted") + + return filesToDelete, nil +} + +func ReworkFileName(str string) string { + p1 := strings.Split(str, "/") + p2 := p1[len(p1)-1] + p3 := strings.Split(p2, "_") + if len(p3) >= 4 { + p2 = fmt.Sprintf("%s_%s_%s_%s_", p3[0], p3[1], p3[2], p3[3]) + } + return p2 +} diff --git a/pkg/proc/delete_handler_test.go b/pkg/proc/delete_handler_test.go new file mode 100644 index 0000000..a37995d --- /dev/null +++ b/pkg/proc/delete_handler_test.go @@ -0,0 +1,101 @@ +package proc_test + +import ( + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/yezzey-gp/yproxy/pkg/message" + mock "github.com/yezzey-gp/yproxy/pkg/mock" + "github.com/yezzey-gp/yproxy/pkg/proc" + "github.com/yezzey-gp/yproxy/pkg/storage" +) + +func TestReworkingName(t *testing.T) { + type TestCase struct { + input string + expected string + } + + testCases := []TestCase{ + { + input: "/segments_005/seg1/basebackups_005/yezzey/1663_16530_a4c5ad8305b83f07200b020694c36563_17660_1__DY_1_xlog_19649822496", + expected: "1663_16530_a4c5ad8305b83f07200b020694c36563_17660_", + }, + { + input: "1663_16530_a4c5ad8305b83f07200b020694c36563_17660_1__DY_1_xlog_19649822496", + expected: "1663_16530_a4c5ad8305b83f07200b020694c36563_17660_", + }, + { + input: "seg1/basebackups_005/yezzey/1663_16530_a4c5ad8305b83f07200b020694c36563_17660_1__DY_1_xlog_19649822496", + expected: "1663_16530_a4c5ad8305b83f07200b020694c36563_17660_", + }, + { + input: "1663_16530_a4c5ad8305b83f07200b020694c36563", + expected: "1663_16530_a4c5ad8305b83f07200b020694c36563", + }, + { + input: "1663___a4c5ad8305b83f07200b020694c36563___", + expected: "1663___a4c5ad8305b83f07200b020694c36563_", + }, + { + input: "file", + expected: "file", + }, + } + + for _, testCase := range testCases { + ans := proc.ReworkFileName(testCase.input) + assert.Equal(t, testCase.expected, ans) + } +} + +func TestFilesToDeletion(t *testing.T) { + ctrl := gomock.NewController(t) + + msg := message.DeleteMessage{ + Name: "path", + Port: 6000, + Segnum: 0, + Confirm: false, + } + + filesInStorage := []*storage.ObjectInfo{ + {Path: "1663_16530_not-deleted_18002_"}, + {Path: "1663_16530_deleted-after-backup_18002_"}, + {Path: "1663_16530_deleted-when-backup-start_18002_"}, + {Path: "1663_16530_deleted-before-backup_18002_"}, + {Path: "some_trash"}, + } + storage := mock.NewMockStorageInteractor(ctrl) + storage.EXPECT().ListPath(msg.Name).Return(filesInStorage, nil) + + backup := mock.NewMockBackupInterractor(ctrl) + backup.EXPECT().GetFirstLSN(msg.Segnum).Return(uint64(1337), nil) + + vi := map[string]bool{ + "1663_16530_not-deleted_18002_": true, + "1663_16530_deleted-after-backup_18002_": true, + "1663_16530_deleted-when-backup-start_18002_": true, + } + ei := map[string]uint64{ + "1663_16530_deleted-after-backup_18002_": uint64(1400), + "1663_16530_deleted-when-backup-start_18002_": uint64(1337), + "1663_16530_deleted-before-backup_18002_": uint64(1300), + } + database := mock.NewMockDatabaseInterractor(ctrl) + database.EXPECT().GetVirtualExpireIndexes(msg.Port).Return(vi, ei, nil) + + handler := proc.BasicDeleteHandler{ + StorageInterractor: storage, + DbInterractor: database, + BackupInterractor: backup, + } + + list, err := handler.ListGarbageFiles(msg) + + assert.NoError(t, err) + assert.Equal(t, 2, len(list)) + assert.Equal(t, "1663_16530_deleted-before-backup_18002_", list[0]) + assert.Equal(t, "some_trash", list[1]) +} diff --git a/pkg/proc/interaction.go b/pkg/proc/interaction.go index fbc4380..b3f81da 100644 --- a/pkg/proc/interaction.go +++ b/pkg/proc/interaction.go @@ -7,8 +7,10 @@ import ( "sync" "github.com/yezzey-gp/yproxy/config" + "github.com/yezzey-gp/yproxy/pkg/backups" "github.com/yezzey-gp/yproxy/pkg/client" "github.com/yezzey-gp/yproxy/pkg/crypt" + "github.com/yezzey-gp/yproxy/pkg/database" "github.com/yezzey-gp/yproxy/pkg/message" "github.com/yezzey-gp/yproxy/pkg/storage" "github.com/yezzey-gp/yproxy/pkg/ylogger" @@ -230,7 +232,6 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl retryCount++ for i := 0; i < len(objectMetas); i++ { path := strings.TrimPrefix(objectMetas[i].Path, instanceCnf.StorageCnf.StoragePrefix) - //get reader readerFromOldBucket := NewYRetryReader(NewRestartReader(oldStorage, path)) var fromReader io.Reader @@ -317,8 +318,48 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl fmt.Println("Copy finished successfully") ylogger.Zero.Info().Msg("Copy finished successfully") + case message.MessageTypeDelete: + //recieve message + msg := message.DeleteMessage{} + msg.Decode(body) + + ycl.SetExternalFilePath(msg.Name) + + dbInterractor := &database.DatabaseHandler{} + backupHandler := &backups.WalgBackupInterractor{} + + var dh DeleteHandler + dh = &BasicDeleteHandler{ + StorageInterractor: s, + DbInterractor: dbInterractor, + BackupInterractor: backupHandler, + } + + if msg.Garbage { + err = dh.HandleDeleteGarbage(msg) + if err != nil { + _ = ycl.ReplyError(err, "failed to finish operation") + return nil + } + } else { + err = dh.HandleDeleteFile(msg) + if err != nil { + _ = ycl.ReplyError(err, "failed to finish operation") + return nil + } + } + + if _, err = ycl.GetRW().Write(message.NewReadyForQueryMessage().Encode()); err != nil { + _ = ycl.ReplyError(err, "failed to upload") + return nil + } + ylogger.Zero.Info().Msg("Deleted garbage successfully") + if !msg.Confirm { + ylogger.Zero.Warn().Msg("It was a dry-run, nothing was deleted") + } + default: - ylogger.Zero.Error().Any("type", tp).Msg("what type is it") + ylogger.Zero.Error().Any("type", tp).Msg("unknown message type") _ = ycl.ReplyError(nil, "wrong request type") return nil diff --git a/pkg/storage/s3storage.go b/pkg/storage/s3storage.go index f190718..bbf6e2b 100644 --- a/pkg/storage/s3storage.go +++ b/pkg/storage/s3storage.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "path" + "strings" "github.com/yezzey-gp/aws-sdk-go/aws" "github.com/yezzey-gp/aws-sdk-go/service/s3" @@ -136,3 +137,29 @@ func (s *S3StorageInteractor) ListPath(prefix string) ([]*ObjectInfo, error) { } return metas, nil } + +func (s *S3StorageInteractor) DeleteObject(key string) error { + sess, err := s.pool.GetSession(context.TODO()) + if err != nil { + ylogger.Zero.Err(err).Msg("failed to acquire s3 session") + return err + } + ylogger.Zero.Debug().Msg("aquired session") + + if !strings.HasPrefix(key, s.cnf.StoragePrefix) { + key = path.Join(s.cnf.StoragePrefix, key) + } + + input2 := s3.DeleteObjectInput{ + Bucket: &s.cnf.StorageBucket, + Key: aws.String(key), + } + + _, err = sess.DeleteObject(&input2) + if err != nil { + ylogger.Zero.Err(err).Msg("failed to delete old object") + return err + } + ylogger.Zero.Debug().Msg("deleted object") + return nil +} diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index f12ea6f..6f02559 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -24,6 +24,8 @@ type StorageMover interface { MoveObject(from string, to string) error DeleteObject(key string) error } + +//go:generate mockgen -destination=pkg/mock/storage.go -package=mock type StorageInteractor interface { StorageReader StorageWriter