Skip to content

Commit

Permalink
Stat basic for client connections
Browse files Browse the repository at this point in the history
  • Loading branch information
reshke committed Jun 7, 2024
1 parent 62ab574 commit 11a6d87
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 5 deletions.
4 changes: 2 additions & 2 deletions cmd/yproxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var rootCmd = &cobra.Command{

instanceCnf := config.InstanceConfig()

instance := core.Instance{}
instance := core.NewInstance()

ylogger.ReloadLogger(instanceCnf.LogPath)
if logLevel == "" {
Expand All @@ -35,7 +35,7 @@ var rootCmd = &cobra.Command{

return instance.Run(instanceCnf)
},
Version: pkg.YproxyVersionRevision,
Version: pkg.YproxyVersionRevision,
}

func init() {
Expand Down
4 changes: 4 additions & 0 deletions config/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,16 @@ func initInstanceConfig(file *os.File, cfgInstance *Instance) error {

const (
DefaultStorageConcurrency = 100
DefaultStatPort = 7432
)

func EmbedDefaults(cfgInstance *Instance) {
if cfgInstance.StorageCnf.StorageConcurrency == 0 {
cfgInstance.StorageCnf.StorageConcurrency = DefaultStorageConcurrency
}
if cfgInstance.StatPort == 0 {
cfgInstance.StatPort = DefaultStatPort
}
}

func LoadInstanceConfig(cfgPath string) (err error) {
Expand Down
29 changes: 29 additions & 0 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,40 @@ type YproxyClient interface {
ID() uint
ReplyError(err error, msg string) error
GetRW() io.ReadWriteCloser

SetOPType(optype byte)
OPType() byte

SetExternalFilePath(path string)
ExternalFilePath() string

Close() error
}

type YClient struct {
Conn net.Conn
op byte
path string
}

// ExternalFilePath implements YproxyClient.
func (y *YClient) ExternalFilePath() string {
return y.path
}

// SetExternalFilePath implements YproxyClient.
func (y *YClient) SetExternalFilePath(path string) {
y.path = path
}

// OPType implements YproxyClient.
func (y *YClient) OPType() byte {
return y.op
}

// SetOPType implements YproxyClient.
func (y *YClient) SetOPType(optype byte) {
y.op = optype
}

// Close implements YproxyClient.
Expand Down
34 changes: 31 additions & 3 deletions pkg/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/yezzey-gp/yproxy/config"
"github.com/yezzey-gp/yproxy/pkg/client"
"github.com/yezzey-gp/yproxy/pkg/clientpool"
"github.com/yezzey-gp/yproxy/pkg/crypt"
"github.com/yezzey-gp/yproxy/pkg/proc"
"github.com/yezzey-gp/yproxy/pkg/sdnotifier"
Expand All @@ -19,7 +20,13 @@ import (
)

type Instance struct {
crypter crypt.Crypter
pool clientpool.Pool
}

func NewInstance() *Instance {
return &Instance{
pool: clientpool.NewClientPool(),
}
}

func (i *Instance) Run(instanceCnf *config.Instance) error {
Expand Down Expand Up @@ -56,6 +63,7 @@ func (i *Instance) Run(instanceCnf *config.Instance) error {
}
}()

/* dispatch statistic server */
go func() {

listener, err := net.Listen("tcp", fmt.Sprintf("localhost:%v", instanceCnf.StatPort))
Expand All @@ -72,8 +80,17 @@ func (i *Instance) Run(instanceCnf *config.Instance) error {
}
ylogger.Zero.Debug().Str("addr", clConn.LocalAddr().String()).Msg("accepted client connection")

}
clConn.Write([]byte("Hello from stats server!!\n"))
clConn.Write([]byte("Client id | Optype | External Path \n"))

i.pool.ClientPoolForeach(func(cl client.YproxyClient) error {
_, err := clConn.Write([]byte(fmt.Sprintf("%v | %v | %v\n", cl.ID(), cl.OPType(), cl.ExternalFilePath())))

return err
})

clConn.Close()
}
}()

listener, err := net.Listen("unix", instanceCnf.SocketPath)
Expand Down Expand Up @@ -120,6 +137,17 @@ func (i *Instance) Run(instanceCnf *config.Instance) error {
ylogger.Zero.Error().Err(err).Msg("failed to accept connection")
}
ylogger.Zero.Debug().Str("addr", clConn.LocalAddr().String()).Msg("accepted client connection")
go proc.ProcConn(s, cr, client.NewYClient(clConn))
go func() {
ycl := client.NewYClient(clConn)
i.pool.Put(ycl)
if err := proc.ProcConn(s, cr, ycl); err != nil {
ylogger.Zero.Debug().Uint("id", ycl.ID()).Err(err).Msg("got error serving client")
}
_, err := i.pool.Pop(ycl.ID())
if err != nil {
// ?? wtf
ylogger.Zero.Error().Uint("id", ycl.ID()).Err(err).Msg("got error erasing client from pool")
}
}()
}
}
10 changes: 10 additions & 0 deletions pkg/proc/interaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,16 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl

ylogger.Zero.Debug().Str("msg-type", tp.String()).Msg("recieved client request")

ycl.SetOPType(byte(tp))

switch tp {
case message.MessageTypeCat:
// omit first byte
msg := message.CatMessage{}
msg.Decode(body)

ycl.SetExternalFilePath(msg.Name)

yr := NewYRetryReader(NewRestartReader(s, msg.Name), msg.StartOffset)

var contentReader io.Reader
Expand All @@ -61,6 +65,8 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl
msg := message.PutMessage{}
msg.Decode(body)

ycl.SetExternalFilePath(msg.Name)

var w io.WriteCloser

r, w := io.Pipe()
Expand Down Expand Up @@ -145,6 +151,8 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl
msg := message.ListMessage{}
msg.Decode(body)

ycl.SetExternalFilePath(msg.Prefix)

objectMetas, err := s.ListPath(msg.Prefix)
if err != nil {
_ = ycl.ReplyError(fmt.Errorf("could not list objects: %s", err), "failed to compelete request")
Expand Down Expand Up @@ -176,6 +184,8 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl
msg := message.CopyMessage{}
msg.Decode(body)

ycl.SetExternalFilePath(msg.Name)

//get config for old bucket
instanceCnf, err := config.ReadInstanceConfig(msg.OldCfgPath)
if err != nil {
Expand Down

0 comments on commit 11a6d87

Please sign in to comment.