Skip to content

Commit

Permalink
pool: Derive statistic and status interfaces
Browse files Browse the repository at this point in the history
Signed-off-by: Evgenii Baidakov <[email protected]>
  • Loading branch information
smallhive committed Jun 20, 2023
1 parent 2ae77bb commit d8e66c9
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 56 deletions.
8 changes: 8 additions & 0 deletions pool/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,3 +203,11 @@ func (m *mockClient) getClient() (*sdkClient.Client, error) {

func (m *mockClient) incRequests(_ time.Duration, _ MethodIndex) {
}

func (m *mockClient) status() clientStatus {
return m
}

func (m *mockClient) statisticUpdater() statusUpdater {
return m
}
129 changes: 88 additions & 41 deletions pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,7 @@ import (
"go.uber.org/zap"
)

// client represents virtual connection to the single NeoFS network endpoint from which Pool is formed.
// This interface is expected to have exactly one production implementation - clientWrapper.
// Others are expected to be for test purposes only.
type internalClient interface {
type singleClient interface {
// see clientWrapper.balanceGet.
balanceGet(context.Context, PrmBalanceGet) (accounting.Decimal, error)
// see clientWrapper.containerPut.
Expand Down Expand Up @@ -68,9 +65,16 @@ type internalClient interface {
objectSearch(context.Context, cid.ID, PrmObjectSearch) (ResObjectSearch, error)
// see clientWrapper.sessionCreate.
sessionCreate(context.Context, prmCreateSession) (resCreateSession, error)
}

clientStatus
statisticUpdater
// client represents virtual connection to the single NeoFS network endpoint from which Pool is formed.
// This interface is expected to have exactly one production implementation - clientWrapper.
// Others are expected to be for test purposes only.
type internalClient interface {
singleClient

status() clientStatus
statisticUpdater() statusUpdater

// see clientWrapper.dial.
dial(ctx context.Context) error
Expand All @@ -80,7 +84,13 @@ type internalClient interface {
getClient() (*sdkClient.Client, error)
}

type statisticUpdater interface {
// ClientStatus describes pool statistic.
type ClientStatus interface {
statusUpdater
clientStatus
}

type statusUpdater interface {
updateErrorRate(err error)
incRequests(elapsed time.Duration, method MethodIndex)
}
Expand All @@ -91,6 +101,8 @@ type clientStatus interface {
isHealthy() bool
// setUnhealthy marks client as unhealthy.
setUnhealthy()
// setHealthy marks client as healthy.
setHealthy()
// address return address of endpoint.
address() string
// currentErrorRate returns current errors rate.
Expand Down Expand Up @@ -202,6 +214,12 @@ func (m MethodIndex) String() string {
}
}

// NewClientStatistic is a constructor for default statistic implementation.
func NewClientStatistic(addr string, errorThreshold uint32) ClientStatus {
statusMonitor := newClientStatusMonitor(addr, errorThreshold)
return &statusMonitor
}

func newClientStatusMonitor(addr string, errorThreshold uint32) clientStatusMonitor {
methods := make([]*methodStatus, methodLast)
for i := methodBalanceGet; i < methodLast; i++ {
Expand Down Expand Up @@ -235,7 +253,15 @@ type clientWrapper struct {
client *sdkClient.Client
prm wrapperPrm

clientStatusMonitor
clientStatus ClientStatus
}

func (c *clientWrapper) status() clientStatus {
return c.clientStatus
}

func (c *clientWrapper) statisticUpdater() statusUpdater {
return c.clientStatus
}

// wrapperPrm is params to create clientWrapper.
Expand Down Expand Up @@ -295,16 +321,16 @@ func (x *wrapperPrm) getNewClient() (*sdkClient.Client, error) {
}

// newWrapper creates a clientWrapper that implements the client interface.
func newWrapper(prm wrapperPrm) (*clientWrapper, error) {
func newWrapper(prm wrapperPrm, statistic ClientStatus) (*clientWrapper, error) {
cl, err := prm.getNewClient()
if err != nil {
return nil, err
}

res := &clientWrapper{
client: cl,
clientStatusMonitor: newClientStatusMonitor(prm.address, prm.errorThreshold),
prm: prm,
client: cl,
clientStatus: statistic,
prm: prm,
}

return res, nil
Expand All @@ -326,7 +352,7 @@ func (c *clientWrapper) dial(ctx context.Context) error {
prmDial.SetContext(ctx)

if err = cl.Dial(prmDial); err != nil {
c.setUnhealthy()
c.clientStatus.setUnhealthy()
return err
}

Expand All @@ -345,7 +371,7 @@ func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (healthy, change

cl, err := c.prm.getNewClient()
if err != nil {
c.setUnhealthy()
c.clientStatus.setUnhealthy()
return false, wasHealthy
}

Expand All @@ -356,7 +382,7 @@ func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (healthy, change
prmDial.SetContext(ctx)

if err := cl.Dial(prmDial); err != nil {
c.setUnhealthy()
c.clientStatus.setUnhealthy()
return false, wasHealthy
}

Expand All @@ -365,18 +391,18 @@ func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (healthy, change
c.clientMutex.Unlock()

if _, err := cl.EndpointInfo(ctx, sdkClient.PrmEndpointInfo{}); err != nil {
c.setUnhealthy()
c.clientStatus.setUnhealthy()
return false, wasHealthy
}

c.setHealthy()
c.clientStatus.setHealthy()
return true, !wasHealthy
}

func (c *clientWrapper) getClient() (*sdkClient.Client, error) {
c.clientMutex.RLock()
defer c.clientMutex.RUnlock()
if c.isHealthy() {
if c.clientStatus.isHealthy() {
return c.client, nil
}
return nil, errPoolClientUnhealthy
Expand Down Expand Up @@ -873,6 +899,22 @@ func (c *clientWrapper) sessionCreate(ctx context.Context, prm prmCreateSession)
}, nil
}

func (c *clientWrapper) incRequests(elapsed time.Duration, method MethodIndex) {
c.clientStatus.incRequests(elapsed, method)

if c.prm.poolRequestInfoCallback != nil {
c.prm.poolRequestInfoCallback(RequestInfo{
Address: c.prm.address,
Method: method,
Elapsed: elapsed,
})
}
}

func (c *clientWrapper) updateErrorRate(err error) {
c.clientStatus.updateErrorRate(err)
}

func (c *clientStatusMonitor) isHealthy() bool {
return c.healthy.Load()
}
Expand Down Expand Up @@ -920,17 +962,9 @@ func (c *clientStatusMonitor) methodsStatus() []statusSnapshot {

return result
}

func (c *clientWrapper) incRequests(elapsed time.Duration, method MethodIndex) {
func (c *clientStatusMonitor) incRequests(elapsed time.Duration, method MethodIndex) {
methodStat := c.methods[method]
methodStat.incRequests(elapsed)
if c.prm.poolRequestInfoCallback != nil {
c.prm.poolRequestInfoCallback(RequestInfo{
Address: c.prm.address,
Method: method,
Elapsed: elapsed,
})
}
}

func (c *clientStatusMonitor) updateErrorRate(err error) {
Expand Down Expand Up @@ -985,6 +1019,7 @@ type InitParameters struct {
requestCallback func(RequestInfo)

clientBuilder clientBuilder
statistic ClientStatus
}

// SetSigner specifies default signer to be used for the protocol communication by default.
Expand Down Expand Up @@ -1055,6 +1090,11 @@ func (x *InitParameters) isMissingClientBuilder() bool {
return x.clientBuilder == nil
}

// SetClientStatistic sets client statistic implementation for clients.
func (x *InitParameters) SetClientStatistic(statistic ClientStatus) {
x.statistic = statistic
}

type rebalanceParameters struct {
nodesParams []*nodesParam
nodeRequestTimeout time.Duration
Expand Down Expand Up @@ -1482,7 +1522,7 @@ func (p *Pool) Dial(ctx context.Context) error {
var st session.Object
err := initSessionForDuration(ctx, &st, clients[j], p.rebalanceParams.sessionExpirationDuration, p.signer)
if err != nil {
clients[j].setUnhealthy()
clients[j].status().setUnhealthy()
if p.logger != nil {
p.logger.Warn("failed to create neofs session token for client",
zap.String("address", addr), zap.Error(err))
Expand Down Expand Up @@ -1553,7 +1593,14 @@ func fillDefaultInitParams(params *InitParameters, cache *sessionCache) {
cache.updateEpoch(info.Epoch())
return nil
})
return newWrapper(prm)

var statistic ClientStatus
if params.statistic == nil {
// statistic is disabled by default.
statistic = newNoOpClientStatusMonitor(prm.address)
}

return newWrapper(prm, statistic)
})
}
}
Expand Down Expand Up @@ -1644,7 +1691,7 @@ func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights
bufferWeights[j] = options.nodesParams[i].weights[j]
} else {
bufferWeights[j] = 0
p.cache.DeleteByPrefix(cli.address())
p.cache.DeleteByPrefix(cli.status().address())
}

if changed {
Expand Down Expand Up @@ -1694,15 +1741,15 @@ func (p *innerPool) connection() (internalClient, error) {
defer p.lock.RUnlock()
if len(p.clients) == 1 {
cp := p.clients[0]
if cp.isHealthy() {
if cp.status().isHealthy() {
return cp, nil
}
return nil, errors.New("no healthy client")
}
attempts := 3 * len(p.clients)
for k := 0; k < attempts; k++ {
i := p.sampler.Next()
if cp := p.clients[i]; cp.isHealthy() {
if cp := p.clients[i]; cp.status().isHealthy() {
return cp, nil
}
}
Expand Down Expand Up @@ -1807,7 +1854,7 @@ func (p *Pool) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContex
ctx.signer = p.signer
}

ctx.endpoint = cp.address()
ctx.endpoint = cp.status().address()
ctx.client = cp

if ctx.sessionTarget != nil && cfg.stoken != nil {
Expand Down Expand Up @@ -2281,10 +2328,10 @@ func (p Pool) Statistic() Statistic {
inner.lock.RLock()
for _, cl := range inner.clients {
node := NodeStatistic{
address: cl.address(),
methods: cl.methodsStatus(),
overallErrors: cl.overallErrorRate(),
currentErrors: cl.currentErrorRate(),
address: cl.status().address(),
methods: cl.status().methodsStatus(),
overallErrors: cl.status().overallErrorRate(),
currentErrors: cl.status().currentErrorRate(),
}
stat.nodes = append(stat.nodes, node)
stat.overallErrors += node.overallErrors
Expand All @@ -2296,7 +2343,7 @@ func (p Pool) Statistic() Statistic {
}

// waitForContainerPresence waits until the container is found on the NeoFS network.
func waitForContainerPresence(ctx context.Context, cli internalClient, cnrID cid.ID, waitParams *WaitParams) error {
func waitForContainerPresence(ctx context.Context, cli singleClient, cnrID cid.ID, waitParams *WaitParams) error {
return waitFor(ctx, waitParams, func(ctx context.Context) bool {
_, err := cli.containerGet(ctx, cnrID)
return err == nil
Expand All @@ -2315,7 +2362,7 @@ func waitForEACLPresence(ctx context.Context, cli internalClient, cnrID cid.ID,
}

// waitForContainerRemoved waits until the container is removed from the NeoFS network.
func waitForContainerRemoved(ctx context.Context, cli internalClient, cnrID cid.ID, waitParams *WaitParams) error {
func waitForContainerRemoved(ctx context.Context, cli singleClient, cnrID cid.ID, waitParams *WaitParams) error {
return waitFor(ctx, waitParams, func(ctx context.Context) bool {
_, err := cli.containerGet(ctx, cnrID)
return errors.Is(err, apistatus.ErrContainerNotFound)
Expand Down Expand Up @@ -2480,7 +2527,7 @@ func (p *Pool) FindSiblingByParentID(ctx context.Context, cnrID cid.ID, objID oi
return res, nil
}

func (p *Pool) sdkClient() (*sdkClient.Client, statisticUpdater, error) {
func (p *Pool) sdkClient() (*sdkClient.Client, statusUpdater, error) {
conn, err := p.connection()
if err != nil {
return nil, nil, fmt.Errorf("connection: %w", err)
Expand All @@ -2491,5 +2538,5 @@ func (p *Pool) sdkClient() (*sdkClient.Client, statisticUpdater, error) {
return nil, nil, fmt.Errorf("get client: %w", err)
}

return cl, conn, nil
return cl, conn.statisticUpdater(), nil
}
Loading

0 comments on commit d8e66c9

Please sign in to comment.