From bfcaae7b55bc11bed9ca0d516dbeab6955f25688 Mon Sep 17 00:00:00 2001 From: margothequeen <71291229+margothequeen@users.noreply.github.com> Date: Fri, 9 Aug 2024 19:09:53 +0500 Subject: [PATCH] interconnection (#48) * added gool interconnection * gool interconnection rebasing * updated server dispatching * updated gitignore * removed useless files + modified yaml * fixed yaml --- .gitignore | 1 + cmd/client/main.go | 58 +++++++++++++++++++++++++++++++++++++ config/instance.go | 11 +++---- examples/yproxy.yaml | 3 +- go.mod | 9 ++++-- pkg/core/core.go | 33 +++++++++++++++++++++ pkg/message/gool_message.go | 52 +++++++++++++++++++++++++++++++++ pkg/message/message.go | 3 ++ pkg/proc/interaction.go | 31 ++++++++++++++++++++ 9 files changed, 192 insertions(+), 9 deletions(-) create mode 100644 pkg/message/gool_message.go diff --git a/.gitignore b/.gitignore index 099ca41..d1fa225 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ vendor devbin/* +yezzey.log diff --git a/cmd/client/main.go b/cmd/client/main.go index 0a65828..151da04 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -272,6 +272,62 @@ var listCmd = &cobra.Command{ RunE: Runner(listFunc), } +var goolCmd = &cobra.Command{ + Use: "gool", + Short: "gool", + RunE: func(cmd *cobra.Command, args []string) error { + + err := config.LoadInstanceConfig(cfgPath) + if err != nil { + ylogger.Zero.Debug().Err(err) + return err + } + + instanceCnf := config.InstanceConfig() + + con, err := net.Dial("unix", instanceCnf.InterconnectSocketPath) + + if err != nil { + ylogger.Zero.Debug().Err(err) + return err + } + + defer con.Close() + msg := message.NewGoolMessage(args[0]).Encode() + _, err = con.Write(msg) + if err != nil { + ylogger.Zero.Debug().Err(err) + return err + } + + ylogger.Zero.Debug().Bytes("msg", msg).Msg("constructed gool message") + + ycl := client.NewYClient(con) + r := proc.NewProtoReader(ycl) + + done := false + for { + if done { + break + } + tp, _, err := r.ReadPacket() + if err != nil { + return err + } + + switch tp { + case message.MessageTypeReadyForQuery: + ylogger.Zero.Debug().Bytes("msg", msg).Msg("got RFQ message") + done = true + default: + return fmt.Errorf("incorrect gool") + } + } + + return nil + }, +} + func init() { rootCmd.PersistentFlags().StringVarP(&cfgPath, "config", "c", "/etc/yproxy/yproxy.yaml", "path to yproxy config file") rootCmd.PersistentFlags().StringVarP(&logLevel, "log-level", "l", "", "log level") @@ -289,12 +345,14 @@ func init() { rootCmd.AddCommand(putCmd) rootCmd.AddCommand(listCmd) + rootCmd.AddCommand(goolCmd) deleteCmd.PersistentFlags().IntVarP(&segmentPort, "port", "p", 6000, "port that segment is listening on") deleteCmd.PersistentFlags().IntVarP(&segmentNum, "segnum", "s", 0, "logical number of a segment") deleteCmd.PersistentFlags().BoolVarP(&confirm, "confirm", "", false, "confirm deletion") deleteCmd.PersistentFlags().BoolVarP(&garbage, "garbage", "g", false, "delete garbage") rootCmd.AddCommand(deleteCmd) + } func main() { diff --git a/config/instance.go b/config/instance.go index f7083ab..0e8b0e7 100644 --- a/config/instance.go +++ b/config/instance.go @@ -17,11 +17,12 @@ type Instance struct { CryptoCnf Crypto `json:"crypto" toml:"crypto" yaml:"crypto"` - LogPath string `json:"log_path" toml:"log_path" yaml:"log_path"` - LogLevel string `json:"log_level" toml:"log_level" yaml:"log_level"` - SocketPath string `json:"socket_path" toml:"socket_path" yaml:"socket_path"` - StatPort int `json:"stat_port" toml:"stat_port" yaml:"stat_port"` - PsqlPort int `json:"psql_port" toml:"psql_port" yaml:"psql_port"` + LogPath string `json:"log_path" toml:"log_path" yaml:"log_path"` + LogLevel string `json:"log_level" toml:"log_level" yaml:"log_level"` + SocketPath string `json:"socket_path" toml:"socket_path" yaml:"socket_path"` + StatPort int `json:"stat_port" toml:"stat_port" yaml:"stat_port"` + PsqlPort int `json:"psql_port" toml:"psql_port" yaml:"psql_port"` + InterconnectSocketPath string `json:"interconnect_socket_path" toml:"interconnect_socket_path" yaml:"interconnect_socket_path"` SystemdNotificationsDebug bool `json:"sd_notifications_debug" toml:"sd_notifications_debug" yaml:"sd_notifications_debug"` systemdSocketPath string diff --git a/examples/yproxy.yaml b/examples/yproxy.yaml index b2302e6..8b6b891 100644 --- a/examples/yproxy.yaml +++ b/examples/yproxy.yaml @@ -1,7 +1,8 @@ socket_path: "/tmp/yezzey.sock" +interconnect_socket_path: "tmp/gool.sock" storage: storage_endpoint: "https://storage.yandexcloud.net" storage_prefix: "wal-e/6/" storage_bucket: "gpyezzey" - storage_region: "us-west-2" + storage_region: "us-west-2" \ No newline at end of file diff --git a/go.mod b/go.mod index d226e8b..02215d7 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/yezzey-gp/yproxy -go 1.21.4 +go 1.21 require ( github.com/BurntSushi/toml v1.3.2 @@ -27,6 +27,11 @@ require ( github.com/shopspring/decimal v1.4.0 // indirect ) +require ( + golang.org/x/crypto v0.17.0 // indirect + golang.org/x/sys v0.15.0 // indirect +) + require ( github.com/cloudflare/circl v1.3.3 // indirect github.com/davecgh/go-spew v1.1.1 // indirect @@ -38,7 +43,5 @@ require ( github.com/mattn/go-isatty v0.0.19 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/spf13/pflag v1.0.5 // indirect - golang.org/x/crypto v0.17.0 // indirect - golang.org/x/sys v0.15.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/pkg/core/core.go b/pkg/core/core.go index f629612..9306b74 100644 --- a/pkg/core/core.go +++ b/pkg/core/core.go @@ -13,6 +13,7 @@ import ( "github.com/yezzey-gp/yproxy/pkg/client" "github.com/yezzey-gp/yproxy/pkg/clientpool" "github.com/yezzey-gp/yproxy/pkg/crypt" + "github.com/yezzey-gp/yproxy/pkg/message" "github.com/yezzey-gp/yproxy/pkg/proc" "github.com/yezzey-gp/yproxy/pkg/sdnotifier" "github.com/yezzey-gp/yproxy/pkg/storage" @@ -55,6 +56,8 @@ func (i *Instance) Run(instanceCnf *config.Instance) error { go func() { defer os.Remove(instanceCnf.SocketPath) + + defer os.Remove(instanceCnf.InterconnectSocketPath) defer cancelCtx() for { @@ -145,6 +148,36 @@ func (i *Instance) Run(instanceCnf *config.Instance) error { return err } + iclistener, err := net.Listen("unix", instanceCnf.InterconnectSocketPath) + + ylogger.Zero.Debug().Msg("try to start interconnect socket listener") + if err != nil { + ylogger.Zero.Error().Err(err).Msg("failed to start interconnect socket listener") + return err + } + + i.DispatchServer(iclistener, func(clConn net.Conn) { + defer clConn.Close() + ycl := client.NewYClient(clConn) + r := proc.NewProtoReader(ycl) + + mt, _, err := r.ReadPacket() + + if err != nil { + ylogger.Zero.Error().Err(err).Msg("failed to accept interconnection") + } + + switch mt { + case message.MessageTypeGool: + msg := message.ReadyForQueryMessage{} + _, _ = ycl.GetRW().Write(msg.Encode()) + default: + ycl.ReplyError(fmt.Errorf("wrong message type"), "") + + } + ylogger.Zero.Debug().Msg("interconnection closed") + }) + notifier, err := sdnotifier.NewNotifier(instanceCnf.GetSystemdSocketPath(), instanceCnf.SystemdNotificationsDebug) if err != nil { ylogger.Zero.Error().Err(err).Msg("failed to initialize systemd notifier") diff --git a/pkg/message/gool_message.go b/pkg/message/gool_message.go new file mode 100644 index 0000000..73c31a4 --- /dev/null +++ b/pkg/message/gool_message.go @@ -0,0 +1,52 @@ +package message + +import ( + "bytes" + "encoding/binary" +) + +type GoolMessage struct { + Name string +} + +func (c *GoolMessage) Encode() []byte { + bt := []byte{ + byte(MessageTypeGool), + 0, + 0, + 0, + } + + bt = append(bt, []byte("GOOL "+c.Name)...) + bt = append(bt, 0) + ln := len(bt) + 8 + + bs := make([]byte, 8) + binary.BigEndian.PutUint64(bs, uint64(ln)) + return append(bs, bt...) +} + +func (c *GoolMessage) Decode(body []byte) { + c.Name = c.GetGoolName(body[4:]) +} + +func (c *GoolMessage) GetGoolName(b []byte) string { + buff := bytes.NewBufferString("") + + for i := 0; i < len(b); i++ { + if b[i] == 0 { + break + } + buff.WriteByte(b[i]) + } + + return buff.String() +} + +var _ ProtoMessage = &GoolMessage{} + +func NewGoolMessage(name string) *GoolMessage { + return &GoolMessage{ + Name: name, + } +} diff --git a/pkg/message/message.go b/pkg/message/message.go index 64febcb..ddd0f36 100644 --- a/pkg/message/message.go +++ b/pkg/message/message.go @@ -20,6 +20,7 @@ const ( MessageTypeObjectMeta = MessageType(49) MessageTypePatch = MessageType(50) MessageTypeCopy = MessageType(51) + MessageTypeGool = MessageType(52) DecryptMessage = RequestEncryption(1) NoDecryptMessage = RequestEncryption(0) @@ -50,6 +51,8 @@ func (m MessageType) String() string { return "OBJECT META" case MessageTypeCopy: return "COPY" + case MessageTypeGool: + return "GOOL" } return "UNKNOWN" } diff --git a/pkg/proc/interaction.go b/pkg/proc/interaction.go index b3f81da..ba2207d 100644 --- a/pkg/proc/interaction.go +++ b/pkg/proc/interaction.go @@ -357,6 +357,8 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl if !msg.Confirm { ylogger.Zero.Warn().Msg("It was a dry-run, nothing was deleted") } + case message.MessageTypeGool: + return ProcMotion(s, cr, ycl) default: ylogger.Zero.Error().Any("type", tp).Msg("unknown message type") @@ -367,3 +369,32 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl return nil } + +func ProcMotion(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyClient) error { + + defer func() { + _ = ycl.Close() + }() + + pr := NewProtoReader(ycl) + tp, body, err := pr.ReadPacket() + if err != nil { + _ = ycl.ReplyError(err, "failed to read request packet") + return err + } + + ylogger.Zero.Debug().Str("msg-type", tp.String()).Msg("recieved client request") + + ycl.SetOPType(byte(tp)) + + msg := message.GoolMessage{} + msg.Decode(body) + + ylogger.Zero.Info().Msg("recieved client gool succ") + + _, err = ycl.GetRW().Write(message.NewReadyForQueryMessage().Encode()) + if err != nil { + _ = ycl.ReplyError(err, "failed to gool") + } + return nil +}