Skip to content

Commit

Permalink
interconnection (#48)
Browse files Browse the repository at this point in the history
* added gool interconnection

* gool interconnection rebasing

* updated server dispatching

* updated gitignore

* removed useless files + modified yaml

* fixed yaml
  • Loading branch information
margothequeen authored Aug 9, 2024
1 parent 403e564 commit bfcaae7
Show file tree
Hide file tree
Showing 9 changed files with 192 additions and 9 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
vendor
devbin/*
yezzey.log
58 changes: 58 additions & 0 deletions cmd/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,62 @@ var listCmd = &cobra.Command{
RunE: Runner(listFunc),
}

var goolCmd = &cobra.Command{
Use: "gool",
Short: "gool",
RunE: func(cmd *cobra.Command, args []string) error {

err := config.LoadInstanceConfig(cfgPath)
if err != nil {
ylogger.Zero.Debug().Err(err)
return err
}

instanceCnf := config.InstanceConfig()

con, err := net.Dial("unix", instanceCnf.InterconnectSocketPath)

if err != nil {
ylogger.Zero.Debug().Err(err)
return err
}

defer con.Close()
msg := message.NewGoolMessage(args[0]).Encode()
_, err = con.Write(msg)
if err != nil {
ylogger.Zero.Debug().Err(err)
return err
}

ylogger.Zero.Debug().Bytes("msg", msg).Msg("constructed gool message")

ycl := client.NewYClient(con)
r := proc.NewProtoReader(ycl)

done := false
for {
if done {
break
}
tp, _, err := r.ReadPacket()
if err != nil {
return err
}

switch tp {
case message.MessageTypeReadyForQuery:
ylogger.Zero.Debug().Bytes("msg", msg).Msg("got RFQ message")
done = true
default:
return fmt.Errorf("incorrect gool")
}
}

return nil
},
}

func init() {
rootCmd.PersistentFlags().StringVarP(&cfgPath, "config", "c", "/etc/yproxy/yproxy.yaml", "path to yproxy config file")
rootCmd.PersistentFlags().StringVarP(&logLevel, "log-level", "l", "", "log level")
Expand All @@ -289,12 +345,14 @@ func init() {
rootCmd.AddCommand(putCmd)

rootCmd.AddCommand(listCmd)
rootCmd.AddCommand(goolCmd)

deleteCmd.PersistentFlags().IntVarP(&segmentPort, "port", "p", 6000, "port that segment is listening on")
deleteCmd.PersistentFlags().IntVarP(&segmentNum, "segnum", "s", 0, "logical number of a segment")
deleteCmd.PersistentFlags().BoolVarP(&confirm, "confirm", "", false, "confirm deletion")
deleteCmd.PersistentFlags().BoolVarP(&garbage, "garbage", "g", false, "delete garbage")
rootCmd.AddCommand(deleteCmd)

}

func main() {
Expand Down
11 changes: 6 additions & 5 deletions config/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ type Instance struct {

CryptoCnf Crypto `json:"crypto" toml:"crypto" yaml:"crypto"`

LogPath string `json:"log_path" toml:"log_path" yaml:"log_path"`
LogLevel string `json:"log_level" toml:"log_level" yaml:"log_level"`
SocketPath string `json:"socket_path" toml:"socket_path" yaml:"socket_path"`
StatPort int `json:"stat_port" toml:"stat_port" yaml:"stat_port"`
PsqlPort int `json:"psql_port" toml:"psql_port" yaml:"psql_port"`
LogPath string `json:"log_path" toml:"log_path" yaml:"log_path"`
LogLevel string `json:"log_level" toml:"log_level" yaml:"log_level"`
SocketPath string `json:"socket_path" toml:"socket_path" yaml:"socket_path"`
StatPort int `json:"stat_port" toml:"stat_port" yaml:"stat_port"`
PsqlPort int `json:"psql_port" toml:"psql_port" yaml:"psql_port"`
InterconnectSocketPath string `json:"interconnect_socket_path" toml:"interconnect_socket_path" yaml:"interconnect_socket_path"`

SystemdNotificationsDebug bool `json:"sd_notifications_debug" toml:"sd_notifications_debug" yaml:"sd_notifications_debug"`
systemdSocketPath string
Expand Down
3 changes: 2 additions & 1 deletion examples/yproxy.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
socket_path: "/tmp/yezzey.sock"
interconnect_socket_path: "tmp/gool.sock"

storage:
storage_endpoint: "https://storage.yandexcloud.net"
storage_prefix: "wal-e/6/"
storage_bucket: "gpyezzey"
storage_region: "us-west-2"
storage_region: "us-west-2"
9 changes: 6 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/yezzey-gp/yproxy

go 1.21.4
go 1.21

require (
github.com/BurntSushi/toml v1.3.2
Expand All @@ -27,6 +27,11 @@ require (
github.com/shopspring/decimal v1.4.0 // indirect
)

require (
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/sys v0.15.0 // indirect
)

require (
github.com/cloudflare/circl v1.3.3 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
Expand All @@ -38,7 +43,5 @@ require (
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/sys v0.15.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
33 changes: 33 additions & 0 deletions pkg/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/yezzey-gp/yproxy/pkg/client"
"github.com/yezzey-gp/yproxy/pkg/clientpool"
"github.com/yezzey-gp/yproxy/pkg/crypt"
"github.com/yezzey-gp/yproxy/pkg/message"
"github.com/yezzey-gp/yproxy/pkg/proc"
"github.com/yezzey-gp/yproxy/pkg/sdnotifier"
"github.com/yezzey-gp/yproxy/pkg/storage"
Expand Down Expand Up @@ -55,6 +56,8 @@ func (i *Instance) Run(instanceCnf *config.Instance) error {

go func() {
defer os.Remove(instanceCnf.SocketPath)

defer os.Remove(instanceCnf.InterconnectSocketPath)
defer cancelCtx()

for {
Expand Down Expand Up @@ -145,6 +148,36 @@ func (i *Instance) Run(instanceCnf *config.Instance) error {
return err
}

iclistener, err := net.Listen("unix", instanceCnf.InterconnectSocketPath)

ylogger.Zero.Debug().Msg("try to start interconnect socket listener")
if err != nil {
ylogger.Zero.Error().Err(err).Msg("failed to start interconnect socket listener")
return err
}

i.DispatchServer(iclistener, func(clConn net.Conn) {
defer clConn.Close()
ycl := client.NewYClient(clConn)
r := proc.NewProtoReader(ycl)

mt, _, err := r.ReadPacket()

if err != nil {
ylogger.Zero.Error().Err(err).Msg("failed to accept interconnection")
}

switch mt {
case message.MessageTypeGool:
msg := message.ReadyForQueryMessage{}
_, _ = ycl.GetRW().Write(msg.Encode())
default:
ycl.ReplyError(fmt.Errorf("wrong message type"), "")

}
ylogger.Zero.Debug().Msg("interconnection closed")
})

notifier, err := sdnotifier.NewNotifier(instanceCnf.GetSystemdSocketPath(), instanceCnf.SystemdNotificationsDebug)
if err != nil {
ylogger.Zero.Error().Err(err).Msg("failed to initialize systemd notifier")
Expand Down
52 changes: 52 additions & 0 deletions pkg/message/gool_message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package message

import (
"bytes"
"encoding/binary"
)

type GoolMessage struct {
Name string
}

func (c *GoolMessage) Encode() []byte {
bt := []byte{
byte(MessageTypeGool),
0,
0,
0,
}

bt = append(bt, []byte("GOOL "+c.Name)...)
bt = append(bt, 0)
ln := len(bt) + 8

bs := make([]byte, 8)
binary.BigEndian.PutUint64(bs, uint64(ln))
return append(bs, bt...)
}

func (c *GoolMessage) Decode(body []byte) {
c.Name = c.GetGoolName(body[4:])
}

func (c *GoolMessage) GetGoolName(b []byte) string {
buff := bytes.NewBufferString("")

for i := 0; i < len(b); i++ {
if b[i] == 0 {
break
}
buff.WriteByte(b[i])
}

return buff.String()
}

var _ ProtoMessage = &GoolMessage{}

func NewGoolMessage(name string) *GoolMessage {
return &GoolMessage{
Name: name,
}
}
3 changes: 3 additions & 0 deletions pkg/message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
MessageTypeObjectMeta = MessageType(49)
MessageTypePatch = MessageType(50)
MessageTypeCopy = MessageType(51)
MessageTypeGool = MessageType(52)

DecryptMessage = RequestEncryption(1)
NoDecryptMessage = RequestEncryption(0)
Expand Down Expand Up @@ -50,6 +51,8 @@ func (m MessageType) String() string {
return "OBJECT META"
case MessageTypeCopy:
return "COPY"
case MessageTypeGool:
return "GOOL"
}
return "UNKNOWN"
}
31 changes: 31 additions & 0 deletions pkg/proc/interaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,8 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl
if !msg.Confirm {
ylogger.Zero.Warn().Msg("It was a dry-run, nothing was deleted")
}
case message.MessageTypeGool:
return ProcMotion(s, cr, ycl)

default:
ylogger.Zero.Error().Any("type", tp).Msg("unknown message type")
Expand All @@ -367,3 +369,32 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl

return nil
}

func ProcMotion(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyClient) error {

defer func() {
_ = ycl.Close()
}()

pr := NewProtoReader(ycl)
tp, body, err := pr.ReadPacket()
if err != nil {
_ = ycl.ReplyError(err, "failed to read request packet")
return err
}

ylogger.Zero.Debug().Str("msg-type", tp.String()).Msg("recieved client request")

ycl.SetOPType(byte(tp))

msg := message.GoolMessage{}
msg.Decode(body)

ylogger.Zero.Info().Msg("recieved client gool succ")

_, err = ycl.GetRW().Write(message.NewReadyForQueryMessage().Encode())
if err != nil {
_ = ycl.ReplyError(err, "failed to gool")
}
return nil
}

0 comments on commit bfcaae7

Please sign in to comment.