From 11a6d87c1377533bf46ee59bbfc2e13f965d60f7 Mon Sep 17 00:00:00 2001 From: reshke Date: Fri, 7 Jun 2024 07:07:38 +0000 Subject: [PATCH] Stat basic for client connections --- cmd/yproxy/main.go | 4 ++-- config/instance.go | 4 ++++ pkg/client/client.go | 29 +++++++++++++++++++++++++++++ pkg/core/core.go | 34 +++++++++++++++++++++++++++++++--- pkg/proc/interaction.go | 10 ++++++++++ 5 files changed, 76 insertions(+), 5 deletions(-) diff --git a/cmd/yproxy/main.go b/cmd/yproxy/main.go index f504504..84fa47c 100644 --- a/cmd/yproxy/main.go +++ b/cmd/yproxy/main.go @@ -25,7 +25,7 @@ var rootCmd = &cobra.Command{ instanceCnf := config.InstanceConfig() - instance := core.Instance{} + instance := core.NewInstance() ylogger.ReloadLogger(instanceCnf.LogPath) if logLevel == "" { @@ -35,7 +35,7 @@ var rootCmd = &cobra.Command{ return instance.Run(instanceCnf) }, - Version: pkg.YproxyVersionRevision, + Version: pkg.YproxyVersionRevision, } func init() { diff --git a/config/instance.go b/config/instance.go index 283409e..31a56aa 100644 --- a/config/instance.go +++ b/config/instance.go @@ -59,12 +59,16 @@ func initInstanceConfig(file *os.File, cfgInstance *Instance) error { const ( DefaultStorageConcurrency = 100 + DefaultStatPort = 7432 ) func EmbedDefaults(cfgInstance *Instance) { if cfgInstance.StorageCnf.StorageConcurrency == 0 { cfgInstance.StorageCnf.StorageConcurrency = DefaultStorageConcurrency } + if cfgInstance.StatPort == 0 { + cfgInstance.StatPort = DefaultStatPort + } } func LoadInstanceConfig(cfgPath string) (err error) { diff --git a/pkg/client/client.go b/pkg/client/client.go index 24a666b..9bb698d 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -13,11 +13,40 @@ type YproxyClient interface { ID() uint ReplyError(err error, msg string) error GetRW() io.ReadWriteCloser + + SetOPType(optype byte) + OPType() byte + + SetExternalFilePath(path string) + ExternalFilePath() string + Close() error } type YClient struct { Conn net.Conn + op byte + path string +} + +// ExternalFilePath implements YproxyClient. +func (y *YClient) ExternalFilePath() string { + return y.path +} + +// SetExternalFilePath implements YproxyClient. +func (y *YClient) SetExternalFilePath(path string) { + y.path = path +} + +// OPType implements YproxyClient. +func (y *YClient) OPType() byte { + return y.op +} + +// SetOPType implements YproxyClient. +func (y *YClient) SetOPType(optype byte) { + y.op = optype } // Close implements YproxyClient. diff --git a/pkg/core/core.go b/pkg/core/core.go index bc8e597..66ea913 100644 --- a/pkg/core/core.go +++ b/pkg/core/core.go @@ -11,6 +11,7 @@ import ( "github.com/yezzey-gp/yproxy/config" "github.com/yezzey-gp/yproxy/pkg/client" + "github.com/yezzey-gp/yproxy/pkg/clientpool" "github.com/yezzey-gp/yproxy/pkg/crypt" "github.com/yezzey-gp/yproxy/pkg/proc" "github.com/yezzey-gp/yproxy/pkg/sdnotifier" @@ -19,7 +20,13 @@ import ( ) type Instance struct { - crypter crypt.Crypter + pool clientpool.Pool +} + +func NewInstance() *Instance { + return &Instance{ + pool: clientpool.NewClientPool(), + } } func (i *Instance) Run(instanceCnf *config.Instance) error { @@ -56,6 +63,7 @@ func (i *Instance) Run(instanceCnf *config.Instance) error { } }() + /* dispatch statistic server */ go func() { listener, err := net.Listen("tcp", fmt.Sprintf("localhost:%v", instanceCnf.StatPort)) @@ -72,8 +80,17 @@ func (i *Instance) Run(instanceCnf *config.Instance) error { } ylogger.Zero.Debug().Str("addr", clConn.LocalAddr().String()).Msg("accepted client connection") - } + clConn.Write([]byte("Hello from stats server!!\n")) + clConn.Write([]byte("Client id | Optype | External Path \n")) + + i.pool.ClientPoolForeach(func(cl client.YproxyClient) error { + _, err := clConn.Write([]byte(fmt.Sprintf("%v | %v | %v\n", cl.ID(), cl.OPType(), cl.ExternalFilePath()))) + return err + }) + + clConn.Close() + } }() listener, err := net.Listen("unix", instanceCnf.SocketPath) @@ -120,6 +137,17 @@ func (i *Instance) Run(instanceCnf *config.Instance) error { ylogger.Zero.Error().Err(err).Msg("failed to accept connection") } ylogger.Zero.Debug().Str("addr", clConn.LocalAddr().String()).Msg("accepted client connection") - go proc.ProcConn(s, cr, client.NewYClient(clConn)) + go func() { + ycl := client.NewYClient(clConn) + i.pool.Put(ycl) + if err := proc.ProcConn(s, cr, ycl); err != nil { + ylogger.Zero.Debug().Uint("id", ycl.ID()).Err(err).Msg("got error serving client") + } + _, err := i.pool.Pop(ycl.ID()) + if err != nil { + // ?? wtf + ylogger.Zero.Error().Uint("id", ycl.ID()).Err(err).Msg("got error erasing client from pool") + } + }() } } diff --git a/pkg/proc/interaction.go b/pkg/proc/interaction.go index 71121ec..5d2dbd4 100644 --- a/pkg/proc/interaction.go +++ b/pkg/proc/interaction.go @@ -29,12 +29,16 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl ylogger.Zero.Debug().Str("msg-type", tp.String()).Msg("recieved client request") + ycl.SetOPType(byte(tp)) + switch tp { case message.MessageTypeCat: // omit first byte msg := message.CatMessage{} msg.Decode(body) + ycl.SetExternalFilePath(msg.Name) + yr := NewYRetryReader(NewRestartReader(s, msg.Name), msg.StartOffset) var contentReader io.Reader @@ -61,6 +65,8 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl msg := message.PutMessage{} msg.Decode(body) + ycl.SetExternalFilePath(msg.Name) + var w io.WriteCloser r, w := io.Pipe() @@ -145,6 +151,8 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl msg := message.ListMessage{} msg.Decode(body) + ycl.SetExternalFilePath(msg.Prefix) + objectMetas, err := s.ListPath(msg.Prefix) if err != nil { _ = ycl.ReplyError(fmt.Errorf("could not list objects: %s", err), "failed to compelete request") @@ -176,6 +184,8 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl msg := message.CopyMessage{} msg.Decode(body) + ycl.SetExternalFilePath(msg.Name) + //get config for old bucket instanceCnf, err := config.ReadInstanceConfig(msg.OldCfgPath) if err != nil {