From 79d4b97db942e9165f1d5aabe83b62fbed03eba7 Mon Sep 17 00:00:00 2001 From: Evgenii Baidakov Date: Thu, 22 Jun 2023 14:14:29 +0400 Subject: [PATCH 01/10] stat: External statistic definition Using EndpointInfo, instead of Balance have two advantages: - Check the entrypoint is available; - Get node public key for statistic purposes. Any error now is a reason to return from the constructor and broke initialization. Signed-off-by: Evgenii Baidakov --- client/client.go | 31 ++++++++---- client/client_test.go | 7 ++- pool/pool.go | 82 ++++++++++++++++++++++--------- stat/stat.go | 110 ++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 198 insertions(+), 32 deletions(-) create mode 100644 stat/stat.go diff --git a/client/client.go b/client/client.go index 79f4c2fb..742e8bad 100644 --- a/client/client.go +++ b/client/client.go @@ -3,14 +3,12 @@ package client import ( "context" "crypto/tls" - "errors" "fmt" "time" - v2accounting "github.com/nspcc-dev/neofs-api-go/v2/accounting" - "github.com/nspcc-dev/neofs-api-go/v2/rpc" "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" + "github.com/nspcc-dev/neofs-sdk-go/stat" ) // Client represents virtual connection to the NeoFS network to communicate @@ -48,6 +46,9 @@ type Client struct { c client.Client server neoFSAPIServer + + endpoint string + nodeKey []byte } var errNonNeoSigner = fmt.Errorf("%w: expected ECDSA_DETERMINISTIC_SHA256 scheme", neofscrypto.ErrIncorrectSigner) @@ -90,6 +91,7 @@ func (c *Client) Dial(prm PrmDial) error { if prm.endpoint == "" { return ErrMissingServer } + c.endpoint = prm.endpoint if prm.timeoutDialSet { if prm.timeoutDial <= 0 { @@ -119,15 +121,13 @@ func (c *Client) Dial(prm PrmDial) error { prm.parentCtx = context.Background() } - // TODO: (neofs-api-go#382) perform generic dial stage of the client.Client - _, err := rpc.Balance(&c.c, new(v2accounting.BalanceRequest), - client.WithContext(prm.parentCtx), - ) - // return context errors since they signal about dial problem - if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + endpointInfo, err := c.EndpointInfo(prm.parentCtx, PrmEndpointInfo{}) + if err != nil { return err } + c.nodeKey = endpointInfo.NodeInfo().PublicKey() + return nil } @@ -168,6 +168,12 @@ func (c *Client) Close() error { return c.c.Conn().Close() } +func (c *Client) sendStatistic(m stat.Method, elapsed time.Duration, err error) { + if c.prm.statisticCallback != nil { + c.prm.statisticCallback(c.nodeKey, c.endpoint, m, elapsed, err) + } +} + // PrmInit groups initialization parameters of Client instances. // // See also [New]. @@ -177,6 +183,8 @@ type PrmInit struct { cbRespInfo func(ResponseMetaInfo) error netMagic uint64 + + statisticCallback stat.OperationCallback } // SetDefaultSigner sets Client private signer to be used for the protocol @@ -196,6 +204,11 @@ func (x *PrmInit) SetResponseInfoCallback(f func(ResponseMetaInfo) error) { x.cbRespInfo = f } +// SetStatisticCallback makes the Client to pass [stat.OperationCallback] for the external statistic. +func (x *PrmInit) SetStatisticCallback(statisticCallback stat.OperationCallback) { + x.statisticCallback = statisticCallback +} + // PrmDial groups connection parameters for the Client. // // See also Dial. diff --git a/client/client_test.go b/client/client_test.go index e8696ecf..bc376514 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -6,6 +6,7 @@ import ( apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" + "github.com/nspcc-dev/neofs-sdk-go/crypto/test" "github.com/stretchr/testify/require" ) @@ -31,7 +32,11 @@ func newClient(t *testing.T, signer neofscrypto.Signer, server neoFSAPIServer) * } func TestClient_DialContext(t *testing.T) { - var c Client + var prmInit PrmInit + prmInit.SetDefaultSigner(test.RandomSignerRFC6979(t)) + + c, err := New(prmInit) + require.NoError(t, err) // try to connect to any host var prm PrmDial diff --git a/pool/pool.go b/pool/pool.go index 7cbf70e9..fbe07755 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -27,6 +27,7 @@ import ( oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/object/relations" "github.com/nspcc-dev/neofs-sdk-go/session" + "github.com/nspcc-dev/neofs-sdk-go/stat" "github.com/nspcc-dev/neofs-sdk-go/user" "go.uber.org/atomic" "go.uber.org/zap" @@ -236,6 +237,7 @@ type clientWrapper struct { prm wrapperPrm clientStatusMonitor + statisticCallback stat.OperationCallback } // wrapperPrm is params to create clientWrapper. @@ -247,6 +249,7 @@ type wrapperPrm struct { errorThreshold uint32 responseInfoCallback func(sdkClient.ResponseMetaInfo) error poolRequestInfoCallback func(RequestInfo) + statisticCallback stat.OperationCallback } // setAddress sets endpoint to connect in NeoFS network. @@ -285,31 +288,49 @@ func (x *wrapperPrm) setResponseInfoCallback(f func(sdkClient.ResponseMetaInfo) x.responseInfoCallback = f } +// setStatisticCallback set callback for external statistic. +func (x *wrapperPrm) setStatisticCallback(statisticCallback stat.OperationCallback) { + x.statisticCallback = statisticCallback +} + // getNewClient returns a new [sdkClient.Client] instance using internal parameters. -func (x *wrapperPrm) getNewClient() (*sdkClient.Client, error) { +func (x *wrapperPrm) getNewClient(statisticCallback stat.OperationCallback) (*sdkClient.Client, error) { var prmInit sdkClient.PrmInit prmInit.SetDefaultSigner(x.signer) prmInit.SetResponseInfoCallback(x.responseInfoCallback) + prmInit.SetStatisticCallback(statisticCallback) return sdkClient.New(prmInit) } // newWrapper creates a clientWrapper that implements the client interface. func newWrapper(prm wrapperPrm) (*clientWrapper, error) { - cl, err := prm.getNewClient() - if err != nil { - return nil, err - } - res := &clientWrapper{ - client: cl, clientStatusMonitor: newClientStatusMonitor(prm.address, prm.errorThreshold), prm: prm, + statisticCallback: prm.statisticCallback, } + // integrate clientWrapper middleware to handle errors and wrapped client health. + cl, err := prm.getNewClient(res.statisticMiddleware) + if err != nil { + return nil, err + } + + res.client = cl + return res, nil } +func (c *clientWrapper) statisticMiddleware(nodeKey []byte, endpoint string, method stat.Method, duration time.Duration, err error) { + c.incRequests(duration, MethodIndex(method)) + c.updateErrorRate(err) + + if c.statisticCallback != nil { + c.statisticCallback(nodeKey, endpoint, method, duration, err) + } +} + // dial establishes a connection to the server from the NeoFS network. // Returns an error describing failure reason. If failed, the client // SHOULD NOT be used. @@ -343,7 +364,7 @@ func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (healthy, change wasHealthy = true } - cl, err := c.prm.getNewClient() + cl, err := c.prm.getNewClient(c.statisticMiddleware) if err != nil { c.setUnhealthy() return false, wasHealthy @@ -985,6 +1006,8 @@ type InitParameters struct { requestCallback func(RequestInfo) clientBuilder clientBuilder + + statisticCallback stat.OperationCallback } // SetSigner specifies default signer to be used for the protocol communication by default. @@ -1055,6 +1078,11 @@ func (x *InitParameters) isMissingClientBuilder() bool { return x.clientBuilder == nil } +// SetStatisticCallback makes the Pool to pass [stat.OperationCallback] for external statistic. +func (x *InitParameters) SetStatisticCallback(statisticCallback stat.OperationCallback) { + x.statisticCallback = statisticCallback +} + type rebalanceParameters struct { nodesParams []*nodesParam nodeRequestTimeout time.Duration @@ -1388,6 +1416,8 @@ type Pool struct { rebalanceParams rebalanceParameters clientBuilder clientBuilder logger *zap.Logger + + statisticCallback stat.OperationCallback } type innerPool struct { @@ -1480,21 +1510,22 @@ func NewPool(options InitParameters) (*Pool, error) { return nil, fmt.Errorf("couldn't create cache: %w", err) } - fillDefaultInitParams(&options, cache) + pool := &Pool{cache: cache} - pool := &Pool{ - signer: options.signer, - cache: cache, - logger: options.logger, - stokenDuration: options.sessionExpirationDuration, - rebalanceParams: rebalanceParameters{ - nodesParams: nodesParams, - nodeRequestTimeout: options.healthcheckTimeout, - clientRebalanceInterval: options.clientRebalanceInterval, - sessionExpirationDuration: options.sessionExpirationDuration, - }, - clientBuilder: options.clientBuilder, + // we need our middleware integration in clientBuilder + fillDefaultInitParams(&options, cache, pool.statisticMiddleware) + + pool.signer = options.signer + pool.logger = options.logger + pool.stokenDuration = options.sessionExpirationDuration + pool.rebalanceParams = rebalanceParameters{ + nodesParams: nodesParams, + nodeRequestTimeout: options.healthcheckTimeout, + clientRebalanceInterval: options.clientRebalanceInterval, + sessionExpirationDuration: options.sessionExpirationDuration, } + pool.clientBuilder = options.clientBuilder + pool.statisticCallback = options.statisticCallback return pool, nil } @@ -1567,7 +1598,7 @@ func (p *Pool) Dial(ctx context.Context) error { return nil } -func fillDefaultInitParams(params *InitParameters, cache *sessionCache) { +func fillDefaultInitParams(params *InitParameters, cache *sessionCache, statisticCallback stat.OperationCallback) { if params.sessionExpirationDuration == 0 { params.sessionExpirationDuration = defaultSessionTokenExpirationDuration } @@ -1605,6 +1636,7 @@ func fillDefaultInitParams(params *InitParameters, cache *sessionCache) { cache.updateEpoch(info.Epoch()) return nil }) + prm.setStatisticCallback(statisticCallback) return newWrapper(prm) }) } @@ -2545,3 +2577,9 @@ func (p *Pool) sdkClient() (*sdkClient.Client, statisticUpdater, error) { return cl, conn, nil } + +func (p *Pool) statisticMiddleware(nodeKey []byte, endpoint string, method stat.Method, duration time.Duration, err error) { + if p.statisticCallback != nil { + p.statisticCallback(nodeKey, endpoint, method, duration, err) + } +} diff --git a/stat/stat.go b/stat/stat.go new file mode 100644 index 00000000..8a9b20e2 --- /dev/null +++ b/stat/stat.go @@ -0,0 +1,110 @@ +/* +Package provides functionality for collecting/maintaining client statistics. +*/ + +package stat + +import ( + "time" +) + +// Method is an enumerator to describe [client.Client] methods. +type Method int + +// Various client methods. +const ( + MethodBalanceGet Method = iota + MethodContainerPut + MethodContainerGet + MethodContainerList + MethodContainerDelete + MethodContainerEACL + MethodContainerSetEACL + MethodEndpointInfo + MethodNetworkInfo + MethodObjectPut + MethodObjectDelete + MethodObjectGet + MethodObjectHead + MethodObjectRange + MethodSessionCreate + MethodNetMapSnapshot + MethodObjectHash + MethodObjectSearch + MethodContainerAnnounceUsedSpace + MethodAnnounceIntermediateTrust + MethodAnnounceLocalTrust + MethodObjectGetStream + MethodObjectRangeStream + MethodObjectSearchStream + MethodObjectPutStream + // MethodLast is no a valid method name, it's a system anchor for tests, etc. + MethodLast +) + +// String implements fmt.Stringer. +func (m Method) String() string { + switch m { + case MethodBalanceGet: + return "balanceGet" + case MethodContainerPut: + return "containerPut" + case MethodContainerGet: + return "containerGet" + case MethodContainerList: + return "containerList" + case MethodContainerDelete: + return "containerDelete" + case MethodContainerEACL: + return "containerEACL" + case MethodContainerSetEACL: + return "containerSetEACL" + case MethodEndpointInfo: + return "endpointInfo" + case MethodNetworkInfo: + return "networkInfo" + case MethodObjectPut: + return "objectPut" + case MethodObjectDelete: + return "objectDelete" + case MethodObjectGet: + return "objectGet" + case MethodObjectHead: + return "objectHead" + case MethodObjectRange: + return "objectRange" + case MethodSessionCreate: + return "sessionCreate" + case MethodNetMapSnapshot: + return "netMapSnapshot" + case MethodObjectHash: + return "objectHash" + case MethodObjectSearch: + return "objectSearch" + case MethodContainerAnnounceUsedSpace: + return "containerAnnounceUsedSpace" + case MethodAnnounceIntermediateTrust: + return "announceIntermediateTrust" + case MethodAnnounceLocalTrust: + return "announceLocalTrust" + case MethodObjectGetStream: + return "objectGetStream" + case MethodObjectRangeStream: + return "objectRangeStream" + case MethodObjectSearchStream: + return "objectSearchStream" + case MethodObjectPutStream: + return "objectPutStream" + case MethodLast: + return "it's a system name rather than a method" + default: + return "unknown" + } +} + +type ( + // OperationCallback describes common interface to external statistic collection. + // + // Passing zero duration means only error counting. + OperationCallback = func(nodeKey []byte, endpoint string, method Method, duration time.Duration, err error) +) From 17efd8c78e0ad1fb0c431a2c25b4fdb592286e48 Mon Sep 17 00:00:00 2001 From: Evgenii Baidakov Date: Fri, 30 Jun 2023 09:25:07 +0400 Subject: [PATCH 02/10] client: External statistic in client Signed-off-by: Evgenii Baidakov --- client/accounting.go | 20 ++++++- client/api.go | 10 +++- client/client.go | 11 +++- client/container.go | 101 +++++++++++++++++++++++++++------- client/netmap.go | 51 +++++++++++++---- client/object_delete.go | 20 +++++-- client/object_get.go | 119 +++++++++++++++++++++++++++++++++------- client/object_hash.go | 25 +++++++-- client/object_put.go | 61 +++++++++++++++++--- client/object_search.go | 40 ++++++++++++-- client/reputation.go | 42 +++++++++++--- client/session.go | 10 +++- pool/accounting.go | 10 +--- pool/container.go | 55 ++++--------------- pool/netmap.go | 19 ++----- pool/object.go | 71 +++++------------------- pool/pool.go | 9 +++ pool/session.go | 9 +-- 18 files changed, 467 insertions(+), 216 deletions(-) diff --git a/client/accounting.go b/client/accounting.go index 3b09ebd2..9165a1d4 100644 --- a/client/accounting.go +++ b/client/accounting.go @@ -8,9 +8,15 @@ import ( rpcapi "github.com/nspcc-dev/neofs-api-go/v2/rpc" "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" "github.com/nspcc-dev/neofs-sdk-go/accounting" + "github.com/nspcc-dev/neofs-sdk-go/stat" "github.com/nspcc-dev/neofs-sdk-go/user" ) +var ( + // special variable for test purposes, to overwrite real RPC calls. + rpcAPIBalance = rpcapi.Balance +) + // PrmBalanceGet groups parameters of BalanceGet operation. type PrmBalanceGet struct { prmCommonMeta @@ -37,13 +43,20 @@ func (x *PrmBalanceGet) SetAccount(id user.ID) { // - [ErrMissingAccount] // - [ErrMissingSigner] func (c *Client) BalanceGet(ctx context.Context, prm PrmBalanceGet) (accounting.Decimal, error) { + var err error + defer func() { + c.sendStatistic(stat.MethodBalanceGet, err)() + }() + switch { case !prm.accountSet: - return accounting.Decimal{}, ErrMissingAccount + err = ErrMissingAccount + return accounting.Decimal{}, err } if c.prm.signer == nil { - return accounting.Decimal{}, ErrMissingSigner + err = ErrMissingSigner + return accounting.Decimal{}, err } // form request body @@ -69,7 +82,7 @@ func (c *Client) BalanceGet(ctx context.Context, prm PrmBalanceGet) (accounting. cc.meta = prm.prmCommonMeta cc.req = &req cc.call = func() (responseV2, error) { - return rpcapi.Balance(&c.c, &req, client.WithContext(ctx)) + return rpcAPIBalance(&c.c, &req, client.WithContext(ctx)) } cc.result = func(r responseV2) { resp := r.(*v2accounting.BalanceResponse) @@ -90,6 +103,7 @@ func (c *Client) BalanceGet(ctx context.Context, prm PrmBalanceGet) (accounting. // process call if !cc.processCall() { + err = cc.err return accounting.Decimal{}, cc.err } diff --git a/client/api.go b/client/api.go index dfad9ea0..88f3d313 100644 --- a/client/api.go +++ b/client/api.go @@ -10,6 +10,12 @@ import ( "github.com/nspcc-dev/neofs-api-go/v2/session" ) +var ( + // special variables for test purposes only, to overwrite real RPC calls. + rpcAPINetMapSnapshot = rpcapi.NetMapSnapshot + rpcAPICreateSession = rpcapi.CreateSession +) + // interface of NeoFS API server. Exists for test purposes only. type neoFSAPIServer interface { createSession(cli *client.Client, req *session.CreateRequest, opts ...client.CallOption) (*session.CreateResponse, error) @@ -29,7 +35,7 @@ func rpcErr(e error) error { // executes NetmapService.NetmapSnapshot RPC declared in NeoFS API protocol // using underlying client.Client. func (x *coreServer) netMapSnapshot(ctx context.Context, req v2netmap.SnapshotRequest) (*v2netmap.SnapshotResponse, error) { - resp, err := rpcapi.NetMapSnapshot((*client.Client)(x), &req, client.WithContext(ctx)) + resp, err := rpcAPINetMapSnapshot((*client.Client)(x), &req, client.WithContext(ctx)) if err != nil { return nil, rpcErr(err) } @@ -38,7 +44,7 @@ func (x *coreServer) netMapSnapshot(ctx context.Context, req v2netmap.SnapshotRe } func (x *coreServer) createSession(cli *client.Client, req *session.CreateRequest, opts ...client.CallOption) (*session.CreateResponse, error) { - resp, err := rpcapi.CreateSession(cli, req, opts...) + resp, err := rpcAPICreateSession(cli, req, opts...) if err != nil { return nil, rpcErr(err) } diff --git a/client/client.go b/client/client.go index 742e8bad..f0634c44 100644 --- a/client/client.go +++ b/client/client.go @@ -168,9 +168,14 @@ func (c *Client) Close() error { return c.c.Conn().Close() } -func (c *Client) sendStatistic(m stat.Method, elapsed time.Duration, err error) { - if c.prm.statisticCallback != nil { - c.prm.statisticCallback(c.nodeKey, c.endpoint, m, elapsed, err) +func (c *Client) sendStatistic(m stat.Method, err error) func() { + if c.prm.statisticCallback == nil { + return func() {} + } + + ts := time.Now() + return func() { + c.prm.statisticCallback(c.nodeKey, c.endpoint, m, time.Since(ts), err) } } diff --git a/client/container.go b/client/container.go index 8bd102ca..a1cbbbed 100644 --- a/client/container.go +++ b/client/container.go @@ -15,9 +15,21 @@ import ( neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" "github.com/nspcc-dev/neofs-sdk-go/eacl" "github.com/nspcc-dev/neofs-sdk-go/session" + "github.com/nspcc-dev/neofs-sdk-go/stat" "github.com/nspcc-dev/neofs-sdk-go/user" ) +var ( + // special variables for test purposes, to overwrite real RPC calls. + rpcAPIPutContainer = rpcapi.PutContainer + rpcAPIGetContainer = rpcapi.GetContainer + rpcAPIListContainers = rpcapi.ListContainers + rpcAPIDeleteContainer = rpcapi.DeleteContainer + rpcAPIGetEACL = rpcapi.GetEACL + rpcAPISetEACL = rpcapi.SetEACL + rpcAPIAnnounceUsedSpace = rpcapi.AnnounceUsedSpace +) + // PrmContainerPut groups optional parameters of ContainerPut operation. type PrmContainerPut struct { prmCommonMeta @@ -64,6 +76,11 @@ func (x *PrmContainerPut) WithinSession(s session.Container) { // Return errors: // - [ErrMissingSigner] func (c *Client) ContainerPut(ctx context.Context, cont container.Container, prm PrmContainerPut) (cid.ID, error) { + var err error + defer func() { + c.sendStatistic(stat.MethodContainerPut, err)() + }() + signer, err := c.getSigner(prm.signer) if err != nil { return cid.ID{}, err @@ -76,7 +93,8 @@ func (c *Client) ContainerPut(ctx context.Context, cont container.Container, prm var sig neofscrypto.Signature err = cont.CalculateSignature(&sig, signer) if err != nil { - return cid.ID{}, fmt.Errorf("calculate container signature: %w", err) + err = fmt.Errorf("calculate container signature: %w", err) + return cid.ID{}, err } var sigv2 refs.Signature @@ -115,7 +133,7 @@ func (c *Client) ContainerPut(ctx context.Context, cont container.Container, prm c.initCallContext(&cc) cc.req = &req cc.call = func() (responseV2, error) { - return rpcapi.PutContainer(&c.c, &req, client.WithContext(ctx)) + return rpcAPIPutContainer(&c.c, &req, client.WithContext(ctx)) } cc.result = func(r responseV2) { resp := r.(*v2container.PutResponse) @@ -136,6 +154,7 @@ func (c *Client) ContainerPut(ctx context.Context, cont container.Container, prm // process call if !cc.processCall() { + err = cc.err return cid.ID{}, cc.err } @@ -157,8 +176,14 @@ type PrmContainerGet struct { // Return errors: // - [ErrMissingSigner] func (c *Client) ContainerGet(ctx context.Context, id cid.ID, prm PrmContainerGet) (container.Container, error) { + var err error + defer func() { + c.sendStatistic(stat.MethodContainerGet, err)() + }() + if c.prm.signer == nil { - return container.Container{}, ErrMissingSigner + err = ErrMissingSigner + return container.Container{}, err } var cidV2 refs.ContainerID @@ -184,7 +209,7 @@ func (c *Client) ContainerGet(ctx context.Context, id cid.ID, prm PrmContainerGe cc.meta = prm.prmCommonMeta cc.req = &req cc.call = func() (responseV2, error) { - return rpcapi.GetContainer(&c.c, &req, client.WithContext(ctx)) + return rpcAPIGetContainer(&c.c, &req, client.WithContext(ctx)) } cc.result = func(r responseV2) { resp := r.(*v2container.GetResponse) @@ -203,6 +228,7 @@ func (c *Client) ContainerGet(ctx context.Context, id cid.ID, prm PrmContainerGe // process call if !cc.processCall() { + err = cc.err return container.Container{}, cc.err } @@ -224,8 +250,14 @@ type PrmContainerList struct { // Return errors: // - [ErrMissingSigner] func (c *Client) ContainerList(ctx context.Context, ownerID user.ID, prm PrmContainerList) ([]cid.ID, error) { + var err error + defer func() { + c.sendStatistic(stat.MethodContainerList, err)() + }() + if c.prm.signer == nil { - return nil, ErrMissingSigner + err = ErrMissingSigner + return nil, err } // form request body @@ -251,7 +283,7 @@ func (c *Client) ContainerList(ctx context.Context, ownerID user.ID, prm PrmCont cc.meta = prm.prmCommonMeta cc.req = &req cc.call = func() (responseV2, error) { - return rpcapi.ListContainers(&c.c, &req, client.WithContext(ctx)) + return rpcAPIListContainers(&c.c, &req, client.WithContext(ctx)) } cc.result = func(r responseV2) { resp := r.(*v2container.ListResponse) @@ -269,6 +301,7 @@ func (c *Client) ContainerList(ctx context.Context, ownerID user.ID, prm PrmCont // process call if !cc.processCall() { + err = cc.err return nil, cc.err } @@ -320,13 +353,19 @@ func (x *PrmContainerDelete) WithinSession(tok session.Container) { // - [ErrMissingSigner] // - [neofscrypto.ErrIncorrectSigner] func (c *Client) ContainerDelete(ctx context.Context, id cid.ID, prm PrmContainerDelete) error { + var err error + defer func() { + c.sendStatistic(stat.MethodContainerDelete, err)() + }() + signer, err := c.getSigner(prm.signer) if err != nil { return err } if signer.Scheme() != neofscrypto.ECDSA_DETERMINISTIC_SHA256 { - return errNonNeoSigner + err = errNonNeoSigner + return err } // sign container ID @@ -340,7 +379,8 @@ func (c *Client) ContainerDelete(ctx context.Context, id cid.ID, prm PrmContaine var sig neofscrypto.Signature err = sig.Calculate(signer, data) if err != nil { - return fmt.Errorf("calculate signature: %w", err) + err = fmt.Errorf("calculate signature: %w", err) + return err } var sigv2 refs.Signature @@ -378,11 +418,12 @@ func (c *Client) ContainerDelete(ctx context.Context, id cid.ID, prm PrmContaine c.initCallContext(&cc) cc.req = &req cc.call = func() (responseV2, error) { - return rpcapi.DeleteContainer(&c.c, &req, client.WithContext(ctx)) + return rpcAPIDeleteContainer(&c.c, &req, client.WithContext(ctx)) } // process call if !cc.processCall() { + err = cc.err return cc.err } @@ -404,8 +445,14 @@ type PrmContainerEACL struct { // Return errors: // - [ErrMissingSigner] func (c *Client) ContainerEACL(ctx context.Context, id cid.ID, prm PrmContainerEACL) (eacl.Table, error) { + var err error + defer func() { + c.sendStatistic(stat.MethodContainerEACL, err)() + }() + if c.prm.signer == nil { - return eacl.Table{}, ErrMissingSigner + err = ErrMissingSigner + return eacl.Table{}, err } var cidV2 refs.ContainerID @@ -431,7 +478,7 @@ func (c *Client) ContainerEACL(ctx context.Context, id cid.ID, prm PrmContainerE cc.meta = prm.prmCommonMeta cc.req = &req cc.call = func() (responseV2, error) { - return rpcapi.GetEACL(&c.c, &req, client.WithContext(ctx)) + return rpcAPIGetEACL(&c.c, &req, client.WithContext(ctx)) } cc.result = func(r responseV2) { resp := r.(*v2container.GetExtendedACLResponse) @@ -447,6 +494,7 @@ func (c *Client) ContainerEACL(ctx context.Context, id cid.ID, prm PrmContainerE // process call if !cc.processCall() { + err = cc.err return eacl.Table{}, cc.err } @@ -503,18 +551,25 @@ func (x *PrmContainerSetEACL) WithinSession(s session.Container) { // // Context is required and must not be nil. It is used for network communication. func (c *Client) ContainerSetEACL(ctx context.Context, table eacl.Table, prm PrmContainerSetEACL) error { + var err error + defer func() { + c.sendStatistic(stat.MethodContainerSetEACL, err)() + }() + signer, err := c.getSigner(prm.signer) if err != nil { return err } if signer.Scheme() != neofscrypto.ECDSA_DETERMINISTIC_SHA256 { - return errNonNeoSigner + err = errNonNeoSigner + return err } _, isCIDSet := table.CID() if !isCIDSet { - return ErrMissingEACLContainer + err = ErrMissingEACLContainer + return err } // sign the eACL table @@ -523,7 +578,8 @@ func (c *Client) ContainerSetEACL(ctx context.Context, table eacl.Table, prm Prm var sig neofscrypto.Signature err = sig.CalculateMarshalled(signer, eaclV2) if err != nil { - return fmt.Errorf("calculate signature: %w", err) + err = fmt.Errorf("calculate signature: %w", err) + return err } var sigv2 refs.Signature @@ -561,11 +617,12 @@ func (c *Client) ContainerSetEACL(ctx context.Context, table eacl.Table, prm Prm c.initCallContext(&cc) cc.req = &req cc.call = func() (responseV2, error) { - return rpcapi.SetEACL(&c.c, &req, client.WithContext(ctx)) + return rpcAPISetEACL(&c.c, &req, client.WithContext(ctx)) } // process call if !cc.processCall() { + err = cc.err return cc.err } @@ -595,14 +652,19 @@ type PrmAnnounceSpace struct { // - [ErrMissingAnnouncements] // - [ErrMissingSigner] func (c *Client) ContainerAnnounceUsedSpace(ctx context.Context, announcements []container.SizeEstimation, prm PrmAnnounceSpace) error { - // check parameters + var err error + defer func() { + c.sendStatistic(stat.MethodContainerAnnounceUsedSpace, err)() + }() if len(announcements) == 0 { - return ErrMissingAnnouncements + err = ErrMissingAnnouncements + return err } if c.prm.signer == nil { - return ErrMissingSigner + err = ErrMissingSigner + return err } // convert list of SDK announcement structures into NeoFS-API v2 list @@ -630,11 +692,12 @@ func (c *Client) ContainerAnnounceUsedSpace(ctx context.Context, announcements [ cc.meta = prm.prmCommonMeta cc.req = &req cc.call = func() (responseV2, error) { - return rpcapi.AnnounceUsedSpace(&c.c, &req, client.WithContext(ctx)) + return rpcAPIAnnounceUsedSpace(&c.c, &req, client.WithContext(ctx)) } // process call if !cc.processCall() { + err = cc.err return cc.err } diff --git a/client/netmap.go b/client/netmap.go index 171418cb..c55f2b2a 100644 --- a/client/netmap.go +++ b/client/netmap.go @@ -9,9 +9,16 @@ import ( "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" v2session "github.com/nspcc-dev/neofs-api-go/v2/session" "github.com/nspcc-dev/neofs-sdk-go/netmap" + "github.com/nspcc-dev/neofs-sdk-go/stat" "github.com/nspcc-dev/neofs-sdk-go/version" ) +var ( + // special variables for test purposes only, to overwrite real RPC calls. + rpcAPINetworkInfo = rpcapi.NetworkInfo + rpcAPILocalNodeInfo = rpcapi.LocalNodeInfo +) + // PrmEndpointInfo groups parameters of EndpointInfo operation. type PrmEndpointInfo struct { prmCommonMeta @@ -49,8 +56,14 @@ func (x ResEndpointInfo) NodeInfo() netmap.NodeInfo { // Returns errors: // - [ErrMissingSigner] func (c *Client) EndpointInfo(ctx context.Context, prm PrmEndpointInfo) (*ResEndpointInfo, error) { + var err error + defer func() { + c.sendStatistic(stat.MethodEndpointInfo, err)() + }() + if c.prm.signer == nil { - return nil, ErrMissingSigner + err = ErrMissingSigner + return nil, err } // form request @@ -67,7 +80,7 @@ func (c *Client) EndpointInfo(ctx context.Context, prm PrmEndpointInfo) (*ResEnd cc.meta = prm.prmCommonMeta cc.req = &req cc.call = func() (responseV2, error) { - return rpcapi.LocalNodeInfo(&c.c, &req, client.WithContext(ctx)) + return rpcAPILocalNodeInfo(&c.c, &req, client.WithContext(ctx)) } cc.result = func(r responseV2) { resp := r.(*v2netmap.LocalNodeInfoResponse) @@ -105,7 +118,8 @@ func (c *Client) EndpointInfo(ctx context.Context, prm PrmEndpointInfo) (*ResEnd // process call if !cc.processCall() { - return nil, cc.err + err = cc.err + return nil, err } return &res, nil @@ -125,6 +139,11 @@ type PrmNetworkInfo struct { // // Reflects all internal errors in second return value (transport problems, response processing, etc.). func (c *Client) NetworkInfo(ctx context.Context, prm PrmNetworkInfo) (netmap.NetworkInfo, error) { + var err error + defer func() { + c.sendStatistic(stat.MethodNetworkInfo, err)() + }() + // form request var req v2netmap.NetworkInfoRequest @@ -139,7 +158,7 @@ func (c *Client) NetworkInfo(ctx context.Context, prm PrmNetworkInfo) (netmap.Ne cc.meta = prm.prmCommonMeta cc.req = &req cc.call = func() (responseV2, error) { - return rpcapi.NetworkInfo(&c.c, &req, client.WithContext(ctx)) + return rpcAPINetworkInfo(&c.c, &req, client.WithContext(ctx)) } cc.result = func(r responseV2) { resp := r.(*v2netmap.NetworkInfoResponse) @@ -161,6 +180,7 @@ func (c *Client) NetworkInfo(ctx context.Context, prm PrmNetworkInfo) (netmap.Ne // process call if !cc.processCall() { + err = cc.err return netmap.NetworkInfo{}, cc.err } @@ -183,8 +203,14 @@ type PrmNetMapSnapshot struct { // Returns errors: // - [ErrMissingSigner] func (c *Client) NetMapSnapshot(ctx context.Context, _ PrmNetMapSnapshot) (netmap.NetMap, error) { + var err error + defer func() { + c.sendStatistic(stat.MethodNetMapSnapshot, err)() + }() + if c.prm.signer == nil { - return netmap.NetMap{}, ErrMissingSigner + err = ErrMissingSigner + return netmap.NetMap{}, err } // form request body var body v2netmap.SnapshotRequestBody @@ -197,12 +223,15 @@ func (c *Client) NetMapSnapshot(ctx context.Context, _ PrmNetMapSnapshot) (netma req.SetBody(&body) c.prepareRequest(&req, &meta) - err := signServiceMessage(c.prm.signer, &req) + err = signServiceMessage(c.prm.signer, &req) if err != nil { - return netmap.NetMap{}, fmt.Errorf("sign request: %w", err) + err = fmt.Errorf("sign request: %w", err) + return netmap.NetMap{}, err } - resp, err := c.server.netMapSnapshot(ctx, req) + var resp *v2netmap.SnapshotResponse + + resp, err = c.server.netMapSnapshot(ctx, req) if err != nil { return netmap.NetMap{}, err } @@ -216,12 +245,14 @@ func (c *Client) NetMapSnapshot(ctx context.Context, _ PrmNetMapSnapshot) (netma netMapV2 := resp.GetBody().NetMap() if netMapV2 == nil { - return netmap.NetMap{}, newErrMissingResponseField(fieldNetMap) + err = newErrMissingResponseField(fieldNetMap) + return netmap.NetMap{}, err } err = res.ReadFromV2(*netMapV2) if err != nil { - return netmap.NetMap{}, newErrInvalidResponseField(fieldNetMap, err) + err = newErrInvalidResponseField(fieldNetMap, err) + return netmap.NetMap{}, err } return res, nil diff --git a/client/object_delete.go b/client/object_delete.go index 69b67463..9a60e7e8 100644 --- a/client/object_delete.go +++ b/client/object_delete.go @@ -16,9 +16,13 @@ import ( neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/session" + "github.com/nspcc-dev/neofs-sdk-go/stat" ) var ( + // special variable for test purposes only, to overwrite real RPC calls. + rpcAPIDeleteObject = rpcapi.DeleteObject + // ErrNoSession indicates that session wasn't set in some Prm* structure. ErrNoSession = errors.New("session is not set") ) @@ -115,8 +119,13 @@ func (c *Client) ObjectDelete(ctx context.Context, containerID cid.ID, objectID cidV2 v2refs.ContainerID oidV2 v2refs.ObjectID body v2object.DeleteRequestBody + err error ) + defer func() { + c.sendStatistic(stat.MethodObjectDelete, err)() + }() + containerID.WriteToV2(&cidV2) addr.SetContainerID(&cidV2) @@ -138,10 +147,11 @@ func (c *Client) ObjectDelete(ctx context.Context, containerID cid.ID, objectID err = signServiceMessage(signer, &req) if err != nil { - return oid.ID{}, fmt.Errorf("sign request: %w", err) + err = fmt.Errorf("sign request: %w", err) + return oid.ID{}, err } - resp, err := rpcapi.DeleteObject(&c.c, &req, client.WithContext(ctx)) + resp, err := rpcAPIDeleteObject(&c.c, &req, client.WithContext(ctx)) if err != nil { return oid.ID{}, err } @@ -155,12 +165,14 @@ func (c *Client) ObjectDelete(ctx context.Context, containerID cid.ID, objectID idTombV2 := resp.GetBody().GetTombstone().GetObjectID() if idTombV2 == nil { - return oid.ID{}, newErrMissingResponseField(fieldTombstone) + err = newErrMissingResponseField(fieldTombstone) + return oid.ID{}, err } err = res.ReadFromV2(*idTombV2) if err != nil { - return oid.ID{}, newErrInvalidResponseField(fieldTombstone, err) + err = newErrInvalidResponseField(fieldTombstone, err) + return oid.ID{}, err } return res, nil diff --git a/client/object_get.go b/client/object_get.go index 87f70547..9e159ead 100644 --- a/client/object_get.go +++ b/client/object_get.go @@ -18,6 +18,14 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/session" + "github.com/nspcc-dev/neofs-sdk-go/stat" +) + +var ( + // special variables for test purposes only, to overwrite real RPC calls. + rpcAPIGetObject = rpcapi.GetObject + rpcAPIHeadObject = rpcapi.HeadObject + rpcAPIGetObjectRange = rpcapi.GetObjectRange ) // shared parameters of GET/HEAD/RANGE. @@ -109,6 +117,8 @@ type ObjectReader struct { tailPayload []byte remainingPayloadLen int + + statisticCallback shortStatisticCallback } // UseSigner specifies private signer to sign the requests. @@ -125,6 +135,12 @@ func (x *PrmObjectGet) Signer() neofscrypto.Signer { // readHeader reads header of the object. Result means success. // Failure reason can be received via Close. func (x *ObjectReader) readHeader(dst *object.Object) bool { + if x.statisticCallback != nil { + defer func() { + x.statisticCallback(x.err) + }() + } + var resp v2object.GetResponse x.err = x.stream.Read(&resp) if x.err != nil { @@ -163,6 +179,12 @@ func (x *ObjectReader) readHeader(dst *object.Object) bool { } func (x *ObjectReader) readChunk(buf []byte) (int, bool) { + if x.statisticCallback != nil { + defer func() { + x.statisticCallback(x.err) + }() + } + var read int // read remaining tail @@ -225,17 +247,27 @@ func (x *ObjectReader) ReadChunk(buf []byte) (int, bool) { } func (x *ObjectReader) close(ignoreEOF bool) error { + var err error + if x.statisticCallback != nil { + defer func() { + x.statisticCallback(err) + }() + } + defer x.cancelCtxStream() if x.err != nil { if !errors.Is(x.err, io.EOF) { - return x.err + err = x.err + return err } else if !ignoreEOF { if x.remainingPayloadLen > 0 { - return io.ErrUnexpectedEOF + err = io.ErrUnexpectedEOF + return err } - return io.EOF + err = io.EOF + return err } } @@ -298,8 +330,13 @@ func (c *Client) ObjectGetInit(ctx context.Context, containerID cid.ID, objectID oidV2 v2refs.ObjectID body v2object.GetRequestBody hdr object.Object + err error ) + defer func() { + c.sendStatistic(stat.MethodObjectGet, err)() + }() + signer, err := c.getSigner(prm.signer) if err != nil { return hdr, nil, err @@ -322,24 +359,30 @@ func (c *Client) ObjectGetInit(ctx context.Context, containerID cid.ID, objectID err = signServiceMessage(signer, &req) if err != nil { - return hdr, nil, fmt.Errorf("sign request: %w", err) + err = fmt.Errorf("sign request: %w", err) + return hdr, nil, err } ctx, cancel := context.WithCancel(ctx) - stream, err := rpcapi.GetObject(&c.c, &req, client.WithContext(ctx)) + stream, err := rpcAPIGetObject(&c.c, &req, client.WithContext(ctx)) if err != nil { cancel() - return hdr, nil, fmt.Errorf("open stream: %w", err) + err = fmt.Errorf("open stream: %w", err) + return hdr, nil, err } var r ObjectReader r.cancelCtxStream = cancel r.stream = stream r.client = c + r.statisticCallback = func(err error) { + c.sendStatistic(stat.MethodObjectGet, err) + } if !r.readHeader(&hdr) { - return hdr, nil, fmt.Errorf("header: %w", r.Close()) + err = fmt.Errorf("header: %w", r.Close()) + return hdr, nil, err } return hdr, &r, nil @@ -414,8 +457,13 @@ func (c *Client) ObjectHead(ctx context.Context, containerID cid.ID, objectID oi cidV2 v2refs.ContainerID oidV2 v2refs.ObjectID body v2object.HeadRequestBody + err error ) + defer func() { + c.sendStatistic(stat.MethodObjectHead, err)() + }() + signer, err := c.getSigner(prm.signer) if err != nil { return nil, err @@ -437,12 +485,14 @@ func (c *Client) ObjectHead(ctx context.Context, containerID cid.ID, objectID oi // sign the request err = signServiceMessage(signer, &req) if err != nil { - return nil, fmt.Errorf("sign request: %w", err) + err = fmt.Errorf("sign request: %w", err) + return nil, err } - resp, err := rpcapi.HeadObject(&c.c, &req, client.WithContext(ctx)) + resp, err := rpcAPIHeadObject(&c.c, &req, client.WithContext(ctx)) if err != nil { - return nil, fmt.Errorf("write request: %w", err) + err = fmt.Errorf("write request: %w", err) + return nil, err } var res ResObjectHead @@ -454,9 +504,11 @@ func (c *Client) ObjectHead(ctx context.Context, containerID cid.ID, objectID oi switch v := resp.GetBody().GetHeaderPart().(type) { default: - return nil, fmt.Errorf("unexpected header type %T", v) + err = fmt.Errorf("unexpected header type %T", v) + return nil, err case *v2object.SplitInfo: - return nil, object.NewSplitInfoError(object.NewSplitInfoFromV2(v)) + err = object.NewSplitInfoError(object.NewSplitInfoFromV2(v)) + return nil, err case *v2object.HeaderWithSignature: res.hdr = v } @@ -501,9 +553,17 @@ type ObjectRangeReader struct { tailPayload []byte remainingPayloadLen int + + statisticCallback shortStatisticCallback } func (x *ObjectRangeReader) readChunk(buf []byte) (int, bool) { + if x.statisticCallback != nil { + defer func() { + x.statisticCallback(x.err) + }() + } + var read int // read remaining tail @@ -571,17 +631,27 @@ func (x *ObjectRangeReader) ReadChunk(buf []byte) (int, bool) { } func (x *ObjectRangeReader) close(ignoreEOF bool) error { + var err error + if x.statisticCallback != nil { + defer func() { + x.statisticCallback(err) + }() + } + defer x.cancelCtxStream() if x.err != nil { if !errors.Is(x.err, io.EOF) { - return x.err + err = x.err + return err } else if !ignoreEOF { if x.remainingPayloadLen > 0 { - return io.ErrUnexpectedEOF + err = io.ErrUnexpectedEOF + return err } - return io.EOF + err = io.EOF + return err } } @@ -648,10 +718,16 @@ func (c *Client) ObjectRangeInit(ctx context.Context, containerID cid.ID, object oidV2 v2refs.ObjectID rngV2 v2object.Range body v2object.GetRangeRequestBody + err error ) + defer func() { + c.sendStatistic(stat.MethodObjectRange, err)() + }() + if length == 0 { - return nil, ErrZeroRangeLength + err = ErrZeroRangeLength + return nil, err } signer, err := c.getSigner(prm.signer) @@ -681,15 +757,17 @@ func (c *Client) ObjectRangeInit(ctx context.Context, containerID cid.ID, object err = signServiceMessage(signer, &req) if err != nil { - return nil, fmt.Errorf("sign request: %w", err) + err = fmt.Errorf("sign request: %w", err) + return nil, err } ctx, cancel := context.WithCancel(ctx) - stream, err := rpcapi.GetObjectRange(&c.c, &req, client.WithContext(ctx)) + stream, err := rpcAPIGetObjectRange(&c.c, &req, client.WithContext(ctx)) if err != nil { cancel() - return nil, fmt.Errorf("open stream: %w", err) + err = fmt.Errorf("open stream: %w", err) + return nil, err } var r ObjectRangeReader @@ -697,6 +775,9 @@ func (c *Client) ObjectRangeInit(ctx context.Context, containerID cid.ID, object r.cancelCtxStream = cancel r.stream = stream r.client = c + r.statisticCallback = func(err error) { + c.sendStatistic(stat.MethodObjectRangeStream, err)() + } return &r, nil } diff --git a/client/object_hash.go b/client/object_hash.go index ca73396b..ecdbc456 100644 --- a/client/object_hash.go +++ b/client/object_hash.go @@ -15,6 +15,12 @@ import ( neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/session" + "github.com/nspcc-dev/neofs-sdk-go/stat" +) + +var ( + // special variable for test purposes only, to overwrite real RPC calls. + rpcAPIHashObjectRange = rpcapi.HashObjectRange ) // PrmObjectHash groups parameters of ObjectHash operation. @@ -148,10 +154,16 @@ func (c *Client) ObjectHash(ctx context.Context, containerID cid.ID, objectID oi addr v2refs.Address cidV2 v2refs.ContainerID oidV2 v2refs.ObjectID + err error ) + defer func() { + c.sendStatistic(stat.MethodObjectHash, err)() + }() + if len(prm.body.GetRanges()) == 0 { - return nil, ErrMissingRanges + err = ErrMissingRanges + return nil, err } containerID.WriteToV2(&cidV2) @@ -178,12 +190,14 @@ func (c *Client) ObjectHash(ctx context.Context, containerID cid.ID, objectID oi err = signServiceMessage(signer, &req) if err != nil { - return nil, fmt.Errorf("sign request: %w", err) + err = fmt.Errorf("sign request: %w", err) + return nil, err } - resp, err := rpcapi.HashObjectRange(&c.c, &req, client.WithContext(ctx)) + resp, err := rpcAPIHashObjectRange(&c.c, &req, client.WithContext(ctx)) if err != nil { - return nil, fmt.Errorf("write request: %w", err) + err = fmt.Errorf("write request: %w", err) + return nil, err } var res [][]byte @@ -193,7 +207,8 @@ func (c *Client) ObjectHash(ctx context.Context, containerID cid.ID, objectID oi res = resp.GetBody().GetHashList() if len(res) == 0 { - return nil, newErrMissingResponseField("hash list") + err = newErrMissingResponseField("hash list") + return nil, err } return res, nil diff --git a/client/object_put.go b/client/object_put.go index 1259fc31..a89d06c0 100644 --- a/client/object_put.go +++ b/client/object_put.go @@ -18,9 +18,20 @@ import ( oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/object/slicer" "github.com/nspcc-dev/neofs-sdk-go/session" + "github.com/nspcc-dev/neofs-sdk-go/stat" "github.com/nspcc-dev/neofs-sdk-go/user" ) +var ( + // special variable for test purposes only, to overwrite real RPC calls. + rpcAPIPutObject = rpcapi.PutObject +) + +// shortStatisticCallback is a shorter version of [stat.OperationCallback] which is calling from [client.Client]. +// The difference is the client already know some info about itself. Despite it the client doesn't know +// duration and error from writer/reader. +type shortStatisticCallback func(err error) + // PrmObjectPutInit groups parameters of ObjectPutInit operation. type PrmObjectPutInit struct { copyNum uint32 @@ -66,6 +77,8 @@ type ObjectWriter struct { req v2object.PutRequest partInit v2object.PutObjectPartInit partChunk v2object.PutObjectPartChunk + + statisticCallback shortStatisticCallback } // UseSigner specifies private signer to sign the requests. @@ -129,6 +142,12 @@ func (x *PrmObjectPutInit) WithXHeaders(hs ...string) { // writeHeader writes header of the object. Result means success. // Failure reason can be received via Close. func (x *ObjectWriter) writeHeader(hdr object.Object) error { + if x.statisticCallback != nil { + defer func() { + x.statisticCallback(x.err) + }() + } + v2Hdr := hdr.ToV2() x.partInit.SetObjectID(v2Hdr.GetObjectID()) @@ -151,6 +170,12 @@ func (x *ObjectWriter) writeHeader(hdr object.Object) error { // WritePayloadChunk writes chunk of the object payload. Result means success. // Failure reason can be received via Close. func (x *ObjectWriter) WritePayloadChunk(chunk []byte) bool { + if x.statisticCallback != nil { + defer func() { + x.statisticCallback(x.err) + }() + } + if !x.chunkCalled { x.chunkCalled = true x.req.GetBody().SetObjectPart(&x.partChunk) @@ -214,33 +239,45 @@ func (x *ObjectWriter) WritePayloadChunk(chunk []byte) bool { // - [apistatus.ErrSessionTokenNotFound] // - [apistatus.ErrSessionTokenExpired] func (x *ObjectWriter) Close() (*ResObjectPut, error) { + var err error + if x.statisticCallback != nil { + defer func() { + x.statisticCallback(err) + }() + } + defer x.cancelCtxStream() // Ignore io.EOF error, because it is expected error for client-side // stream termination by the server. E.g. when stream contains invalid // message. Server returns an error in response message (in status). if x.err != nil && !errors.Is(x.err, io.EOF) { - return nil, x.err + err = x.err + return nil, err } if x.err = x.stream.Close(); x.err != nil { - return nil, x.err + err = x.err + return nil, err } if x.err = x.client.processResponse(&x.respV2); x.err != nil { - return nil, x.err + err = x.err + return nil, err } const fieldID = "ID" idV2 := x.respV2.GetBody().GetObjectID() if idV2 == nil { - return nil, newErrMissingResponseField(fieldID) + err = newErrMissingResponseField(fieldID) + return nil, err } x.err = x.res.obj.ReadFromV2(*idV2) if x.err != nil { x.err = newErrInvalidResponseField(fieldID, x.err) + err = x.err } return &x.res, nil @@ -256,7 +293,15 @@ func (x *ObjectWriter) Close() (*ResObjectPut, error) { // Returns errors: // - [ErrMissingSigner] func (c *Client) ObjectPutInit(ctx context.Context, hdr object.Object, prm PrmObjectPutInit) (*ObjectWriter, error) { + var err error + defer func() { + c.sendStatistic(stat.MethodObjectPut, err)() + }() + var w ObjectWriter + w.statisticCallback = func(err error) { + c.sendStatistic(stat.MethodObjectPutStream, err)() + } signer, err := c.getSigner(prm.signer) if err != nil { @@ -264,10 +309,11 @@ func (c *Client) ObjectPutInit(ctx context.Context, hdr object.Object, prm PrmOb } ctx, cancel := context.WithCancel(ctx) - stream, err := rpcapi.PutObject(&c.c, &w.respV2, client.WithContext(ctx)) + stream, err := rpcAPIPutObject(&c.c, &w.respV2, client.WithContext(ctx)) if err != nil { cancel() - return nil, fmt.Errorf("open stream: %w", err) + err = fmt.Errorf("open stream: %w", err) + return nil, err } w.signer = signer @@ -280,7 +326,8 @@ func (c *Client) ObjectPutInit(ctx context.Context, hdr object.Object, prm PrmOb if err = w.writeHeader(hdr); err != nil { _, _ = w.Close() - return nil, fmt.Errorf("header write: %w", err) + err = fmt.Errorf("header write: %w", err) + return nil, err } return &w, nil diff --git a/client/object_search.go b/client/object_search.go index 73afaef5..1754dc34 100644 --- a/client/object_search.go +++ b/client/object_search.go @@ -18,6 +18,12 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/session" + "github.com/nspcc-dev/neofs-sdk-go/stat" +) + +var ( + // special variable for test purposes only, to overwrite real RPC calls. + rpcAPISearchObjects = rpcapi.SearchObjects ) // PrmObjectSearch groups optional parameters of ObjectSearch operation. @@ -110,6 +116,8 @@ type ObjectListReader struct { Read(resp *v2object.SearchResponse) error } tail []v2refs.ObjectID + + statisticCallback shortStatisticCallback } // Read reads another list of the object identifiers. Works similar to @@ -123,6 +131,12 @@ func (x *ObjectListReader) Read(buf []oid.ID) (int, bool) { panic("empty buffer in ObjectListReader.ReadList") } + if x.statisticCallback != nil { + defer func() { + x.statisticCallback(x.err) + }() + } + read := copyIDBuffers(buf, x.tail) x.tail = x.tail[read:] @@ -202,10 +216,18 @@ func (x *ObjectListReader) Iterate(f func(oid.ID) bool) error { // - [apistatus.ErrObjectAccessDenied] // - [apistatus.ErrSessionTokenExpired] func (x *ObjectListReader) Close() error { + var err error + if x.statisticCallback != nil { + defer func() { + x.statisticCallback(err) + }() + } + defer x.cancelCtxStream() if x.err != nil && !errors.Is(x.err, io.EOF) { - return x.err + err = x.err + return err } return nil @@ -222,6 +244,11 @@ func (x *ObjectListReader) Close() error { // Return errors: // - [ErrMissingSigner] func (c *Client) ObjectSearchInit(ctx context.Context, containerID cid.ID, prm PrmObjectSearch) (*ObjectListReader, error) { + var err error + defer func() { + c.sendStatistic(stat.MethodObjectSearch, err)() + }() + signer, err := c.getSigner(prm.signer) if err != nil { return nil, err @@ -242,17 +269,22 @@ func (c *Client) ObjectSearchInit(ctx context.Context, containerID cid.ID, prm P err = signServiceMessage(signer, &req) if err != nil { - return nil, fmt.Errorf("sign request: %w", err) + err = fmt.Errorf("sign request: %w", err) + return nil, err } var r ObjectListReader ctx, r.cancelCtxStream = context.WithCancel(ctx) - r.stream, err = rpcapi.SearchObjects(&c.c, &req, client.WithContext(ctx)) + r.stream, err = rpcAPISearchObjects(&c.c, &req, client.WithContext(ctx)) if err != nil { - return nil, fmt.Errorf("open stream: %w", err) + err = fmt.Errorf("open stream: %w", err) + return nil, err } r.client = c + r.statisticCallback = func(err error) { + c.sendStatistic(stat.MethodObjectSearch, err)() + } return &r, nil } diff --git a/client/reputation.go b/client/reputation.go index ba21b786..99300b69 100644 --- a/client/reputation.go +++ b/client/reputation.go @@ -7,6 +7,13 @@ import ( rpcapi "github.com/nspcc-dev/neofs-api-go/v2/rpc" "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" "github.com/nspcc-dev/neofs-sdk-go/reputation" + "github.com/nspcc-dev/neofs-sdk-go/stat" +) + +var ( + // special variables for test purposes only, to overwrite real RPC calls. + rpcAPIAnnounceIntermediateResult = rpcapi.AnnounceIntermediateResult + rpcAPIAnnounceLocalTrust = rpcapi.AnnounceLocalTrust ) // PrmAnnounceLocalTrust groups optional parameters of AnnounceLocalTrust operation. @@ -29,16 +36,24 @@ type PrmAnnounceLocalTrust struct { // Parameter epoch must not be zero. // Parameter trusts must not be empty. func (c *Client) AnnounceLocalTrust(ctx context.Context, epoch uint64, trusts []reputation.Trust, prm PrmAnnounceLocalTrust) error { + var err error + defer func() { + c.sendStatistic(stat.MethodAnnounceLocalTrust, err)() + }() + // check parameters switch { case epoch == 0: - return ErrZeroEpoch + err = ErrZeroEpoch + return err case len(trusts) == 0: - return ErrMissingTrusts + err = ErrMissingTrusts + return err } if c.prm.signer == nil { - return ErrMissingSigner + err = ErrMissingSigner + return err } // form request body @@ -68,12 +83,13 @@ func (c *Client) AnnounceLocalTrust(ctx context.Context, epoch uint64, trusts [] cc.meta = prm.prmCommonMeta cc.req = &req cc.call = func() (responseV2, error) { - return rpcapi.AnnounceLocalTrust(&c.c, &req, client.WithContext(ctx)) + return rpcAPIAnnounceLocalTrust(&c.c, &req, client.WithContext(ctx)) } // process call if !cc.processCall() { - return cc.err + err = cc.err + return err } return nil @@ -106,12 +122,19 @@ func (x *PrmAnnounceIntermediateTrust) SetIteration(iter uint32) { // // Parameter epoch must not be zero. func (c *Client) AnnounceIntermediateTrust(ctx context.Context, epoch uint64, trust reputation.PeerToPeerTrust, prm PrmAnnounceIntermediateTrust) error { + var err error + defer func() { + c.sendStatistic(stat.MethodAnnounceIntermediateTrust, err)() + }() + if epoch == 0 { - return ErrZeroEpoch + err = ErrZeroEpoch + return err } if c.prm.signer == nil { - return ErrMissingSigner + err = ErrMissingSigner + return err } var v2Trust v2reputation.PeerToPeerTrust @@ -138,12 +161,13 @@ func (c *Client) AnnounceIntermediateTrust(ctx context.Context, epoch uint64, tr cc.meta = prm.prmCommonMeta cc.req = &req cc.call = func() (responseV2, error) { - return rpcapi.AnnounceIntermediateResult(&c.c, &req, client.WithContext(ctx)) + return rpcAPIAnnounceIntermediateResult(&c.c, &req, client.WithContext(ctx)) } // process call if !cc.processCall() { - return cc.err + err = cc.err + return err } return nil diff --git a/client/session.go b/client/session.go index ac36814b..6d6d5e8b 100644 --- a/client/session.go +++ b/client/session.go @@ -8,6 +8,7 @@ import ( "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" v2session "github.com/nspcc-dev/neofs-api-go/v2/session" neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" + "github.com/nspcc-dev/neofs-sdk-go/stat" "github.com/nspcc-dev/neofs-sdk-go/user" ) @@ -70,6 +71,11 @@ func (x ResSessionCreate) PublicKey() []byte { // Return errors: // - [ErrMissingSigner] func (c *Client) SessionCreate(ctx context.Context, prm PrmSessionCreate) (*ResSessionCreate, error) { + var err error + defer func() { + c.sendStatistic(stat.MethodSessionCreate, err)() + }() + signer, err := c.getSigner(prm.signer) if err != nil { return nil, err @@ -77,7 +83,8 @@ func (c *Client) SessionCreate(ctx context.Context, prm PrmSessionCreate) (*ResS var ownerID user.ID if err = user.IDFromSigner(&ownerID, signer); err != nil { - return nil, fmt.Errorf("IDFromSigner: %w", err) + err = fmt.Errorf("IDFromSigner: %w", err) + return nil, err } var ownerIDV2 refs.OwnerID @@ -128,6 +135,7 @@ func (c *Client) SessionCreate(ctx context.Context, prm PrmSessionCreate) (*ResS // process call if !cc.processCall() { + err = cc.err return nil, cc.err } diff --git a/pool/accounting.go b/pool/accounting.go index 96db58fe..9ff246ce 100644 --- a/pool/accounting.go +++ b/pool/accounting.go @@ -2,7 +2,6 @@ package pool import ( "context" - "time" "github.com/nspcc-dev/neofs-sdk-go/accounting" "github.com/nspcc-dev/neofs-sdk-go/client" @@ -12,15 +11,10 @@ import ( // // See details in [client.Client.BalanceGet]. func (p *Pool) BalanceGet(ctx context.Context, prm client.PrmBalanceGet) (accounting.Decimal, error) { - c, statUpdater, err := p.sdkClient() + c, _, err := p.sdkClient() if err != nil { return accounting.Decimal{}, err } - start := time.Now() - acc, err := c.BalanceGet(ctx, prm) - statUpdater.incRequests(time.Since(start), methodBalanceGet) - statUpdater.updateErrorRate(err) - - return acc, err + return c.BalanceGet(ctx, prm) } diff --git a/pool/container.go b/pool/container.go index 9ec32e2f..d858a290 100644 --- a/pool/container.go +++ b/pool/container.go @@ -2,7 +2,6 @@ package pool import ( "context" - "time" "github.com/nspcc-dev/neofs-sdk-go/client" "github.com/nspcc-dev/neofs-sdk-go/container" @@ -15,100 +14,70 @@ import ( // // See details in [client.Client.ContainerPut]. func (p *Pool) ContainerPut(ctx context.Context, cont container.Container, prm client.PrmContainerPut) (cid.ID, error) { - c, statUpdater, err := p.sdkClient() + c, _, err := p.sdkClient() if err != nil { return cid.ID{}, err } - start := time.Now() - id, err := c.ContainerPut(ctx, cont, prm) - statUpdater.incRequests(time.Since(start), methodContainerPut) - statUpdater.updateErrorRate(err) - - return id, err + return c.ContainerPut(ctx, cont, prm) } // ContainerGet reads NeoFS container by ID. // // See details in [client.Client.ContainerGet]. func (p *Pool) ContainerGet(ctx context.Context, id cid.ID, prm client.PrmContainerGet) (container.Container, error) { - c, statUpdater, err := p.sdkClient() + c, _, err := p.sdkClient() if err != nil { return container.Container{}, err } - start := time.Now() - cnr, err := c.ContainerGet(ctx, id, prm) - statUpdater.incRequests(time.Since(start), methodContainerGet) - statUpdater.updateErrorRate(err) - - return cnr, err + return c.ContainerGet(ctx, id, prm) } // ContainerList requests identifiers of the account-owned containers. // // See details in [client.Client.ContainerList]. func (p *Pool) ContainerList(ctx context.Context, ownerID user.ID, prm client.PrmContainerList) ([]cid.ID, error) { - c, statUpdater, err := p.sdkClient() + c, _, err := p.sdkClient() if err != nil { return []cid.ID{}, err } - start := time.Now() - ids, err := c.ContainerList(ctx, ownerID, prm) - statUpdater.incRequests(time.Since(start), methodContainerList) - statUpdater.updateErrorRate(err) - - return ids, err + return c.ContainerList(ctx, ownerID, prm) } // ContainerDelete sends request to remove the NeoFS container. // // See details in [client.Client.ContainerDelete]. func (p *Pool) ContainerDelete(ctx context.Context, id cid.ID, prm client.PrmContainerDelete) error { - c, statUpdater, err := p.sdkClient() + c, _, err := p.sdkClient() if err != nil { return err } - start := time.Now() - err = c.ContainerDelete(ctx, id, prm) - statUpdater.incRequests(time.Since(start), methodContainerDelete) - statUpdater.updateErrorRate(err) - - return err + return c.ContainerDelete(ctx, id, prm) } // ContainerEACL reads eACL table of the NeoFS container. // // See details in [client.Client.ContainerEACL]. func (p *Pool) ContainerEACL(ctx context.Context, id cid.ID, prm client.PrmContainerEACL) (eacl.Table, error) { - c, statUpdater, err := p.sdkClient() + c, _, err := p.sdkClient() if err != nil { return eacl.Table{}, err } - start := time.Now() - table, err := c.ContainerEACL(ctx, id, prm) - statUpdater.incRequests(time.Since(start), methodContainerEACL) - statUpdater.updateErrorRate(err) - - return table, err + return c.ContainerEACL(ctx, id, prm) } // ContainerSetEACL sends request to update eACL table of the NeoFS container. // // See details in [client.Client.ContainerSetEACL]. func (p *Pool) ContainerSetEACL(ctx context.Context, table eacl.Table, prm client.PrmContainerSetEACL) error { - c, statUpdater, err := p.sdkClient() + c, _, err := p.sdkClient() if err != nil { return err } - start := time.Now() - err = c.ContainerSetEACL(ctx, table, prm) - statUpdater.incRequests(time.Since(start), methodContainerSetEACL) - statUpdater.updateErrorRate(err) - - return err + return c.ContainerSetEACL(ctx, table, prm) } diff --git a/pool/netmap.go b/pool/netmap.go index fe5147dc..fe773a36 100644 --- a/pool/netmap.go +++ b/pool/netmap.go @@ -2,7 +2,6 @@ package pool import ( "context" - "time" "github.com/nspcc-dev/neofs-sdk-go/client" "github.com/nspcc-dev/neofs-sdk-go/netmap" @@ -12,32 +11,22 @@ import ( // // See details in [client.Client.NetworkInfo]. func (p *Pool) NetworkInfo(ctx context.Context, prm client.PrmNetworkInfo) (netmap.NetworkInfo, error) { - c, statUpdater, err := p.sdkClient() + c, _, err := p.sdkClient() if err != nil { return netmap.NetworkInfo{}, err } - start := time.Now() - info, err := c.NetworkInfo(ctx, prm) - statUpdater.incRequests(time.Since(start), methodNetworkInfo) - statUpdater.updateErrorRate(err) - - return info, err + return c.NetworkInfo(ctx, prm) } // NetMapSnapshot requests current network view of the remote server. // // See details in [client.Client.NetMapSnapshot]. func (p *Pool) NetMapSnapshot(ctx context.Context, prm client.PrmNetMapSnapshot) (netmap.NetMap, error) { - c, statUpdater, err := p.sdkClient() + c, _, err := p.sdkClient() if err != nil { return netmap.NetMap{}, err } - start := time.Now() - netMap, err := c.NetMapSnapshot(ctx, prm) - statUpdater.incRequests(time.Since(start), methodNetMapSnapshot) - statUpdater.updateErrorRate(err) - - return netMap, err + return c.NetMapSnapshot(ctx, prm) } diff --git a/pool/object.go b/pool/object.go index 54c9ca0b..6a9d7fdf 100644 --- a/pool/object.go +++ b/pool/object.go @@ -3,7 +3,6 @@ package pool import ( "context" "fmt" - "time" "github.com/nspcc-dev/neofs-sdk-go/client" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" @@ -30,7 +29,7 @@ func (p *Pool) actualSigner(signer neofscrypto.Signer) neofscrypto.Signer { // // See details in [client.Client.ObjectPutInit]. func (p *Pool) ObjectPutInit(ctx context.Context, hdr object.Object, prm client.PrmObjectPutInit) (*client.ObjectWriter, error) { - c, statUpdater, err := p.sdkClient() + c, _, err := p.sdkClient() if err != nil { return nil, err } @@ -47,17 +46,11 @@ func (p *Pool) ObjectPutInit(ctx context.Context, hdr object.Object, prm client. p.actualSigner(prm.Signer()), session.VerbObjectPut, &prm, - statUpdater, ); err != nil { return nil, fmt.Errorf("session: %w", err) } - start := time.Now() - w, err := c.ObjectPutInit(ctx, hdr, prm) - statUpdater.incRequests(time.Since(start), methodObjectPut) - statUpdater.updateErrorRate(err) - - return w, err + return c.ObjectPutInit(ctx, hdr, prm) } // ObjectGetInit initiates reading an object through a remote server using NeoFS API protocol. @@ -65,7 +58,7 @@ func (p *Pool) ObjectPutInit(ctx context.Context, hdr object.Object, prm client. // See details in [client.Client.ObjectGetInit]. func (p *Pool) ObjectGetInit(ctx context.Context, containerID cid.ID, objectID oid.ID, prm client.PrmObjectGet) (object.Object, *client.ObjectReader, error) { var hdr object.Object - c, statUpdater, err := p.sdkClient() + c, _, err := p.sdkClient() if err != nil { return hdr, nil, err } @@ -76,24 +69,18 @@ func (p *Pool) ObjectGetInit(ctx context.Context, containerID cid.ID, objectID o p.actualSigner(prm.Signer()), session.VerbObjectGet, &prm, - statUpdater, ); err != nil { return hdr, nil, fmt.Errorf("session: %w", err) } - start := time.Now() - obj, reader, err := c.ObjectGetInit(ctx, containerID, objectID, prm) - statUpdater.incRequests(time.Since(start), methodObjectGet) - statUpdater.updateErrorRate(err) - - return obj, reader, err + return c.ObjectGetInit(ctx, containerID, objectID, prm) } // ObjectHead reads object header through a remote server using NeoFS API protocol. // // See details in [client.Client.ObjectHead]. func (p *Pool) ObjectHead(ctx context.Context, containerID cid.ID, objectID oid.ID, prm client.PrmObjectHead) (*client.ResObjectHead, error) { - c, statUpdater, err := p.sdkClient() + c, _, err := p.sdkClient() if err != nil { return nil, err } @@ -104,24 +91,18 @@ func (p *Pool) ObjectHead(ctx context.Context, containerID cid.ID, objectID oid. p.actualSigner(prm.Signer()), session.VerbObjectHead, &prm, - statUpdater, ); err != nil { return nil, fmt.Errorf("session: %w", err) } - start := time.Now() - head, err := c.ObjectHead(ctx, containerID, objectID, prm) - statUpdater.incRequests(time.Since(start), methodObjectHead) - statUpdater.updateErrorRate(err) - - return head, err + return c.ObjectHead(ctx, containerID, objectID, prm) } // ObjectRangeInit initiates reading an object's payload range through a remote // // See details in [client.Client.ObjectRangeInit]. func (p *Pool) ObjectRangeInit(ctx context.Context, containerID cid.ID, objectID oid.ID, offset, length uint64, prm client.PrmObjectRange) (*client.ObjectRangeReader, error) { - c, statUpdater, err := p.sdkClient() + c, _, err := p.sdkClient() if err != nil { return nil, err } @@ -132,24 +113,18 @@ func (p *Pool) ObjectRangeInit(ctx context.Context, containerID cid.ID, objectID p.actualSigner(prm.Signer()), session.VerbObjectRange, &prm, - statUpdater, ); err != nil { return nil, fmt.Errorf("session: %w", err) } - start := time.Now() - reader, err := c.ObjectRangeInit(ctx, containerID, objectID, offset, length, prm) - statUpdater.incRequests(time.Since(start), methodObjectRange) - statUpdater.updateErrorRate(err) - - return reader, err + return c.ObjectRangeInit(ctx, containerID, objectID, offset, length, prm) } // ObjectDelete marks an object for deletion from the container using NeoFS API protocol. // // See details in [client.Client.ObjectDelete]. func (p *Pool) ObjectDelete(ctx context.Context, containerID cid.ID, objectID oid.ID, prm client.PrmObjectDelete) (oid.ID, error) { - c, statUpdater, err := p.sdkClient() + c, _, err := p.sdkClient() if err != nil { return oid.ID{}, err } @@ -160,24 +135,18 @@ func (p *Pool) ObjectDelete(ctx context.Context, containerID cid.ID, objectID oi p.actualSigner(prm.Signer()), session.VerbObjectDelete, &prm, - statUpdater, ); err != nil { return oid.ID{}, fmt.Errorf("session: %w", err) } - start := time.Now() - id, err := c.ObjectDelete(ctx, containerID, objectID, prm) - statUpdater.incRequests(time.Since(start), methodObjectDelete) - statUpdater.updateErrorRate(err) - - return id, err + return c.ObjectDelete(ctx, containerID, objectID, prm) } // ObjectHash requests checksum of the range list of the object payload using // // See details in [client.Client.ObjectHash]. func (p *Pool) ObjectHash(ctx context.Context, containerID cid.ID, objectID oid.ID, prm client.PrmObjectHash) ([][]byte, error) { - c, statUpdater, err := p.sdkClient() + c, _, err := p.sdkClient() if err != nil { return [][]byte{}, err } @@ -188,24 +157,18 @@ func (p *Pool) ObjectHash(ctx context.Context, containerID cid.ID, objectID oid. p.actualSigner(prm.Signer()), session.VerbObjectRangeHash, &prm, - statUpdater, ); err != nil { return [][]byte{}, fmt.Errorf("session: %w", err) } - start := time.Now() - hashes, err := c.ObjectHash(ctx, containerID, objectID, prm) - statUpdater.incRequests(time.Since(start), methodObjectHash) - statUpdater.updateErrorRate(err) - - return hashes, err + return c.ObjectHash(ctx, containerID, objectID, prm) } // ObjectSearchInit initiates object selection through a remote server using NeoFS API protocol. // // See details in [client.Client.ObjectSearchInit]. func (p *Pool) ObjectSearchInit(ctx context.Context, containerID cid.ID, prm client.PrmObjectSearch) (*client.ObjectListReader, error) { - c, statUpdater, err := p.sdkClient() + c, _, err := p.sdkClient() if err != nil { return nil, err } @@ -216,15 +179,9 @@ func (p *Pool) ObjectSearchInit(ctx context.Context, containerID cid.ID, prm cli p.actualSigner(prm.Signer()), session.VerbObjectSearch, &prm, - statUpdater, ); err != nil { return nil, fmt.Errorf("session: %w", err) } - start := time.Now() - reader, err := c.ObjectSearchInit(ctx, containerID, prm) - statUpdater.incRequests(time.Since(start), methodObjectSearch) - statUpdater.updateErrorRate(err) - - return reader, err + return c.ObjectSearchInit(ctx, containerID, prm) } diff --git a/pool/pool.go b/pool/pool.go index fbe07755..77d3c8de 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -154,6 +154,9 @@ const ( methodNetMapSnapshot methodObjectHash methodObjectSearch + methodContainerAnnounceUsedSpace + methodAnnounceIntermediateTrust + methodAnnounceLocalTrust methodLast ) @@ -196,6 +199,12 @@ func (m MethodIndex) String() string { return "objectHash" case methodObjectSearch: return "objectSearch" + case methodContainerAnnounceUsedSpace: + return "containerAnnounceUsedSpace" + case methodAnnounceIntermediateTrust: + return "announceIntermediateTrust" + case methodAnnounceLocalTrust: + return "announceLocalTrust" case methodLast: return "it's a system name rather than a method" default: diff --git a/pool/session.go b/pool/session.go index b0706184..97f86562 100644 --- a/pool/session.go +++ b/pool/session.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "math" - "time" "github.com/google/uuid" "github.com/nspcc-dev/neofs-sdk-go/client" @@ -19,7 +18,7 @@ var ( errContainerRequired = errors.New("container required") ) -func initSession(ctx context.Context, dst *session.Object, c *client.Client, dur uint64, signer neofscrypto.Signer, statUpdater statisticUpdater) error { +func initSession(ctx context.Context, dst *session.Object, c *client.Client, dur uint64, signer neofscrypto.Signer) error { ni, err := c.NetworkInfo(ctx, client.PrmNetworkInfo{}) if err != nil { return err @@ -38,10 +37,7 @@ func initSession(ctx context.Context, dst *session.Object, c *client.Client, dur prm.SetExp(exp) prm.UseSigner(signer) - start := time.Now() res, err := c.SessionCreate(ctx, prm) - statUpdater.incRequests(time.Since(start), methodSessionCreate) - statUpdater.updateErrorRate(err) if err != nil { return err @@ -71,7 +67,6 @@ func (p *Pool) withinContainerSession( signer neofscrypto.Signer, verb session.ObjectVerb, params containerSessionParams, - statUpdate statisticUpdater, ) error { // empty error means the session was set. if _, err := params.GetSession(); err == nil { @@ -83,7 +78,7 @@ func (p *Pool) withinContainerSession( tok, ok := p.cache.Get(cacheKey) if !ok { // init new session - err := initSession(ctx, &tok, c, p.stokenDuration, signer, statUpdate) + err := initSession(ctx, &tok, c, p.stokenDuration, signer) if err != nil { return fmt.Errorf("init session: %w", err) } From 8cf10a2e9b60fdfc705860ab5f5300ee45652939 Mon Sep 17 00:00:00 2001 From: Evgenii Baidakov Date: Fri, 30 Jun 2023 09:25:55 +0400 Subject: [PATCH 03/10] client: External statistic tests Signed-off-by: Evgenii Baidakov --- client/container_statistic_test.go | 927 +++++++++++++++++++++++++++++ 1 file changed, 927 insertions(+) create mode 100644 client/container_statistic_test.go diff --git a/client/container_statistic_test.go b/client/container_statistic_test.go new file mode 100644 index 00000000..aae6ae65 --- /dev/null +++ b/client/container_statistic_test.go @@ -0,0 +1,927 @@ +package client + +import ( + "context" + "crypto/rand" + "crypto/sha256" + "fmt" + mathRand "math/rand" + "strconv" + "testing" + "time" + + "github.com/google/uuid" + "github.com/nspcc-dev/neofs-api-go/v2/accounting" + v2acl "github.com/nspcc-dev/neofs-api-go/v2/acl" + v2container "github.com/nspcc-dev/neofs-api-go/v2/container" + netmapv2 "github.com/nspcc-dev/neofs-api-go/v2/netmap" + v2object "github.com/nspcc-dev/neofs-api-go/v2/object" + "github.com/nspcc-dev/neofs-api-go/v2/refs" + "github.com/nspcc-dev/neofs-api-go/v2/reputation" + rpcapi "github.com/nspcc-dev/neofs-api-go/v2/rpc" + "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" + "github.com/nspcc-dev/neofs-api-go/v2/session" + "github.com/nspcc-dev/neofs-sdk-go/container" + "github.com/nspcc-dev/neofs-sdk-go/container/acl" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" + "github.com/nspcc-dev/neofs-sdk-go/crypto/test" + "github.com/nspcc-dev/neofs-sdk-go/eacl" + "github.com/nspcc-dev/neofs-sdk-go/netmap" + "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + reputation2 "github.com/nspcc-dev/neofs-sdk-go/reputation" + session2 "github.com/nspcc-dev/neofs-sdk-go/session" + "github.com/nspcc-dev/neofs-sdk-go/stat" + "github.com/nspcc-dev/neofs-sdk-go/user" + "github.com/stretchr/testify/require" +) + +type ( + methodStatistic struct { + requests int + errors int + duration time.Duration + } + + testStatCollector struct { + methods map[stat.Method]*methodStatistic + } +) + +func newCollector() *testStatCollector { + c := testStatCollector{ + methods: make(map[stat.Method]*methodStatistic), + } + + for i := stat.MethodBalanceGet; i < stat.MethodLast; i++ { + c.methods[i] = &methodStatistic{} + } + + return &c +} + +func (c *testStatCollector) Collect(_ []byte, _ string, method stat.Method, duration time.Duration, err error) { + data, ok := c.methods[method] + if ok { + data.duration += duration + if duration > 0 { + data.requests++ + } + + if err != nil { + data.errors++ + } + } +} + +func randBytes(l int) []byte { + r := make([]byte, l) + _, _ = rand.Read(r) + + return r +} + +func randRefsContainerID() *refs.ContainerID { + var id refs.ContainerID + id.SetValue(randBytes(sha256.Size)) + return &id +} + +func randContainerID() *cid.ID { + var refID refs.ContainerID + refID.SetValue(randBytes(sha256.Size)) + + var id cid.ID + _ = id.ReadFromV2(refID) + + return &id +} + +func randAccount(signer neofscrypto.Signer) *user.ID { + var u user.ID + _ = user.IDFromSigner(&u, signer) + + return &u +} + +func randOwner(signer neofscrypto.Signer) *refs.OwnerID { + acc := randAccount(signer) + + var u refs.OwnerID + acc.WriteToV2(&u) + + return &u +} + +func prepareContainer(accountID user.ID) container.Container { + cont := container.Container{} + cont.Init() + cont.SetOwner(accountID) + cont.SetBasicACL(acl.PublicRW) + + cont.SetName(strconv.FormatInt(time.Now().UnixNano(), 16)) + cont.SetCreationTime(time.Now().UTC()) + + var pp netmap.PlacementPolicy + var rd netmap.ReplicaDescriptor + + pp.SetContainerBackupFactor(1) + rd.SetNumberOfObjects(1) + pp.AddReplicas(rd) + cont.SetPlacementPolicy(pp) + + return cont +} + +func testEaclTable(containerID cid.ID) eacl.Table { + var table eacl.Table + table.SetCID(containerID) + + r := eacl.NewRecord() + r.SetOperation(eacl.OperationPut) + r.SetAction(eacl.ActionAllow) + + var target eacl.Target + target.SetRole(eacl.RoleOthers) + r.SetTargets(target) + table.AddRecord(r) + + return table +} + +func TestClientStatistic_AccountBalance(t *testing.T) { + signer := test.RandomSignerRFC6979(t) + ctx := context.Background() + c := newClient(t, signer, nil) + + rpcAPIBalance = func(cli *client.Client, req *accounting.BalanceRequest, opts ...client.CallOption) (*accounting.BalanceResponse, error) { + var resp accounting.BalanceResponse + var meta session.ResponseMetaHeader + var balance accounting.Decimal + var body accounting.BalanceResponseBody + + body.SetBalance(&balance) + + resp.SetBody(&body) + resp.SetMetaHeader(&meta) + + err := signServiceMessage(signer, &resp) + if err != nil { + panic(fmt.Sprintf("sign response: %v", err)) + } + + return &resp, nil + } + + collector := newCollector() + c.prm.statisticCallback = collector.Collect + + var prm PrmBalanceGet + prm.SetAccount(*randAccount(signer)) + _, err := c.BalanceGet(ctx, prm) + require.NoError(t, err) + + require.Equal(t, 1, collector.methods[stat.MethodBalanceGet].requests) +} + +func TestClientStatistic_ContainerPut(t *testing.T) { + signer := test.RandomSignerRFC6979(t) + ctx := context.Background() + c := newClient(t, signer, nil) + + rpcAPIPutContainer = func(cli *client.Client, req *v2container.PutRequest, opts ...client.CallOption) (*v2container.PutResponse, error) { + var resp v2container.PutResponse + var meta session.ResponseMetaHeader + var body v2container.PutResponseBody + + body.SetContainerID(randRefsContainerID()) + + resp.SetBody(&body) + resp.SetMetaHeader(&meta) + + err := signServiceMessage(signer, &resp) + if err != nil { + panic(fmt.Sprintf("sign response: %v", err)) + } + + return &resp, nil + } + + cont := prepareContainer(*randAccount(signer)) + + collector := newCollector() + c.prm.statisticCallback = collector.Collect + + var prm PrmContainerPut + _, err := c.ContainerPut(ctx, cont, prm) + require.NoError(t, err) + + require.Equal(t, 1, collector.methods[stat.MethodContainerPut].requests) +} + +func TestClientStatistic_ContainerGet(t *testing.T) { + signer := test.RandomSignerRFC6979(t) + ctx := context.Background() + c := newClient(t, signer, nil) + + rpcAPIGetContainer = func(cli *client.Client, req *v2container.GetRequest, opts ...client.CallOption) (*v2container.GetResponse, error) { + var cont v2container.Container + var ver refs.Version + var placementPolicyV2 netmapv2.PlacementPolicy + var replicas []netmapv2.Replica + var resp v2container.GetResponse + var meta session.ResponseMetaHeader + + cont.SetOwnerID(randOwner(signer)) + cont.SetVersion(&ver) + + nonce, err := uuid.New().MarshalBinary() + require.NoError(t, err) + cont.SetNonce(nonce) + + replica := netmapv2.Replica{} + replica.SetCount(1) + replicas = append(replicas, replica) + placementPolicyV2.SetReplicas(replicas) + cont.SetPlacementPolicy(&placementPolicyV2) + + body := v2container.GetResponseBody{} + body.SetContainer(&cont) + + resp.SetBody(&body) + resp.SetMetaHeader(&meta) + + if err = signServiceMessage(signer, &resp); err != nil { + panic(fmt.Sprintf("sign response: %v", err)) + } + + return &resp, nil + } + + collector := newCollector() + c.prm.statisticCallback = collector.Collect + + var prm PrmContainerGet + _, err := c.ContainerGet(ctx, cid.ID{}, prm) + require.NoError(t, err) + + require.Equal(t, 1, collector.methods[stat.MethodContainerGet].requests) +} + +func TestClientStatistic_ContainerList(t *testing.T) { + signer := test.RandomSignerRFC6979(t) + ctx := context.Background() + c := newClient(t, signer, nil) + + rpcAPIListContainers = func(cli *client.Client, req *v2container.ListRequest, opts ...client.CallOption) (*v2container.ListResponse, error) { + var resp v2container.ListResponse + var meta session.ResponseMetaHeader + var body v2container.ListResponseBody + + resp.SetBody(&body) + resp.SetMetaHeader(&meta) + + if err := signServiceMessage(signer, &resp); err != nil { + panic(fmt.Sprintf("sign response: %v", err)) + } + + return &resp, nil + } + + collector := newCollector() + c.prm.statisticCallback = collector.Collect + + var prm PrmContainerList + _, err := c.ContainerList(ctx, *randAccount(signer), prm) + require.NoError(t, err) + + require.Equal(t, 1, collector.methods[stat.MethodContainerList].requests) +} + +func TestClientStatistic_ContainerDelete(t *testing.T) { + signer := test.RandomSignerRFC6979(t) + ctx := context.Background() + c := newClient(t, signer, nil) + + rpcAPIDeleteContainer = func(cli *client.Client, req *v2container.DeleteRequest, opts ...client.CallOption) (*v2container.PutResponse, error) { + var resp v2container.PutResponse + var meta session.ResponseMetaHeader + var body v2container.PutResponseBody + + resp.SetBody(&body) + resp.SetMetaHeader(&meta) + + if err := signServiceMessage(signer, &resp); err != nil { + panic(fmt.Sprintf("sign response: %v", err)) + } + + return &resp, nil + } + + collector := newCollector() + c.prm.statisticCallback = collector.Collect + + var prm PrmContainerDelete + err := c.ContainerDelete(ctx, cid.ID{}, prm) + require.NoError(t, err) + + require.Equal(t, 1, collector.methods[stat.MethodContainerDelete].requests) +} + +func TestClientStatistic_ContainerEacl(t *testing.T) { + signer := test.RandomSignerRFC6979(t) + ctx := context.Background() + c := newClient(t, signer, nil) + + rpcAPIGetEACL = func(cli *client.Client, req *v2container.GetExtendedACLRequest, opts ...client.CallOption) (*v2container.GetExtendedACLResponse, error) { + var resp v2container.GetExtendedACLResponse + var meta session.ResponseMetaHeader + var aclTable v2acl.Table + var body v2container.GetExtendedACLResponseBody + + body.SetEACL(&aclTable) + + resp.SetBody(&body) + resp.SetMetaHeader(&meta) + + if err := signServiceMessage(signer, &resp); err != nil { + panic(fmt.Sprintf("sign response: %v", err)) + } + + return &resp, nil + } + + collector := newCollector() + c.prm.statisticCallback = collector.Collect + + var prm PrmContainerEACL + _, err := c.ContainerEACL(ctx, cid.ID{}, prm) + require.NoError(t, err) + + require.Equal(t, 1, collector.methods[stat.MethodContainerEACL].requests) +} + +func TestClientStatistic_ContainerSetEacl(t *testing.T) { + signer := test.RandomSignerRFC6979(t) + ctx := context.Background() + c := newClient(t, signer, nil) + + rpcAPISetEACL = func(cli *client.Client, req *v2container.SetExtendedACLRequest, opts ...client.CallOption) (*v2container.PutResponse, error) { + var resp v2container.PutResponse + var meta session.ResponseMetaHeader + var body v2container.PutResponseBody + + resp.SetBody(&body) + resp.SetMetaHeader(&meta) + + if err := signServiceMessage(signer, &resp); err != nil { + panic(fmt.Sprintf("sign response: %v", err)) + } + + return &resp, nil + } + + collector := newCollector() + c.prm.statisticCallback = collector.Collect + + var prm PrmContainerSetEACL + table := testEaclTable(cid.ID{}) + err := c.ContainerSetEACL(ctx, table, prm) + require.NoError(t, err) + + require.Equal(t, 1, collector.methods[stat.MethodContainerSetEACL].requests) +} + +func TestClientStatistic_ContainerAnnounceUsedSpace(t *testing.T) { + signer := test.RandomSignerRFC6979(t) + ctx := context.Background() + c := newClient(t, signer, nil) + + rpcAPIAnnounceUsedSpace = func(cli *client.Client, req *v2container.AnnounceUsedSpaceRequest, opts ...client.CallOption) (*v2container.PutResponse, error) { + var resp v2container.PutResponse + var meta session.ResponseMetaHeader + var body v2container.PutResponseBody + + resp.SetBody(&body) + resp.SetMetaHeader(&meta) + + if err := signServiceMessage(signer, &resp); err != nil { + panic(fmt.Sprintf("sign response: %v", err)) + } + + return &resp, nil + } + + collector := newCollector() + c.prm.statisticCallback = collector.Collect + + estimation := container.SizeEstimation{} + estimation.SetContainer(*randContainerID()) + estimation.SetValue(mathRand.Uint64()) + estimation.SetEpoch(mathRand.Uint64()) + + var prm PrmAnnounceSpace + err := c.ContainerAnnounceUsedSpace(ctx, []container.SizeEstimation{estimation}, prm) + require.NoError(t, err) + + require.Equal(t, 1, collector.methods[stat.MethodContainerAnnounceUsedSpace].requests) +} + +func TestClientStatistic_ContainerSyncContainerWithNetwork(t *testing.T) { + signer := test.RandomSignerRFC6979(t) + ctx := context.Background() + c := newClient(t, signer, nil) + + rpcAPINetworkInfo = func(cli *client.Client, req *netmapv2.NetworkInfoRequest, opts ...client.CallOption) (*netmapv2.NetworkInfoResponse, error) { + var resp netmapv2.NetworkInfoResponse + var meta session.ResponseMetaHeader + var netInfo netmapv2.NetworkInfo + var netConfig netmapv2.NetworkConfig + var p1 netmapv2.NetworkParameter + + p1.SetKey(randBytes(10)) + p1.SetValue(randBytes(10)) + + netConfig.SetParameters(p1) + netInfo.SetNetworkConfig(&netConfig) + + body := netmapv2.NetworkInfoResponseBody{} + body.SetNetworkInfo(&netInfo) + + resp.SetBody(&body) + resp.SetMetaHeader(&meta) + + if err := signServiceMessage(signer, &resp); err != nil { + panic(fmt.Sprintf("sign response: %v", err)) + } + + return &resp, nil + } + + collector := newCollector() + c.prm.statisticCallback = collector.Collect + + cont := prepareContainer(*randAccount(signer)) + + err := SyncContainerWithNetwork(ctx, &cont, c) + require.NoError(t, err) + + require.Equal(t, 1, collector.methods[stat.MethodNetworkInfo].requests) +} + +func TestClientStatistic_ContainerEndpointInfo(t *testing.T) { + signer := test.RandomSignerRFC6979(t) + ctx := context.Background() + c := newClient(t, signer, nil) + + rpcAPILocalNodeInfo = func(cli *client.Client, req *netmapv2.LocalNodeInfoRequest, opts ...client.CallOption) (*netmapv2.LocalNodeInfoResponse, error) { + var resp netmapv2.LocalNodeInfoResponse + var meta session.ResponseMetaHeader + var ver refs.Version + var nodeInfo netmapv2.NodeInfo + + b := make([]byte, signer.Public().MaxEncodedSize()) + signer.Public().Encode(b) + + nodeInfo.SetPublicKey(b) + nodeInfo.SetAddresses("https://some-endpont.com") + + body := netmapv2.LocalNodeInfoResponseBody{} + body.SetVersion(&ver) + body.SetNodeInfo(&nodeInfo) + + resp.SetBody(&body) + resp.SetMetaHeader(&meta) + + if err := signServiceMessage(signer, &resp); err != nil { + panic(fmt.Sprintf("sign response: %v", err)) + } + + return &resp, nil + } + + collector := newCollector() + c.prm.statisticCallback = collector.Collect + + _, err := c.EndpointInfo(ctx, PrmEndpointInfo{}) + require.NoError(t, err) + + require.Equal(t, 1, collector.methods[stat.MethodEndpointInfo].requests) +} + +func TestClientStatistic_ContainerNetMapSnapshot(t *testing.T) { + signer := test.RandomSignerRFC6979(t) + ctx := context.Background() + c := newClient(t, signer, nil) + + rpcAPINetMapSnapshot = func(cli *client.Client, req *netmapv2.SnapshotRequest, opts ...client.CallOption) (*netmapv2.SnapshotResponse, error) { + var resp netmapv2.SnapshotResponse + var meta session.ResponseMetaHeader + var netMap netmapv2.NetMap + + body := netmapv2.SnapshotResponseBody{} + body.SetNetMap(&netMap) + + resp.SetBody(&body) + resp.SetMetaHeader(&meta) + + if err := signServiceMessage(signer, &resp); err != nil { + panic(fmt.Sprintf("sign response: %v", err)) + } + + return &resp, nil + } + + collector := newCollector() + c.prm.statisticCallback = collector.Collect + c.setNeoFSAPIServer((*coreServer)(&c.c)) + + _, err := c.NetMapSnapshot(ctx, PrmNetMapSnapshot{}) + require.NoError(t, err) + + require.Equal(t, 1, collector.methods[stat.MethodNetMapSnapshot].requests) +} + +func TestClientStatistic_CreateSession(t *testing.T) { + signer := test.RandomSignerRFC6979(t) + ctx := context.Background() + c := newClient(t, signer, nil) + + rpcAPICreateSession = func(cli *client.Client, req *session.CreateRequest, opts ...client.CallOption) (*session.CreateResponse, error) { + var resp session.CreateResponse + var meta session.ResponseMetaHeader + + body := session.CreateResponseBody{} + body.SetID(randBytes(10)) + + b := make([]byte, signer.Public().MaxEncodedSize()) + signer.Public().Encode(b) + body.SetSessionKey(b) + + resp.SetBody(&body) + resp.SetMetaHeader(&meta) + + if err := signServiceMessage(signer, &resp); err != nil { + panic(fmt.Sprintf("sign response: %v", err)) + } + + return &resp, nil + } + + collector := newCollector() + c.prm.statisticCallback = collector.Collect + c.setNeoFSAPIServer((*coreServer)(&c.c)) + + var prm PrmSessionCreate + prm.UseSigner(signer) + + _, err := c.SessionCreate(ctx, prm) + require.NoError(t, err) + + require.Equal(t, 1, collector.methods[stat.MethodSessionCreate].requests) +} + +func TestClientStatistic_ObjectPut(t *testing.T) { + t.Skip("need changes to api-go, to set `wc client.MessageWriterCloser` in rpcapi.PutRequestWriter") + + signer := test.RandomSignerRFC6979(t) + ctx := context.Background() + c := newClient(t, signer, nil) + + rpcAPIPutObject = func(cli *client.Client, pResp *v2object.PutResponse, opts ...client.CallOption) (*rpcapi.PutRequestWriter, error) { + var resp rpcapi.PutRequestWriter + + return &resp, nil + } + + containerID := *randContainerID() + account := randAccount(signer) + + collector := newCollector() + c.prm.statisticCallback = collector.Collect + c.setNeoFSAPIServer((*coreServer)(&c.c)) + + var tokenSession session2.Object + tokenSession.SetID(uuid.New()) + tokenSession.SetExp(1) + tokenSession.BindContainer(containerID) + tokenSession.ForVerb(session2.VerbObjectPut) + tokenSession.SetAuthKey(signer.Public()) + tokenSession.SetIssuer(*account) + + err := tokenSession.Sign(signer) + require.NoError(t, err) + + var prm PrmObjectPutInit + prm.UseSigner(signer) + prm.WithinSession(tokenSession) + + var hdr object.Object + hdr.SetOwnerID(account) + hdr.SetContainerID(containerID) + + writer, err := c.ObjectPutInit(ctx, hdr, prm) + require.NoError(t, err) + + require.True(t, writer.WritePayloadChunk(randBytes(10))) + + _, err = writer.Close() + require.NoError(t, err) + + require.Equal(t, 2, collector.methods[stat.MethodObjectPut].requests) +} + +func TestClientStatistic_ObjectDelete(t *testing.T) { + signer := test.RandomSignerRFC6979(t) + ctx := context.Background() + c := newClient(t, signer, nil) + + rpcAPIDeleteObject = func(cli *client.Client, req *v2object.DeleteRequest, opts ...client.CallOption) (*v2object.DeleteResponse, error) { + var resp v2object.DeleteResponse + var meta session.ResponseMetaHeader + var body v2object.DeleteResponseBody + var addr refs.Address + var objID refs.ObjectID + var contID = randRefsContainerID() + + objID.SetValue(randBytes(32)) + + addr.SetContainerID(contID) + addr.SetObjectID(&objID) + + body.SetTombstone(&addr) + + resp.SetBody(&body) + resp.SetMetaHeader(&meta) + + if err := signServiceMessage(signer, &resp); err != nil { + panic(fmt.Sprintf("sign response: %v", err)) + } + + return &resp, nil + } + + containerID := *randContainerID() + objectID := oid.ID{} + + collector := newCollector() + c.prm.statisticCallback = collector.Collect + + var prm PrmObjectDelete + prm.UseSigner(signer) + + _, err := c.ObjectDelete(ctx, containerID, objectID, prm) + require.NoError(t, err) + + require.Equal(t, 1, collector.methods[stat.MethodObjectDelete].requests) +} + +func TestClientStatistic_ObjectGet(t *testing.T) { + t.Skip("need changes to api-go, to set `r client.MessageReader` in rpcapi.GetResponseReader") + + signer := test.RandomSignerRFC6979(t) + ctx := context.Background() + c := newClient(t, signer, nil) + + rpcAPIGetObject = func(cli *client.Client, req *v2object.GetRequest, opts ...client.CallOption) (*rpcapi.GetResponseReader, error) { + var resp rpcapi.GetResponseReader + + // todo: fill + + return &resp, nil + } + + containerID := *randContainerID() + objectID := oid.ID{} + + collector := newCollector() + c.prm.statisticCallback = collector.Collect + + var prm PrmObjectGet + prm.UseSigner(signer) + + _, reader, err := c.ObjectGetInit(ctx, containerID, objectID, prm) + require.NoError(t, err) + + buff := make([]byte, 32) + _, isOk := reader.ReadChunk(buff) + require.True(t, isOk) + + require.Equal(t, 2, collector.methods[stat.MethodObjectGet].requests) +} + +func TestClientStatistic_ObjectHead(t *testing.T) { + signer := test.RandomSignerRFC6979(t) + ctx := context.Background() + c := newClient(t, signer, nil) + + rpcAPIHeadObject = func(cli *client.Client, req *v2object.HeadRequest, opts ...client.CallOption) (*v2object.HeadResponse, error) { + var resp v2object.HeadResponse + var meta session.ResponseMetaHeader + var body v2object.HeadResponseBody + var headerPart v2object.HeaderWithSignature + + body.SetHeaderPart(&headerPart) + + resp.SetBody(&body) + resp.SetMetaHeader(&meta) + + if err := signServiceMessage(signer, &resp); err != nil { + panic(fmt.Sprintf("sign response: %v", err)) + } + + return &resp, nil + } + + containerID := *randContainerID() + objectID := oid.ID{} + + collector := newCollector() + c.prm.statisticCallback = collector.Collect + + var prm PrmObjectHead + prm.UseSigner(signer) + + _, err := c.ObjectHead(ctx, containerID, objectID, prm) + require.NoError(t, err) + + require.Equal(t, 1, collector.methods[stat.MethodObjectHead].requests) +} + +func TestClientStatistic_ObjectRange(t *testing.T) { + t.Skip("need changes to api-go, to set `r client.MessageReader` in rpcapi.ObjectRangeResponseReader") + + signer := test.RandomSignerRFC6979(t) + ctx := context.Background() + c := newClient(t, signer, nil) + + rpcAPIGetObjectRange = func(cli *client.Client, req *v2object.GetRangeRequest, opts ...client.CallOption) (*rpcapi.ObjectRangeResponseReader, error) { + var resp rpcapi.ObjectRangeResponseReader + + // todo: fill + + return &resp, nil + } + + containerID := *randContainerID() + objectID := oid.ID{} + + collector := newCollector() + c.prm.statisticCallback = collector.Collect + + var prm PrmObjectRange + prm.UseSigner(signer) + + reader, err := c.ObjectRangeInit(ctx, containerID, objectID, 0, 1, prm) + require.NoError(t, err) + + buff := make([]byte, 32) + _, isOk := reader.ReadChunk(buff) + require.True(t, isOk) + + require.Equal(t, 2, collector.methods[stat.MethodObjectRange].requests) +} + +func TestClientStatistic_ObjectHash(t *testing.T) { + signer := test.RandomSignerRFC6979(t) + ctx := context.Background() + c := newClient(t, signer, nil) + + rpcAPIHashObjectRange = func(cli *client.Client, req *v2object.GetRangeHashRequest, opts ...client.CallOption) (*v2object.GetRangeHashResponse, error) { + var resp v2object.GetRangeHashResponse + var meta session.ResponseMetaHeader + var body v2object.GetRangeHashResponseBody + + body.SetHashList([][]byte{ + randBytes(4), + }) + + resp.SetBody(&body) + resp.SetMetaHeader(&meta) + + if err := signServiceMessage(signer, &resp); err != nil { + panic(fmt.Sprintf("sign response: %v", err)) + } + + return &resp, nil + } + + containerID := *randContainerID() + objectID := oid.ID{} + + collector := newCollector() + c.prm.statisticCallback = collector.Collect + + var prm PrmObjectHash + prm.SetRangeList(0, 2) + prm.UseSigner(signer) + + _, err := c.ObjectHash(ctx, containerID, objectID, prm) + require.NoError(t, err) + + require.Equal(t, 1, collector.methods[stat.MethodObjectHash].requests) +} + +func TestClientStatistic_ObjectSearch(t *testing.T) { + t.Skip("need changes to api-go, to set `r client.MessageReader` in rpcapi.SearchResponseReader") + + signer := test.RandomSignerRFC6979(t) + ctx := context.Background() + c := newClient(t, signer, nil) + + rpcAPISearchObjects = func(cli *client.Client, req *v2object.SearchRequest, opts ...client.CallOption) (*rpcapi.SearchResponseReader, error) { + var resp rpcapi.SearchResponseReader + + // todo: fill + + return &resp, nil + } + + containerID := *randContainerID() + + collector := newCollector() + c.prm.statisticCallback = collector.Collect + + var prm PrmObjectSearch + prm.UseSigner(signer) + + reader, err := c.ObjectSearchInit(ctx, containerID, prm) + require.NoError(t, err) + + iterator := func(oid.ID) bool { + return false + } + + err = reader.Iterate(iterator) + require.NoError(t, err) + + require.Equal(t, 2, collector.methods[stat.MethodObjectSearch].requests) +} + +func TestClientStatistic_AnnounceIntermediateTrust(t *testing.T) { + signer := test.RandomSignerRFC6979(t) + ctx := context.Background() + c := newClient(t, signer, nil) + + rpcAPIAnnounceIntermediateResult = func(cli *client.Client, req *reputation.AnnounceIntermediateResultRequest, opts ...client.CallOption) (*reputation.AnnounceIntermediateResultResponse, error) { + var resp reputation.AnnounceIntermediateResultResponse + var meta session.ResponseMetaHeader + var body reputation.AnnounceIntermediateResultResponseBody + + resp.SetBody(&body) + resp.SetMetaHeader(&meta) + + if err := signServiceMessage(signer, &resp); err != nil { + panic(fmt.Sprintf("sign response: %v", err)) + } + + return &resp, nil + } + + collector := newCollector() + c.prm.statisticCallback = collector.Collect + + var trust reputation2.PeerToPeerTrust + var prm PrmAnnounceIntermediateTrust + + err := c.AnnounceIntermediateTrust(ctx, 1, trust, prm) + require.NoError(t, err) + + require.Equal(t, 1, collector.methods[stat.MethodAnnounceIntermediateTrust].requests) +} + +func TestClientStatistic_MethodAnnounceLocalTrust(t *testing.T) { + signer := test.RandomSignerRFC6979(t) + ctx := context.Background() + c := newClient(t, signer, nil) + + rpcAPIAnnounceLocalTrust = func(cli *client.Client, req *reputation.AnnounceLocalTrustRequest, opts ...client.CallOption) (*reputation.AnnounceLocalTrustResponse, error) { + var resp reputation.AnnounceLocalTrustResponse + var meta session.ResponseMetaHeader + var body reputation.AnnounceLocalTrustResponseBody + + resp.SetBody(&body) + resp.SetMetaHeader(&meta) + + if err := signServiceMessage(signer, &resp); err != nil { + panic(fmt.Sprintf("sign response: %v", err)) + } + + return &resp, nil + } + + collector := newCollector() + c.prm.statisticCallback = collector.Collect + + var peer reputation2.PeerID + var trust reputation2.Trust + trust.SetPeer(peer) + + var prm PrmAnnounceLocalTrust + + err := c.AnnounceLocalTrust(ctx, 1, []reputation2.Trust{trust}, prm) + require.NoError(t, err) + + require.Equal(t, 1, collector.methods[stat.MethodAnnounceLocalTrust].requests) +} From 549a23d48adebdf84721dd9d2bbfb0a3982aa6ec Mon Sep 17 00:00:00 2001 From: Evgenii Baidakov Date: Fri, 30 Jun 2023 09:28:06 +0400 Subject: [PATCH 04/10] pool: Remove unused statisticUpdater Signed-off-by: Evgenii Baidakov --- pool/accounting.go | 2 +- pool/container.go | 12 ++++++------ pool/netmap.go | 4 ++-- pool/object.go | 14 +++++++------- pool/pool.go | 8 ++++---- pool/pool_aio_test.go | 8 ++++---- 6 files changed, 24 insertions(+), 24 deletions(-) diff --git a/pool/accounting.go b/pool/accounting.go index 9ff246ce..f6346d92 100644 --- a/pool/accounting.go +++ b/pool/accounting.go @@ -11,7 +11,7 @@ import ( // // See details in [client.Client.BalanceGet]. func (p *Pool) BalanceGet(ctx context.Context, prm client.PrmBalanceGet) (accounting.Decimal, error) { - c, _, err := p.sdkClient() + c, err := p.sdkClient() if err != nil { return accounting.Decimal{}, err } diff --git a/pool/container.go b/pool/container.go index d858a290..956d98de 100644 --- a/pool/container.go +++ b/pool/container.go @@ -14,7 +14,7 @@ import ( // // See details in [client.Client.ContainerPut]. func (p *Pool) ContainerPut(ctx context.Context, cont container.Container, prm client.PrmContainerPut) (cid.ID, error) { - c, _, err := p.sdkClient() + c, err := p.sdkClient() if err != nil { return cid.ID{}, err } @@ -26,7 +26,7 @@ func (p *Pool) ContainerPut(ctx context.Context, cont container.Container, prm c // // See details in [client.Client.ContainerGet]. func (p *Pool) ContainerGet(ctx context.Context, id cid.ID, prm client.PrmContainerGet) (container.Container, error) { - c, _, err := p.sdkClient() + c, err := p.sdkClient() if err != nil { return container.Container{}, err } @@ -38,7 +38,7 @@ func (p *Pool) ContainerGet(ctx context.Context, id cid.ID, prm client.PrmContai // // See details in [client.Client.ContainerList]. func (p *Pool) ContainerList(ctx context.Context, ownerID user.ID, prm client.PrmContainerList) ([]cid.ID, error) { - c, _, err := p.sdkClient() + c, err := p.sdkClient() if err != nil { return []cid.ID{}, err } @@ -50,7 +50,7 @@ func (p *Pool) ContainerList(ctx context.Context, ownerID user.ID, prm client.Pr // // See details in [client.Client.ContainerDelete]. func (p *Pool) ContainerDelete(ctx context.Context, id cid.ID, prm client.PrmContainerDelete) error { - c, _, err := p.sdkClient() + c, err := p.sdkClient() if err != nil { return err } @@ -62,7 +62,7 @@ func (p *Pool) ContainerDelete(ctx context.Context, id cid.ID, prm client.PrmCon // // See details in [client.Client.ContainerEACL]. func (p *Pool) ContainerEACL(ctx context.Context, id cid.ID, prm client.PrmContainerEACL) (eacl.Table, error) { - c, _, err := p.sdkClient() + c, err := p.sdkClient() if err != nil { return eacl.Table{}, err } @@ -74,7 +74,7 @@ func (p *Pool) ContainerEACL(ctx context.Context, id cid.ID, prm client.PrmConta // // See details in [client.Client.ContainerSetEACL]. func (p *Pool) ContainerSetEACL(ctx context.Context, table eacl.Table, prm client.PrmContainerSetEACL) error { - c, _, err := p.sdkClient() + c, err := p.sdkClient() if err != nil { return err } diff --git a/pool/netmap.go b/pool/netmap.go index fe773a36..6c2699c5 100644 --- a/pool/netmap.go +++ b/pool/netmap.go @@ -11,7 +11,7 @@ import ( // // See details in [client.Client.NetworkInfo]. func (p *Pool) NetworkInfo(ctx context.Context, prm client.PrmNetworkInfo) (netmap.NetworkInfo, error) { - c, _, err := p.sdkClient() + c, err := p.sdkClient() if err != nil { return netmap.NetworkInfo{}, err } @@ -23,7 +23,7 @@ func (p *Pool) NetworkInfo(ctx context.Context, prm client.PrmNetworkInfo) (netm // // See details in [client.Client.NetMapSnapshot]. func (p *Pool) NetMapSnapshot(ctx context.Context, prm client.PrmNetMapSnapshot) (netmap.NetMap, error) { - c, _, err := p.sdkClient() + c, err := p.sdkClient() if err != nil { return netmap.NetMap{}, err } diff --git a/pool/object.go b/pool/object.go index 6a9d7fdf..bed31bec 100644 --- a/pool/object.go +++ b/pool/object.go @@ -29,7 +29,7 @@ func (p *Pool) actualSigner(signer neofscrypto.Signer) neofscrypto.Signer { // // See details in [client.Client.ObjectPutInit]. func (p *Pool) ObjectPutInit(ctx context.Context, hdr object.Object, prm client.PrmObjectPutInit) (*client.ObjectWriter, error) { - c, _, err := p.sdkClient() + c, err := p.sdkClient() if err != nil { return nil, err } @@ -58,7 +58,7 @@ func (p *Pool) ObjectPutInit(ctx context.Context, hdr object.Object, prm client. // See details in [client.Client.ObjectGetInit]. func (p *Pool) ObjectGetInit(ctx context.Context, containerID cid.ID, objectID oid.ID, prm client.PrmObjectGet) (object.Object, *client.ObjectReader, error) { var hdr object.Object - c, _, err := p.sdkClient() + c, err := p.sdkClient() if err != nil { return hdr, nil, err } @@ -80,7 +80,7 @@ func (p *Pool) ObjectGetInit(ctx context.Context, containerID cid.ID, objectID o // // See details in [client.Client.ObjectHead]. func (p *Pool) ObjectHead(ctx context.Context, containerID cid.ID, objectID oid.ID, prm client.PrmObjectHead) (*client.ResObjectHead, error) { - c, _, err := p.sdkClient() + c, err := p.sdkClient() if err != nil { return nil, err } @@ -102,7 +102,7 @@ func (p *Pool) ObjectHead(ctx context.Context, containerID cid.ID, objectID oid. // // See details in [client.Client.ObjectRangeInit]. func (p *Pool) ObjectRangeInit(ctx context.Context, containerID cid.ID, objectID oid.ID, offset, length uint64, prm client.PrmObjectRange) (*client.ObjectRangeReader, error) { - c, _, err := p.sdkClient() + c, err := p.sdkClient() if err != nil { return nil, err } @@ -124,7 +124,7 @@ func (p *Pool) ObjectRangeInit(ctx context.Context, containerID cid.ID, objectID // // See details in [client.Client.ObjectDelete]. func (p *Pool) ObjectDelete(ctx context.Context, containerID cid.ID, objectID oid.ID, prm client.PrmObjectDelete) (oid.ID, error) { - c, _, err := p.sdkClient() + c, err := p.sdkClient() if err != nil { return oid.ID{}, err } @@ -146,7 +146,7 @@ func (p *Pool) ObjectDelete(ctx context.Context, containerID cid.ID, objectID oi // // See details in [client.Client.ObjectHash]. func (p *Pool) ObjectHash(ctx context.Context, containerID cid.ID, objectID oid.ID, prm client.PrmObjectHash) ([][]byte, error) { - c, _, err := p.sdkClient() + c, err := p.sdkClient() if err != nil { return [][]byte{}, err } @@ -168,7 +168,7 @@ func (p *Pool) ObjectHash(ctx context.Context, containerID cid.ID, objectID oid. // // See details in [client.Client.ObjectSearchInit]. func (p *Pool) ObjectSearchInit(ctx context.Context, containerID cid.ID, prm client.PrmObjectSearch) (*client.ObjectListReader, error) { - c, _, err := p.sdkClient() + c, err := p.sdkClient() if err != nil { return nil, err } diff --git a/pool/pool.go b/pool/pool.go index 77d3c8de..521bc22b 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -2573,18 +2573,18 @@ func (p *Pool) FindSiblingByParentID(ctx context.Context, cnrID cid.ID, objID oi return res, nil } -func (p *Pool) sdkClient() (*sdkClient.Client, statisticUpdater, error) { +func (p *Pool) sdkClient() (*sdkClient.Client, error) { conn, err := p.connection() if err != nil { - return nil, nil, fmt.Errorf("connection: %w", err) + return nil, fmt.Errorf("connection: %w", err) } cl, err := conn.getClient() if err != nil { - return nil, nil, fmt.Errorf("get client: %w", err) + return nil, fmt.Errorf("get client: %w", err) } - return cl, conn, nil + return cl, nil } func (p *Pool) statisticMiddleware(nodeKey []byte, endpoint string, method stat.Method, duration time.Duration, err error) { diff --git a/pool/pool_aio_test.go b/pool/pool_aio_test.go index 09b977af..d7759312 100644 --- a/pool/pool_aio_test.go +++ b/pool/pool_aio_test.go @@ -162,7 +162,7 @@ func TestPoolInterfaceWithAIO(t *testing.T) { defer cancel() containerID = testCreateContainer(t, ctxTimeout, signer, cont, pool) - cl, _, err := pool.sdkClient() + cl, err := pool.sdkClient() require.NoError(t, err) require.NoError(t, isBucketCreated(ctxTimeout, cl, containerID)) @@ -175,7 +175,7 @@ func TestPoolInterfaceWithAIO(t *testing.T) { defer cancel() table := testSetEacl(t, ctxTimeout, signer, eaclTable, pool) - cl, _, err := pool.sdkClient() + cl, err := pool.sdkClient() require.NoError(t, err) require.NoError(t, isEACLCreated(ctxTimeout, cl, containerID, table)) @@ -218,7 +218,7 @@ func TestPoolInterfaceWithAIO(t *testing.T) { defer cancel() testDeleteObject(t, ctxTimeout, signer, containerID, objectID, pool) - cl, _, err := pool.sdkClient() + cl, err := pool.sdkClient() require.NoError(t, err) require.NoError(t, isObjectDeleted(ctxTimeout, cl, containerID, objectID)) @@ -229,7 +229,7 @@ func TestPoolInterfaceWithAIO(t *testing.T) { defer cancel() testDeleteContainer(t, ctxTimeout, signer, containerID, pool) - cl, _, err := pool.sdkClient() + cl, err := pool.sdkClient() require.NoError(t, err) require.NoError(t, isBucketDeleted(ctxTimeout, cl, containerID)) From a2709185045a24fb4b8b73d6aee3e2e55046e36a Mon Sep 17 00:00:00 2001 From: Evgenii Baidakov Date: Fri, 30 Jun 2023 11:23:14 +0400 Subject: [PATCH 05/10] stat: Separate exec commands stats with stream operation in these commands Signed-off-by: Evgenii Baidakov --- client/object_get.go | 2 +- client/object_search.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/client/object_get.go b/client/object_get.go index 9e159ead..0db1e0e1 100644 --- a/client/object_get.go +++ b/client/object_get.go @@ -377,7 +377,7 @@ func (c *Client) ObjectGetInit(ctx context.Context, containerID cid.ID, objectID r.stream = stream r.client = c r.statisticCallback = func(err error) { - c.sendStatistic(stat.MethodObjectGet, err) + c.sendStatistic(stat.MethodObjectGetStream, err) } if !r.readHeader(&hdr) { diff --git a/client/object_search.go b/client/object_search.go index 1754dc34..a3ddbf7c 100644 --- a/client/object_search.go +++ b/client/object_search.go @@ -283,7 +283,7 @@ func (c *Client) ObjectSearchInit(ctx context.Context, containerID cid.ID, prm P } r.client = c r.statisticCallback = func(err error) { - c.sendStatistic(stat.MethodObjectSearch, err)() + c.sendStatistic(stat.MethodObjectSearchStream, err)() } return &r, nil From 95e6055c576af6c4beb95f6dd9872606f55ee2d1 Mon Sep 17 00:00:00 2001 From: Evgenii Baidakov Date: Fri, 30 Jun 2023 14:46:18 +0400 Subject: [PATCH 06/10] stat: Extract methods statistic from pool Signed-off-by: Evgenii Baidakov --- pool/mock_test.go | 4 -- pool/pool.go | 68 ------------------------------ pool/pool_aio_test.go | 37 ++++++++++------ stat/monitor.go | 81 +++++++++++++++++++++++++++++++++++ stat/pool.go | 84 +++++++++++++++++++++++++++++++++++++ stat/stat.go | 5 +++ {pool => stat}/statistic.go | 53 ++++++++++++----------- 7 files changed, 222 insertions(+), 110 deletions(-) create mode 100644 stat/monitor.go create mode 100644 stat/pool.go rename {pool => stat}/statistic.go (78%) diff --git a/pool/mock_test.go b/pool/mock_test.go index d5d4ee46..b0a1e6f8 100644 --- a/pool/mock_test.go +++ b/pool/mock_test.go @@ -3,7 +3,6 @@ package pool import ( "context" "errors" - "time" "github.com/google/uuid" sessionv2 "github.com/nspcc-dev/neofs-api-go/v2/session" @@ -200,6 +199,3 @@ func (m *mockClient) restartIfUnhealthy(ctx context.Context) (healthy bool, chan func (m *mockClient) getClient() (*sdkClient.Client, error) { return nil, errors.New("now supported to return sdkClient from mockClient") } - -func (m *mockClient) incRequests(_ time.Duration, _ MethodIndex) { -} diff --git a/pool/pool.go b/pool/pool.go index 521bc22b..cd631342 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -83,7 +83,6 @@ type internalClient interface { type statisticUpdater interface { updateErrorRate(err error) - incRequests(elapsed time.Duration, method MethodIndex) } // clientStatus provide access to some metrics for connection. @@ -100,8 +99,6 @@ type clientStatus interface { currentErrorRate() uint32 // overallErrorRate returns the number of all happened errors. overallErrorRate() uint64 - // methodsStatus returns statistic for all used methods. - methodsStatus() []statusSnapshot } // errPoolClientUnhealthy is an error to indicate that client in pool is unhealthy. @@ -116,20 +113,6 @@ type clientStatusMonitor struct { mu sync.RWMutex // protect counters currentErrorCount uint32 overallErrorCount uint64 - methods []*methodStatus -} - -// methodStatus provide statistic for specific method. -type methodStatus struct { - name string - mu sync.RWMutex // protect counters - statusSnapshot -} - -// statusSnapshot is statistic for specific method. -type statusSnapshot struct { - allTime uint64 - allRequests uint64 } // MethodIndex index of method in list of statuses in clientStatusMonitor. @@ -213,32 +196,13 @@ func (m MethodIndex) String() string { } func newClientStatusMonitor(addr string, errorThreshold uint32) clientStatusMonitor { - methods := make([]*methodStatus, methodLast) - for i := methodBalanceGet; i < methodLast; i++ { - methods[i] = &methodStatus{name: i.String()} - } - return clientStatusMonitor{ addr: addr, healthy: atomic.NewBool(true), errorThreshold: errorThreshold, - methods: methods, } } -func (m *methodStatus) snapshot() statusSnapshot { - m.mu.RLock() - defer m.mu.RUnlock() - return m.statusSnapshot -} - -func (m *methodStatus) incRequests(elapsed time.Duration) { - m.mu.Lock() - defer m.mu.Unlock() - m.allTime += uint64(elapsed) - m.allRequests++ -} - // clientWrapper is used by default, alternative implementations are intended for testing purposes only. type clientWrapper struct { clientMutex sync.RWMutex @@ -942,18 +906,7 @@ func (c *clientStatusMonitor) overallErrorRate() uint64 { return c.overallErrorCount } -func (c *clientStatusMonitor) methodsStatus() []statusSnapshot { - result := make([]statusSnapshot, len(c.methods)) - for i, val := range c.methods { - result[i] = val.snapshot() - } - - return result -} - func (c *clientWrapper) incRequests(elapsed time.Duration, method MethodIndex) { - methodStat := c.methods[method] - methodStat.incRequests(elapsed) if c.prm.poolRequestInfoCallback != nil { c.prm.poolRequestInfoCallback(RequestInfo{ Address: c.prm.address, @@ -2367,27 +2320,6 @@ func (p *Pool) Balance(ctx context.Context, prm PrmBalanceGet) (accounting.Decim return cp.balanceGet(ctx, prm) } -// Statistic returns connection statistics. -func (p Pool) Statistic() Statistic { - stat := Statistic{} - for _, inner := range p.innerPools { - inner.lock.RLock() - for _, cl := range inner.clients { - node := NodeStatistic{ - address: cl.address(), - methods: cl.methodsStatus(), - overallErrors: cl.overallErrorRate(), - currentErrors: cl.currentErrorRate(), - } - stat.nodes = append(stat.nodes, node) - stat.overallErrors += node.overallErrors - } - inner.lock.RUnlock() - } - - return stat -} - // waitForContainerPresence waits until the container is found on the NeoFS network. func waitForContainerPresence(ctx context.Context, cli internalClient, cnrID cid.ID, waitParams *WaitParams) error { return waitFor(ctx, waitParams, func(ctx context.Context) bool { diff --git a/pool/pool_aio_test.go b/pool/pool_aio_test.go index d7759312..034b4167 100644 --- a/pool/pool_aio_test.go +++ b/pool/pool_aio_test.go @@ -27,6 +27,7 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/session" + "github.com/nspcc-dev/neofs-sdk-go/stat" "github.com/nspcc-dev/neofs-sdk-go/user" "github.com/nspcc-dev/neofs-sdk-go/waiter" "github.com/stretchr/testify/require" @@ -115,7 +116,11 @@ func TestPoolInterfaceWithAIO(t *testing.T) { nodeAddr := "grpc://localhost:8080" - pool, err := New(NewFlatNodeParams([]string{nodeAddr}), signer, DefaultOptions()) + poolStat := stat.NewPoolStatistic() + opts := DefaultOptions() + opts.SetStatisticCallback(poolStat.OperationCallback) + + pool, err := New(NewFlatNodeParams([]string{nodeAddr}), signer, opts) require.NoError(t, err) require.NoError(t, pool.Dial(ctx)) @@ -132,12 +137,15 @@ func TestPoolInterfaceWithAIO(t *testing.T) { _, err = pool.BalanceGet(ctx, cmd) require.NoError(t, err) - stat := pool.Statistic() - nodeStat, err := stat.Node(nodeAddr) + st := poolStat.Statistic() + nodeStat, err := st.Node(nodeAddr) + require.NoError(t, err) + + snap, err := nodeStat.Snapshot(stat.MethodBalanceGet) require.NoError(t, err) - require.Equal(t, uint64(1), nodeStat.methods[methodBalanceGet].allRequests) - require.Greater(t, nodeStat.methods[methodBalanceGet].allTime, uint64(0)) + require.Equal(t, uint64(1), snap.AllRequests()) + require.Greater(t, snap.AllTime(), uint64(0)) }) t.Run("balance err", func(t *testing.T) { @@ -148,13 +156,16 @@ func TestPoolInterfaceWithAIO(t *testing.T) { _, err = pool.BalanceGet(ctx, cmd) require.Error(t, err) - stat := pool.Statistic() - nodeStat, err := stat.Node(nodeAddr) + st := poolStat.Statistic() + nodeStat, err := st.Node(nodeAddr) + require.NoError(t, err) + + snap, err := nodeStat.Snapshot(stat.MethodBalanceGet) require.NoError(t, err) - require.Equal(t, uint32(1), nodeStat.currentErrors) - require.Equal(t, uint64(2), nodeStat.methods[methodBalanceGet].allRequests) - require.Greater(t, nodeStat.methods[methodBalanceGet].allTime, uint64(0)) + require.Equal(t, uint64(1), nodeStat.OverallErrors()) + require.Equal(t, uint64(2), snap.AllRequests()) + require.Greater(t, snap.AllTime(), uint64(0)) }) t.Run("create container", func(t *testing.T) { @@ -297,7 +308,7 @@ func TestPoolWaiterWithAIO(t *testing.T) { } var hdr object.Object - object.InitCreation(&hdr, rf) + hdr.InitCreation(rf) var prm client.PrmObjectPutInit prm.UseSigner(signer) @@ -455,7 +466,7 @@ func TestClientWaiterWithAIO(t *testing.T) { } var hdr object.Object - object.InitCreation(&hdr, rf) + hdr.InitCreation(rf) var prm client.PrmObjectPutInit prm.UseSigner(signer) @@ -529,7 +540,7 @@ func testObjectPutInit(t *testing.T, ctx context.Context, account user.ID, conta } var hdr object.Object - object.InitCreation(&hdr, rf) + hdr.InitCreation(rf) var prm client.PrmObjectPutInit prm.UseSigner(signer) diff --git a/stat/monitor.go b/stat/monitor.go new file mode 100644 index 00000000..a5f5b604 --- /dev/null +++ b/stat/monitor.go @@ -0,0 +1,81 @@ +package stat + +import ( + "sync" + "time" +) + +// methodStatus provide statistic for specific method. +type methodStatus struct { + name string + mu sync.RWMutex // protect counters + Snapshot +} + +func (m *methodStatus) snapshot() Snapshot { + m.mu.RLock() + defer m.mu.RUnlock() + return m.Snapshot +} + +func (m *methodStatus) incRequests(elapsed time.Duration) { + m.mu.Lock() + defer m.mu.Unlock() + m.allTime += uint64(elapsed) + m.allRequests++ +} + +// Snapshot represents statistic for specific method. +type Snapshot struct { + allTime uint64 + allRequests uint64 +} + +// AllTime returns sum of time, spent to specific request. Use with [time.Duration] to get human-readable value. +func (s Snapshot) AllTime() uint64 { + return s.allTime +} + +// AllRequests returns amount of requests to node. +func (s Snapshot) AllRequests() uint64 { + return s.allRequests +} + +type nodeMonitor struct { + pubKey []byte + addr string + errorThreshold uint32 + methods []*methodStatus + + mu sync.RWMutex // protect counters + overallErrorCount uint64 +} + +func (c *nodeMonitor) incErrorRate() { + c.mu.Lock() + defer c.mu.Unlock() + c.overallErrorCount++ +} + +func (c *nodeMonitor) overallErrorRate() uint64 { + c.mu.RLock() + defer c.mu.RUnlock() + return c.overallErrorCount +} + +func (c *nodeMonitor) methodsStatus() []Snapshot { + result := make([]Snapshot, len(c.methods)) + for i, val := range c.methods { + result[i] = val.snapshot() + } + + return result +} + +func (c *nodeMonitor) address() string { + return c.addr +} + +func (c *nodeMonitor) publicKey() []byte { + return c.pubKey +} diff --git a/stat/pool.go b/stat/pool.go new file mode 100644 index 00000000..58b4ebfc --- /dev/null +++ b/stat/pool.go @@ -0,0 +1,84 @@ +package stat + +import ( + "encoding/hex" + "sync" + "time" +) + +// PoolStat is an external statistic for pool connections. +type PoolStat struct { + errorThreshold uint32 + + mu sync.RWMutex // protects nodeMonitor's map + monitors map[string]*nodeMonitor +} + +// NewPoolStatistic is a constructor for [PoolStat]. +func NewPoolStatistic() *PoolStat { + return &PoolStat{ + mu: sync.RWMutex{}, + monitors: make(map[string]*nodeMonitor), + } +} + +// OperationCallback implements [stat.OperationCallback]. +func (s *PoolStat) OperationCallback(nodeKey []byte, endpoint string, method Method, duration time.Duration, err error) { + if len(nodeKey) == 0 { + // situation when we initialize the client connection and make first EndpointInfo call. + return + } + + if !IsMethodValid(method) { + return + } + + k := hex.EncodeToString(nodeKey) + + s.mu.Lock() + mon, ok := s.monitors[k] + if !ok { + methods := make([]*methodStatus, MethodLast) + for i := MethodBalanceGet; i < MethodLast; i++ { + methods[i] = &methodStatus{name: i.String()} + } + + mon = &nodeMonitor{ + addr: endpoint, + mu: sync.RWMutex{}, + methods: methods, + errorThreshold: s.errorThreshold, + } + + s.monitors[k] = mon + } + s.mu.Unlock() + + if duration > 0 { + mon.methods[method].incRequests(duration) + } + + if err != nil { + mon.incErrorRate() + } +} + +// Statistic returns connection statistics. +func (s *PoolStat) Statistic() Statistic { + stat := Statistic{} + + s.mu.RLock() + for _, mon := range s.monitors { + node := NodeStatistic{ + publicKey: mon.publicKey(), + address: mon.address(), + methods: mon.methodsStatus(), + overallErrors: mon.overallErrorRate(), + } + stat.nodes = append(stat.nodes, node) + stat.overallErrors += node.overallErrors + } + s.mu.RUnlock() + + return stat +} diff --git a/stat/stat.go b/stat/stat.go index 8a9b20e2..aa9df980 100644 --- a/stat/stat.go +++ b/stat/stat.go @@ -102,6 +102,11 @@ func (m Method) String() string { } } +// IsMethodValid check range of passed Method variable. +func IsMethodValid(m Method) bool { + return m >= 0 && m < MethodLast +} + type ( // OperationCallback describes common interface to external statistic collection. // diff --git a/pool/statistic.go b/stat/statistic.go similarity index 78% rename from pool/statistic.go rename to stat/statistic.go index 3a4f4242..28296573 100644 --- a/pool/statistic.go +++ b/stat/statistic.go @@ -1,4 +1,4 @@ -package pool +package stat import ( "errors" @@ -38,10 +38,19 @@ func (s Statistic) Node(address string) (*NodeStatistic, error) { // NodeStatistic is metrics of certain connections. type NodeStatistic struct { + publicKey []byte address string - methods []statusSnapshot + methods []Snapshot overallErrors uint64 - currentErrors uint32 +} + +// Snapshot returns snapshot statistic for method. +func (n NodeStatistic) Snapshot(method Method) (Snapshot, error) { + if !IsMethodValid(method) { + return Snapshot{}, errors.New("invalid method") + } + + return n.methods[method], nil } // OverallErrors returns all errors on current node. @@ -50,12 +59,6 @@ func (n NodeStatistic) OverallErrors() uint64 { return n.overallErrors } -// CurrentErrors returns errors on current node. -// This value is always less than 'errorThreshold' from InitParameters. -func (n NodeStatistic) CurrentErrors() uint32 { - return n.currentErrors -} - // Requests returns number of requests. func (n NodeStatistic) Requests() (requests uint64) { for _, val := range n.methods { @@ -71,80 +74,80 @@ func (n NodeStatistic) Address() string { // AverageGetBalance returns average time to perform BalanceGet request. func (n NodeStatistic) AverageGetBalance() time.Duration { - return n.averageTime(methodBalanceGet) + return n.averageTime(MethodBalanceGet) } // AveragePutContainer returns average time to perform ContainerPut request. func (n NodeStatistic) AveragePutContainer() time.Duration { - return n.averageTime(methodContainerPut) + return n.averageTime(MethodContainerPut) } // AverageGetContainer returns average time to perform ContainerGet request. func (n NodeStatistic) AverageGetContainer() time.Duration { - return n.averageTime(methodContainerGet) + return n.averageTime(MethodContainerGet) } // AverageListContainer returns average time to perform ContainerList request. func (n NodeStatistic) AverageListContainer() time.Duration { - return n.averageTime(methodContainerList) + return n.averageTime(MethodContainerList) } // AverageDeleteContainer returns average time to perform ContainerDelete request. func (n NodeStatistic) AverageDeleteContainer() time.Duration { - return n.averageTime(methodContainerDelete) + return n.averageTime(MethodContainerDelete) } // AverageGetContainerEACL returns average time to perform ContainerEACL request. func (n NodeStatistic) AverageGetContainerEACL() time.Duration { - return n.averageTime(methodContainerEACL) + return n.averageTime(MethodContainerEACL) } // AverageSetContainerEACL returns average time to perform ContainerSetEACL request. func (n NodeStatistic) AverageSetContainerEACL() time.Duration { - return n.averageTime(methodContainerSetEACL) + return n.averageTime(MethodContainerSetEACL) } // AverageEndpointInfo returns average time to perform EndpointInfo request. func (n NodeStatistic) AverageEndpointInfo() time.Duration { - return n.averageTime(methodEndpointInfo) + return n.averageTime(MethodEndpointInfo) } // AverageNetworkInfo returns average time to perform NetworkInfo request. func (n NodeStatistic) AverageNetworkInfo() time.Duration { - return n.averageTime(methodNetworkInfo) + return n.averageTime(MethodNetworkInfo) } // AveragePutObject returns average time to perform ObjectPut request. func (n NodeStatistic) AveragePutObject() time.Duration { - return n.averageTime(methodObjectPut) + return n.averageTime(MethodObjectPut) } // AverageDeleteObject returns average time to perform ObjectDelete request. func (n NodeStatistic) AverageDeleteObject() time.Duration { - return n.averageTime(methodObjectDelete) + return n.averageTime(MethodObjectDelete) } // AverageGetObject returns average time to perform ObjectGet request. func (n NodeStatistic) AverageGetObject() time.Duration { - return n.averageTime(methodObjectGet) + return n.averageTime(MethodObjectGet) } // AverageHeadObject returns average time to perform ObjectHead request. func (n NodeStatistic) AverageHeadObject() time.Duration { - return n.averageTime(methodObjectHead) + return n.averageTime(MethodObjectHead) } // AverageRangeObject returns average time to perform ObjectRange request. func (n NodeStatistic) AverageRangeObject() time.Duration { - return n.averageTime(methodObjectRange) + return n.averageTime(MethodObjectRange) } // AverageCreateSession returns average time to perform SessionCreate request. func (n NodeStatistic) AverageCreateSession() time.Duration { - return n.averageTime(methodSessionCreate) + return n.averageTime(MethodSessionCreate) } -func (n NodeStatistic) averageTime(method MethodIndex) time.Duration { +func (n NodeStatistic) averageTime(method Method) time.Duration { stat := n.methods[method] if stat.allRequests == 0 { return 0 From 94a129e987880b06915c2bdaf41af309c638ccf0 Mon Sep 17 00:00:00 2001 From: Evgenii Baidakov Date: Fri, 30 Jun 2023 15:47:16 +0400 Subject: [PATCH 07/10] stat: Statistic concurrency tests Signed-off-by: Evgenii Baidakov --- stat/pool_test.go | 72 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) create mode 100644 stat/pool_test.go diff --git a/stat/pool_test.go b/stat/pool_test.go new file mode 100644 index 00000000..81d63b44 --- /dev/null +++ b/stat/pool_test.go @@ -0,0 +1,72 @@ +package stat + +import ( + "errors" + "math/rand" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.uber.org/atomic" +) + +func TestStatisticConcurrency(t *testing.T) { + ps := NewPoolStatistic() + wg := sync.WaitGroup{} + + type config struct { + method Method + modeKey []byte + addr string + errors *atomic.Int64 + } + + addrNode1 := "node1" + addrNode2 := "node2" + addrNode3 := "node3" + + counterNode1 := &atomic.Int64{} + counterNode2 := &atomic.Int64{} + counterNode3 := &atomic.Int64{} + + configs := []*config{ + {method: MethodContainerDelete, modeKey: []byte{1}, addr: addrNode1, errors: counterNode1}, + {method: MethodContainerDelete, modeKey: []byte{2}, addr: addrNode2, errors: counterNode2}, + {method: MethodContainerList, modeKey: []byte{1}, addr: addrNode1, errors: counterNode1}, + {method: MethodContainerList, modeKey: []byte{2}, addr: addrNode2, errors: counterNode2}, + {method: MethodObjectRange, modeKey: []byte{3}, addr: addrNode3, errors: counterNode3}, + {method: MethodLast - 1, modeKey: []byte{3}, addr: addrNode3, errors: counterNode3}, + } + + wg.Add(len(configs)) + n := 30000 + + for _, s := range configs { + go func(c *config) { + defer wg.Done() + + for i := 0; i < n; i++ { + var err error + if rand.Int63n(2) > 0 { + err = errors.New("some err") + c.errors.Inc() + } + + duration := time.Duration(rand.Int63n(200)+1) * time.Millisecond + + ps.OperationCallback(c.modeKey, c.addr, c.method, duration, err) + } + }(s) + } + + wg.Wait() + + for _, s := range configs { + node, err := ps.Statistic().Node(s.addr) + require.NoError(t, err) + + require.Equal(t, uint64(s.errors.Load()), node.OverallErrors()) + require.Equal(t, uint64(n*2), node.Requests(), s.addr) + } +} From c29c89a68520757fdd93683fcc28c0a5fdfb1966 Mon Sep 17 00:00:00 2001 From: Evgenii Baidakov Date: Fri, 30 Jun 2023 16:23:36 +0400 Subject: [PATCH 08/10] pool: Remove poolRequestInfoCallback and incRequests func Signed-off-by: Evgenii Baidakov --- pool/pool.go | 95 +++++++++------------------------------------------- 1 file changed, 15 insertions(+), 80 deletions(-) diff --git a/pool/pool.go b/pool/pool.go index cd631342..46e6a1ce 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -215,14 +215,13 @@ type clientWrapper struct { // wrapperPrm is params to create clientWrapper. type wrapperPrm struct { - address string - signer neofscrypto.Signer - dialTimeout time.Duration - streamTimeout time.Duration - errorThreshold uint32 - responseInfoCallback func(sdkClient.ResponseMetaInfo) error - poolRequestInfoCallback func(RequestInfo) - statisticCallback stat.OperationCallback + address string + signer neofscrypto.Signer + dialTimeout time.Duration + streamTimeout time.Duration + errorThreshold uint32 + responseInfoCallback func(sdkClient.ResponseMetaInfo) error + statisticCallback stat.OperationCallback } // setAddress sets endpoint to connect in NeoFS network. @@ -251,11 +250,6 @@ func (x *wrapperPrm) setErrorThreshold(threshold uint32) { x.errorThreshold = threshold } -// setPoolRequestCallback sets callback that will be invoked after every pool response. -func (x *wrapperPrm) setPoolRequestCallback(f func(RequestInfo)) { - x.poolRequestInfoCallback = f -} - // setResponseInfoCallback sets callback that will be invoked after every response. func (x *wrapperPrm) setResponseInfoCallback(f func(sdkClient.ResponseMetaInfo) error) { x.responseInfoCallback = f @@ -296,7 +290,6 @@ func newWrapper(prm wrapperPrm) (*clientWrapper, error) { } func (c *clientWrapper) statisticMiddleware(nodeKey []byte, endpoint string, method stat.Method, duration time.Duration, err error) { - c.incRequests(duration, MethodIndex(method)) c.updateErrorRate(err) if c.statisticCallback != nil { @@ -386,9 +379,7 @@ func (c *clientWrapper) balanceGet(ctx context.Context, prm PrmBalanceGet) (acco var cliPrm sdkClient.PrmBalanceGet cliPrm.SetAccount(prm.account) - start := time.Now() res, err := cl.BalanceGet(ctx, cliPrm) - c.incRequests(time.Since(start), methodBalanceGet) c.updateErrorRate(err) if err != nil { return accounting.Decimal{}, fmt.Errorf("balance get on client: %w", err) @@ -405,9 +396,7 @@ func (c *clientWrapper) containerPut(ctx context.Context, cont container.Contain return cid.ID{}, err } - start := time.Now() idCnr, err := cl.ContainerPut(ctx, cont, prm.prmClient) - c.incRequests(time.Since(start), methodContainerPut) c.updateErrorRate(err) if err != nil { return cid.ID{}, fmt.Errorf("container put on client: %w", err) @@ -433,9 +422,7 @@ func (c *clientWrapper) containerGet(ctx context.Context, cnrID cid.ID) (contain return container.Container{}, err } - start := time.Now() res, err := cl.ContainerGet(ctx, cnrID, sdkClient.PrmContainerGet{}) - c.incRequests(time.Since(start), methodContainerGet) c.updateErrorRate(err) if err != nil { return container.Container{}, fmt.Errorf("container get on client: %w", err) @@ -451,9 +438,7 @@ func (c *clientWrapper) containerList(ctx context.Context, ownerID user.ID) ([]c return nil, err } - start := time.Now() res, err := cl.ContainerList(ctx, ownerID, sdkClient.PrmContainerList{}) - c.incRequests(time.Since(start), methodContainerList) c.updateErrorRate(err) if err != nil { return nil, fmt.Errorf("container list on client: %w", err) @@ -474,9 +459,7 @@ func (c *clientWrapper) containerDelete(ctx context.Context, id cid.ID, prm PrmC cliPrm.WithinSession(prm.stoken) } - start := time.Now() err = cl.ContainerDelete(ctx, id, cliPrm) - c.incRequests(time.Since(start), methodContainerDelete) c.updateErrorRate(err) if err != nil { return fmt.Errorf("container delete on client: %w", err) @@ -496,9 +479,7 @@ func (c *clientWrapper) containerEACL(ctx context.Context, id cid.ID) (eacl.Tabl return eacl.Table{}, err } - start := time.Now() res, err := cl.ContainerEACL(ctx, id, sdkClient.PrmContainerEACL{}) - c.incRequests(time.Since(start), methodContainerEACL) c.updateErrorRate(err) if err != nil { return eacl.Table{}, fmt.Errorf("get eacl on client: %w", err) @@ -520,9 +501,7 @@ func (c *clientWrapper) containerSetEACL(ctx context.Context, table eacl.Table, cliPrm.WithinSession(prm.session) } - start := time.Now() err = cl.ContainerSetEACL(ctx, table, cliPrm) - c.incRequests(time.Since(start), methodContainerSetEACL) c.updateErrorRate(err) if err != nil { return fmt.Errorf("set eacl on client: %w", err) @@ -553,9 +532,7 @@ func (c *clientWrapper) endpointInfo(ctx context.Context, _ prmEndpointInfo) (ne return netmap.NodeInfo{}, err } - start := time.Now() res, err := cl.EndpointInfo(ctx, sdkClient.PrmEndpointInfo{}) - c.incRequests(time.Since(start), methodEndpointInfo) c.updateErrorRate(err) if err != nil { return netmap.NodeInfo{}, fmt.Errorf("endpoint info on client: %w", err) @@ -571,9 +548,7 @@ func (c *clientWrapper) networkInfo(ctx context.Context, _ prmNetworkInfo) (netm return netmap.NetworkInfo{}, err } - start := time.Now() res, err := cl.NetworkInfo(ctx, sdkClient.PrmNetworkInfo{}) - c.incRequests(time.Since(start), methodNetworkInfo) c.updateErrorRate(err) if err != nil { return netmap.NetworkInfo{}, fmt.Errorf("network info on client: %w", err) @@ -601,9 +576,7 @@ func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID cliPrm.WithBearerToken(*prm.btoken) } - start := time.Now() wObj, err := cl.ObjectPutInit(ctx, prm.hdr, cliPrm) - c.incRequests(time.Since(start), methodObjectPut) c.updateErrorRate(err) if err != nil { return oid.ID{}, fmt.Errorf("init writing on API client: %w", err) @@ -634,9 +607,7 @@ func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID for { n, err = prm.payload.Read(buf) if n > 0 { - start = time.Now() successWrite := wObj.WritePayloadChunk(buf[:n]) - c.incRequests(time.Since(start), methodObjectPut) if !successWrite { break } @@ -682,9 +653,7 @@ func (c *clientWrapper) objectDelete(ctx context.Context, containerID cid.ID, ob cliPrm.UseSigner(prm.signer) } - start := time.Now() _, err = cl.ObjectDelete(ctx, containerID, objectID, cliPrm) - c.incRequests(time.Since(start), methodObjectDelete) c.updateErrorRate(err) if err != nil { return fmt.Errorf("delete object on client: %w", err) @@ -721,14 +690,8 @@ func (c *clientWrapper) objectGet(ctx context.Context, containerID cid.ID, objec } res.Header = hdr - start := time.Now() - c.incRequests(time.Since(start), methodObjectGet) - res.Payload = &objectReadCloser{ reader: rObj, - elapsedTimeCallback: func(elapsed time.Duration) { - c.incRequests(elapsed, methodObjectGet) - }, } return res, nil @@ -760,9 +723,7 @@ func (c *clientWrapper) objectHead(ctx context.Context, containerID cid.ID, obje var obj object.Object - start := time.Now() res, err := cl.ObjectHead(ctx, containerID, objectID, cliPrm) - c.incRequests(time.Since(start), methodObjectHead) c.updateErrorRate(err) if err != nil { return obj, fmt.Errorf("read object header via client: %w", err) @@ -795,9 +756,7 @@ func (c *clientWrapper) objectRange(ctx context.Context, containerID cid.ID, obj cliPrm.UseSigner(prm.signer) } - start := time.Now() res, err := cl.ObjectRangeInit(ctx, containerID, objectID, offset, length, cliPrm) - c.incRequests(time.Since(start), methodObjectRange) c.updateErrorRate(err) if err != nil { return ResObjectRange{}, fmt.Errorf("init payload range reading on client: %w", err) @@ -805,9 +764,6 @@ func (c *clientWrapper) objectRange(ctx context.Context, containerID cid.ID, obj return ResObjectRange{ payload: res, - elapsedTimeCallback: func(elapsed time.Duration) { - c.incRequests(elapsed, methodObjectRange) - }, }, nil } @@ -853,9 +809,7 @@ func (c *clientWrapper) sessionCreate(ctx context.Context, prm prmCreateSession) cliPrm.SetExp(prm.exp) cliPrm.UseSigner(prm.signer) - start := time.Now() res, err := cl.SessionCreate(ctx, cliPrm) - c.incRequests(time.Since(start), methodSessionCreate) c.updateErrorRate(err) if err != nil { return resCreateSession{}, fmt.Errorf("session creation on client: %w", err) @@ -906,16 +860,6 @@ func (c *clientStatusMonitor) overallErrorRate() uint64 { return c.overallErrorCount } -func (c *clientWrapper) incRequests(elapsed time.Duration, method MethodIndex) { - if c.prm.poolRequestInfoCallback != nil { - c.prm.poolRequestInfoCallback(RequestInfo{ - Address: c.prm.address, - Method: method, - Elapsed: elapsed, - }) - } -} - func (c *clientStatusMonitor) updateErrorRate(err error) { if err == nil { return @@ -947,13 +891,6 @@ func (c *clientStatusMonitor) updateErrorRate(err error) { // clientBuilder is a type alias of client constructors. type clientBuilder = func(endpoint string) (internalClient, error) -// RequestInfo groups info about pool request. -type RequestInfo struct { - Address string - Method MethodIndex - Elapsed time.Duration -} - // InitParameters contains values used to initialize connection Pool. type InitParameters struct { signer neofscrypto.Signer @@ -965,7 +902,6 @@ type InitParameters struct { sessionExpirationDuration uint64 errorThreshold uint32 nodeParams []NodeParam - requestCallback func(RequestInfo) clientBuilder clientBuilder @@ -1018,12 +954,6 @@ func (x *InitParameters) SetErrorThreshold(threshold uint32) { x.errorThreshold = threshold } -// SetRequestCallback makes the pool client to pass RequestInfo for each -// request to f. Nil (default) means ignore RequestInfo. -func (x *InitParameters) SetRequestCallback(f func(RequestInfo)) { - x.requestCallback = f -} - // AddNode append information about the node to which you want to connect. func (x *InitParameters) AddNode(nodeParam NodeParam) { x.nodeParams = append(x.nodeParams, nodeParam) @@ -1593,7 +1523,6 @@ func fillDefaultInitParams(params *InitParameters, cache *sessionCache, statisti prm.setDialTimeout(params.nodeDialTimeout) prm.setStreamTimeout(params.nodeStreamTimeout) prm.setErrorThreshold(params.errorThreshold) - prm.setPoolRequestCallback(params.requestCallback) prm.setResponseInfoCallback(func(info sdkClient.ResponseMetaInfo) error { cache.updateEpoch(info.Epoch()) return nil @@ -2036,7 +1965,10 @@ type objectReadCloser struct { func (x *objectReadCloser) Read(p []byte) (int, error) { start := time.Now() n, err := x.reader.Read(p) - x.elapsedTimeCallback(time.Since(start)) + if x.elapsedTimeCallback != nil { + x.elapsedTimeCallback(time.Since(start)) + } + return n, err } @@ -2115,7 +2047,10 @@ type ResObjectRange struct { func (x *ResObjectRange) Read(p []byte) (int, error) { start := time.Now() n, err := x.payload.Read(p) - x.elapsedTimeCallback(time.Since(start)) + if x.elapsedTimeCallback != nil { + x.elapsedTimeCallback(time.Since(start)) + } + return n, err } From d5782d65ad0971d73851ffbf8a04b31b72ee56cf Mon Sep 17 00:00:00 2001 From: Evgenii Baidakov Date: Fri, 30 Jun 2023 16:24:17 +0400 Subject: [PATCH 09/10] pool: Remove elapsedTimeCallback Signed-off-by: Evgenii Baidakov --- pool/pool.go | 22 ++++------------------ 1 file changed, 4 insertions(+), 18 deletions(-) diff --git a/pool/pool.go b/pool/pool.go index 46e6a1ce..bab43f83 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -1957,19 +1957,12 @@ func (p *Pool) RawClient() (*sdkClient.Client, error) { } type objectReadCloser struct { - reader *sdkClient.ObjectReader - elapsedTimeCallback func(time.Duration) + reader *sdkClient.ObjectReader } // Read implements io.Reader of the object payload. func (x *objectReadCloser) Read(p []byte) (int, error) { - start := time.Now() - n, err := x.reader.Read(p) - if x.elapsedTimeCallback != nil { - x.elapsedTimeCallback(time.Since(start)) - } - - return n, err + return x.reader.Read(p) } // Close implements io.Closer of the object payload. @@ -2039,19 +2032,12 @@ func (p *Pool) HeadObject(ctx context.Context, containerID cid.ID, objectID oid. // Must be initialized using Pool.ObjectRange, any other // usage is unsafe. type ResObjectRange struct { - payload *sdkClient.ObjectRangeReader - elapsedTimeCallback func(time.Duration) + payload *sdkClient.ObjectRangeReader } // Read implements io.Reader of the object payload. func (x *ResObjectRange) Read(p []byte) (int, error) { - start := time.Now() - n, err := x.payload.Read(p) - if x.elapsedTimeCallback != nil { - x.elapsedTimeCallback(time.Since(start)) - } - - return n, err + return x.payload.Read(p) } // Close ends reading the payload range and returns the result of the operation From 41bd8e5207e63682bb92eb0eb6626f76f8231160 Mon Sep 17 00:00:00 2001 From: Evgenii Baidakov Date: Fri, 30 Jun 2023 16:25:37 +0400 Subject: [PATCH 10/10] pool: Remove old statistic method enum Signed-off-by: Evgenii Baidakov --- pool/pool.go | 80 ----------------------------------------------- pool/pool_test.go | 7 ----- 2 files changed, 87 deletions(-) diff --git a/pool/pool.go b/pool/pool.go index bab43f83..5927beb4 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -115,86 +115,6 @@ type clientStatusMonitor struct { overallErrorCount uint64 } -// MethodIndex index of method in list of statuses in clientStatusMonitor. -type MethodIndex int - -const ( - methodBalanceGet MethodIndex = iota - methodContainerPut - methodContainerGet - methodContainerList - methodContainerDelete - methodContainerEACL - methodContainerSetEACL - methodEndpointInfo - methodNetworkInfo - methodObjectPut - methodObjectDelete - methodObjectGet - methodObjectHead - methodObjectRange - methodSessionCreate - methodNetMapSnapshot - methodObjectHash - methodObjectSearch - methodContainerAnnounceUsedSpace - methodAnnounceIntermediateTrust - methodAnnounceLocalTrust - methodLast -) - -// String implements fmt.Stringer. -func (m MethodIndex) String() string { - switch m { - case methodBalanceGet: - return "balanceGet" - case methodContainerPut: - return "containerPut" - case methodContainerGet: - return "containerGet" - case methodContainerList: - return "containerList" - case methodContainerDelete: - return "containerDelete" - case methodContainerEACL: - return "containerEACL" - case methodContainerSetEACL: - return "containerSetEACL" - case methodEndpointInfo: - return "endpointInfo" - case methodNetworkInfo: - return "networkInfo" - case methodObjectPut: - return "objectPut" - case methodObjectDelete: - return "objectDelete" - case methodObjectGet: - return "objectGet" - case methodObjectHead: - return "objectHead" - case methodObjectRange: - return "objectRange" - case methodSessionCreate: - return "sessionCreate" - case methodNetMapSnapshot: - return "netMapSnapshot" - case methodObjectHash: - return "objectHash" - case methodObjectSearch: - return "objectSearch" - case methodContainerAnnounceUsedSpace: - return "containerAnnounceUsedSpace" - case methodAnnounceIntermediateTrust: - return "announceIntermediateTrust" - case methodAnnounceLocalTrust: - return "announceLocalTrust" - case methodLast: - return "it's a system name rather than a method" - default: - return "unknown" - } -} - func newClientStatusMonitor(addr string, errorThreshold uint32) clientStatusMonitor { return clientStatusMonitor{ addr: addr, diff --git a/pool/pool_test.go b/pool/pool_test.go index d9ca4a73..09892b93 100644 --- a/pool/pool_test.go +++ b/pool/pool_test.go @@ -665,10 +665,3 @@ func TestSwitchAfterErrorThreshold(t *testing.T) { _, err = conn.objectGet(ctx, cid.ID{}, oid.ID{}, PrmObjectGet{}) require.NoError(t, err) } - -func TestStatisticMethodsNames(t *testing.T) { - for i := methodBalanceGet; i < methodLast; i++ { - require.NotEqual(t, "unknown", i.String()) - require.NotEqual(t, "it's a system name rather than a method", i.String()) - } -}