From 82f75c0819c43158fe271314caf00fa9e974e6e3 Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Mon, 18 Mar 2024 16:13:04 +0300 Subject: [PATCH] * Added `trace.Query.OnTransactionExecute` event * Added query pool metrics * Refactored query session pool * Changed initialization of internal driver clients to lazy * Disabled the logic of background grpc-connection parking --- CHANGELOG.md | 5 + config/config.go | 14 - driver.go | 265 +++++++------- internal/balancer/balancer.go | 5 +- internal/conn/config.go | 1 - internal/conn/conn.go | 30 -- internal/conn/grpc_client_stream.go | 29 -- internal/conn/pool.go | 38 --- internal/coordination/client.go | 4 +- internal/discovery/discovery.go | 4 +- internal/pool/defaults.go | 19 +- internal/pool/errors.go | 8 +- internal/pool/pool.go | 322 ++++++++---------- internal/pool/pool_test.go | 78 ++--- internal/pool/stats/stats.go | 8 + internal/pool/trace.go | 65 +--- internal/query/client.go | 84 +++-- internal/query/client_test.go | 19 +- internal/query/config/config.go | 40 +-- internal/query/config/options.go | 26 +- internal/query/errors.go | 1 + internal/query/execute_query.go | 13 +- internal/query/options/execute.go | 40 ++- internal/query/result.go | 6 + internal/query/result_test.go | 6 +- internal/query/session.go | 46 +-- internal/query/session_test.go | 6 +- internal/query/transaction.go | 26 +- internal/ratelimiter/client.go | 4 +- internal/scheme/client.go | 4 +- internal/scripting/client.go | 4 +- internal/table/client.go | 17 +- internal/table/client_test.go | 7 +- internal/table/session_test.go | 15 +- internal/topic/topicclientinternal/client.go | 4 +- internal/xcontext/context_with_done.go | 48 --- internal/xcontext/context_with_done_test.go | 48 --- internal/xsync/locked.go | 30 ++ internal/xsync/locked_test.go | 16 + internal/xsync/once.go | 52 ++- internal/xsync/once_test.go | 79 +++-- internal/xtest/grpclogger.go | 2 +- log/driver.go | 27 -- log/query.go | 147 +++----- metrics/query.go | 70 +++- metrics/traces.go | 2 +- options.go | 42 +-- query/session.go | 12 +- query/stats.go | 18 + tests/integration/discovery_test.go | 23 +- tests/integration/helpers_test.go | 4 +- tests/integration/query_execute_test.go | 2 +- .../topic_grpc_stopper_helper_test.go | 4 +- tests/slo/native/query/storage.go | 4 +- trace/driver.go | 13 - trace/driver_gtrace.go | 62 ---- trace/query.go | 87 ++--- trace/query_gtrace.go | 262 +++++--------- trace/table.go | 1 - trace/table_gtrace.go | 5 +- 60 files changed, 919 insertions(+), 1404 deletions(-) create mode 100644 internal/pool/stats/stats.go delete mode 100644 internal/xcontext/context_with_done.go delete mode 100644 internal/xcontext/context_with_done_test.go create mode 100644 internal/xsync/locked.go create mode 100644 internal/xsync/locked_test.go create mode 100644 query/stats.go diff --git a/CHANGELOG.md b/CHANGELOG.md index d9869bc91..d36d2f2dd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,9 @@ * Added support of `TzDate`,`TzDateTime`,`TzTimestamp` types in `ydb.ParamsBuilder()` +* Added `trace.Query.OnTransactionExecute` event +* Added query pool metrics +* Fixed logic of query session pool +* Changed initialization of internal driver clients to lazy +* Disabled the logic of background grpc-connection parking ## v3.58.2 * Added `trace.Query.OnSessionBegin` event diff --git a/config/config.go b/config/config.go index 96756e9ec..47d3b8cc1 100644 --- a/config/config.go +++ b/config/config.go @@ -21,7 +21,6 @@ type Config struct { trace *trace.Driver dialTimeout time.Duration - connectionTTL time.Duration balancerConfig *balancerConfig.Config secure bool endpoint string @@ -57,13 +56,6 @@ func (c *Config) Meta() *meta.Meta { return c.meta } -// ConnectionTTL defines interval for parking grpc connections. -// -// If ConnectionTTL is zero - connections are not park. -func (c *Config) ConnectionTTL() time.Duration { - return c.connectionTTL -} - // Secure is a flag for secure connection func (c *Config) Secure() bool { return c.secure @@ -177,12 +169,6 @@ func WithUserAgent(userAgent string) Option { } } -func WithConnectionTTL(ttl time.Duration) Option { - return func(c *Config) { - c.connectionTTL = ttl - } -} - func WithCredentials(credentials credentials.Credentials) Option { return func(c *Config) { c.credentials = credentials diff --git a/driver.go b/driver.go index 9e48b4894..190254dce 100644 --- a/driver.go +++ b/driver.go @@ -65,28 +65,28 @@ type Driver struct { config *config.Config options []config.Option - discovery *internalDiscovery.Client + discovery *xsync.Once[*internalDiscovery.Client] discoveryOptions []discoveryConfig.Option - table *internalTable.Client + table *xsync.Once[*internalTable.Client] tableOptions []tableConfig.Option - query *internalQuery.Client + query *xsync.Once[*internalQuery.Client] queryOptions []queryConfig.Option - scripting *internalScripting.Client + scripting *xsync.Once[*internalScripting.Client] scriptingOptions []scriptingConfig.Option - scheme *internalScheme.Client + scheme *xsync.Once[*internalScheme.Client] schemeOptions []schemeConfig.Option - coordination *internalCoordination.Client + coordination *xsync.Once[*internalCoordination.Client] coordinationOptions []coordinationConfig.Option - ratelimiter *internalRatelimiter.Client + ratelimiter *xsync.Once[*internalRatelimiter.Client] ratelimiterOptions []ratelimiterConfig.Option - topic *topicclientinternal.Client + topic *xsync.Once[*topicclientinternal.Client] topicOptions []topicoptions.TopicOption databaseSQLOptions []xsql.ConnectorOption @@ -184,7 +184,7 @@ func (d *Driver) Secure() bool { // Table returns table client func (d *Driver) Table() table.Client { - return d.table + return d.table.Get() } // Query returns query client @@ -193,37 +193,37 @@ func (d *Driver) Table() table.Client { // // Notice: This API is EXPERIMENTAL and may be changed or removed in a later release. func (d *Driver) Query() query.Client { - return d.query + return d.query.Get() } // Scheme returns scheme client func (d *Driver) Scheme() scheme.Client { - return d.scheme + return d.scheme.Get() } // Coordination returns coordination client func (d *Driver) Coordination() coordination.Client { - return d.coordination + return d.coordination.Get() } // Ratelimiter returns ratelimiter client func (d *Driver) Ratelimiter() ratelimiter.Client { - return d.ratelimiter + return d.ratelimiter.Get() } // Discovery returns discovery client func (d *Driver) Discovery() discovery.Client { - return d.discovery + return d.discovery.Get() } // Scripting returns scripting client func (d *Driver) Scripting() scripting.Client { - return d.scripting + return d.scripting.Get() } // Topic returns topic client func (d *Driver) Topic() topic.Client { - return d.topic + return d.topic.Get() } // Open connects to database by DSN and return driver runtime holder @@ -400,138 +400,133 @@ func (d *Driver) connect(ctx context.Context) (err error) { return xerrors.WithStackTrace(err) } - d.table, err = internalTable.New(ctx, - d.balancer, - tableConfig.New( - append( - // prepend common params from root config - []tableConfig.Option{ - tableConfig.With(d.config.Common), - }, - d.tableOptions..., - )..., - ), - ) - if err != nil { - return xerrors.WithStackTrace(err) - } + d.table = xsync.OnceValue(func() *internalTable.Client { + return internalTable.New(xcontext.WithoutDeadline(ctx), + d.balancer, + tableConfig.New( + append( + // prepend common params from root config + []tableConfig.Option{ + tableConfig.With(d.config.Common), + }, + d.tableOptions..., + )..., + ), + ) + }) - d.query, err = internalQuery.New(ctx, - d.balancer, - queryConfig.New( - append( - // prepend common params from root config - []queryConfig.Option{ - queryConfig.With(d.config.Common), - }, - d.queryOptions..., - )..., - ), - ) + d.query = xsync.OnceValue(func() *internalQuery.Client { + return internalQuery.New(xcontext.WithoutDeadline(ctx), + d.balancer, + queryConfig.New( + append( + // prepend common params from root config + []queryConfig.Option{ + queryConfig.With(d.config.Common), + }, + d.queryOptions..., + )..., + ), + ) + }) if err != nil { return xerrors.WithStackTrace(err) } - d.scheme, err = internalScheme.New(ctx, - d.balancer, - schemeConfig.New( - append( - // prepend common params from root config - []schemeConfig.Option{ - schemeConfig.WithDatabaseName(d.Name()), - schemeConfig.With(d.config.Common), - }, - d.schemeOptions..., - )..., - ), - ) - if err != nil { - return xerrors.WithStackTrace(err) - } + d.scheme = xsync.OnceValue(func() *internalScheme.Client { + return internalScheme.New(xcontext.WithoutDeadline(ctx), + d.balancer, + schemeConfig.New( + append( + // prepend common params from root config + []schemeConfig.Option{ + schemeConfig.WithDatabaseName(d.Name()), + schemeConfig.With(d.config.Common), + }, + d.schemeOptions..., + )..., + ), + ) + }) - d.coordination, err = internalCoordination.New(ctx, - d.balancer, - coordinationConfig.New( - append( - // prepend common params from root config - []coordinationConfig.Option{ - coordinationConfig.With(d.config.Common), - }, - d.coordinationOptions..., - )..., - ), - ) - if err != nil { - return xerrors.WithStackTrace(err) - } + d.coordination = xsync.OnceValue(func() *internalCoordination.Client { + return internalCoordination.New(xcontext.WithoutDeadline(ctx), + d.balancer, + coordinationConfig.New( + append( + // prepend common params from root config + []coordinationConfig.Option{ + coordinationConfig.With(d.config.Common), + }, + d.coordinationOptions..., + )..., + ), + ) + }) - d.ratelimiter, err = internalRatelimiter.New(ctx, - d.balancer, - ratelimiterConfig.New( - append( - // prepend common params from root config - []ratelimiterConfig.Option{ - ratelimiterConfig.With(d.config.Common), - }, - d.ratelimiterOptions..., - )..., - ), - ) - if err != nil { - return xerrors.WithStackTrace(err) - } + d.ratelimiter = xsync.OnceValue(func() *internalRatelimiter.Client { + return internalRatelimiter.New(xcontext.WithoutDeadline(ctx), + d.balancer, + ratelimiterConfig.New( + append( + // prepend common params from root config + []ratelimiterConfig.Option{ + ratelimiterConfig.With(d.config.Common), + }, + d.ratelimiterOptions..., + )..., + ), + ) + }) - d.discovery, err = internalDiscovery.New(ctx, - d.pool.Get(endpoint.New(d.config.Endpoint())), - discoveryConfig.New( - append( - // prepend common params from root config - []discoveryConfig.Option{ - discoveryConfig.With(d.config.Common), - discoveryConfig.WithEndpoint(d.Endpoint()), - discoveryConfig.WithDatabase(d.Name()), - discoveryConfig.WithSecure(d.Secure()), - discoveryConfig.WithMeta(d.config.Meta()), - }, - d.discoveryOptions..., - )..., - ), - ) - if err != nil { - return xerrors.WithStackTrace(err) - } + d.discovery = xsync.OnceValue(func() *internalDiscovery.Client { + return internalDiscovery.New(xcontext.WithoutDeadline(ctx), + d.pool.Get(endpoint.New(d.config.Endpoint())), + discoveryConfig.New( + append( + // prepend common params from root config + []discoveryConfig.Option{ + discoveryConfig.With(d.config.Common), + discoveryConfig.WithEndpoint(d.Endpoint()), + discoveryConfig.WithDatabase(d.Name()), + discoveryConfig.WithSecure(d.Secure()), + discoveryConfig.WithMeta(d.config.Meta()), + }, + d.discoveryOptions..., + )..., + ), + ) + }) + + d.scripting = xsync.OnceValue(func() *internalScripting.Client { + return internalScripting.New(xcontext.WithoutDeadline(ctx), + d.balancer, + scriptingConfig.New( + append( + // prepend common params from root config + []scriptingConfig.Option{ + scriptingConfig.With(d.config.Common), + }, + d.scriptingOptions..., + )..., + ), + ) + }) - d.scripting, err = internalScripting.New(ctx, - d.balancer, - scriptingConfig.New( + d.topic = xsync.OnceValue(func() *topicclientinternal.Client { + return topicclientinternal.New(xcontext.WithoutDeadline(ctx), + d.balancer, + d.config.Credentials(), append( // prepend common params from root config - []scriptingConfig.Option{ - scriptingConfig.With(d.config.Common), + []topicoptions.TopicOption{ + topicoptions.WithOperationTimeout(d.config.OperationTimeout()), + topicoptions.WithOperationCancelAfter(d.config.OperationCancelAfter()), }, - d.scriptingOptions..., + d.topicOptions..., )..., - ), - ) - if err != nil { - return xerrors.WithStackTrace(err) - } - - d.topic, err = topicclientinternal.New(ctx, - d.balancer, - d.config.Credentials(), - append( - // prepend common params from root config - []topicoptions.TopicOption{ - topicoptions.WithOperationTimeout(d.config.OperationTimeout()), - topicoptions.WithOperationCancelAfter(d.config.OperationCancelAfter()), - }, - d.topicOptions..., - )..., - ) - if err != nil { - return xerrors.WithStackTrace(err) - } + ) + }) return nil } diff --git a/internal/balancer/balancer.go b/internal/balancer/balancer.go index 4d600134b..9bd58b451 100644 --- a/internal/balancer/balancer.go +++ b/internal/balancer/balancer.go @@ -257,12 +257,9 @@ func New( pool: pool, localDCDetector: detectLocalDC, } - d, err := internalDiscovery.New(ctx, pool.Get( + d := internalDiscovery.New(ctx, pool.Get( endpoint.New(driverConfig.Endpoint()), ), discoveryConfig) - if err != nil { - return nil, err - } b.discoveryClient = d diff --git a/internal/conn/config.go b/internal/conn/config.go index 305fc8c71..27a0603a1 100644 --- a/internal/conn/config.go +++ b/internal/conn/config.go @@ -11,6 +11,5 @@ import ( type Config interface { DialTimeout() time.Duration Trace() *trace.Driver - ConnectionTTL() time.Duration GrpcDialOptions() []grpc.DialOption } diff --git a/internal/conn/conn.go b/internal/conn/conn.go index 67f665488..a754923bd 100644 --- a/internal/conn/conn.go +++ b/internal/conn/conn.go @@ -94,36 +94,6 @@ func (c *conn) IsState(states ...State) bool { return false } -func (c *conn) park(ctx context.Context) (err error) { - onDone := trace.DriverOnConnPark( - c.config.Trace(), &ctx, - stack.FunctionID(""), - c.Endpoint(), - ) - defer func() { - onDone(err) - }() - - c.mtx.Lock() - defer c.mtx.Unlock() - - if c.closed { - return nil - } - - if c.cc == nil { - return nil - } - - err = c.close(ctx) - - if err != nil { - return c.wrapError(err) - } - - return nil -} - func (c *conn) NodeID() uint32 { if c != nil { return c.endpoint.NodeID() diff --git a/internal/conn/grpc_client_stream.go b/internal/conn/grpc_client_stream.go index fdc824fd9..0ea1cae86 100644 --- a/internal/conn/grpc_client_stream.go +++ b/internal/conn/grpc_client_stream.go @@ -3,7 +3,6 @@ package conn import ( "context" "io" - "time" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" "google.golang.org/grpc" @@ -11,7 +10,6 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" "github.com/ydb-platform/ydb-go-sdk/v3/internal/wrap" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) @@ -57,9 +55,6 @@ func (s *grpcClientStream) SendMsg(m interface{}) (err error) { onDone(err) }() - cancel := createPinger(s.c) - defer cancel() - err = s.ClientStream.SendMsg(m) if err != nil { @@ -93,9 +88,6 @@ func (s *grpcClientStream) RecvMsg(m interface{}) (err error) { onDone(err) }() - cancel := createPinger(s.c) - defer cancel() - defer func() { if err != nil { md := s.ClientStream.Trailer() @@ -154,24 +146,3 @@ func (s *grpcClientStream) wrapError(err error) error { xerrors.WithSkipDepth(1), ) } - -func createPinger(c *conn) context.CancelFunc { - c.touchLastUsage() - ctx, cancel := xcontext.WithCancel(context.Background()) - go func() { - ticker := time.NewTicker(time.Second) - ctxDone := ctx.Done() - for { - select { - case <-ctxDone: - ticker.Stop() - - return - case <-ticker.C: - c.touchLastUsage() - } - } - }() - - return cancel -} diff --git a/internal/conn/pool.go b/internal/conn/pool.go index 9cf697a05..02072d67c 100644 --- a/internal/conn/pool.go +++ b/internal/conn/pool.go @@ -4,7 +4,6 @@ import ( "context" "sync" "sync/atomic" - "time" "google.golang.org/grpc" grpcCodes "google.golang.org/grpc/codes" @@ -12,7 +11,6 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/closer" "github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint" "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync" "github.com/ydb-platform/ydb-go-sdk/v3/trace" @@ -183,39 +181,6 @@ func (p *Pool) Release(ctx context.Context) (finalErr error) { return nil } -func (p *Pool) connParker(ctx context.Context, ttl, interval time.Duration) { - ticker := time.NewTicker(interval) - defer ticker.Stop() - for { - select { - case <-p.done: - return - case <-ticker.C: - for _, c := range p.collectConns() { - if time.Since(c.LastUsage()) > ttl { - switch c.GetState() { - case Online, Banned: - _ = c.park(ctx) - default: - // nop - } - } - } - } - } -} - -func (p *Pool) collectConns() []*conn { - p.mtx.RLock() - defer p.mtx.RUnlock() - conns := make([]*conn, 0, len(p.conns)) - for _, c := range p.conns { - conns = append(conns, c) - } - - return conns -} - func NewPool(ctx context.Context, config Config) *Pool { onDone := trace.DriverOnPoolNew(config.Trace(), &ctx, stack.FunctionID("")) defer onDone() @@ -227,9 +192,6 @@ func NewPool(ctx context.Context, config Config) *Pool { conns: make(map[connsKey]*conn), done: make(chan struct{}), } - if ttl := config.ConnectionTTL(); ttl > 0 { - go p.connParker(xcontext.WithoutDeadline(ctx), ttl, ttl/2) - } return p } diff --git a/internal/coordination/client.go b/internal/coordination/client.go index c6ceede63..fb9331414 100644 --- a/internal/coordination/client.go +++ b/internal/coordination/client.go @@ -27,11 +27,11 @@ type Client struct { service Ydb_Coordination_V1.CoordinationServiceClient } -func New(ctx context.Context, cc grpc.ClientConnInterface, config config.Config) (*Client, error) { +func New(ctx context.Context, cc grpc.ClientConnInterface, config config.Config) *Client { return &Client{ config: config, service: Ydb_Coordination_V1.NewCoordinationServiceClient(cc), - }, nil + } } func (c *Client) CreateNode(ctx context.Context, path string, config coordination.NodeConfig) error { diff --git a/internal/discovery/discovery.go b/internal/discovery/discovery.go index 1d711ab69..65237078a 100644 --- a/internal/discovery/discovery.go +++ b/internal/discovery/discovery.go @@ -19,12 +19,12 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) -func New(ctx context.Context, cc grpc.ClientConnInterface, config *config.Config) (*Client, error) { +func New(ctx context.Context, cc grpc.ClientConnInterface, config *config.Config) *Client { return &Client{ config: config, cc: cc, client: Ydb_Discovery_V1.NewDiscoveryServiceClient(cc), - }, nil + } } var _ discovery.Client = &Client{} diff --git a/internal/pool/defaults.go b/internal/pool/defaults.go index 4f7eb64a0..2591e8438 100644 --- a/internal/pool/defaults.go +++ b/internal/pool/defaults.go @@ -1,10 +1,6 @@ package pool -const ( - DefaultMaxSize = 50 - DefaultMinSize = 0 - DefaultProducersCount = 1 -) +const DefaultLimit = 50 var defaultTrace = &Trace{ OnNew: func(info *NewStartInfo) func(info *NewDoneInfo) { @@ -15,10 +11,6 @@ var defaultTrace = &Trace{ return func(info *CloseDoneInfo) { } }, - OnProduce: func(info *ProduceStartInfo) func(info *ProduceDoneInfo) { - return func(info *ProduceDoneInfo) { - } - }, OnTry: func(info *TryStartInfo) func(info *TryDoneInfo) { return func(info *TryDoneInfo) { } @@ -35,12 +27,5 @@ var defaultTrace = &Trace{ return func(info *GetDoneInfo) { } }, - OnSpawn: func(info *SpawnStartInfo) func(info *SpawnDoneInfo) { - return func(info *SpawnDoneInfo) { - } - }, - OnWant: func(info *WantStartInfo) func(info *WantDoneInfo) { - return func(info *WantDoneInfo) { - } - }, + OnChange: func(info ChangeInfo) {}, } diff --git a/internal/pool/errors.go b/internal/pool/errors.go index 24a37f31e..8e2eb6f21 100644 --- a/internal/pool/errors.go +++ b/internal/pool/errors.go @@ -2,6 +2,12 @@ package pool import ( "errors" + + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" ) -var errClosedPool = errors.New("closed pool") +var ( + errClosedPool = errors.New("closed pool") + errPoolOverflow = xerrors.Retryable(errors.New("pool overflow")) + errItemIsNotAlive = errors.New("item is not alive") +) diff --git a/internal/pool/pool.go b/internal/pool/pool.go index 90cd0b734..82b2905ef 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -2,11 +2,12 @@ package pool import ( "context" - "sync" + "sync/atomic" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/pool/stats" "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync" "github.com/ydb-platform/ydb-go-sdk/v3/retry" "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) @@ -17,42 +18,44 @@ type ( IsAlive() bool Close(ctx context.Context) error } + Stats struct { + locked *xsync.Locked[stats.Stats] + onChange func(stats.Stats) + } Pool[PT Item[T], T any] struct { - trace *Trace - maxSize int - minSize int - producersCount int + trace *Trace + limit int create func(ctx context.Context) (PT, error) - idle chan PT - spawn chan PT - stop chan struct{} + mu xsync.Mutex + idle []PT + index map[PT]struct{} + + done atomic.Bool + + stats *Stats } option[PT Item[T], T any] func(p *Pool[PT, T]) ) -func WithCreateFunc[PT Item[T], T any](f func(ctx context.Context) (PT, error)) option[PT, T] { - return func(p *Pool[PT, T]) { - p.create = f - } +func (stats *Stats) Change(f func(s stats.Stats) stats.Stats) { + stats.onChange(stats.locked.Change(f)) } -func WithMinSize[PT Item[T], T any](size int) option[PT, T] { - return func(p *Pool[PT, T]) { - p.minSize = size - } +func (stats *Stats) Get() stats.Stats { + return stats.locked.Get() } -func WithMaxSize[PT Item[T], T any](size int) option[PT, T] { +func WithCreateFunc[PT Item[T], T any](f func(ctx context.Context) (PT, error)) option[PT, T] { return func(p *Pool[PT, T]) { - p.maxSize = size + p.create = f } } -func WithProducersCount[PT Item[T], T any](count int) option[PT, T] { +func WithLimit[PT Item[T], T any](size int) option[PT, T] { return func(p *Pool[PT, T]) { - p.producersCount = count + p.limit = size } } @@ -65,18 +68,15 @@ func WithTrace[PT Item[T], T any](t *Trace) option[PT, T] { func New[PT Item[T], T any]( ctx context.Context, opts ...option[PT, T], -) (p *Pool[PT, T], finalErr error) { - p = &Pool[PT, T]{ - trace: defaultTrace, - maxSize: DefaultMaxSize, - minSize: DefaultMinSize, - producersCount: DefaultProducersCount, +) *Pool[PT, T] { + p := &Pool[PT, T]{ + trace: defaultTrace, + limit: DefaultLimit, create: func(ctx context.Context) (PT, error) { var item T return &item, nil }, - stop: make(chan struct{}), } for _, opt := range opts { @@ -86,142 +86,144 @@ func New[PT Item[T], T any]( } onDone := p.trace.OnNew(&NewStartInfo{ - Context: &ctx, - Call: stack.FunctionID(""), - MinSize: p.minSize, - MaxSize: p.maxSize, - ProducersCount: p.producersCount, + Context: &ctx, + Call: stack.FunctionID(""), }) defer func() { onDone(&NewDoneInfo{ - Error: finalErr, - MinSize: p.minSize, - MaxSize: p.maxSize, - ProducersCount: p.producersCount, + Limit: p.limit, }) }() - if p.minSize > p.maxSize { - p.minSize = p.maxSize / 10 + p.idle = make([]PT, 0, p.limit) + p.index = make(map[PT]struct{}, p.limit) + p.stats = &Stats{ + locked: xsync.NewLocked[stats.Stats](stats.Stats{ + Limit: p.limit, + }), + onChange: p.trace.OnChange, } - if p.producersCount > p.maxSize { - p.producersCount = p.maxSize - } - - if p.producersCount <= 0 { - p.producersCount = 1 - } - - p.idle = make(chan PT, p.maxSize) - - p.produce(ctx) + return p +} - return p, nil +func (p *Pool[PT, T]) Stats() stats.Stats { + return p.stats.Get() } -func (p *Pool[PT, T]) want(ctx context.Context) (err error) { - onDone := p.trace.OnWant(&WantStartInfo{ +func (p *Pool[PT, T]) get(ctx context.Context) (_ PT, finalErr error) { + onDone := p.trace.OnGet(&GetStartInfo{ Context: &ctx, Call: stack.FunctionID(""), }) defer func() { - onDone(&WantDoneInfo{ - Error: err, + onDone(&GetDoneInfo{ + Error: finalErr, }) }() - select { - case <-ctx.Done(): - return xerrors.WithStackTrace(ctx.Err()) - case p.spawn <- nil: - return nil + if err := ctx.Err(); err != nil { + return nil, xerrors.WithStackTrace(err) } + + if p.done.Load() { + return nil, xerrors.WithStackTrace(errClosedPool) + } + + var item PT + p.mu.WithLock(func() { + if len(p.idle) > 0 { + item, p.idle = p.idle[0], p.idle[1:] + p.stats.Change(func(v stats.Stats) stats.Stats { + v.Idle-- + + return v + }) + } + }) + + if item != nil { + if item.IsAlive() { + return item, nil + } + _ = item.Close(ctx) + p.mu.WithLock(func() { + delete(p.index, item) + }) + p.stats.Change(func(v stats.Stats) stats.Stats { + v.Index-- + + return v + }) + } + + p.mu.Lock() + defer p.mu.Unlock() + + if len(p.index) == p.limit { + return nil, xerrors.WithStackTrace(errPoolOverflow) + } + + item, err := p.create(ctx) + if err != nil { + return nil, xerrors.WithStackTrace(err) + } + + p.index[item] = struct{}{} + p.stats.Change(func(v stats.Stats) stats.Stats { + v.Index++ + + return v + }) + + return item, nil } -func (p *Pool[PT, T]) put(ctx context.Context, item PT) (err error) { +func (p *Pool[PT, T]) put(ctx context.Context, item PT) (finalErr error) { onDone := p.trace.OnPut(&PutStartInfo{ Context: &ctx, Call: stack.FunctionID(""), }) defer func() { onDone(&PutDoneInfo{ - Error: err, + Error: finalErr, }) }() - select { - case <-ctx.Done(): - // context is done - return xerrors.WithStackTrace(ctx.Err()) - case <-p.stop: - // pool closed early - return item.Close(ctx) - case p.spawn <- item: - // return item into pool - return nil - default: - // not enough space in pool - return item.Close(ctx) + if err := ctx.Err(); err != nil { + return xerrors.WithStackTrace(err) } -} -func (p *Pool[PT, T]) produce(ctx context.Context) { - ctx = xcontext.WithoutDeadline(ctx) + if p.done.Load() { + return xerrors.WithStackTrace(errClosedPool) + } - onDone := p.trace.OnProduce(&ProduceStartInfo{ - Context: &ctx, - Call: stack.FunctionID(""), - Concurrency: p.producersCount, - }) - defer func() { - onDone(&ProduceDoneInfo{}) - }() + if !item.IsAlive() { + _ = item.Close(ctx) - p.spawn = make(chan PT, p.maxSize) - - var wg, started sync.WaitGroup - wg.Add(p.producersCount) - started.Add(p.producersCount + 1) - - for range make([]struct{}, p.producersCount) { - go func() { - defer wg.Done() - started.Done() - - for { - select { - case <-p.stop: - return - - case msg := <-p.spawn: - if msg != nil { - p.idle <- msg - } else { - item, err := p.create(ctx) - if err == nil { - p.idle <- item - } - } - } - } - }() - } + p.mu.WithLock(func() { + delete(p.index, item) + }) + p.stats.Change(func(v stats.Stats) stats.Stats { + v.Index-- + + return v + }) - for i := 0; i < p.maxSize && len(p.idle) < p.minSize; i++ { - _ = p.want(ctx) + return xerrors.WithStackTrace(errItemIsNotAlive) } - go func() { - started.Done() - defer func() { - close(p.idle) - }() - wg.Wait() - }() + p.mu.WithLock(func() { + p.idle = append(p.idle, item) + }) + p.stats.Change(func(v stats.Stats) stats.Stats { + v.Idle++ + + return v + }) - started.Wait() + return nil } func (p *Pool[PT, T]) try(ctx context.Context, f func(ctx context.Context, item PT) error) (finalErr error) { @@ -241,11 +243,20 @@ func (p *Pool[PT, T]) try(ctx context.Context, f func(ctx context.Context, item } defer func() { - if !item.IsAlive() { - _ = item.Close(ctx) - } else { - _ = p.put(ctx, item) - } + _ = p.put(ctx, item) + }() + + p.stats.Change(func(v stats.Stats) stats.Stats { + v.InUse++ + + return v + }) + defer func() { + p.stats.Change(func(v stats.Stats) stats.Stats { + v.InUse-- + + return v + }) }() err = f(ctx, item) @@ -275,10 +286,7 @@ func (p *Pool[PT, T]) With( }) }() - retryCtx, cancelRetry := xcontext.WithDone(ctx, p.stop) - defer cancelRetry() - - err := retry.Retry(retryCtx, func(ctx context.Context) error { + err := retry.Retry(ctx, func(ctx context.Context) error { err := p.try(ctx, f) if err != nil { return xerrors.WithStackTrace(err) @@ -301,35 +309,6 @@ func (p *Pool[PT, T]) With( return nil } -func (p *Pool[PT, T]) get(ctx context.Context) (_ PT, finalErr error) { - onDone := p.trace.OnGet(&GetStartInfo{ - Context: &ctx, - Call: stack.FunctionID(""), - }) - defer func() { - onDone(&GetDoneInfo{ - Error: finalErr, - }) - }() - - for { - select { - case <-p.stop: - return nil, xerrors.WithStackTrace(errClosedPool) - case item, has := <-p.idle: - if !has { - return nil, xerrors.WithStackTrace(errClosedPool) - } - if item.IsAlive() { - return item, nil - } - _ = item.Close(ctx) - case p.spawn <- nil: - continue - } - } -} - func (p *Pool[PT, T]) Close(ctx context.Context) (finalErr error) { onDone := p.trace.OnClose(&CloseStartInfo{ Context: &ctx, @@ -341,24 +320,19 @@ func (p *Pool[PT, T]) Close(ctx context.Context) (finalErr error) { }) }() - close(p.stop) + p.done.Store(true) + + p.mu.Lock() + defer p.mu.Unlock() - errs := make([]error, 0, len(p.idle)+len(p.spawn)) + errs := make([]error, 0, len(p.index)) - for item := range p.idle { + for item := range p.index { if err := item.Close(ctx); err != nil { errs = append(errs, err) } } - for len(p.spawn) > 0 { - if msg := <-p.spawn; msg != nil { - if err := msg.Close(ctx); err != nil { - errs = append(errs, err) - } - } - } - switch len(errs) { case 0: return nil diff --git a/internal/pool/pool_test.go b/internal/pool/pool_test.go index 8f1878d49..f1f220804 100644 --- a/internal/pool/pool_test.go +++ b/internal/pool/pool_test.go @@ -29,6 +29,10 @@ func (t testItem) IsAlive() bool { return true } +func (t testItem) ID() string { + return "" +} + func (t testItem) Close(context.Context) error { if t.onClose != nil { return t.onClose() @@ -41,35 +45,20 @@ func TestPool(t *testing.T) { rootCtx := xtest.Context(t) t.Run("New", func(t *testing.T) { t.Run("Default", func(t *testing.T) { - p, err := New[*testItem, testItem](rootCtx) - require.NoError(t, err) - err = p.With(rootCtx, func(ctx context.Context, testItem *testItem) error { + p := New[*testItem, testItem](rootCtx) + err := p.With(rootCtx, func(ctx context.Context, testItem *testItem) error { return nil }) require.NoError(t, err) }) - t.Run("WithMaxSize", func(t *testing.T) { - p, err := New[*testItem, testItem](rootCtx, WithMaxSize[*testItem, testItem](1)) - require.NoError(t, err) - require.EqualValues(t, 1, p.maxSize) - }) - t.Run("WithMinSize", func(t *testing.T) { - t.Run("LessOrEqualMaxSize", func(t *testing.T) { - p, err := New[*testItem, testItem](rootCtx, - WithMinSize[*testItem, testItem](1), - ) - require.NoError(t, err) - require.EqualValues(t, p.minSize, 1) - }) - t.Run("GreatThenMaxSize", func(t *testing.T) { - p, err := New[*testItem, testItem](rootCtx, WithMinSize[*testItem, testItem](100)) - require.NoError(t, err) - require.EqualValues(t, DefaultMaxSize/10, p.minSize) - }) + t.Run("WithLimit", func(t *testing.T) { + p := New[*testItem, testItem](rootCtx, WithLimit[*testItem, testItem](1)) + require.EqualValues(t, 1, p.limit) }) t.Run("WithCreateFunc", func(t *testing.T) { var newCounter int64 - p, err := New(rootCtx, + p := New(rootCtx, + WithLimit[*testItem, testItem](1), WithCreateFunc(func(context.Context) (*testItem, error) { atomic.AddInt64(&newCounter, 1) var v testItem @@ -77,18 +66,20 @@ func TestPool(t *testing.T) { return &v, nil }), ) + err := p.With(rootCtx, func(ctx context.Context, item *testItem) error { + return nil + }) require.NoError(t, err) - require.EqualValues(t, p.minSize, atomic.LoadInt64(&newCounter)) + require.EqualValues(t, p.limit, atomic.LoadInt64(&newCounter)) }) }) - t.Run("With", func(t *testing.T) { + t.Run("Change", func(t *testing.T) { t.Run("Context", func(t *testing.T) { t.Run("Canceled", func(t *testing.T) { ctx, cancel := context.WithCancel(rootCtx) cancel() - p, err := New[*testItem, testItem](ctx, WithMaxSize[*testItem, testItem](1)) - require.NoError(t, err) - err = p.With(ctx, func(ctx context.Context, testItem *testItem) error { + p := New[*testItem, testItem](ctx, WithLimit[*testItem, testItem](1)) + err := p.With(ctx, func(ctx context.Context, testItem *testItem) error { return nil }) require.ErrorIs(t, err, context.Canceled) @@ -96,9 +87,8 @@ func TestPool(t *testing.T) { t.Run("DeadlineExceeded", func(t *testing.T) { ctx, cancel := context.WithTimeout(rootCtx, 0) cancel() - p, err := New[*testItem, testItem](ctx, WithMaxSize[*testItem, testItem](1)) - require.NoError(t, err) - err = p.With(ctx, func(ctx context.Context, testItem *testItem) error { + p := New[*testItem, testItem](ctx, WithLimit[*testItem, testItem](1)) + err := p.With(ctx, func(ctx context.Context, testItem *testItem) error { return nil }) require.ErrorIs(t, err, context.DeadlineExceeded) @@ -112,9 +102,8 @@ func TestPool(t *testing.T) { createCounter int64 closeCounter int64 ) - p, err := New(rootCtx, - WithMinSize[*testItem, testItem](0), - WithMaxSize[*testItem, testItem](1), + p := New(rootCtx, + WithLimit[*testItem, testItem](1), WithCreateFunc(func(context.Context) (*testItem, error) { atomic.AddInt64(&createCounter, 1) @@ -129,13 +118,13 @@ func TestPool(t *testing.T) { return v, nil }), ) - require.NoError(t, err) - err = p.With(rootCtx, func(ctx context.Context, testItem *testItem) error { + err := p.With(rootCtx, func(ctx context.Context, testItem *testItem) error { return nil }) require.NoError(t, err) require.GreaterOrEqual(t, atomic.LoadInt64(&createCounter), atomic.LoadInt64(&closeCounter)) - p.Close(rootCtx) + err = p.Close(rootCtx) + require.NoError(t, err) require.EqualValues(t, atomic.LoadInt64(&createCounter), atomic.LoadInt64(&closeCounter)) }, xtest.StopAfter(time.Second)) }) @@ -146,8 +135,8 @@ func TestPool(t *testing.T) { deleteItems int64 expErr = xerrors.Retryable(errors.New("expected error"), xerrors.WithDeleteSession()) ) - p, err := New(rootCtx, - WithMaxSize[*testItem, testItem](1), + p := New(rootCtx, + WithLimit[*testItem, testItem](1), WithCreateFunc(func(context.Context) (*testItem, error) { atomic.AddInt64(&newItems, 1) @@ -165,8 +154,7 @@ func TestPool(t *testing.T) { return v, nil }), ) - require.NoError(t, err) - err = p.With(rootCtx, func(ctx context.Context, testItem *testItem) error { + err := p.With(rootCtx, func(ctx context.Context, testItem *testItem) error { if atomic.LoadInt64(&newItems) < 10 { return expErr } @@ -176,18 +164,18 @@ func TestPool(t *testing.T) { require.NoError(t, err) require.GreaterOrEqual(t, atomic.LoadInt64(&newItems), int64(9)) require.GreaterOrEqual(t, atomic.LoadInt64(&newItems), atomic.LoadInt64(&deleteItems)) - p.Close(rootCtx) + err = p.Close(rootCtx) + require.NoError(t, err) require.EqualValues(t, atomic.LoadInt64(&newItems), atomic.LoadInt64(&deleteItems)) }, xtest.StopAfter(5*time.Second)) }) }) t.Run("Stress", func(t *testing.T) { xtest.TestManyTimes(t, func(t testing.TB) { - p, err := New[*testItem, testItem](rootCtx, WithMinSize[*testItem, testItem](DefaultMaxSize/2)) - require.NoError(t, err) + p := New[*testItem, testItem](rootCtx) var wg sync.WaitGroup - wg.Add(DefaultMaxSize*2 + 1) - for range make([]struct{}, DefaultMaxSize*2) { + wg.Add(DefaultLimit*2 + 1) + for range make([]struct{}, DefaultLimit*2) { go func() { defer wg.Done() err := p.With(rootCtx, func(ctx context.Context, testItem *testItem) error { diff --git a/internal/pool/stats/stats.go b/internal/pool/stats/stats.go new file mode 100644 index 000000000..dff03eaeb --- /dev/null +++ b/internal/pool/stats/stats.go @@ -0,0 +1,8 @@ +package stats + +type Stats struct { + Limit int + Index int + Idle int + InUse int +} diff --git a/internal/pool/trace.go b/internal/pool/trace.go index b60d63aa5..40adef256 100644 --- a/internal/pool/trace.go +++ b/internal/pool/trace.go @@ -3,20 +3,19 @@ package pool import ( "context" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/pool/stats" "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" ) type ( Trace struct { - OnNew func(*NewStartInfo) func(*NewDoneInfo) - OnClose func(*CloseStartInfo) func(*CloseDoneInfo) - OnProduce func(*ProduceStartInfo) func(*ProduceDoneInfo) - OnTry func(*TryStartInfo) func(*TryDoneInfo) - OnWith func(*WithStartInfo) func(*WithDoneInfo) - OnPut func(*PutStartInfo) func(*PutDoneInfo) - OnGet func(*GetStartInfo) func(*GetDoneInfo) - OnSpawn func(*SpawnStartInfo) func(*SpawnDoneInfo) - OnWant func(*WantStartInfo) func(*WantDoneInfo) + OnNew func(*NewStartInfo) func(*NewDoneInfo) + OnClose func(*CloseStartInfo) func(*CloseDoneInfo) + OnTry func(*TryStartInfo) func(*TryDoneInfo) + OnWith func(*WithStartInfo) func(*WithDoneInfo) + OnPut func(*PutStartInfo) func(*PutDoneInfo) + OnGet func(*GetStartInfo) func(*GetDoneInfo) + OnChange func(ChangeInfo) } NewStartInfo struct { // Context make available context in trace stack.Callerback function. @@ -25,19 +24,9 @@ type ( // Safe replacement of context are provided only inside stack.Callerback function Context *context.Context Call stack.Caller - - // input settings - MinSize int - MaxSize int - ProducersCount int } NewDoneInfo struct { - Error error - - // actual settings - MinSize int - MaxSize int - ProducersCount int + Limit int } CloseStartInfo struct { // Context make available context in trace stack.Callerback function. @@ -50,18 +39,7 @@ type ( CloseDoneInfo struct { Error error } - ProduceStartInfo struct { - // Context make available context in trace stack.Callerback function. - // Pointer to context provide replacement of context in trace stack.Callerback function. - // Warning: concurrent access to pointer on client side must be excluded. - // Safe replacement of context are provided only inside stack.Callerback function - Context *context.Context - Call stack.Caller - - Concurrency int - } - ProduceDoneInfo struct{} - TryStartInfo struct { + TryStartInfo struct { // Context make available context in trace stack.Callerback function. // Pointer to context provide replacement of context in trace stack.Callerback function. // Warning: concurrent access to pointer on client side must be excluded. @@ -107,26 +85,5 @@ type ( GetDoneInfo struct { Error error } - SpawnStartInfo struct { - // Context make available context in trace stack.Callerback function. - // Pointer to context provide replacement of context in trace stack.Callerback function. - // Warning: concurrent access to pointer on client side must be excluded. - // Safe replacement of context are provided only inside stack.Callerback function - Context *context.Context - Call stack.Caller - } - SpawnDoneInfo struct { - Error error - } - WantStartInfo struct { - // Context make available context in trace stack.Callerback function. - // Pointer to context provide replacement of context in trace stack.Callerback function. - // Warning: concurrent access to pointer on client side must be excluded. - // Safe replacement of context are provided only inside stack.Callerback function - Context *context.Context - Call stack.Caller - } - WantDoneInfo struct { - Error error - } + ChangeInfo = stats.Stats ) diff --git a/internal/query/client.go b/internal/query/client.go index 61670f415..a8bfa8ad5 100644 --- a/internal/query/client.go +++ b/internal/query/client.go @@ -7,6 +7,7 @@ import ( "google.golang.org/grpc" "github.com/ydb-platform/ydb-go-sdk/v3/internal/pool" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/pool/stats" "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/config" "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/options" "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" @@ -34,9 +35,19 @@ type Client struct { config *config.Config grpcClient Ydb_Query_V1.QueryServiceClient pool *pool.Pool[*Session, Session] + + done chan struct{} +} + +func (c *Client) Stats() *stats.Stats { + s := c.pool.Stats() + + return &s } func (c *Client) Close(ctx context.Context) error { + close(c.done) + err := c.pool.Close(ctx) if err != nil { return xerrors.WithStackTrace(err) @@ -92,11 +103,16 @@ func do( } func (c *Client) Do(ctx context.Context, op query.Operation, opts ...options.DoOption) error { - onDone := trace.QueryOnDo(c.config.Trace(), &ctx, stack.FunctionID("")) - attempts, err := do(ctx, c.pool, op, c.config.Trace(), opts...) - onDone(attempts, err) - - return err + select { + case <-c.done: + return xerrors.WithStackTrace(errClosedClient) + default: + onDone := trace.QueryOnDo(c.config.Trace(), &ctx, stack.FunctionID("")) + attempts, err := do(ctx, c.pool, op, c.config.Trace(), opts...) + onDone(attempts, err) + + return err + } } func doTx( @@ -142,27 +158,30 @@ func doTx( } func (c *Client) DoTx(ctx context.Context, op query.TxOperation, opts ...options.DoTxOption) error { - onDone := trace.QueryOnDoTx(c.config.Trace(), &ctx, stack.FunctionID("")) - attempts, err := doTx(ctx, c.pool, op, c.config.Trace(), opts...) - onDone(attempts, err) - - return err + select { + case <-c.done: + return xerrors.WithStackTrace(errClosedClient) + default: + onDone := trace.QueryOnDoTx(c.config.Trace(), &ctx, stack.FunctionID("")) + attempts, err := doTx(ctx, c.pool, op, c.config.Trace(), opts...) + onDone(attempts, err) + + return err + } } -func New(ctx context.Context, balancer balancer, cfg *config.Config) (_ *Client, err error) { +func New(ctx context.Context, balancer balancer, cfg *config.Config) *Client { onDone := trace.QueryOnNew(cfg.Trace(), &ctx, stack.FunctionID("")) - defer func() { - onDone(err) - }() + defer onDone() client := &Client{ config: cfg, grpcClient: Ydb_Query_V1.NewQueryServiceClient(balancer), + done: make(chan struct{}), } - client.pool, err = pool.New(ctx, - pool.WithMaxSize[*Session, Session](cfg.PoolMaxSize()), - pool.WithProducersCount[*Session, Session](cfg.PoolProducersCount()), + client.pool = pool.New(ctx, + pool.WithLimit[*Session, Session](cfg.PoolLimit()), pool.WithTrace[*Session, Session](poolTrace(cfg.Trace())), pool.WithCreateFunc(func(ctx context.Context) (_ *Session, err error) { var cancel context.CancelFunc @@ -187,20 +206,17 @@ func New(ctx context.Context, balancer balancer, cfg *config.Config) (_ *Client, return s, nil }), ) - if err != nil { - return nil, xerrors.WithStackTrace(err) - } - return client, xerrors.WithStackTrace(ctx.Err()) + return client } func poolTrace(t *trace.Query) *pool.Trace { return &pool.Trace{ OnNew: func(info *pool.NewStartInfo) func(*pool.NewDoneInfo) { - onDone := trace.QueryOnPoolNew(t, info.Context, info.Call, info.MinSize, info.MaxSize, info.ProducersCount) + onDone := trace.QueryOnPoolNew(t, info.Context, info.Call) return func(info *pool.NewDoneInfo) { - onDone(info.Error, info.MinSize, info.MaxSize, info.ProducersCount) + onDone(info.Limit) } }, OnClose: func(info *pool.CloseStartInfo) func(*pool.CloseDoneInfo) { @@ -210,13 +226,6 @@ func poolTrace(t *trace.Query) *pool.Trace { onDone(info.Error) } }, - OnProduce: func(info *pool.ProduceStartInfo) func(*pool.ProduceDoneInfo) { - onDone := trace.QueryOnPoolProduce(t, info.Context, info.Call, info.Concurrency) - - return func(info *pool.ProduceDoneInfo) { - onDone() - } - }, OnTry: func(info *pool.TryStartInfo) func(*pool.TryDoneInfo) { onDone := trace.QueryOnPoolTry(t, info.Context, info.Call) @@ -245,19 +254,8 @@ func poolTrace(t *trace.Query) *pool.Trace { onDone(info.Error) } }, - OnSpawn: func(info *pool.SpawnStartInfo) func(*pool.SpawnDoneInfo) { - onDone := trace.QueryOnPoolSpawn(t, info.Context, info.Call) - - return func(info *pool.SpawnDoneInfo) { - onDone(info.Error) - } - }, - OnWant: func(info *pool.WantStartInfo) func(*pool.WantDoneInfo) { - onDone := trace.QueryOnPoolWant(t, info.Context, info.Call) - - return func(info *pool.WantDoneInfo) { - onDone(info.Error) - } + OnChange: func(info pool.ChangeInfo) { + trace.QueryOnPoolChange(t, info.Limit, info.Index, info.Idle, info.InUse) }, } } diff --git a/internal/query/client_test.go b/internal/query/client_test.go index ece41f161..7c49b22e7 100644 --- a/internal/query/client_test.go +++ b/internal/query/client_test.go @@ -173,25 +173,20 @@ func newTestSessionWithClient(client Ydb_Query_V1.QueryServiceClient) (*Session, }, nil } -func mustTestPool( +func testPool( ctx context.Context, createSession func(ctx context.Context) (*Session, error), ) *pool.Pool[*Session, Session] { - p, err := pool.New[*Session, Session](ctx, - pool.WithMaxSize[*Session, Session](1), + return pool.New[*Session, Session](ctx, + pool.WithLimit[*Session, Session](1), pool.WithCreateFunc(createSession), ) - if err != nil { - panic(err) - } - - return p } func TestDo(t *testing.T) { ctx := xtest.Context(t) t.Run("HappyWay", func(t *testing.T) { - attempts, err := do(ctx, mustTestPool(ctx, func(ctx context.Context) (*Session, error) { + attempts, err := do(ctx, testPool(ctx, func(ctx context.Context) (*Session, error) { return newTestSession() }), func(ctx context.Context, s query.Session) error { return nil @@ -201,7 +196,7 @@ func TestDo(t *testing.T) { }) t.Run("RetryableError", func(t *testing.T) { counter := 0 - attempts, err := do(ctx, mustTestPool(ctx, func(ctx context.Context) (*Session, error) { + attempts, err := do(ctx, testPool(ctx, func(ctx context.Context) (*Session, error) { return newTestSession() }), func(ctx context.Context, s query.Session) error { counter++ @@ -228,7 +223,7 @@ func TestDoTx(t *testing.T) { client.EXPECT().CommitTransaction(gomock.Any(), gomock.Any()).Return(&Ydb_Query.CommitTransactionResponse{ Status: Ydb.StatusIds_SUCCESS, }, nil) - attempts, err := doTx(ctx, mustTestPool(ctx, func(ctx context.Context) (*Session, error) { + attempts, err := doTx(ctx, testPool(ctx, func(ctx context.Context) (*Session, error) { return newTestSessionWithClient(client) }), func(ctx context.Context, tx query.TxActor) error { return nil @@ -249,7 +244,7 @@ func TestDoTx(t *testing.T) { client.EXPECT().CommitTransaction(gomock.Any(), gomock.Any()).Return(&Ydb_Query.CommitTransactionResponse{ Status: Ydb.StatusIds_SUCCESS, }, nil).AnyTimes() - attempts, err := doTx(ctx, mustTestPool(ctx, func(ctx context.Context) (*Session, error) { + attempts, err := doTx(ctx, testPool(ctx, func(ctx context.Context) (*Session, error) { return newTestSessionWithClient(client) }), func(ctx context.Context, tx query.TxActor) error { counter++ diff --git a/internal/query/config/config.go b/internal/query/config/config.go index a6d3c4edb..392f2ec4d 100644 --- a/internal/query/config/config.go +++ b/internal/query/config/config.go @@ -3,8 +3,6 @@ package config import ( "time" - "github.com/jonboulle/clockwork" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/config" "github.com/ydb-platform/ydb-go-sdk/v3/internal/pool" "github.com/ydb-platform/ydb-go-sdk/v3/trace" @@ -13,24 +11,18 @@ import ( const ( DefaultSessionDeleteTimeout = 500 * time.Millisecond DefaultSessionCreateTimeout = 5 * time.Second - DefaultPoolMinSize = pool.DefaultMinSize - DefaultPoolMaxSize = pool.DefaultMaxSize - DefaultPoolProducersCount = pool.DefaultProducersCount + DefaultPoolMaxSize = pool.DefaultLimit ) type Config struct { config.Common - minSize int - maxSize int - producersCount int + poolLimit int sessionCreateTimeout time.Duration sessionDeleteTimeout time.Duration trace *trace.Query - - clock clockwork.Clock } func New(opts ...Option) *Config { @@ -46,12 +38,9 @@ func New(opts ...Option) *Config { func defaults() *Config { return &Config{ - minSize: DefaultPoolMinSize, - maxSize: DefaultPoolMaxSize, - producersCount: DefaultPoolProducersCount, + poolLimit: DefaultPoolMaxSize, sessionCreateTimeout: DefaultSessionCreateTimeout, sessionDeleteTimeout: DefaultSessionDeleteTimeout, - clock: clockwork.NewRealClock(), trace: &trace.Query{}, } } @@ -61,24 +50,11 @@ func (c *Config) Trace() *trace.Query { return c.trace } -// Clock defines clock -func (c *Config) Clock() clockwork.Clock { - return c.clock -} - -func (c *Config) PoolMinSize() int { - return c.minSize -} - -// PoolMaxSize is an upper bound of pooled sessions. -// If PoolMaxSize is less than or equal to zero then the -// DefaultPoolMaxSize variable is used as a limit. -func (c *Config) PoolMaxSize() int { - return c.maxSize -} - -func (c *Config) PoolProducersCount() int { - return c.producersCount +// PoolLimit is an upper bound of pooled sessions. +// If PoolLimit is less than or equal to zero then the +// DefaultPoolMaxSize variable is used as a pool limit. +func (c *Config) PoolLimit() int { + return c.poolLimit } // SessionCreateTimeout limits maximum time spent on Create session request diff --git a/internal/query/config/options.go b/internal/query/config/options.go index 3919e71f4..2b30c5be2 100644 --- a/internal/query/config/options.go +++ b/internal/query/config/options.go @@ -23,29 +23,13 @@ func WithTrace(trace *trace.Query, opts ...trace.QueryComposeOption) Option { } } -// WithPoolMaxSize defines upper bound of pooled sessions. -// If maxSize is less than or equal to zero then the -// DefaultPoolMaxSize variable is used as a limit. -func WithPoolMaxSize(size int) Option { +// WithPoolLimit defines upper bound of pooled sessions. +// If poolLimit is less than or equal to zero then the +// DefaultPoolMaxSize variable is used as a poolLimit. +func WithPoolLimit(size int) Option { return func(c *Config) { if size > 0 { - c.maxSize = size - } - } -} - -func WithPoolMinSize(size int) Option { - return func(c *Config) { - if size > 0 { - c.minSize = size - } - } -} - -func WithPoolProducersCount(count int) Option { - return func(c *Config) { - if count > 0 { - c.producersCount = count + c.poolLimit = size } } } diff --git a/internal/query/errors.go b/internal/query/errors.go index 5c5070456..923ef8ed8 100644 --- a/internal/query/errors.go +++ b/internal/query/errors.go @@ -8,5 +8,6 @@ var ( ErrNotImplemented = errors.New("not implemented yet") errWrongNextResultSetIndex = errors.New("wrong result set index") errClosedResult = errors.New("result closed early") + errClosedClient = errors.New("query client closed early") errWrongResultSetIndex = errors.New("critical violation of the logic - wrong result set index") ) diff --git a/internal/query/execute_query.go b/internal/query/execute_query.go index 713d26c69..182e75c0b 100644 --- a/internal/query/execute_query.go +++ b/internal/query/execute_query.go @@ -60,13 +60,17 @@ func execute(ctx context.Context, s *Session, c Ydb_Query_V1.QueryServiceClient, request, callOptions := executeQueryRequest(a, s.id, q, cfg) - stream, err := c.ExecuteQuery(xcontext.WithoutDeadline(ctx), request, callOptions...) + executeCtx, cancelExecute := xcontext.WithCancel(xcontext.WithoutDeadline(ctx)) + + stream, err := c.ExecuteQuery(executeCtx, request, callOptions...) if err != nil { return nil, nil, xerrors.WithStackTrace(err) } - r, txID, err := newResult(ctx, stream, s.trace) + r, txID, err := newResult(ctx, stream, s.trace, cancelExecute) if err != nil { + cancelExecute() + return nil, nil, xerrors.WithStackTrace(err) } @@ -74,8 +78,5 @@ func execute(ctx context.Context, s *Session, c Ydb_Query_V1.QueryServiceClient, return nil, r, nil } - return &transaction{ - id: txID, - s: s, - }, r, nil + return newTransaction(txID, s, s.trace), r, nil } diff --git a/internal/query/options/execute.go b/internal/query/options/execute.go index b742e1717..6a306c26d 100644 --- a/internal/query/options/execute.go +++ b/internal/query/options/execute.go @@ -12,7 +12,7 @@ type ( Syntax Ydb_Query.Syntax ExecMode Ydb_Query.ExecMode StatsMode Ydb_Query.StatsMode - callOptions []grpc.CallOption + CallOptions []grpc.CallOption commonExecuteSettings struct { syntax Syntax params params.Parameters @@ -37,13 +37,13 @@ type ( applyTxExecuteOption(s *txExecuteSettings) } txCommitOption struct{} - parametersOption params.Parameters - txControlOption struct { + ParametersOption params.Parameters + TxControlOption struct { txControl *tx.Control } ) -func (opt txControlOption) applyExecuteOption(s *Execute) { +func (opt TxControlOption) applyExecuteOption(s *Execute) { s.txControl = opt.txControl } @@ -64,19 +64,19 @@ const ( SyntaxPostgreSQL = Syntax(Ydb_Query.Syntax_SYNTAX_PG) ) -func (params parametersOption) applyTxExecuteOption(s *txExecuteSettings) { +func (params ParametersOption) applyTxExecuteOption(s *txExecuteSettings) { params.applyExecuteOption(s.ExecuteSettings) } -func (params parametersOption) applyExecuteOption(s *Execute) { +func (params ParametersOption) applyExecuteOption(s *Execute) { s.params = append(s.params, params...) } -func (opts callOptions) applyExecuteOption(s *Execute) { +func (opts CallOptions) applyExecuteOption(s *Execute) { s.callOptions = append(s.callOptions, opts...) } -func (opts callOptions) applyTxExecuteOption(s *txExecuteSettings) { +func (opts CallOptions) applyTxExecuteOption(s *txExecuteSettings) { opts.applyExecuteOption(s.ExecuteSettings) } @@ -178,12 +178,10 @@ func TxExecuteSettings(id string, opts ...TxExecuteOption) (settings *txExecuteS return settings } -var _ ExecuteOption = (*parametersOption)(nil) +var _ ExecuteOption = ParametersOption{} -func WithParameters(parameters *params.Parameters) *parametersOption { - params := parametersOption(*parameters) - - return ¶ms +func WithParameters(parameters *params.Parameters) ParametersOption { + return ParametersOption(*parameters) } var ( @@ -192,29 +190,35 @@ var ( _ TxExecuteOption = ExecMode(0) _ TxExecuteOption = StatsMode(0) _ TxExecuteOption = txCommitOption{} - _ ExecuteOption = txControlOption{} + _ ExecuteOption = TxControlOption{} ) func WithCommit() txCommitOption { return txCommitOption{} } +type ExecModeOption = ExecMode + func WithExecMode(mode ExecMode) ExecMode { return mode } -func WithSyntax(syntax Syntax) Syntax { +type SyntaxOption = Syntax + +func WithSyntax(syntax Syntax) SyntaxOption { return syntax } +type StatsModeOption = StatsMode + func WithStatsMode(mode StatsMode) StatsMode { return mode } -func WithCallOptions(opts ...grpc.CallOption) callOptions { +func WithCallOptions(opts ...grpc.CallOption) CallOptions { return opts } -func WithTxControl(txControl *tx.Control) txControlOption { - return txControlOption{txControl} +func WithTxControl(txControl *tx.Control) TxControlOption { + return TxControlOption{txControl} } diff --git a/internal/query/result.go b/internal/query/result.go index 554d924f3..9a25da8e7 100644 --- a/internal/query/result.go +++ b/internal/query/result.go @@ -31,10 +31,14 @@ func newResult( ctx context.Context, stream Ydb_Query_V1.QueryService_ExecuteQueryClient, t *trace.Query, + closeResult context.CancelFunc, ) (_ *result, txID string, err error) { if t == nil { t = &trace.Query{} } + if closeResult == nil { + closeResult = func() {} + } onDone := trace.QueryOnResultNew(t, &ctx, stack.FunctionID("")) defer func() { @@ -53,6 +57,8 @@ func newResult( interrupted = make(chan struct{}) closed = make(chan struct{}) closeOnce = xsync.OnceFunc(func(ctx context.Context) error { + closeResult() + close(interrupted) close(closed) diff --git a/internal/query/result_test.go b/internal/query/result_test.go index f7361740c..10d408f03 100644 --- a/internal/query/result_test.go +++ b/internal/query/result_test.go @@ -345,7 +345,7 @@ func TestResultNextResultSet(t *testing.T) { }, }, nil) stream.EXPECT().Recv().Return(nil, io.EOF) - r, _, err := newResult(ctx, stream, nil) + r, _, err := newResult(ctx, stream, nil, nil) require.NoError(t, err) defer r.Close(ctx) { @@ -514,7 +514,7 @@ func TestResultNextResultSet(t *testing.T) { }, }, }, nil) - r, _, err := newResult(ctx, stream, nil) + r, _, err := newResult(ctx, stream, nil, nil) require.NoError(t, err) defer r.Close(ctx) { @@ -833,7 +833,7 @@ func TestResultNextResultSet(t *testing.T) { }, }, }, nil) - r, _, err := newResult(ctx, stream, nil) + r, _, err := newResult(ctx, stream, nil, nil) require.NoError(t, err) defer r.Close(ctx) { diff --git a/internal/query/session.go b/internal/query/session.go index e4030bf64..42a482339 100644 --- a/internal/query/session.go +++ b/internal/query/session.go @@ -63,16 +63,6 @@ func createSession( } }, }, - closeOnce: xsync.OnceFunc(func(ctx context.Context) (err error) { - s.setStatus(statusClosing) - defer s.setStatus(statusClosed) - - if err = deleteSession(ctx, s.grpcClient, s.id); err != nil { - return xerrors.WithStackTrace(err) - } - - return nil - }), } defer func() { if finalErr != nil && s != nil { @@ -129,7 +119,9 @@ func (s *Session) attach(ctx context.Context) (finalErr error) { onDone(finalErr) }() - attach, err := s.grpcClient.AttachSession(xcontext.WithoutDeadline(ctx), &Ydb_Query.AttachSessionRequest{ + attachCtx, cancelAttach := xcontext.WithCancel(xcontext.WithoutDeadline(ctx)) + + attach, err := s.grpcClient.AttachSession(attachCtx, &Ydb_Query.AttachSessionRequest{ SessionId: s.id, }) if err != nil { @@ -137,15 +129,33 @@ func (s *Session) attach(ctx context.Context) (finalErr error) { xerrors.Transport(err), ) } + state, err := attach.Recv() if err != nil { + cancelAttach() + return xerrors.WithStackTrace(xerrors.Transport(err)) } if state.GetStatus() != Ydb.StatusIds_SUCCESS { + cancelAttach() + return xerrors.WithStackTrace(xerrors.FromOperation(state)) } + s.closeOnce = xsync.OnceFunc(func(ctx context.Context) (err error) { + cancelAttach() + + s.setStatus(statusClosing) + defer s.setStatus(statusClosed) + + if err = deleteSession(ctx, s.grpcClient, s.id); err != nil { + return xerrors.WithStackTrace(err) + } + + return nil + }) + go func() { defer func() { _ = s.closeOnce(ctx) @@ -218,16 +228,14 @@ func (s *Session) Close(ctx context.Context) (err error) { func begin( ctx context.Context, client Ydb_Query_V1.QueryServiceClient, - sessionID string, + s *Session, txSettings query.TransactionSettings, -) ( - *transaction, error, -) { +) (*transaction, error) { a := allocator.New() defer a.Free() response, err := client.BeginTransaction(ctx, &Ydb_Query.BeginTransactionRequest{ - SessionId: sessionID, + SessionId: s.id, TxSettings: txSettings.ToYDB(a), }, ) @@ -238,9 +246,7 @@ func begin( return nil, xerrors.WithStackTrace(xerrors.FromOperation(response)) } - return &transaction{ - id: response.GetTxMeta().GetId(), - }, nil + return newTransaction(response.GetTxMeta().GetId(), s, s.trace), nil } func (s *Session) Begin( @@ -256,7 +262,7 @@ func (s *Session) Begin( onDone(err, tx) }() - tx, err = begin(ctx, s.grpcClient, s.id, txSettings) + tx, err = begin(ctx, s.grpcClient, s, txSettings) if err != nil { return nil, xerrors.WithStackTrace(err) } diff --git a/internal/query/session_test.go b/internal/query/session_test.go index db6b41fc7..c96989855 100644 --- a/internal/query/session_test.go +++ b/internal/query/session_test.go @@ -27,7 +27,7 @@ func TestBegin(t *testing.T) { }, }, nil) t.Log("begin") - tx, err := begin(ctx, service, "123", query.TxSettings()) + tx, err := begin(ctx, service, &Session{id: "123"}, query.TxSettings()) require.NoError(t, err) require.Equal(t, "123", tx.id) }) @@ -37,7 +37,7 @@ func TestBegin(t *testing.T) { service := NewMockQueryServiceClient(ctrl) service.EXPECT().BeginTransaction(gomock.Any(), gomock.Any()).Return(nil, grpcStatus.Error(grpcCodes.Unavailable, "")) t.Log("begin") - _, err := begin(ctx, service, "123", query.TxSettings()) + _, err := begin(ctx, service, &Session{id: "123"}, query.TxSettings()) require.Error(t, err) require.True(t, xerrors.IsTransportError(err, grpcCodes.Unavailable)) }) @@ -49,7 +49,7 @@ func TestBegin(t *testing.T) { Status: Ydb.StatusIds_UNAVAILABLE, }, nil) t.Log("begin") - _, err := begin(ctx, service, "123", query.TxSettings()) + _, err := begin(ctx, service, &Session{id: "123"}, query.TxSettings()) require.Error(t, err) require.True(t, xerrors.IsOperationError(err, Ydb.StatusIds_UNAVAILABLE)) }) diff --git a/internal/query/transaction.go b/internal/query/transaction.go index d3f7cfe02..e62778890 100644 --- a/internal/query/transaction.go +++ b/internal/query/transaction.go @@ -8,15 +8,30 @@ import ( "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query" "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/options" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" "github.com/ydb-platform/ydb-go-sdk/v3/query" + "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) var _ query.Transaction = (*transaction)(nil) type transaction struct { - id string - s *Session + id string + s *Session + trace *trace.Query +} + +func newTransaction(id string, s *Session, t *trace.Query) *transaction { + if t == nil { + t = &trace.Query{} + } + + return &transaction{ + id: id, + s: s, + trace: t, + } } func (tx transaction) ID() string { @@ -24,8 +39,13 @@ func (tx transaction) ID() string { } func (tx transaction) Execute(ctx context.Context, q string, opts ...options.TxExecuteOption) ( - r query.Result, err error, + r query.Result, finalErr error, ) { + onDone := trace.QueryOnTxExecute(tx.trace, &ctx, stack.FunctionID(""), tx.s, tx, q) + defer func() { + onDone(finalErr) + }() + _, res, err := execute(ctx, tx.s, tx.s.grpcClient, q, options.TxExecuteSettings(tx.id, opts...).ExecuteSettings) if err != nil { return nil, xerrors.WithStackTrace(err) diff --git a/internal/ratelimiter/client.go b/internal/ratelimiter/client.go index 206cbf5f9..eec07b118 100644 --- a/internal/ratelimiter/client.go +++ b/internal/ratelimiter/client.go @@ -37,11 +37,11 @@ func (c *Client) Close(ctx context.Context) error { return nil } -func New(ctx context.Context, cc grpc.ClientConnInterface, config config.Config) (*Client, error) { +func New(ctx context.Context, cc grpc.ClientConnInterface, config config.Config) *Client { return &Client{ config: config, service: Ydb_RateLimiter_V1.NewRateLimiterServiceClient(cc), - }, nil + } } func (c *Client) CreateResource( diff --git a/internal/scheme/client.go b/internal/scheme/client.go index 57dc9b16b..9d0cf10e0 100644 --- a/internal/scheme/client.go +++ b/internal/scheme/client.go @@ -38,11 +38,11 @@ func (c *Client) Close(_ context.Context) error { return nil } -func New(ctx context.Context, cc grpc.ClientConnInterface, config config.Config) (*Client, error) { +func New(ctx context.Context, cc grpc.ClientConnInterface, config config.Config) *Client { return &Client{ config: config, service: Ydb_Scheme_V1.NewSchemeServiceClient(cc), - }, nil + } } func (c *Client) MakeDirectory(ctx context.Context, path string) (finalErr error) { diff --git a/internal/scripting/client.go b/internal/scripting/client.go index c411b1ea9..60b85a248 100644 --- a/internal/scripting/client.go +++ b/internal/scripting/client.go @@ -300,9 +300,9 @@ func (c *Client) Close(ctx context.Context) (err error) { return nil } -func New(ctx context.Context, cc grpc.ClientConnInterface, config config.Config) (*Client, error) { +func New(ctx context.Context, cc grpc.ClientConnInterface, config config.Config) *Client { return &Client{ config: config, service: Ydb_Scripting_V1.NewScriptingServiceClient(cc), - }, nil + } } diff --git a/internal/table/client.go b/internal/table/client.go index 47cf29f34..da0f84534 100644 --- a/internal/table/client.go +++ b/internal/table/client.go @@ -34,7 +34,12 @@ type balancer interface { nodeChecker } -func New(ctx context.Context, balancer balancer, config *config.Config) (*Client, error) { +func New(ctx context.Context, balancer balancer, config *config.Config) *Client { + onDone := trace.TableOnInit(config.Trace(), &ctx, stack.FunctionID("")) + defer func() { + onDone(config.SizeLimit()) + }() + return newClient(ctx, balancer, func(ctx context.Context) (s *session, err error) { return newSession(ctx, balancer, config) }, config) @@ -45,12 +50,8 @@ func newClient( balancer balancer, builder sessionBuilder, config *config.Config, -) (c *Client, finalErr error) { - onDone := trace.TableOnInit(config.Trace(), &ctx, stack.FunctionID("")) - defer func() { - onDone(config.SizeLimit(), finalErr) - }() - c = &Client{ +) *Client { + c := &Client{ clock: config.Clock(), config: config, cc: balancer, @@ -74,7 +75,7 @@ func newClient( go c.internalPoolGC(ctx, idleThreshold) } - return c, nil + return c } // Client is a set of session instances that may be reused. diff --git a/internal/table/client_test.go b/internal/table/client_test.go index f1f1cc2f9..82b7eb6af 100644 --- a/internal/table/client_test.go +++ b/internal/table/client_test.go @@ -408,7 +408,7 @@ func TestSessionPoolRacyGet(t *testing.T) { session *session } create := make(chan createReq) - p, err := newClient( + p := newClient( context.Background(), nil, (&StubBuilder{ @@ -429,10 +429,10 @@ func TestSessionPoolRacyGet(t *testing.T) { config.WithIdleThreshold(-1), ), ) - require.NoError(t, err) var ( expSession *session done = make(chan struct{}, 2) + err error ) for i := 0; i < 2; i++ { go func() { @@ -872,7 +872,7 @@ func newClientWithStubBuilder( stubLimit int, options ...config.Option, ) *Client { - c, err := newClient( + c := newClient( context.Background(), balancer, (&StubBuilder{ @@ -882,7 +882,6 @@ func newClientWithStubBuilder( }).createSession, config.New(options...), ) - require.NoError(t, err) return c } diff --git a/internal/table/session_test.go b/internal/table/session_test.go index f45fc271a..006086369 100644 --- a/internal/table/session_test.go +++ b/internal/table/session_test.go @@ -339,7 +339,7 @@ func TestSessionOperationModeOnExecuteDataQuery(t *testing.T) { func(t *testing.T) { for _, srcDst := range fromTo { t.Run(srcDst.srcMode.String()+"->"+srcDst.dstMode.String(), func(t *testing.T) { - client, err := New(context.Background(), testutil.NewBalancer( + client := New(context.Background(), testutil.NewBalancer( testutil.WithInvokeHandlers( testutil.InvokeHandlers{ testutil.TableExecuteDataQuery: func(interface{}) (proto.Message, error) { @@ -382,7 +382,6 @@ func TestSessionOperationModeOnExecuteDataQuery(t *testing.T) { }, ), ), config.New()) - require.NoError(t, err) ctx, cancel := xcontext.WithTimeout( context.Background(), time.Second, @@ -397,7 +396,7 @@ func TestSessionOperationModeOnExecuteDataQuery(t *testing.T) { } func TestCreateTableRegression(t *testing.T) { - client, err := New(context.Background(), testutil.NewBalancer( + client := New(context.Background(), testutil.NewBalancer( testutil.WithInvokeHandlers( testutil.InvokeHandlers{ testutil.TableCreateSession: func(request interface{}) (proto.Message, error) { @@ -474,15 +473,13 @@ func TestCreateTableRegression(t *testing.T) { ), ), config.New()) - require.NoError(t, err) - ctx, cancel := xcontext.WithTimeout( context.Background(), time.Second, ) defer cancel() - err = client.Do(ctx, func(ctx context.Context, s table.Session) error { + err := client.Do(ctx, func(ctx context.Context, s table.Session) error { return s.CreateTable(ctx, "episodes", options.WithColumn("series_id", types.NewOptional(types.Uint64)), options.WithColumn("season_id", types.NewOptional(types.Uint64)), @@ -498,7 +495,7 @@ func TestCreateTableRegression(t *testing.T) { } func TestDescribeTableRegression(t *testing.T) { - client, err := New(context.Background(), testutil.NewBalancer( + client := New(context.Background(), testutil.NewBalancer( testutil.WithInvokeHandlers( testutil.InvokeHandlers{ testutil.TableCreateSession: func(request interface{}) (proto.Message, error) { @@ -567,8 +564,6 @@ func TestDescribeTableRegression(t *testing.T) { ), ), config.New()) - require.NoError(t, err) - ctx, cancel := xcontext.WithTimeout( context.Background(), time.Second, @@ -577,7 +572,7 @@ func TestDescribeTableRegression(t *testing.T) { var act options.Description - err = client.Do(ctx, func(ctx context.Context, s table.Session) (err error) { + err := client.Do(ctx, func(ctx context.Context, s table.Session) (err error) { act, err = s.DescribeTable(ctx, "episodes") return err diff --git a/internal/topic/topicclientinternal/client.go b/internal/topic/topicclientinternal/client.go index 976dca32d..2318d97b1 100644 --- a/internal/topic/topicclientinternal/client.go +++ b/internal/topic/topicclientinternal/client.go @@ -32,7 +32,7 @@ func New( conn grpc.ClientConnInterface, cred credentials.Credentials, opts ...topicoptions.TopicOption, -) (*Client, error) { +) *Client { rawClient := rawtopic.NewClient(Ydb_Topic_V1.NewTopicServiceClient(conn)) cfg := newTopicConfig(opts...) @@ -45,7 +45,7 @@ func New( cred: cred, defaultOperationParams: defaultOperationParams, rawClient: rawClient, - }, nil + } } func newTopicConfig(opts ...topicoptions.TopicOption) topic.Config { diff --git a/internal/xcontext/context_with_done.go b/internal/xcontext/context_with_done.go deleted file mode 100644 index d7bdb4fc1..000000000 --- a/internal/xcontext/context_with_done.go +++ /dev/null @@ -1,48 +0,0 @@ -package xcontext - -import ( - "context" -) - -type ( - withDoneOpts struct { - onStart func() - onDone func() - } - withDoneOpt func(t *withDoneOpts) -) - -func withDone( - parent context.Context, - done <-chan struct{}, - opts ...withDoneOpt, -) (context.Context, context.CancelFunc) { - cfg := &withDoneOpts{} - for _, opt := range opts { - opt(cfg) - } - ctx, cancel := context.WithCancel(parent) - go func() { - if cfg.onStart != nil { - cfg.onStart() - } - defer func() { - if cfg.onDone != nil { - cfg.onDone() - } - }() - - select { - case <-ctx.Done(): - case <-done: - } - - cancel() - }() - - return ctx, cancel -} - -func WithDone(parent context.Context, done <-chan struct{}) (context.Context, context.CancelFunc) { - return withDone(parent, done) -} diff --git a/internal/xcontext/context_with_done_test.go b/internal/xcontext/context_with_done_test.go deleted file mode 100644 index f5c016dad..000000000 --- a/internal/xcontext/context_with_done_test.go +++ /dev/null @@ -1,48 +0,0 @@ -package xcontext - -import ( - "context" - "testing" - "time" - - "github.com/stretchr/testify/require" -) - -func TestWithDone(t *testing.T) { - require.NotPanics(t, func() { - for range make([]struct{}, 100000) { - ctx1, cancel1 := context.WithCancel(context.Background()) - done := make(chan struct{}) - goroutines := make(chan struct{}, 1) - ctx2, cancel2 := withDone(ctx1, done, - func(t *withDoneOpts) { - t.onStart = func() { - goroutines <- struct{}{} - } - }, - func(t *withDoneOpts) { - t.onDone = func() { - goroutines <- struct{}{} - } - }, - ) - go func() { - cancel1() - }() - go func() { - cancel2() - }() - go func() { - close(done) - }() - <-goroutines - <-ctx2.Done() - select { - case <-time.After(time.Second): - t.Failed() - case <-goroutines: - } - close(goroutines) - } - }) -} diff --git a/internal/xsync/locked.go b/internal/xsync/locked.go new file mode 100644 index 000000000..24639f439 --- /dev/null +++ b/internal/xsync/locked.go @@ -0,0 +1,30 @@ +package xsync + +import "sync" + +type Locked[T any] struct { + v T + mu sync.RWMutex +} + +func NewLocked[T any](v T) *Locked[T] { + return &Locked[T]{ + v: v, + } +} + +func (l *Locked[T]) Get() T { + l.mu.RLock() + defer l.mu.RUnlock() + + return l.v +} + +func (l *Locked[T]) Change(f func(prev T) T) T { + l.mu.Lock() + defer l.mu.Unlock() + + l.v = f(l.v) + + return l.v +} diff --git a/internal/xsync/locked_test.go b/internal/xsync/locked_test.go new file mode 100644 index 000000000..b0e5aa6e9 --- /dev/null +++ b/internal/xsync/locked_test.go @@ -0,0 +1,16 @@ +package xsync + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestLocked(t *testing.T) { + l := NewLocked[int](1) + require.Equal(t, 1, l.Get()) + require.Equal(t, 2, l.Change(func(v int) int { + return 2 + })) + require.Equal(t, 2, l.Get()) +} diff --git a/internal/xsync/once.go b/internal/xsync/once.go index a14354b85..35f5ed0aa 100644 --- a/internal/xsync/once.go +++ b/internal/xsync/once.go @@ -3,6 +3,8 @@ package xsync import ( "context" "sync" + + "github.com/ydb-platform/ydb-go-sdk/v3/internal/closer" ) func OnceFunc(f func(ctx context.Context) error) func(ctx context.Context) error { @@ -17,25 +19,43 @@ func OnceFunc(f func(ctx context.Context) error) func(ctx context.Context) error } } -func OnceValue[T any](f func(ctx context.Context) (T, error)) func(ctx context.Context) (T, error) { - var ( - once sync.Once - mutex sync.RWMutex - t T - err error - ) +type Once[T closer.Closer] struct { + f func() T + once sync.Once + mutex sync.RWMutex + t T +} - return func(ctx context.Context) (T, error) { - once.Do(func() { - mutex.Lock() - defer mutex.Unlock() +func OnceValue[T closer.Closer](f func() T) *Once[T] { + return &Once[T]{f: f} +} - t, err = f(ctx) - }) +func (v *Once[T]) Close(ctx context.Context) (err error) { + has := true + v.once.Do(func() { + has = false + }) - mutex.RLock() - defer mutex.RUnlock() + if has { + v.mutex.RLock() + defer v.mutex.RUnlock() - return t, err + return v.t.Close(ctx) } + + return nil +} + +func (v *Once[T]) Get() T { + v.once.Do(func() { + v.mutex.Lock() + defer v.mutex.Unlock() + + v.t = v.f() + }) + + v.mutex.RLock() + defer v.mutex.RUnlock() + + return v.t } diff --git a/internal/xsync/once_test.go b/internal/xsync/once_test.go index 8990fb9a0..003e9a7e7 100644 --- a/internal/xsync/once_test.go +++ b/internal/xsync/once_test.go @@ -2,6 +2,7 @@ package xsync import ( "context" + "errors" "sync" "testing" @@ -27,26 +28,66 @@ func TestOnceFunc(t *testing.T) { require.Equal(t, 1, cnt) } +type testCloser struct { + value int + inited bool + closed bool + closeErr error +} + +func (c *testCloser) Close(ctx context.Context) error { + c.closed = true + + return c.closeErr +} + func TestOnceValue(t *testing.T) { - var ( - ctx = xtest.Context(t) - cnt = 0 - ) - f := OnceValue(func(ctx context.Context) (int, error) { - cnt++ + ctx := xtest.Context(t) + t.Run("Race", func(t *testing.T) { + counter := 0 + once := OnceValue(func() *testCloser { + counter++ - return cnt, nil + return &testCloser{value: counter} + }) + var wg sync.WaitGroup + wg.Add(1000) + for range make([]struct{}, 1000) { + go func() { + defer wg.Done() + v := once.Get() + require.Equal(t, 1, v.value) + }() + } + wg.Wait() + }) + t.Run("GetBeforeClose", func(t *testing.T) { + constCloseErr := errors.New("") + once := OnceValue(func() *testCloser { + return &testCloser{ + inited: true, + closeErr: constCloseErr, + } + }) + v := once.Get() + require.True(t, v.inited) + require.False(t, v.closed) + err := once.Close(ctx) + require.ErrorIs(t, err, constCloseErr) + require.True(t, v.inited) + require.True(t, v.closed) + }) + t.Run("CloseBeforeGet", func(t *testing.T) { + constCloseErr := errors.New("") + once := OnceValue(func() *testCloser { + return &testCloser{ + inited: true, + closeErr: constCloseErr, + } + }) + err := once.Close(ctx) + require.NoError(t, err) + v := once.Get() + require.Nil(t, v) }) - require.Equal(t, 0, cnt) - var wg sync.WaitGroup - wg.Add(1000) - for range make([]struct{}, 1000) { - go func() { - defer wg.Done() - v, err := f(ctx) - require.NoError(t, err) - require.Equal(t, 1, v) - }() - } - wg.Wait() } diff --git a/internal/xtest/grpclogger.go b/internal/xtest/grpclogger.go index b36c05483..3506d4ccb 100644 --- a/internal/xtest/grpclogger.go +++ b/internal/xtest/grpclogger.go @@ -16,7 +16,7 @@ var globalLastStreamID = int64(0) // // db, err := ydb.Open(context.Background(), connectionString, // ... -// ydb.With(config.WithGrpcOptions(grpc.WithChainUnaryInterceptor(xtest.NewGrpcLogger(t).UnaryClientInterceptor))), +// ydb.Change(config.WithGrpcOptions(grpc.WithChainUnaryInterceptor(xtest.NewGrpcLogger(t).UnaryClientInterceptor))), // ) type GrpcLogger struct { t testing.TB diff --git a/log/driver.go b/log/driver.go index a35154805..fb9c8b1a1 100644 --- a/log/driver.go +++ b/log/driver.go @@ -151,33 +151,6 @@ func internalDriver(l Logger, d trace.Detailer) trace.Driver { //nolint:gocyclo ) } }, - OnConnPark: func(info trace.DriverConnParkStartInfo) func(trace.DriverConnParkDoneInfo) { - if d.Details()&trace.DriverConnEvents == 0 { - return nil - } - ctx := with(*info.Context, TRACE, "ydb", "driver", "conn", "park") - endpoint := info.Endpoint - l.Log(ctx, "start", - Stringer("endpoint", endpoint), - ) - start := time.Now() - - return func(info trace.DriverConnParkDoneInfo) { - if info.Error == nil { - l.Log(ctx, "done", - Stringer("endpoint", endpoint), - latencyField(start), - ) - } else { - l.Log(WithLevel(ctx, WARN), "failed", - Error(info.Error), - Stringer("endpoint", endpoint), - latencyField(start), - versionField(), - ) - } - } - }, OnConnClose: func(info trace.DriverConnCloseStartInfo) func(trace.DriverConnCloseDoneInfo) { if d.Details()&trace.DriverConnEvents == 0 { return nil diff --git a/log/query.go b/log/query.go index bd38f0847..e44e9e3ac 100644 --- a/log/query.go +++ b/log/query.go @@ -27,21 +27,9 @@ func internalQuery( start := time.Now() return func(info trace.QueryNewDoneInfo) { - if info.Error == nil { - l.Log(WithLevel(ctx, INFO), "done", - latencyField(start), - ) - } else { - lvl := FATAL - if !xerrors.IsYdb(info.Error) { - lvl = ERROR - } - l.Log(WithLevel(ctx, lvl), "failed", - latencyField(start), - Error(info.Error), - versionField(), - ) - } + l.Log(WithLevel(ctx, INFO), "done", + latencyField(start), + ) } }, OnClose: func(info trace.QueryCloseStartInfo) func(info trace.QueryCloseDoneInfo) { @@ -75,35 +63,14 @@ func internalQuery( return nil } ctx := with(*info.Context, TRACE, "ydb", "query", "pool", "new") - l.Log(ctx, "start", - Int("MinSize", info.MinSize), - Int("MaxSize", info.MaxSize), - Int("ProducersCount", info.ProducersCount), - ) + l.Log(ctx, "start") start := time.Now() return func(info trace.QueryPoolNewDoneInfo) { - if info.Error == nil { - l.Log(WithLevel(ctx, INFO), "done", - latencyField(start), - Int("MinSize", info.MinSize), - Int("MaxSize", info.MaxSize), - Int("ProducersCount", info.ProducersCount), - ) - } else { - lvl := FATAL - if !xerrors.IsYdb(info.Error) { - lvl = ERROR - } - l.Log(WithLevel(ctx, lvl), "failed", - latencyField(start), - Error(info.Error), - Int("MinSize", info.MinSize), - Int("MaxSize", info.MaxSize), - Int("ProducersCount", info.ProducersCount), - versionField(), - ) - } + l.Log(WithLevel(ctx, INFO), "done", + latencyField(start), + Int("Limit", info.Limit), + ) } }, OnPoolClose: func(info trace.QueryPoolCloseStartInfo) func(trace.QueryPoolCloseDoneInfo) { @@ -132,22 +99,6 @@ func internalQuery( } } }, - OnPoolProduce: func(info trace.QueryPoolProduceStartInfo) func(trace.QueryPoolProduceDoneInfo) { - if d.Details()&trace.QueryPoolEvents == 0 { - return nil - } - ctx := with(*info.Context, TRACE, "ydb", "query", "pool", "produce") - l.Log(ctx, "start", - Int("Concurrency", info.Concurrency), - ) - start := time.Now() - - return func(info trace.QueryPoolProduceDoneInfo) { - l.Log(ctx, "done", - latencyField(start), - ) - } - }, OnPoolTry: func(info trace.QueryPoolTryStartInfo) func(trace.QueryPoolTryDoneInfo) { if d.Details()&trace.QueryPoolEvents == 0 { return nil @@ -254,58 +205,6 @@ func internalQuery( } } }, - OnPoolSpawn: func(info trace.QueryPoolSpawnStartInfo) func(trace.QueryPoolSpawnDoneInfo) { - if d.Details()&trace.QueryPoolEvents == 0 { - return nil - } - ctx := with(*info.Context, TRACE, "ydb", "query", "pool", "spawn") - l.Log(ctx, "start") - start := time.Now() - - return func(info trace.QueryPoolSpawnDoneInfo) { - if info.Error == nil { - l.Log(ctx, "done", - latencyField(start), - ) - } else { - lvl := WARN - if !xerrors.IsYdb(info.Error) { - lvl = DEBUG - } - l.Log(WithLevel(ctx, lvl), "failed", - latencyField(start), - Error(info.Error), - versionField(), - ) - } - } - }, - OnPoolWant: func(info trace.QueryPoolWantStartInfo) func(trace.QueryPoolWantDoneInfo) { - if d.Details()&trace.QueryPoolEvents == 0 { - return nil - } - ctx := with(*info.Context, TRACE, "ydb", "query", "pool", "want") - l.Log(ctx, "start") - start := time.Now() - - return func(info trace.QueryPoolWantDoneInfo) { - if info.Error == nil { - l.Log(ctx, "done", - latencyField(start), - ) - } else { - lvl := WARN - if !xerrors.IsYdb(info.Error) { - lvl = DEBUG - } - l.Log(WithLevel(ctx, lvl), "failed", - latencyField(start), - Error(info.Error), - versionField(), - ) - } - } - }, OnDo: func(info trace.QueryDoStartInfo) func(trace.QueryDoDoneInfo) { if d.Details()&trace.QueryEvents == 0 { return nil @@ -508,6 +407,36 @@ func internalQuery( } } }, + OnTxExecute: func(info trace.QueryTxExecuteStartInfo) func(info trace.QueryTxExecuteDoneInfo) { + if d.Details()&trace.QueryTransactionEvents == 0 { + return nil + } + ctx := with(*info.Context, TRACE, "ydb", "query", "transaction", "execute") + l.Log(ctx, "start", + String("SessionID", info.Session.ID()), + String("TransactionID", info.Tx.ID()), + String("SessionStatus", info.Session.Status()), + ) + start := time.Now() + + return func(info trace.QueryTxExecuteDoneInfo) { + if info.Error == nil { + l.Log(WithLevel(ctx, DEBUG), "done", + latencyField(start), + ) + } else { + lvl := WARN + if !xerrors.IsYdb(info.Error) { + lvl = DEBUG + } + l.Log(WithLevel(ctx, lvl), "failed", + latencyField(start), + Error(info.Error), + versionField(), + ) + } + } + }, OnResultNew: func(info trace.QueryResultNewStartInfo) func(info trace.QueryResultNewDoneInfo) { if d.Details()&trace.QueryResultEvents == 0 { return nil diff --git a/metrics/query.go b/metrics/query.go index 186e07874..2d48cfd52 100644 --- a/metrics/query.go +++ b/metrics/query.go @@ -20,19 +20,39 @@ func query(config Config) (t trace.Query) { ) func( info trace.QueryPoolWithDoneInfo, ) { + if withConfig.Details()&trace.QueryPoolEvents == 0 { + return nil + } + start := time.Now() return func(info trace.QueryPoolWithDoneInfo) { - if withConfig.Details()&trace.QueryPoolEvents != 0 { - attempts.With(nil).Record(float64(info.Attempts)) - if info.Error != nil { - errs.With(map[string]string{ - "status": errorBrief(info.Error), - }).Inc() - } - latency.With(nil).Record(time.Since(start)) + attempts.With(nil).Record(float64(info.Attempts)) + if info.Error != nil { + errs.With(map[string]string{ + "status": errorBrief(info.Error), + }).Inc() } + latency.With(nil).Record(time.Since(start)) + } + } + } + { + sizeConfig := poolConfig.WithSystem("size") + limit := sizeConfig.GaugeVec("limit") + idle := sizeConfig.GaugeVec("idle") + index := sizeConfig.GaugeVec("index") + inUse := sizeConfig.WithSystem("in").GaugeVec("use") + + t.OnPoolChange = func(stats trace.QueryPoolChange) { + if sizeConfig.Details()&trace.QueryPoolEvents == 0 { + return } + + limit.With(nil).Set(float64(stats.Limit)) + idle.With(nil).Set(float64(stats.Idle)) + inUse.With(nil).Set(float64(stats.InUse)) + index.With(nil).Set(float64(stats.Index)) } } } @@ -86,6 +106,7 @@ func query(config Config) (t trace.Query) { } { sessionConfig := queryConfig.WithSystem("session") + count := sessionConfig.GaugeVec("count") { createConfig := sessionConfig.WithSystem("create") errs := createConfig.CounterVec("errs", "status") @@ -99,6 +120,9 @@ func query(config Config) (t trace.Query) { return func(info trace.QuerySessionCreateDoneInfo) { if createConfig.Details()&trace.QuerySessionEvents != 0 { + if info.Error == nil { + count.With(nil).Add(1) + } errs.With(map[string]string{ "status": errorBrief(info.Error), }).Inc() @@ -111,14 +135,12 @@ func query(config Config) (t trace.Query) { deleteConfig := sessionConfig.WithSystem("delete") errs := deleteConfig.CounterVec("errs", "status") latency := deleteConfig.TimerVec("latency") - t.OnSessionCreate = func( - info trace.QuerySessionCreateStartInfo, - ) func( - info trace.QuerySessionCreateDoneInfo, - ) { + t.OnSessionDelete = func(info trace.QuerySessionDeleteStartInfo) func(info trace.QuerySessionDeleteDoneInfo) { + count.With(nil).Add(-1) + start := time.Now() - return func(info trace.QuerySessionCreateDoneInfo) { + return func(info trace.QuerySessionDeleteDoneInfo) { if deleteConfig.Details()&trace.QuerySessionEvents != 0 { errs.With(map[string]string{ "status": errorBrief(info.Error), @@ -163,6 +185,26 @@ func query(config Config) (t trace.Query) { } } } + { + txConfig := queryConfig.WithSystem("tx") + { + executeConfig := txConfig.WithSystem("execute") + errs := executeConfig.CounterVec("errs", "status") + latency := executeConfig.TimerVec("latency") + t.OnTxExecute = func(info trace.QueryTxExecuteStartInfo) func(info trace.QueryTxExecuteDoneInfo) { + start := time.Now() + + return func(info trace.QueryTxExecuteDoneInfo) { + if executeConfig.Details()&trace.QuerySessionEvents != 0 { + errs.With(map[string]string{ + "status": errorBrief(info.Error), + }).Inc() + latency.With(nil).Record(time.Since(start)) + } + } + } + } + } return t } diff --git a/metrics/traces.go b/metrics/traces.go index 8be34a6b2..7744ebbcb 100644 --- a/metrics/traces.go +++ b/metrics/traces.go @@ -13,6 +13,7 @@ func WithTraces(config Config) ydb.Option { return ydb.MergeOptions( ydb.WithTraceDriver(driver(config)), ydb.WithTraceTable(table(config)), + ydb.WithTraceQuery(query(config)), ydb.WithTraceScripting(scripting(config)), ydb.WithTraceScheme(scheme(config)), ydb.WithTraceCoordination(coordination(config)), @@ -20,6 +21,5 @@ func WithTraces(config Config) ydb.Option { ydb.WithTraceDiscovery(discovery(config)), ydb.WithTraceDatabaseSQL(databaseSQL(config)), ydb.WithTraceRetry(retry(config)), - ydb.WithTraceQuery(query(config)), ) } diff --git a/options.go b/options.go index 1682fb447..13bcabc70 100644 --- a/options.go +++ b/options.go @@ -108,10 +108,10 @@ func WithConnectionString(connectionString string) Option { } // WithConnectionTTL defines duration for parking idle connections -func WithConnectionTTL(ttl time.Duration) Option { +// +// Deprecated: background connection parking not available +func WithConnectionTTL(time.Duration) Option { return func(ctx context.Context, c *Driver) error { - c.options = append(c.options, config.WithConnectionTTL(ttl)) - return nil } } @@ -398,40 +398,15 @@ func WithSessionPoolSizeLimit(sizeLimit int) Option { } } -// WithSessionPoolMinSize set min size of internal sessions pool in query.Client -func WithSessionPoolMinSize(size int) Option { - return func(ctx context.Context, c *Driver) error { - c.queryOptions = append(c.queryOptions, queryConfig.WithPoolMinSize(size)) - - return nil - } -} - -// WithSessionPoolMaxSize set min size of internal sessions pool in query.Client -func WithSessionPoolMaxSize(size int) Option { +// WithSessionPoolLimit set min size of internal sessions pool in query.Client +func WithSessionPoolLimit(size int) Option { return func(ctx context.Context, c *Driver) error { - c.queryOptions = append(c.queryOptions, queryConfig.WithPoolMaxSize(size)) + c.queryOptions = append(c.queryOptions, queryConfig.WithPoolLimit(size)) return nil } } -// WithSessionPoolProducersCount set min size of internal sessions pool in query.Client -func WithSessionPoolProducersCount(count int) Option { - return func(ctx context.Context, c *Driver) error { - c.queryOptions = append(c.queryOptions, queryConfig.WithPoolProducersCount(count)) - - return nil - } -} - -// WithSessionPoolKeepAliveMinSize set minimum sessions should be keeped alive in table.Client -// -// Deprecated: table client do not supports background session keep-aliving now -func WithSessionPoolKeepAliveMinSize(keepAliveMinSize int) Option { - return func(ctx context.Context, c *Driver) error { return nil } -} - // WithSessionPoolIdleThreshold defines interval for idle sessions func WithSessionPoolIdleThreshold(idleThreshold time.Duration) Option { return func(ctx context.Context, c *Driver) error { @@ -445,11 +420,6 @@ func WithSessionPoolIdleThreshold(idleThreshold time.Duration) Option { } } -// WithSessionPoolKeepAliveTimeout set timeout of keep alive requests for session in table.Client -func WithSessionPoolKeepAliveTimeout(keepAliveTimeout time.Duration) Option { - return func(ctx context.Context, c *Driver) error { return nil } -} - // WithSessionPoolCreateSessionTimeout set timeout for new session creation process in table.Client func WithSessionPoolCreateSessionTimeout(createSessionTimeout time.Duration) Option { return func(ctx context.Context, c *Driver) error { diff --git a/query/session.go b/query/session.go index 8c2cbaf34..9a92449eb 100644 --- a/query/session.go +++ b/query/session.go @@ -50,11 +50,11 @@ const ( StatsModeProfile = options.StatsModeProfile ) -func WithParameters(parameters *params.Parameters) options.ExecuteOption { +func WithParameters(parameters *params.Parameters) options.ParametersOption { return options.WithParameters(parameters) } -func WithTxControl(txControl *tx.Control) options.ExecuteOption { +func WithTxControl(txControl *tx.Control) options.TxControlOption { return options.WithTxControl(txControl) } @@ -66,18 +66,18 @@ func WithCommit() options.TxExecuteOption { return options.WithCommit() } -func WithExecMode(mode options.ExecMode) options.ExecuteOption { +func WithExecMode(mode options.ExecMode) options.ExecModeOption { return options.WithExecMode(mode) } -func WithSyntax(syntax options.Syntax) options.ExecuteOption { +func WithSyntax(syntax options.Syntax) options.SyntaxOption { return options.WithSyntax(syntax) } -func WithStatsMode(mode options.StatsMode) options.ExecuteOption { +func WithStatsMode(mode options.StatsMode) options.StatsModeOption { return options.WithStatsMode(mode) } -func WithCallOptions(opts ...grpc.CallOption) options.ExecuteOption { +func WithCallOptions(opts ...grpc.CallOption) options.CallOptions { return options.WithCallOptions(opts...) } diff --git a/query/stats.go b/query/stats.go new file mode 100644 index 000000000..f93b8867c --- /dev/null +++ b/query/stats.go @@ -0,0 +1,18 @@ +package query + +import ( + "fmt" + + "github.com/ydb-platform/ydb-go-sdk/v3/internal/pool/stats" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" +) + +func Stats(client Client) (*stats.Stats, error) { + if c, has := client.(interface { + Stats() *stats.Stats + }); has { + return c.Stats(), nil + } + + return nil, xerrors.WithStackTrace(fmt.Errorf("client %T not supported stats", client)) +} diff --git a/tests/integration/discovery_test.go b/tests/integration/discovery_test.go index 511043ebd..ebd5a054e 100644 --- a/tests/integration/discovery_test.go +++ b/tests/integration/discovery_test.go @@ -46,8 +46,7 @@ func TestDiscovery(t *testing.T) { t.Fatalf("unknown request type: %s", requestTypes[0]) } } - parking = make(chan struct{}) - ctx = xtest.Context(t) + ctx = xtest.Context(t) ) db, err := ydb.Open(ctx, @@ -60,7 +59,6 @@ func TestDiscovery(t *testing.T) { config.WithOperationCancelAfter(time.Second*2), ), ydb.WithBalancer(balancers.SingleConn()), - ydb.WithConnectionTTL(time.Second*1), ydb.WithMinTLSVersion(tls.VersionTLS10), ydb.WithLogger( newLoggerWithMinLevel(t, log.WARN), @@ -94,13 +92,6 @@ func TestDiscovery(t *testing.T) { }), ), ), - ydb.WithTraceDriver(trace.Driver{ - OnConnPark: func(info trace.DriverConnParkStartInfo) func(trace.DriverConnParkDoneInfo) { - return func(info trace.DriverConnParkDoneInfo) { - parking <- struct{}{} - } - }, - }), ) if err != nil { t.Fatal(err) @@ -117,17 +108,5 @@ func TestDiscovery(t *testing.T) { } else { t.Log(endpoints) } - t.Run("wait", func(t *testing.T) { - t.Run("parking", func(t *testing.T) { - <-parking // wait for parking conn - t.Run("re-discover", func(t *testing.T) { - if endpoints, err := db.Discovery().Discover(ctx); err != nil { - t.Fatal(err) - } else { - t.Log(endpoints) - } - }) - }) - }) }) } diff --git a/tests/integration/helpers_test.go b/tests/integration/helpers_test.go index 73363250d..83fd412f6 100644 --- a/tests/integration/helpers_test.go +++ b/tests/integration/helpers_test.go @@ -85,9 +85,9 @@ func (scope *scopeT) Driver(opts ...ydb.Option) *ydb.Driver { token := scope.AuthToken() if token == "" { - scope.Logf("With empty auth token") + scope.Logf("Change empty auth token") } else { - scope.Logf("With auth token") + scope.Logf("Change auth token") } connectionContext, cancel := context.WithTimeout(scope.Ctx, time.Second*10) diff --git a/tests/integration/query_execute_test.go b/tests/integration/query_execute_test.go index bf36d542e..b3d6c0339 100644 --- a/tests/integration/query_execute_test.go +++ b/tests/integration/query_execute_test.go @@ -182,7 +182,7 @@ func TestQueryExecute(t *testing.T) { require.EqualValues(t, time.Duration(100500000000), data.P3) require.Nil(t, data.P4) }) - t.Run("Transaction", func(t *testing.T) { + t.Run("Tx", func(t *testing.T) { t.Run("Explicit", func(t *testing.T) { err = db.Query().Do(ctx, func(ctx context.Context, s query.Session) (err error) { tx, err := s.Begin(ctx, query.TxSettings(query.WithSerializableReadWrite())) diff --git a/tests/integration/topic_grpc_stopper_helper_test.go b/tests/integration/topic_grpc_stopper_helper_test.go index d9b521bed..f08930a83 100644 --- a/tests/integration/topic_grpc_stopper_helper_test.go +++ b/tests/integration/topic_grpc_stopper_helper_test.go @@ -19,8 +19,8 @@ import ( // // db, err := ydb.Open(context.Background(), connectionString, // ... -// ydb.With(config.WithGrpcOptions(grpc.WithChainUnaryInterceptor(grpcStopper.UnaryClientInterceptor)), -// ydb.With(config.WithGrpcOptions(grpc.WithStreamInterceptor(grpcStopper.StreamClientInterceptor)), +// ydb.Change(config.WithGrpcOptions(grpc.WithChainUnaryInterceptor(grpcStopper.UnaryClientInterceptor)), +// ydb.Change(config.WithGrpcOptions(grpc.WithStreamInterceptor(grpcStopper.StreamClientInterceptor)), // ), // // grpcStopper.Stop(errors.New("test error")) diff --git a/tests/slo/native/query/storage.go b/tests/slo/native/query/storage.go index 8437a94be..b5e08a309 100755 --- a/tests/slo/native/query/storage.go +++ b/tests/slo/native/query/storage.go @@ -69,9 +69,7 @@ func NewStorage(ctx context.Context, cfg *config.Config, poolSize int) (*Storage db, err := ydb.Open(ctx, cfg.Endpoint+cfg.DB, - ydb.WithSessionPoolMaxSize(poolSize), - ydb.WithSessionPoolMinSize(poolSize/10), - ydb.WithSessionPoolProducersCount(poolSize/10), + ydb.WithSessionPoolLimit(poolSize), ) if err != nil { return nil, err diff --git a/trace/driver.go b/trace/driver.go index e73b67912..e9ece8712 100644 --- a/trace/driver.go +++ b/trace/driver.go @@ -35,7 +35,6 @@ type ( OnConnStreamSendMsg func(DriverConnStreamSendMsgStartInfo) func(DriverConnStreamSendMsgDoneInfo) OnConnStreamCloseSend func(DriverConnStreamCloseSendStartInfo) func(DriverConnStreamCloseSendDoneInfo) OnConnDial func(DriverConnDialStartInfo) func(DriverConnDialDoneInfo) - OnConnPark func(DriverConnParkStartInfo) func(DriverConnParkDoneInfo) OnConnBan func(DriverConnBanStartInfo) func(DriverConnBanDoneInfo) OnConnAllow func(DriverConnAllowStartInfo) func(DriverConnAllowDoneInfo) OnConnClose func(DriverConnCloseStartInfo) func(DriverConnCloseDoneInfo) @@ -226,18 +225,6 @@ type ( DriverConnDialDoneInfo struct { Error error } - DriverConnParkStartInfo struct { - // Context make available context in trace callback function. - // Pointer to context provide replacement of context in trace callback function. - // Warning: concurrent access to pointer on client side must be excluded. - // Safe replacement of context are provided only inside callback function - Context *context.Context - Call call - Endpoint EndpointInfo - } - DriverConnParkDoneInfo struct { - Error error - } DriverConnCloseStartInfo struct { // Context make available context in trace callback function. // Pointer to context provide replacement of context in trace callback function. diff --git a/trace/driver_gtrace.go b/trace/driver_gtrace.go index a299166ba..bb2e951c0 100644 --- a/trace/driver_gtrace.go +++ b/trace/driver_gtrace.go @@ -485,41 +485,6 @@ func (t *Driver) Compose(x *Driver, opts ...DriverComposeOption) *Driver { } } } - { - h1 := t.OnConnPark - h2 := x.OnConnPark - ret.OnConnPark = func(d DriverConnParkStartInfo) func(DriverConnParkDoneInfo) { - if options.panicCallback != nil { - defer func() { - if e := recover(); e != nil { - options.panicCallback(e) - } - }() - } - var r, r1 func(DriverConnParkDoneInfo) - if h1 != nil { - r = h1(d) - } - if h2 != nil { - r1 = h2(d) - } - return func(d DriverConnParkDoneInfo) { - if options.panicCallback != nil { - defer func() { - if e := recover(); e != nil { - options.panicCallback(e) - } - }() - } - if r != nil { - r(d) - } - if r1 != nil { - r1(d) - } - } - } - } { h1 := t.OnConnBan h2 := x.OnConnBan @@ -1067,21 +1032,6 @@ func (t *Driver) onConnDial(d DriverConnDialStartInfo) func(DriverConnDialDoneIn } return res } -func (t *Driver) onConnPark(d DriverConnParkStartInfo) func(DriverConnParkDoneInfo) { - fn := t.OnConnPark - if fn == nil { - return func(DriverConnParkDoneInfo) { - return - } - } - res := fn(d) - if res == nil { - return func(DriverConnParkDoneInfo) { - return - } - } - return res -} func (t *Driver) onConnBan(d DriverConnBanStartInfo) func(DriverConnBanDoneInfo) { fn := t.OnConnBan if fn == nil { @@ -1393,18 +1343,6 @@ func DriverOnConnDial(t *Driver, c *context.Context, call call, endpoint Endpoin res(p) } } -func DriverOnConnPark(t *Driver, c *context.Context, call call, endpoint EndpointInfo) func(error) { - var p DriverConnParkStartInfo - p.Context = c - p.Call = call - p.Endpoint = endpoint - res := t.onConnPark(p) - return func(e error) { - var p DriverConnParkDoneInfo - p.Error = e - res(p) - } -} func DriverOnConnBan(t *Driver, c *context.Context, call call, endpoint EndpointInfo, state ConnState, cause error) func(state ConnState) { var p DriverConnBanStartInfo p.Context = c diff --git a/trace/query.go b/trace/query.go index 47d4d1385..aefb54ddd 100644 --- a/trace/query.go +++ b/trace/query.go @@ -24,15 +24,13 @@ type ( OnNew func(QueryNewStartInfo) func(info QueryNewDoneInfo) OnClose func(QueryCloseStartInfo) func(info QueryCloseDoneInfo) - OnPoolNew func(QueryPoolNewStartInfo) func(QueryPoolNewDoneInfo) - OnPoolClose func(QueryPoolCloseStartInfo) func(QueryPoolCloseDoneInfo) - OnPoolProduce func(QueryPoolProduceStartInfo) func(QueryPoolProduceDoneInfo) - OnPoolTry func(QueryPoolTryStartInfo) func(QueryPoolTryDoneInfo) - OnPoolWith func(QueryPoolWithStartInfo) func(QueryPoolWithDoneInfo) - OnPoolPut func(QueryPoolPutStartInfo) func(QueryPoolPutDoneInfo) - OnPoolGet func(QueryPoolGetStartInfo) func(QueryPoolGetDoneInfo) - OnPoolSpawn func(QueryPoolSpawnStartInfo) func(QueryPoolSpawnDoneInfo) - OnPoolWant func(QueryPoolWantStartInfo) func(QueryPoolWantDoneInfo) + OnPoolNew func(QueryPoolNewStartInfo) func(QueryPoolNewDoneInfo) + OnPoolClose func(QueryPoolCloseStartInfo) func(QueryPoolCloseDoneInfo) + OnPoolTry func(QueryPoolTryStartInfo) func(QueryPoolTryDoneInfo) + OnPoolWith func(QueryPoolWithStartInfo) func(QueryPoolWithDoneInfo) + OnPoolPut func(QueryPoolPutStartInfo) func(QueryPoolPutDoneInfo) + OnPoolGet func(QueryPoolGetStartInfo) func(QueryPoolGetDoneInfo) + OnPoolChange func(QueryPoolChange) OnDo func(QueryDoStartInfo) func(QueryDoDoneInfo) OnDoTx func(QueryDoTxStartInfo) func(QueryDoTxDoneInfo) @@ -42,6 +40,7 @@ type ( OnSessionDelete func(QuerySessionDeleteStartInfo) func(info QuerySessionDeleteDoneInfo) OnSessionExecute func(QuerySessionExecuteStartInfo) func(info QuerySessionExecuteDoneInfo) OnSessionBegin func(QuerySessionBeginStartInfo) func(info QuerySessionBeginDoneInfo) + OnTxExecute func(QueryTxExecuteStartInfo) func(info QueryTxExecuteDoneInfo) OnResultNew func(QueryResultNewStartInfo) func(info QueryResultNewDoneInfo) OnResultNextPart func(QueryResultNextPartStartInfo) func(info QueryResultNextPartDoneInfo) OnResultNextResultSet func(QueryResultNextResultSetStartInfo) func(info QueryResultNextResultSetDoneInfo) @@ -102,6 +101,21 @@ type ( QuerySessionExecuteDoneInfo struct { Error error } + QueryTxExecuteStartInfo struct { + // Context make available context in trace callback function. + // Pointer to context provide replacement of context in trace callback function. + // Warning: concurrent access to pointer on client side must be excluded. + // Safe replacement of context are provided only inside callback function + Context *context.Context + Call call + + Session querySessionInfo + Tx queryTransactionInfo + Query string + } + QueryTxExecuteDoneInfo struct { + Error error + } QuerySessionAttachStartInfo struct { // Context make available context in trace callback function. // Pointer to context provide replacement of context in trace callback function. @@ -235,9 +249,7 @@ type ( Context *context.Context Call call } - QueryNewDoneInfo struct { - Error error - } + QueryNewDoneInfo struct{} QueryCloseStartInfo struct { // Context make available context in trace callback function. // Pointer to context provide replacement of context in trace callback function. @@ -256,19 +268,9 @@ type ( // Safe replacement of context are provided only inside callback function Context *context.Context Call call - - // input settings - MinSize int - MaxSize int - ProducersCount int } QueryPoolNewDoneInfo struct { - Error error - - // actual settings - MinSize int - MaxSize int - ProducersCount int + Limit int } QueryPoolCloseStartInfo struct { // Context make available context in trace callback function. @@ -281,18 +283,7 @@ type ( QueryPoolCloseDoneInfo struct { Error error } - QueryPoolProduceStartInfo struct { - // Context make available context in trace callback function. - // Pointer to context provide replacement of context in trace callback function. - // Warning: concurrent access to pointer on client side must be excluded. - // Safe replacement of context are provided only inside callback function - Context *context.Context - Call call - - Concurrency int - } - QueryPoolProduceDoneInfo struct{} - QueryPoolTryStartInfo struct { + QueryPoolTryStartInfo struct { // Context make available context in trace callback function. // Pointer to context provide replacement of context in trace callback function. // Warning: concurrent access to pointer on client side must be excluded. @@ -338,26 +329,10 @@ type ( QueryPoolGetDoneInfo struct { Error error } - QueryPoolSpawnStartInfo struct { - // Context make available context in trace callback function. - // Pointer to context provide replacement of context in trace callback function. - // Warning: concurrent access to pointer on client side must be excluded. - // Safe replacement of context are provided only inside callback function - Context *context.Context - Call call - } - QueryPoolSpawnDoneInfo struct { - Error error - } - QueryPoolWantStartInfo struct { - // Context make available context in trace callback function. - // Pointer to context provide replacement of context in trace callback function. - // Warning: concurrent access to pointer on client side must be excluded. - // Safe replacement of context are provided only inside callback function - Context *context.Context - Call call - } - QueryPoolWantDoneInfo struct { - Error error + QueryPoolChange struct { + Limit int + Index int + Idle int + InUse int } ) diff --git a/trace/query_gtrace.go b/trace/query_gtrace.go index 16f03d5f1..2353b77b4 100644 --- a/trace/query_gtrace.go +++ b/trace/query_gtrace.go @@ -170,41 +170,6 @@ func (t *Query) Compose(x *Query, opts ...QueryComposeOption) *Query { } } } - { - h1 := t.OnPoolProduce - h2 := x.OnPoolProduce - ret.OnPoolProduce = func(q QueryPoolProduceStartInfo) func(QueryPoolProduceDoneInfo) { - if options.panicCallback != nil { - defer func() { - if e := recover(); e != nil { - options.panicCallback(e) - } - }() - } - var r, r1 func(QueryPoolProduceDoneInfo) - if h1 != nil { - r = h1(q) - } - if h2 != nil { - r1 = h2(q) - } - return func(q QueryPoolProduceDoneInfo) { - if options.panicCallback != nil { - defer func() { - if e := recover(); e != nil { - options.panicCallback(e) - } - }() - } - if r != nil { - r(q) - } - if r1 != nil { - r1(q) - } - } - } - } { h1 := t.OnPoolTry h2 := x.OnPoolTry @@ -346,44 +311,9 @@ func (t *Query) Compose(x *Query, opts ...QueryComposeOption) *Query { } } { - h1 := t.OnPoolSpawn - h2 := x.OnPoolSpawn - ret.OnPoolSpawn = func(q QueryPoolSpawnStartInfo) func(QueryPoolSpawnDoneInfo) { - if options.panicCallback != nil { - defer func() { - if e := recover(); e != nil { - options.panicCallback(e) - } - }() - } - var r, r1 func(QueryPoolSpawnDoneInfo) - if h1 != nil { - r = h1(q) - } - if h2 != nil { - r1 = h2(q) - } - return func(q QueryPoolSpawnDoneInfo) { - if options.panicCallback != nil { - defer func() { - if e := recover(); e != nil { - options.panicCallback(e) - } - }() - } - if r != nil { - r(q) - } - if r1 != nil { - r1(q) - } - } - } - } - { - h1 := t.OnPoolWant - h2 := x.OnPoolWant - ret.OnPoolWant = func(q QueryPoolWantStartInfo) func(QueryPoolWantDoneInfo) { + h1 := t.OnPoolChange + h2 := x.OnPoolChange + ret.OnPoolChange = func(q QueryPoolChange) { if options.panicCallback != nil { defer func() { if e := recover(); e != nil { @@ -391,27 +321,11 @@ func (t *Query) Compose(x *Query, opts ...QueryComposeOption) *Query { } }() } - var r, r1 func(QueryPoolWantDoneInfo) if h1 != nil { - r = h1(q) + h1(q) } if h2 != nil { - r1 = h2(q) - } - return func(q QueryPoolWantDoneInfo) { - if options.panicCallback != nil { - defer func() { - if e := recover(); e != nil { - options.panicCallback(e) - } - }() - } - if r != nil { - r(q) - } - if r1 != nil { - r1(q) - } + h2(q) } } } @@ -660,6 +574,41 @@ func (t *Query) Compose(x *Query, opts ...QueryComposeOption) *Query { } } } + { + h1 := t.OnTxExecute + h2 := x.OnTxExecute + ret.OnTxExecute = func(q QueryTxExecuteStartInfo) func(QueryTxExecuteDoneInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + var r, r1 func(QueryTxExecuteDoneInfo) + if h1 != nil { + r = h1(q) + } + if h2 != nil { + r1 = h2(q) + } + return func(info QueryTxExecuteDoneInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + if r != nil { + r(info) + } + if r1 != nil { + r1(info) + } + } + } + } { h1 := t.OnResultNew h2 := x.OnResultNew @@ -1002,21 +951,6 @@ func (t *Query) onPoolClose(q QueryPoolCloseStartInfo) func(QueryPoolCloseDoneIn } return res } -func (t *Query) onPoolProduce(q QueryPoolProduceStartInfo) func(QueryPoolProduceDoneInfo) { - fn := t.OnPoolProduce - if fn == nil { - return func(QueryPoolProduceDoneInfo) { - return - } - } - res := fn(q) - if res == nil { - return func(QueryPoolProduceDoneInfo) { - return - } - } - return res -} func (t *Query) onPoolTry(q QueryPoolTryStartInfo) func(QueryPoolTryDoneInfo) { fn := t.OnPoolTry if fn == nil { @@ -1077,35 +1011,12 @@ func (t *Query) onPoolGet(q QueryPoolGetStartInfo) func(QueryPoolGetDoneInfo) { } return res } -func (t *Query) onPoolSpawn(q QueryPoolSpawnStartInfo) func(QueryPoolSpawnDoneInfo) { - fn := t.OnPoolSpawn +func (t *Query) onPoolChange(q QueryPoolChange) { + fn := t.OnPoolChange if fn == nil { - return func(QueryPoolSpawnDoneInfo) { - return - } - } - res := fn(q) - if res == nil { - return func(QueryPoolSpawnDoneInfo) { - return - } + return } - return res -} -func (t *Query) onPoolWant(q QueryPoolWantStartInfo) func(QueryPoolWantDoneInfo) { - fn := t.OnPoolWant - if fn == nil { - return func(QueryPoolWantDoneInfo) { - return - } - } - res := fn(q) - if res == nil { - return func(QueryPoolWantDoneInfo) { - return - } - } - return res + fn(q) } func (t *Query) onDo(q QueryDoStartInfo) func(QueryDoDoneInfo) { fn := t.OnDo @@ -1212,6 +1123,21 @@ func (t *Query) onSessionBegin(q QuerySessionBeginStartInfo) func(info QuerySess } return res } +func (t *Query) onTxExecute(q QueryTxExecuteStartInfo) func(info QueryTxExecuteDoneInfo) { + fn := t.OnTxExecute + if fn == nil { + return func(QueryTxExecuteDoneInfo) { + return + } + } + res := fn(q) + if res == nil { + return func(QueryTxExecuteDoneInfo) { + return + } + } + return res +} func (t *Query) onResultNew(q QueryResultNewStartInfo) func(info QueryResultNewDoneInfo) { fn := t.OnResultNew if fn == nil { @@ -1332,14 +1258,13 @@ func (t *Query) onRowScanStruct(q QueryRowScanStructStartInfo) func(info QueryRo } return res } -func QueryOnNew(t *Query, c *context.Context, call call) func(error) { +func QueryOnNew(t *Query, c *context.Context, call call) func() { var p QueryNewStartInfo p.Context = c p.Call = call res := t.onNew(p) - return func(e error) { + return func() { var p QueryNewDoneInfo - p.Error = e res(p) } } @@ -1354,20 +1279,14 @@ func QueryOnClose(t *Query, c *context.Context, call call) func(error) { res(p) } } -func QueryOnPoolNew(t *Query, c *context.Context, call call, minSize int, maxSize int, producersCount int) func(_ error, minSize int, maxSize int, producersCount int) { +func QueryOnPoolNew(t *Query, c *context.Context, call call) func(limit int) { var p QueryPoolNewStartInfo p.Context = c p.Call = call - p.MinSize = minSize - p.MaxSize = maxSize - p.ProducersCount = producersCount res := t.onPoolNew(p) - return func(e error, minSize int, maxSize int, producersCount int) { + return func(limit int) { var p QueryPoolNewDoneInfo - p.Error = e - p.MinSize = minSize - p.MaxSize = maxSize - p.ProducersCount = producersCount + p.Limit = limit res(p) } } @@ -1382,17 +1301,6 @@ func QueryOnPoolClose(t *Query, c *context.Context, call call) func(error) { res(p) } } -func QueryOnPoolProduce(t *Query, c *context.Context, call call, concurrency int) func() { - var p QueryPoolProduceStartInfo - p.Context = c - p.Call = call - p.Concurrency = concurrency - res := t.onPoolProduce(p) - return func() { - var p QueryPoolProduceDoneInfo - res(p) - } -} func QueryOnPoolTry(t *Query, c *context.Context, call call) func(error) { var p QueryPoolTryStartInfo p.Context = c @@ -1438,27 +1346,13 @@ func QueryOnPoolGet(t *Query, c *context.Context, call call) func(error) { res(p) } } -func QueryOnPoolSpawn(t *Query, c *context.Context, call call) func(error) { - var p QueryPoolSpawnStartInfo - p.Context = c - p.Call = call - res := t.onPoolSpawn(p) - return func(e error) { - var p QueryPoolSpawnDoneInfo - p.Error = e - res(p) - } -} -func QueryOnPoolWant(t *Query, c *context.Context, call call) func(error) { - var p QueryPoolWantStartInfo - p.Context = c - p.Call = call - res := t.onPoolWant(p) - return func(e error) { - var p QueryPoolWantDoneInfo - p.Error = e - res(p) - } +func QueryOnPoolChange(t *Query, limit int, index int, idle int, inUse int) { + var p QueryPoolChange + p.Limit = limit + p.Index = index + p.Idle = idle + p.InUse = inUse + t.onPoolChange(p) } func QueryOnDo(t *Query, c *context.Context, call call) func(attempts int, _ error) { var p QueryDoStartInfo @@ -1546,6 +1440,20 @@ func QueryOnSessionBegin(t *Query, c *context.Context, call call, session queryS res(p) } } +func QueryOnTxExecute(t *Query, c *context.Context, call call, session querySessionInfo, tx queryTransactionInfo, query string) func(error) { + var p QueryTxExecuteStartInfo + p.Context = c + p.Call = call + p.Session = session + p.Tx = tx + p.Query = query + res := t.onTxExecute(p) + return func(e error) { + var p QueryTxExecuteDoneInfo + p.Error = e + res(p) + } +} func QueryOnResultNew(t *Query, c *context.Context, call call) func(error) { var p QueryResultNewStartInfo p.Context = c diff --git a/trace/table.go b/trace/table.go index 425cb675e..59d4f2791 100644 --- a/trace/table.go +++ b/trace/table.go @@ -312,7 +312,6 @@ type ( } TableInitDoneInfo struct { Limit int - Error error } TablePoolStateChangeInfo struct { Size int diff --git a/trace/table_gtrace.go b/trace/table_gtrace.go index 5ecbb1a6c..3d50e4c40 100644 --- a/trace/table_gtrace.go +++ b/trace/table_gtrace.go @@ -1350,15 +1350,14 @@ func (t *Table) onPoolWait(t1 TablePoolWaitStartInfo) func(TablePoolWaitDoneInfo } return res } -func TableOnInit(t *Table, c *context.Context, call call) func(limit int, _ error) { +func TableOnInit(t *Table, c *context.Context, call call) func(limit int) { var p TableInitStartInfo p.Context = c p.Call = call res := t.onInit(p) - return func(limit int, e error) { + return func(limit int) { var p TableInitDoneInfo p.Limit = limit - p.Error = e res(p) } }