Skip to content

Commit

Permalink
stat: External statistic definition
Browse files Browse the repository at this point in the history
Signed-off-by: Evgenii Baidakov <[email protected]>
  • Loading branch information
smallhive committed Jun 22, 2023
1 parent a52a8e0 commit d2dc7fb
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 28 deletions.
30 changes: 25 additions & 5 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ import (
"fmt"
"time"

v2accounting "github.com/nspcc-dev/neofs-api-go/v2/accounting"
"github.com/nspcc-dev/neofs-api-go/v2/rpc"
"github.com/nspcc-dev/neofs-api-go/v2/rpc/client"
neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto"
"github.com/nspcc-dev/neofs-sdk-go/stat"
)

// Client represents virtual connection to the NeoFS network to communicate
Expand Down Expand Up @@ -48,6 +47,9 @@ type Client struct {
c client.Client

server neoFSAPIServer

endpoint string
nodeKey []byte
}

var errNonNeoSigner = fmt.Errorf("%w: expected ECDSA_DETERMINISTIC_SHA256 scheme", neofscrypto.ErrIncorrectSigner)
Expand Down Expand Up @@ -90,6 +92,7 @@ func (c *Client) Dial(prm PrmDial) error {
if prm.endpoint == "" {
return ErrMissingServer
}
c.endpoint = prm.endpoint

if prm.timeoutDialSet {
if prm.timeoutDial <= 0 {
Expand Down Expand Up @@ -120,14 +123,18 @@ func (c *Client) Dial(prm PrmDial) error {
}

// TODO: (neofs-api-go#382) perform generic dial stage of the client.Client
_, err := rpc.Balance(&c.c, new(v2accounting.BalanceRequest),
client.WithContext(prm.parentCtx),
)
endpointInfo, err := c.EndpointInfo(prm.parentCtx, PrmEndpointInfo{})
// return context errors since they signal about dial problem
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return err
}

if endpointInfo == nil && err != nil {
return err
}

c.nodeKey = endpointInfo.NodeInfo().PublicKey()

return nil
}

Expand Down Expand Up @@ -168,6 +175,12 @@ func (c *Client) Close() error {
return c.c.Conn().Close()
}

func (c *Client) sendStatistic(m stat.Method, elapsed time.Duration, err error) {
if c.prm.statisticCallback != nil {
c.prm.statisticCallback(c.nodeKey, c.endpoint, m, elapsed, err)
}
}

// PrmInit groups initialization parameters of Client instances.
//
// See also [New].
Expand All @@ -177,6 +190,8 @@ type PrmInit struct {
cbRespInfo func(ResponseMetaInfo) error

netMagic uint64

statisticCallback stat.OperationCallback
}

// SetDefaultSigner sets Client private signer to be used for the protocol
Expand All @@ -196,6 +211,11 @@ func (x *PrmInit) SetResponseInfoCallback(f func(ResponseMetaInfo) error) {
x.cbRespInfo = f
}

// SetStatisticCallback makes the Client to pass [stat.OperationCallback] for the external statistic.
func (x *PrmInit) SetStatisticCallback(statisticCallback stat.OperationCallback) {
x.statisticCallback = statisticCallback
}

// PrmDial groups connection parameters for the Client.
//
// See also Dial.
Expand Down
7 changes: 6 additions & 1 deletion client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto"
"github.com/nspcc-dev/neofs-sdk-go/crypto/test"
"github.com/stretchr/testify/require"
)

Expand All @@ -31,7 +32,11 @@ func newClient(t *testing.T, signer neofscrypto.Signer, server neoFSAPIServer) *
}

