diff --git a/client/client.go b/client/client.go index 79f4c2fb..141f8e71 100644 --- a/client/client.go +++ b/client/client.go @@ -7,10 +7,9 @@ import ( "fmt" "time" - v2accounting "github.com/nspcc-dev/neofs-api-go/v2/accounting" - "github.com/nspcc-dev/neofs-api-go/v2/rpc" "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" + "github.com/nspcc-dev/neofs-sdk-go/stat" ) // Client represents virtual connection to the NeoFS network to communicate @@ -48,6 +47,9 @@ type Client struct { c client.Client server neoFSAPIServer + + endpoint string + nodeKey []byte } var errNonNeoSigner = fmt.Errorf("%w: expected ECDSA_DETERMINISTIC_SHA256 scheme", neofscrypto.ErrIncorrectSigner) @@ -90,6 +92,7 @@ func (c *Client) Dial(prm PrmDial) error { if prm.endpoint == "" { return ErrMissingServer } + c.endpoint = prm.endpoint if prm.timeoutDialSet { if prm.timeoutDial <= 0 { @@ -120,14 +123,14 @@ func (c *Client) Dial(prm PrmDial) error { } // TODO: (neofs-api-go#382) perform generic dial stage of the client.Client - _, err := rpc.Balance(&c.c, new(v2accounting.BalanceRequest), - client.WithContext(prm.parentCtx), - ) + endpointInfo, err := c.EndpointInfo(prm.parentCtx, PrmEndpointInfo{}) // return context errors since they signal about dial problem if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { return err } + c.nodeKey = endpointInfo.NodeInfo().PublicKey() + return nil } @@ -168,6 +171,12 @@ func (c *Client) Close() error { return c.c.Conn().Close() } +func (c *Client) sendStatistic(m stat.Method, elapsed time.Duration, err error) { + if c.prm.statisticCallback != nil { + c.prm.statisticCallback(c.nodeKey, c.endpoint, m, elapsed, err) + } +} + // PrmInit groups initialization parameters of Client instances. // // See also [New]. @@ -177,6 +186,8 @@ type PrmInit struct { cbRespInfo func(ResponseMetaInfo) error netMagic uint64 + + statisticCallback stat.OperationCallback } // SetDefaultSigner sets Client private signer to be used for the protocol @@ -196,6 +207,11 @@ func (x *PrmInit) SetResponseInfoCallback(f func(ResponseMetaInfo) error) { x.cbRespInfo = f } +// SetStatisticCallback makes the Client to pass [stat.OperationCallback] for the external statistic. +func (x *PrmInit) SetStatisticCallback(statisticCallback stat.OperationCallback) { + x.statisticCallback = statisticCallback +} + // PrmDial groups connection parameters for the Client. // // See also Dial. diff --git a/client/client_test.go b/client/client_test.go index e8696ecf..bc376514 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -6,6 +6,7 @@ import ( apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" + "github.com/nspcc-dev/neofs-sdk-go/crypto/test" "github.com/stretchr/testify/require" ) @@ -31,7 +32,11 @@ func newClient(t *testing.T, signer neofscrypto.Signer, server neoFSAPIServer) * } func TestClient_DialContext(t *testing.T) { - var c Client + var prmInit PrmInit + prmInit.SetDefaultSigner(test.RandomSignerRFC6979(t)) + + c, err := New(prmInit) + require.NoError(t, err) // try to connect to any host var prm PrmDial diff --git a/pool/pool.go b/pool/pool.go index 25e0e266..6a814457 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -27,6 +27,7 @@ import ( oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/object/relations" "github.com/nspcc-dev/neofs-sdk-go/session" + "github.com/nspcc-dev/neofs-sdk-go/stat" "github.com/nspcc-dev/neofs-sdk-go/user" "go.uber.org/atomic" "go.uber.org/zap" @@ -236,6 +237,7 @@ type clientWrapper struct { prm wrapperPrm clientStatusMonitor + statisticCallback stat.OperationCallback } // wrapperPrm is params to create clientWrapper. @@ -247,6 +249,7 @@ type wrapperPrm struct { errorThreshold uint32 responseInfoCallback func(sdkClient.ResponseMetaInfo) error poolRequestInfoCallback func(RequestInfo) + statisticCallback stat.OperationCallback } // setAddress sets endpoint to connect in NeoFS network. @@ -285,31 +288,49 @@ func (x *wrapperPrm) setResponseInfoCallback(f func(sdkClient.ResponseMetaInfo) x.responseInfoCallback = f } +// setStatisticCallback set callback for external statistic. +func (x *wrapperPrm) setStatisticCallback(statisticCallback stat.OperationCallback) { + x.statisticCallback = statisticCallback +} + // getNewClient returns a new [sdkClient.Client] instance using internal parameters. -func (x *wrapperPrm) getNewClient() (*sdkClient.Client, error) { +func (x *wrapperPrm) getNewClient(statisticCallback stat.OperationCallback) (*sdkClient.Client, error) { var prmInit sdkClient.PrmInit prmInit.SetDefaultSigner(x.signer) prmInit.SetResponseInfoCallback(x.responseInfoCallback) + prmInit.SetStatisticCallback(statisticCallback) return sdkClient.New(prmInit) } // newWrapper creates a clientWrapper that implements the client interface. func newWrapper(prm wrapperPrm) (*clientWrapper, error) { - cl, err := prm.getNewClient() - if err != nil { - return nil, err - } - res := &clientWrapper{ - client: cl, clientStatusMonitor: newClientStatusMonitor(prm.address, prm.errorThreshold), prm: prm, + statisticCallback: prm.statisticCallback, } + // integrate clientWrapper middleware to handle errors and wrapped client health. + cl, err := prm.getNewClient(res.statisticMiddleware) + if err != nil { + return nil, err + } + + res.client = cl + return res, nil } +func (c *clientWrapper) statisticMiddleware(nodeKey []byte, endpoint string, method stat.Method, duration time.Duration, err error) { + if c.statisticCallback != nil { + c.incRequests(duration, MethodIndex(method)) + c.updateErrorRate(err) + + c.statisticCallback(nodeKey, endpoint, method, duration, err) + } +} + // dial establishes a connection to the server from the NeoFS network. // Returns an error describing failure reason. If failed, the client // SHOULD NOT be used. @@ -343,7 +364,7 @@ func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (healthy, change wasHealthy = true } - cl, err := c.prm.getNewClient() + cl, err := c.prm.getNewClient(c.statisticMiddleware) if err != nil { c.setUnhealthy() return false, wasHealthy @@ -985,6 +1006,8 @@ type InitParameters struct { requestCallback func(RequestInfo) clientBuilder clientBuilder + + statisticCallback stat.OperationCallback } // SetSigner specifies default signer to be used for the protocol communication by default. @@ -1055,6 +1078,11 @@ func (x *InitParameters) isMissingClientBuilder() bool { return x.clientBuilder == nil } +// SetStatisticCallback makes the Pool to pass [stat.OperationCallback] for external statistic. +func (x *InitParameters) SetStatisticCallback(statisticCallback stat.OperationCallback) { + x.statisticCallback = statisticCallback +} + type rebalanceParameters struct { nodesParams []*nodesParam nodeRequestTimeout time.Duration @@ -1388,6 +1416,8 @@ type Pool struct { rebalanceParams rebalanceParameters clientBuilder clientBuilder logger *zap.Logger + + statisticCallback stat.OperationCallback } type innerPool struct { @@ -1428,21 +1458,22 @@ func NewPool(options InitParameters) (*Pool, error) { return nil, fmt.Errorf("couldn't create cache: %w", err) } - fillDefaultInitParams(&options, cache) + pool := &Pool{cache: cache} - pool := &Pool{ - signer: options.signer, - cache: cache, - logger: options.logger, - stokenDuration: options.sessionExpirationDuration, - rebalanceParams: rebalanceParameters{ - nodesParams: nodesParams, - nodeRequestTimeout: options.healthcheckTimeout, - clientRebalanceInterval: options.clientRebalanceInterval, - sessionExpirationDuration: options.sessionExpirationDuration, - }, - clientBuilder: options.clientBuilder, + // we need our middleware integration in clientBuilder + fillDefaultInitParams(&options, cache, pool.statisticMiddleware) + + pool.signer = options.signer + pool.logger = options.logger + pool.stokenDuration = options.sessionExpirationDuration + pool.rebalanceParams = rebalanceParameters{ + nodesParams: nodesParams, + nodeRequestTimeout: options.healthcheckTimeout, + clientRebalanceInterval: options.clientRebalanceInterval, + sessionExpirationDuration: options.sessionExpirationDuration, } + pool.clientBuilder = options.clientBuilder + pool.statisticCallback = options.statisticCallback return pool, nil } @@ -1515,7 +1546,7 @@ func (p *Pool) Dial(ctx context.Context) error { return nil } -func fillDefaultInitParams(params *InitParameters, cache *sessionCache) { +func fillDefaultInitParams(params *InitParameters, cache *sessionCache, statisticCallback stat.OperationCallback) { if params.sessionExpirationDuration == 0 { params.sessionExpirationDuration = defaultSessionTokenExpirationDuration } @@ -1553,6 +1584,7 @@ func fillDefaultInitParams(params *InitParameters, cache *sessionCache) { cache.updateEpoch(info.Epoch()) return nil }) + prm.setStatisticCallback(statisticCallback) return newWrapper(prm) }) } @@ -2493,3 +2525,9 @@ func (p *Pool) sdkClient() (*sdkClient.Client, statisticUpdater, error) { return cl, conn, nil } + +func (p *Pool) statisticMiddleware(nodeKey []byte, endpoint string, method stat.Method, duration time.Duration, err error) { + if p.statisticCallback != nil { + p.statisticCallback(nodeKey, endpoint, method, duration, err) + } +} diff --git a/stat/stat.go b/stat/stat.go new file mode 100644 index 00000000..d5453d5a --- /dev/null +++ b/stat/stat.go @@ -0,0 +1,93 @@ +package stat + +import ( + "time" +) + +// Method is an enumerator to describe [client.Client] methods. +type Method int + +const ( + MethodBalanceGet Method = iota + MethodContainerPut + MethodContainerGet + MethodContainerList + MethodContainerDelete + MethodContainerEACL + MethodContainerSetEACL + MethodEndpointInfo + MethodNetworkInfo + MethodObjectPut + MethodObjectDelete + MethodObjectGet + MethodObjectHead + MethodObjectRange + MethodSessionCreate + MethodNetMapSnapshot + MethodObjectHash + MethodObjectSearch + methodLast +) + +// String implements fmt.Stringer. +func (m Method) String() string { + switch m { + case MethodBalanceGet: + return "balanceGet" + case MethodContainerPut: + return "containerPut" + case MethodContainerGet: + return "containerGet" + case MethodContainerList: + return "containerList" + case MethodContainerDelete: + return "containerDelete" + case MethodContainerEACL: + return "containerEACL" + case MethodContainerSetEACL: + return "containerSetEACL" + case MethodEndpointInfo: + return "endpointInfo" + case MethodNetworkInfo: + return "networkInfo" + case MethodObjectPut: + return "objectPut" + case MethodObjectDelete: + return "objectDelete" + case MethodObjectGet: + return "objectGet" + case MethodObjectHead: + return "objectHead" + case MethodObjectRange: + return "objectRange" + case MethodSessionCreate: + return "sessionCreate" + case MethodNetMapSnapshot: + return "netMapSnapshot" + case MethodObjectHash: + return "objectHash" + case MethodObjectSearch: + return "objectSearch" + case methodLast: + return "it's a system name rather than a method" + default: + return "unknown" + } +} + +type ( + // OperationCallback describes common interface to external statistic collection. + OperationCallback = func(nodeKey []byte, endpoint string, method Method, duration time.Duration, err error) + + // Aggregator is a default statistic collector. + Aggregator struct{} +) + +// NewAggregator constructs a new Aggregator. +func NewAggregator() Aggregator { + return Aggregator{} +} + +// Collect stores statistic from client. +func (a *Aggregator) Collect(_ []byte, _ string, _ Method, _ time.Duration, _ error) { +}