diff --git a/cmd/proxy.go b/cmd/proxy.go new file mode 100644 index 0000000..96d1949 --- /dev/null +++ b/cmd/proxy.go @@ -0,0 +1,171 @@ +package cmd + +import ( + "github.com/sloonz/uback/destinations" + "github.com/sloonz/uback/lib" + "github.com/sloonz/uback/sources" + + "io" + "net/rpc" + "os" + + "github.com/hashicorp/yamux" + "github.com/sirupsen/logrus" + "github.com/spf13/cobra" +) + +type Destination struct { + dataStream *yamux.Stream +} + +func (d *Destination) ListBackups(args *destinations.ListBackupsArgs, reply *[]uback.Backup) error { + var err error + + dstOpts := newOptionsBuilder(&args.Options, nil).WithDestination() + if dstOpts.Error != nil { + return dstOpts.Error + } + + *reply, err = dstOpts.Destination.ListBackups() + if err != nil { + return err + } + + return nil +} + +func (d *Destination) RemoveBackup(args *destinations.RemoveBackupArgs, reply *struct{}) error { + dstOpts := newOptionsBuilder(&args.Options, nil).WithDestination() + if dstOpts.Error != nil { + return dstOpts.Error + } + + return dstOpts.Destination.RemoveBackup(args.Backup) +} + +func (d *Destination) SendBackup(args *destinations.SendBackupArgs, reply *struct{}) error { + dstOpts := newOptionsBuilder(&args.Options, nil).WithDestination() + if dstOpts.Error != nil { + return dstOpts.Error + } + + if err := dstOpts.Destination.SendBackup(args.Backup, d.dataStream); err != nil { + return err + } + + return d.dataStream.Close() +} + +func (d *Destination) ReceiveBackup(args *destinations.ReceiveBackupArgs, reply *struct{}) error { + dstOpts := newOptionsBuilder(&args.Options, nil).WithDestination() + if dstOpts.Error != nil { + return dstOpts.Error + } + + r, err := dstOpts.Destination.ReceiveBackup(args.Backup) + if err != nil { + return err + } + + if _, err = io.Copy(d.dataStream, r); err != nil { + return err + } + + if err = r.Close(); err != nil { + return err + } + + return d.dataStream.Close() +} + +type Source struct { + dataStream *yamux.Stream + backup io.ReadCloser +} + +func (s *Source) ListSnapshots(args *sources.ListSnapshotsArgs, reply *[]uback.Snapshot) error { + var err error + + srcOpts := newOptionsBuilder(&args.Options, nil).WithSource() + if srcOpts.Error != nil { + return srcOpts.Error + } + + *reply, err = srcOpts.Source.ListSnapshots() + if err != nil { + return err + } + return nil +} + +func (s *Source) RemoveSnapshot(args *sources.RemoveSnapshotArgs, reply *struct{}) error { + srcOpts := newOptionsBuilder(&args.Options, nil).WithSource() + if srcOpts.Error != nil { + return srcOpts.Error + } + + return srcOpts.Source.RemoveSnapshot(args.Snapshot) +} + +func (s *Source) CreateBackup(args *sources.CreateBackupArgs, reply *uback.Backup) error { + var err error + + srcOpts := newOptionsBuilder(&args.Options, nil).WithSource() + if srcOpts.Error != nil { + return srcOpts.Error + } + + *reply, s.backup, err = srcOpts.Source.CreateBackup(args.Snapshot) + return err +} + +func (s *Source) TransmitBackup(args *struct{}, reply *struct{}) error { + if _, err := io.Copy(s.dataStream, s.backup); err != nil { + return err + } + + if err := s.backup.Close(); err != nil { + return err + } + + return s.dataStream.Close() +} + +var ( + cmdProxy = &cobra.Command{ + Use: "proxy", + Hidden: true, + Run: func(cmd *cobra.Command, args []string) { + rwc := uback.ReadWriteCloser{ + ReadCloser: os.Stdin, + WriteCloser: os.Stdout, + } + + session, err := yamux.Server(&rwc, nil) + if err != nil { + logrus.Fatalf("Failed to start proxy server: %v", err) + } + defer session.Close() + + rpcStream, err := session.AcceptStream() + if err != nil { + logrus.Fatalf("Failed to start proxy server: %v", err) + } + + dataStream, err := session.AcceptStream() + if err != nil { + logrus.Fatalf("Failed to start proxy server: %v", err) + } + + if err = rpc.Register(&Destination{dataStream: dataStream}); err != nil { + logrus.Fatalf("Failed to start proxy server: %v", err) + } + + if err = rpc.Register(&Source{dataStream: dataStream}); err != nil { + logrus.Fatalf("Failed to start proxy server: %v", err) + } + + rpc.ServeConn(rpcStream) + }, + } +) diff --git a/cmd/root.go b/cmd/root.go index 7686b57..2f07d3b 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -54,7 +54,7 @@ func init() { }) rootCmd.PersistentFlags().StringVarP(&presetsDir, "presets-dir", "p", "", "path to presets directory") - rootCmd.AddCommand(cmdPreset, cmdBackup, cmdKey, cmdContainer, cmdList, cmdPrune, cmdFetch, cmdRestore, cmdVersion) + rootCmd.AddCommand(cmdPreset, cmdBackup, cmdKey, cmdContainer, cmdList, cmdPrune, cmdFetch, cmdRestore, cmdVersion, cmdProxy) } func Execute() { diff --git a/destinations/new.go b/destinations/new.go index 5768dfa..b95860c 100644 --- a/destinations/new.go +++ b/destinations/new.go @@ -16,6 +16,8 @@ func New(options *uback.Options) (uback.Destination, error) { return newObjectStorageDestination(options) case "command": return newCommandDestination(options) + case "proxy": + return newProxyDestination(options) default: return nil, fmt.Errorf("invalid destination type %v", options.String["Type"]) } diff --git a/destinations/proxy.go b/destinations/proxy.go new file mode 100644 index 0000000..1a34bd5 --- /dev/null +++ b/destinations/proxy.go @@ -0,0 +1,155 @@ +package destinations + +import ( + "github.com/sloonz/uback/lib" + + "errors" + "fmt" + "io" + "sync" + + "github.com/sirupsen/logrus" +) + +var ( + ErrProxyCommandMissing = errors.New("proxy destination: missing command") + proxyLog = logrus.WithFields(logrus.Fields{ + "destination": "proxy", + }) +) + +type ListBackupsArgs struct { + uback.Options +} + +type RemoveBackupArgs struct { + uback.Options + uback.Backup +} + +type SendBackupArgs struct { + uback.Options + uback.Backup +} + +type ReceiveBackupArgs struct { + uback.Options + uback.Backup +} + +type proxyDestination struct { + options *uback.Options + command []string +} + +func newProxyDestination(options *uback.Options) (uback.Destination, error) { + command := options.GetCommand("Command", nil) + if len(command) == 0 { + return nil, ErrProxyCommandMissing + } + + return &proxyDestination{options: options, command: command}, nil +} + +func (d *proxyDestination) ListBackups() ([]uback.Backup, error) { + session, rpcClient, _, err := uback.OpenProxy(proxyLog, d.command) + if err != nil { + return nil, fmt.Errorf("Failed to open proxy session: %v", err) + } + defer uback.CloseProxy(session, rpcClient) //nolint: errcheck + + var backups []uback.Backup + err = rpcClient.Call("Destination.ListBackups", &ListBackupsArgs{Options: uback.ProxiedOptions(d.options)}, &backups) + if err != nil { + return nil, err + } + + err = uback.CloseProxy(session, rpcClient) + if err != nil { + return nil, err + } + + return backups, nil +} + +func (d *proxyDestination) RemoveBackup(backup uback.Backup) error { + session, rpcClient, _, err := uback.OpenProxy(proxyLog, d.command) + if err != nil { + return fmt.Errorf("Failed to open proxy session: %v", err) + } + defer uback.CloseProxy(session, rpcClient) //nolint: errcheck + + return rpcClient.Call("Destination.RemoveBackup", &RemoveBackupArgs{Options: uback.ProxiedOptions(d.options), Backup: backup}, nil) +} + +func (d *proxyDestination) SendBackup(backup uback.Backup, data io.Reader) error { + session, rpcClient, dataStream, err := uback.OpenProxy(proxyLog, d.command) + if err != nil { + return fmt.Errorf("Failed to open proxy session: %v", err) + } + defer uback.CloseProxy(session, rpcClient) //nolint: errcheck + + call := rpcClient.Go("Destination.SendBackup", &SendBackupArgs{Options: uback.ProxiedOptions(d.options), Backup: backup}, nil, nil) + + if _, err := io.Copy(dataStream, data); err != nil { + return err + } + + if err := dataStream.Close(); err != nil { + return err + } + + <-call.Done + + return call.Error +} + +func (d *proxyDestination) ReceiveBackup(backup uback.Backup) (io.ReadCloser, error) { + session, rpcClient, dataStream, err := uback.OpenProxy(proxyLog, d.command) + if err != nil { + return nil, fmt.Errorf("Failed to open proxy session: %v", err) + } + + pr, pw := io.Pipe() + call := rpcClient.Go("Destination.ReceiveBackup", &ReceiveBackupArgs{Options: uback.ProxiedOptions(d.options), Backup: backup}, nil, nil) + + ch := make(chan error, 2) + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + <-call.Done + ch <- call.Error + }() + + wg.Add(1) + go func() { + defer wg.Done() + + if _, err := io.Copy(pw, dataStream); err != nil { + ch <- err + return + } + + if err := dataStream.Close(); err != nil { + ch <- err + return + } + }() + + go func() { + defer uback.CloseProxy(session, rpcClient) //nolint: errcheck + wg.Wait() + close(ch) + for err := range ch { + if err != nil { + pw.CloseWithError(err) + return + } + } + pw.Close() + }() + + return pr, nil +} diff --git a/doc/proxy.md b/doc/proxy.md new file mode 100644 index 0000000..24000ab --- /dev/null +++ b/doc/proxy.md @@ -0,0 +1,46 @@ +# Proxying + +## Overview + +Sometimes, you may want to produce backup data on a remote host or store +backup data to a remote host while still keeping `uback` configuration +on a third host. For example, you may want to use a `btrfs` source +on a remote host to produce the backup but store it locally on a `fs` +destination, or use a `tar` source locally and use a `fs` destination +on another host. You may combine both requirements, and on host A +get a backup from a `btrfs` source from host B and store it in a `fs` +destination on host C. + +Proxying means using another `uback` process to provide the source and/or +the destination while doing a backup. The other `uback` process may run +on another user, or in a container, or a remote host. + +Note that encryption and compression is done on the local process (not +the remote one). Also, restoring with proxy is not supported ; you must +use a direct source. + +## Usage + +For both sources and destinations : + +1. Use `proxy` as a source or destination type. + +2. Spawn the other `uback` instance by setting the `command` option to +`uback proxy`. + +3. Specify the proxyfied `type` and/or `command` option by prefixying +it with `proxy-`. + +## Examples + +Proxy a custom destination using ssh : + +``` +type=proxy,command="ssh root@example.com uback proxy",proxy-type=command,proxy-command=uback-custom-dest +``` + +Proxy a `btrfs` source using sudo : + +``` +type=proxy,command="sudo uback proxy",proxy-type=btrfs +``` diff --git a/go.mod b/go.mod index 105ef06..f6dfb04 100644 --- a/go.mod +++ b/go.mod @@ -1,22 +1,40 @@ module github.com/sloonz/uback -go 1.16 +go 1.18 require ( filippo.io/age v1.1.1 - github.com/Masterminds/goutils v1.1.1 // indirect - github.com/Masterminds/semver v1.5.0 // indirect github.com/Masterminds/sprig v2.22.0+incompatible github.com/gobuffalo/flect v1.0.2 github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 + github.com/hashicorp/yamux v0.1.1 + github.com/klauspost/compress v1.17.3 + github.com/minio/minio-go/v7 v7.0.64 + github.com/sirupsen/logrus v1.9.3 + github.com/spf13/cobra v1.8.0 +) + +require ( + github.com/Masterminds/goutils v1.1.1 // indirect + github.com/Masterminds/semver v1.5.0 // indirect + github.com/dustin/go-humanize v1.0.1 // indirect github.com/google/uuid v1.4.0 // indirect github.com/huandu/xstrings v1.4.0 // indirect github.com/imdario/mergo v0.3.16 // indirect - github.com/klauspost/compress v1.17.3 + github.com/inconshreveable/mousetrap v1.1.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/cpuid/v2 v2.2.6 // indirect - github.com/minio/minio-go/v7 v7.0.64 + github.com/minio/md5-simd v1.1.2 // indirect + github.com/minio/sha256-simd v1.0.1 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect - github.com/sirupsen/logrus v1.9.3 - github.com/spf13/cobra v1.8.0 + github.com/mitchellh/reflectwalk v1.0.2 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/rs/xid v1.5.0 // indirect + github.com/spf13/pflag v1.0.5 // indirect + golang.org/x/crypto v0.16.0 // indirect golang.org/x/net v0.19.0 // indirect + golang.org/x/sys v0.15.0 // indirect + golang.org/x/text v0.14.0 // indirect + gopkg.in/ini.v1 v1.67.0 // indirect ) diff --git a/go.sum b/go.sum index 6dcd9f4..c0d75ed 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,5 @@ filippo.io/age v1.1.1 h1:pIpO7l151hCnQ4BdyBujnGP2YlUo0uj6sAVNHGBvXHg= filippo.io/age v1.1.1/go.mod h1:l03SrzDUrBkdBx8+IILdnn2KZysqQdbEBUQ4p3sqEQE= -filippo.io/edwards25519 v1.0.0/go.mod h1:N1IkdkCkiLB6tki+MYJoSx2JTY9NUlxZE7eHn5EwJns= github.com/Masterminds/goutils v1.1.1 h1:5nUrii3FMTL5diU80unEVvNevw1nH4+ZV4DSLVJLSYI= github.com/Masterminds/goutils v1.1.1/go.mod h1:8cTjp+g8YejhMuvIA5y2vz3BpJxksy863GQaJW2MFNU= github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww= @@ -18,9 +17,10 @@ github.com/gobuffalo/flect v1.0.2/go.mod h1:A5msMlrHtLqh9umBSnvabjsMrCcCpAyzglnD github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4= github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ= -github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/yamux v0.1.1 h1:yrQxtgseBDrq9Y652vSRDvsKCJKOUD+GzTS4Y0Y8pvE= +github.com/hashicorp/yamux v0.1.1/go.mod h1:CtWFDAQgb7dxtzFs4tWbplKIe2jSi3+5vKbgIO0SLnQ= github.com/huandu/xstrings v1.4.0 h1:D17IlohoQq4UcpqD7fDk80P7l+lwAmlFaBHgOipl2FU= github.com/huandu/xstrings v1.4.0/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE= github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4= @@ -29,12 +29,9 @@ github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2 github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= -github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/compress v1.17.3 h1:qkRjuerhUU1EmXLYGkSH6EZL+vPSxIrYjLNAK4slzwA= github.com/klauspost/compress v1.17.3/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= -github.com/klauspost/cpuid/v2 v2.2.3/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY= -github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc= github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34= @@ -72,61 +69,16 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= -golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.4.0/go.mod h1:3quD/ATkf6oY+rnes5c3ExXTbLc8mueNue5/DoinL80= -golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY= golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= -golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= -golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= -golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= -golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= -golang.org/x/net v0.3.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE= -golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= -golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= -golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c= golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U= -golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.3.0/go.mod h1:q750SLmJuPmVoN1blW3UFBPREJfb1KmY3vwxfr+nFDA= -golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= -golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= -golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= -golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0= -golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= -golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= -golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= -golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= diff --git a/lib/proxy.go b/lib/proxy.go new file mode 100644 index 0000000..3295b85 --- /dev/null +++ b/lib/proxy.go @@ -0,0 +1,94 @@ +package uback + +import ( + "io" + "net/rpc" + "strings" + + "github.com/hashicorp/yamux" + "github.com/sirupsen/logrus" +) + +type ReadWriteCloser struct { + io.ReadCloser + io.WriteCloser +} + +func (rwc *ReadWriteCloser) Close() error { + if err := rwc.ReadCloser.Close(); err != nil { + _ = rwc.WriteCloser.Close() + return err + } + return rwc.WriteCloser.Close() +} + +func OpenProxy(logger *logrus.Entry, command []string) (*yamux.Session, *rpc.Client, *yamux.Stream, error) { + cmd := BuildCommand(command) + cmd.Stdout = nil + stdin, err := cmd.StdinPipe() + if err != nil { + return nil, nil, nil, err + } + stdout, err := cmd.StdoutPipe() + if err != nil { + return nil, nil, nil, err + } + + rwc := ReadWriteCloser{ + ReadCloser: stdout, + WriteCloser: stdin, + } + + err = StartCommand(logger, cmd) + if err != nil { + return nil, nil, nil, err + } + + session, err := yamux.Client(&rwc, nil) + if err != nil { + return nil, nil, nil, err + } + + rpcStream, err := session.OpenStream() + if err != nil { + return nil, nil, nil, err + } + + dataStream, err := session.OpenStream() + if err != nil { + return nil, nil, nil, err + } + + rpcClient := rpc.NewClient(rpcStream) + return session, rpcClient, dataStream, nil +} + +func CloseProxy(session *yamux.Session, rpcClient *rpc.Client) error { + err := rpcClient.Close() + if err != nil { + _ = session.Close() + return err + } + return session.Close() +} + +func ProxiedOptions(options *Options) Options { + opts := Options{ + String: make(map[string]string), + StrSlice: make(map[string][]string), + } + + for k, v := range options.String { + if k != "Proxy" && k != "Command" && k != "Type" { + opts.String[strings.TrimPrefix(k, "Proxy")] = v + } + } + + for k, v := range options.StrSlice { + if k != "Proxy" && k != "Command" && k != "Type" { + opts.StrSlice[strings.TrimPrefix(k, "Proxy")] = v + } + } + + return opts +} diff --git a/sources/new.go b/sources/new.go index 1083079..8dd9e34 100644 --- a/sources/new.go +++ b/sources/new.go @@ -21,6 +21,8 @@ func New(options *uback.Options) (src uback.Source, typ string, err error) { src, err = newMariaBackupSource(options) case "command": src, typ, err = newCommandSource(options) + case "proxy": + src, typ, err = newProxySource(options) default: return nil, "", fmt.Errorf("invalid source type %v", options.String["Type"]) } @@ -36,6 +38,9 @@ func NewForRestoration(options *uback.Options, typ string) (uback.Source, error) return newTarSourceForRestoration() case "mariabackup": return newMariaBackupSourceForRestoration(options) + case "proxy": + return nil, ErrProxyNoRestoration + // TODO: command not implemented yet ; have to implement it & unit tests default: if strings.HasPrefix(typ, "command:") { command, err := shlex.Split(typ[len("command:"):]) diff --git a/sources/proxy.go b/sources/proxy.go new file mode 100644 index 0000000..52b77dd --- /dev/null +++ b/sources/proxy.go @@ -0,0 +1,156 @@ +package sources + +import ( + "github.com/sloonz/uback/lib" + + "errors" + "fmt" + "io" + "sync" + + "github.com/sirupsen/logrus" +) + +var ( + ErrProxyCommandMissing = errors.New("proxy source: missing command") + ErrProxyMissingType = errors.New("proxy source: missing proxy-type") + ErrProxyNoRestoration = errors.New("proxy source: restoration not implemented") + proxyLog = logrus.WithFields(logrus.Fields{ + "source": "proxy", + }) +) + +type ListSnapshotsArgs struct { + uback.Options +} + +type RemoveSnapshotArgs struct { + uback.Options + uback.Snapshot +} + +type CreateBackupArgs struct { + uback.Options + *uback.Snapshot +} + +type proxySource struct { + options *uback.Options + command []string +} + +func newProxySource(options *uback.Options) (uback.Source, string, error) { + command := options.GetCommand("Command", nil) + if len(command) == 0 { + return nil, "", ErrProxyCommandMissing + } + + typ := "proxy" + prefix := "Proxy" + for typ == "proxy" { + var ok bool + typ, ok = options.String[prefix+"Type"] + if !ok { + return nil, "", ErrProxyMissingType + } else { + prefix += "Proxy" + } + } + + return &proxySource{options: options, command: command}, typ, nil +} + +// Part of uback.Source interface +func (s *proxySource) ListSnapshots() ([]uback.Snapshot, error) { + session, rpcClient, _, err := uback.OpenProxy(proxyLog, s.command) + if err != nil { + return nil, fmt.Errorf("Failed to open proxy session: %v", err) + } + defer uback.CloseProxy(session, rpcClient) //nolint: errcheck + + var snapshots []uback.Snapshot + err = rpcClient.Call("Source.ListSnapshots", &ListSnapshotsArgs{Options: uback.ProxiedOptions(s.options)}, &snapshots) + if err != nil { + return nil, err + } + + err = uback.CloseProxy(session, rpcClient) + if err != nil { + return nil, err + } + + return snapshots, nil +} + +// Part of uback.Source interface +func (s *proxySource) RemoveSnapshot(snapshot uback.Snapshot) error { + session, rpcClient, _, err := uback.OpenProxy(proxyLog, s.command) + if err != nil { + return fmt.Errorf("Failed to open proxy session: %v", err) + } + defer uback.CloseProxy(session, rpcClient) //nolint: errcheck + + return rpcClient.Call("Source.RemoveSnapshot", &RemoveSnapshotArgs{Options: uback.ProxiedOptions(s.options), Snapshot: snapshot}, nil) +} + +// Part of uback.Source interface +func (s *proxySource) CreateBackup(baseSnapshot *uback.Snapshot) (uback.Backup, io.ReadCloser, error) { + session, rpcClient, dataStream, err := uback.OpenProxy(proxyLog, s.command) + if err != nil { + return uback.Backup{}, nil, fmt.Errorf("Failed to open proxy session: %v", err) + } + + var backup uback.Backup + err = rpcClient.Call("Source.CreateBackup", &CreateBackupArgs{Options: uback.ProxiedOptions(s.options)}, &backup) + if err != nil { + return uback.Backup{}, nil, err + } + + pr, pw := io.Pipe() + call := rpcClient.Go("Source.TransmitBackup", struct{}{}, nil, nil) + + ch := make(chan error, 2) + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + <-call.Done + ch <- call.Error + }() + + wg.Add(1) + go func() { + defer wg.Done() + + if _, err := io.Copy(pw, dataStream); err != nil { + ch <- err + return + } + + if err := dataStream.Close(); err != nil { + ch <- err + return + } + }() + + go func() { + defer uback.CloseProxy(session, rpcClient) //nolint: errcheck + wg.Wait() + close(ch) + for err := range ch { + if err != nil { + pw.CloseWithError(err) + return + } + } + pw.Close() + }() + + return backup, pr, nil +} + +// Part of uback.Source interface +func (s *proxySource) RestoreBackup(targetDir string, backup uback.Backup, data io.Reader) error { + return ErrProxyNoRestoration +} diff --git a/tests/dest_proxy_tests.py b/tests/dest_proxy_tests.py new file mode 100644 index 0000000..380b9eb --- /dev/null +++ b/tests/dest_proxy_tests.py @@ -0,0 +1,9 @@ +from .common import * + +class DestProxyTests(unittest.TestCase, DestBaseTests): + def test_proxy_destination(self): + with tempfile.TemporaryDirectory() as d: + os.environ["PATH"] = ":".join((str(tests_path), os.environ["PATH"])) + source = f"type=tar,path={d}/source,key-file={d}/backup.pub,state-file={d}/state.json,snapshots-path={d}/snapshots,full-interval=weekly" + dest = f"id=test,type=proxy,proxy-type=fs,command={uback} proxy,path={d}/backups,@retention-policy=daily=3,key-file={d}/backup.key" + self._test_dest(d, source, dest) diff --git a/tests/src_proxy_tests.py b/tests/src_proxy_tests.py new file mode 100644 index 0000000..7e7e297 --- /dev/null +++ b/tests/src_proxy_tests.py @@ -0,0 +1,8 @@ +from .common import * + +class SrcProxyTests(unittest.TestCase, SrcBaseTests): + def test_proxy_source(self): + with tempfile.TemporaryDirectory() as d: + source = f"type=proxy,command={uback} proxy,proxy-type=tar,path={d}/source,key-file={d}/backup.pub,state-file={d}/state.json,snapshots-path={d}/snapshots,full-interval=weekly,proxy-command=tar --exclude=./c --exclude=./d" + dest = f"id=test,type=fs,path={d}/backups,@retention-policy=daily=3,key-file={d}/backup.key" + self._test_src(d, source, dest, test_ignore=True, test_delete=False)