func TestClient_DialContext(t *testing.T) {
var c Client
var prmInit PrmInit
prmInit.SetDefaultSigner(test.RandomSignerRFC6979(t))

c, err := New(prmInit)
require.NoError(t, err)

// try to connect to any host
var prm PrmDial
Expand Down
82 changes: 60 additions & 22 deletions pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/nspcc-dev/neofs-sdk-go/object/relations"
"github.com/nspcc-dev/neofs-sdk-go/session"
"github.com/nspcc-dev/neofs-sdk-go/stat"
"github.com/nspcc-dev/neofs-sdk-go/user"
"go.uber.org/atomic"
"go.uber.org/zap"
Expand Down Expand Up @@ -236,6 +237,7 @@ type clientWrapper struct {
prm wrapperPrm

clientStatusMonitor
statisticCallback stat.OperationCallback
}

// wrapperPrm is params to create clientWrapper.
Expand All @@ -247,6 +249,7 @@ type wrapperPrm struct {
errorThreshold uint32
responseInfoCallback func(sdkClient.ResponseMetaInfo) error
poolRequestInfoCallback func(RequestInfo)
statisticCallback stat.OperationCallback
}

// setAddress sets endpoint to connect in NeoFS network.
Expand Down Expand Up @@ -285,31 +288,49 @@ func (x *wrapperPrm) setResponseInfoCallback(f func(sdkClient.ResponseMetaInfo)
x.responseInfoCallback = f
}

// setStatisticCallback set callback for external statistic.
func (x *wrapperPrm) setStatisticCallback(statisticCallback stat.OperationCallback) {
x.statisticCallback = statisticCallback
}

// getNewClient returns a new [sdkClient.Client] instance using internal parameters.
func (x *wrapperPrm) getNewClient() (*sdkClient.Client, error) {
func (x *wrapperPrm) getNewClient(statisticCallback stat.OperationCallback) (*sdkClient.Client, error) {
var prmInit sdkClient.PrmInit
prmInit.SetDefaultSigner(x.signer)
prmInit.SetResponseInfoCallback(x.responseInfoCallback)
prmInit.SetStatisticCallback(statisticCallback)

return sdkClient.New(prmInit)
}

// newWrapper creates a clientWrapper that implements the client interface.
func newWrapper(prm wrapperPrm) (*clientWrapper, error) {
cl, err := prm.getNewClient()
if err != nil {
return nil, err
}

res := &clientWrapper{
client: cl,
clientStatusMonitor: newClientStatusMonitor(prm.address, prm.errorThreshold),
prm: prm,
statisticCallback: prm.statisticCallback,
}

// integrate clientWrapper middleware to handle errors and wrapped client health.
cl, err := prm.getNewClient(res.statisticMiddleware)
if err != nil {
return nil, err
}

res.client = cl

return res, nil
}

func (c *clientWrapper) statisticMiddleware(nodeKey []byte, endpoint string, method stat.Method, duration time.Duration, err error) {
if c.statisticCallback != nil {
c.incRequests(duration, MethodIndex(method))
c.updateErrorRate(err)

c.statisticCallback(nodeKey, endpoint, method, duration, err)
}
}

// dial establishes a connection to the server from the NeoFS network.
// Returns an error describing failure reason. If failed, the client
// SHOULD NOT be used.
Expand Down Expand Up @@ -343,7 +364,7 @@ func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (healthy, change
wasHealthy = true
}

cl, err := c.prm.getNewClient()
cl, err := c.prm.getNewClient(c.statisticMiddleware)
if err != nil {
c.setUnhealthy()
return false, wasHealthy
Expand Down Expand Up @@ -985,6 +1006,8 @@ type InitParameters struct {
requestCallback func(RequestInfo)

clientBuilder clientBuilder

statisticCallback stat.OperationCallback
}

// SetSigner specifies default signer to be used for the protocol communication by default.
Expand Down Expand Up @@ -1055,6 +1078,11 @@ func (x *InitParameters) isMissingClientBuilder() bool {
return x.clientBuilder == nil
}

// SetStatisticCallback makes the Pool to pass [stat.OperationCallback] for external statistic.
func (x *InitParameters) SetStatisticCallback(statisticCallback stat.OperationCallback) {
x.statisticCallback = statisticCallback
}

type rebalanceParameters struct {
nodesParams []*nodesParam
nodeRequestTimeout time.Duration
Expand Down Expand Up @@ -1388,6 +1416,8 @@ type Pool struct {
rebalanceParams rebalanceParameters
clientBuilder clientBuilder
logger *zap.Logger

statisticCallback stat.OperationCallback
}

type innerPool struct {
Expand Down Expand Up @@ -1428,21 +1458,22 @@ func NewPool(options InitParameters) (*Pool, error) {
return nil, fmt.Errorf("couldn't create cache: %w", err)
}

fillDefaultInitParams(&options, cache)
pool := &Pool{cache: cache}

pool := &Pool{
signer: options.signer,
cache: cache,
logger: options.logger,
stokenDuration: options.sessionExpirationDuration,
rebalanceParams: rebalanceParameters{
nodesParams: nodesParams,
nodeRequestTimeout: options.healthcheckTimeout,
clientRebalanceInterval: options.clientRebalanceInterval,
sessionExpirationDuration: options.sessionExpirationDuration,
},
clientBuilder: options.clientBuilder,
// we need our middleware integration in clientBuilder
fillDefaultInitParams(&options, cache, pool.statisticMiddleware)

pool.signer = options.signer
pool.logger = options.logger
pool.stokenDuration = options.sessionExpirationDuration
pool.rebalanceParams = rebalanceParameters{
nodesParams: nodesParams,
nodeRequestTimeout: options.healthcheckTimeout,
clientRebalanceInterval: options.clientRebalanceInterval,
sessionExpirationDuration: options.sessionExpirationDuration,
}
pool.clientBuilder = options.clientBuilder
pool.statisticCallback = options.statisticCallback

return pool, nil
}
Expand Down Expand Up @@ -1515,7 +1546,7 @@ func (p *Pool) Dial(ctx context.Context) error {
return nil
}

func fillDefaultInitParams(params *InitParameters, cache *sessionCache) {
func fillDefaultInitParams(params *InitParameters, cache *sessionCache, statisticCallback stat.OperationCallback) {
if params.sessionExpirationDuration == 0 {
params.sessionExpirationDuration = defaultSessionTokenExpirationDuration
}
Expand Down Expand Up @@ -1553,6 +1584,7 @@ func fillDefaultInitParams(params *InitParameters, cache *sessionCache) {
cache.updateEpoch(info.Epoch())
return nil
})
prm.setStatisticCallback(statisticCallback)
return newWrapper(prm)
})
}
Expand Down Expand Up @@ -2493,3 +2525,9 @@ func (p *Pool) sdkClient() (*sdkClient.Client, statisticUpdater, error) {

return cl, conn, nil
}

func (p *Pool) statisticMiddleware(nodeKey []byte, endpoint string, method stat.Method, duration time.Duration, err error) {
if p.statisticCallback != nil {
p.statisticCallback(nodeKey, endpoint, method, duration, err)
}
}
93 changes: 93 additions & 0 deletions stat/stat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package stat

import (
"time"
)

// Method is an enumerator to describe [client.Client] methods.
type Method int

const (
MethodBalanceGet Method = iota
MethodContainerPut
MethodContainerGet
MethodContainerList
MethodContainerDelete
MethodContainerEACL
MethodContainerSetEACL
MethodEndpointInfo
MethodNetworkInfo
MethodObjectPut
MethodObjectDelete
MethodObjectGet
MethodObjectHead
MethodObjectRange
MethodSessionCreate
MethodNetMapSnapshot
MethodObjectHash
MethodObjectSearch
methodLast
)

// String implements fmt.Stringer.
func (m Method) String() string {
switch m {
case MethodBalanceGet:
return "balanceGet"
case MethodContainerPut:
return "containerPut"
case MethodContainerGet:
return "containerGet"
case MethodContainerList:
return "containerList"
case MethodContainerDelete:
return "containerDelete"
case MethodContainerEACL:
return "containerEACL"
case MethodContainerSetEACL:
return "containerSetEACL"
case MethodEndpointInfo:
return "endpointInfo"
case MethodNetworkInfo:
return "networkInfo"
case MethodObjectPut:
return "objectPut"
case MethodObjectDelete:
return "objectDelete"
case MethodObjectGet:
return "objectGet"
case MethodObjectHead:
return "objectHead"
case MethodObjectRange:
return "objectRange"
case MethodSessionCreate:
return "sessionCreate"
case MethodNetMapSnapshot:
return "netMapSnapshot"
case MethodObjectHash:
return "objectHash"
case MethodObjectSearch:
return "objectSearch"
case methodLast:
return "it's a system name rather than a method"
default:
return "unknown"
}
}

type (
// OperationCallback describes common interface to external statistic collection.
OperationCallback = func(nodeKey []byte, endpoint string, method Method, duration time.Duration, err error)

// Aggregator is a default statistic collector.
Aggregator struct{}
)

// NewAggregator constructs a new Aggregator.
func NewAggregator() Aggregator {
return Aggregator{}
}

// Collect stores statistic from client.
func (a *Aggregator) Collect(_ []byte, _ string, _ Method, _ time.Duration, _ error) {
}

0 comments on commit d2dc7fb

Please sign in to comment.