Skip to content

Commit

Permalink
Merge pull request #1142 from ydb-platform/trace-query-transaction
Browse files Browse the repository at this point in the history
Query service fixes
  • Loading branch information
asmyasnikov authored Mar 19, 2024
2 parents fa39a0b + 82f75c0 commit 836562e
Show file tree
Hide file tree
Showing 60 changed files with 919 additions and 1,404 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
* Added support of `TzDate`,`TzDateTime`,`TzTimestamp` types in `ydb.ParamsBuilder()`
* Added `trace.Query.OnTransactionExecute` event
* Added query pool metrics
* Fixed logic of query session pool
* Changed initialization of internal driver clients to lazy
* Disabled the logic of background grpc-connection parking

## v3.58.2
* Added `trace.Query.OnSessionBegin` event
Expand Down
14 changes: 0 additions & 14 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ type Config struct {

trace *trace.Driver
dialTimeout time.Duration
connectionTTL time.Duration
balancerConfig *balancerConfig.Config
secure bool
endpoint string
Expand Down Expand Up @@ -57,13 +56,6 @@ func (c *Config) Meta() *meta.Meta {
return c.meta
}

// ConnectionTTL defines interval for parking grpc connections.
//
// If ConnectionTTL is zero - connections are not park.
func (c *Config) ConnectionTTL() time.Duration {
return c.connectionTTL
}

// Secure is a flag for secure connection
func (c *Config) Secure() bool {
return c.secure
Expand Down Expand Up @@ -177,12 +169,6 @@ func WithUserAgent(userAgent string) Option {
}
}

func WithConnectionTTL(ttl time.Duration) Option {
return func(c *Config) {
c.connectionTTL = ttl
}
}

func WithCredentials(credentials credentials.Credentials) Option {
return func(c *Config) {
c.credentials = credentials
Expand Down
265 changes: 130 additions & 135 deletions driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,28 +65,28 @@ type Driver struct {
config *config.Config
options []config.Option

discovery *internalDiscovery.Client
discovery *xsync.Once[*internalDiscovery.Client]
discoveryOptions []discoveryConfig.Option

table *internalTable.Client
table *xsync.Once[*internalTable.Client]
tableOptions []tableConfig.Option

query *internalQuery.Client
query *xsync.Once[*internalQuery.Client]
queryOptions []queryConfig.Option

scripting *internalScripting.Client
scripting *xsync.Once[*internalScripting.Client]
scriptingOptions []scriptingConfig.Option

scheme *internalScheme.Client
scheme *xsync.Once[*internalScheme.Client]
schemeOptions []schemeConfig.Option

coordination *internalCoordination.Client
coordination *xsync.Once[*internalCoordination.Client]
coordinationOptions []coordinationConfig.Option

ratelimiter *internalRatelimiter.Client
ratelimiter *xsync.Once[*internalRatelimiter.Client]
ratelimiterOptions []ratelimiterConfig.Option

topic *topicclientinternal.Client
topic *xsync.Once[*topicclientinternal.Client]
topicOptions []topicoptions.TopicOption

databaseSQLOptions []xsql.ConnectorOption
Expand Down Expand Up @@ -184,7 +184,7 @@ func (d *Driver) Secure() bool {

// Table returns table client
func (d *Driver) Table() table.Client {
return d.table
return d.table.Get()
}

// Query returns query client
Expand All @@ -193,37 +193,37 @@ func (d *Driver) Table() table.Client {
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
func (d *Driver) Query() query.Client {
return d.query
return d.query.Get()
}

// Scheme returns scheme client
func (d *Driver) Scheme() scheme.Client {
return d.scheme
return d.scheme.Get()
}

// Coordination returns coordination client
func (d *Driver) Coordination() coordination.Client {
return d.coordination
return d.coordination.Get()
}

// Ratelimiter returns ratelimiter client
func (d *Driver) Ratelimiter() ratelimiter.Client {
return d.ratelimiter
return d.ratelimiter.Get()
}

// Discovery returns discovery client
func (d *Driver) Discovery() discovery.Client {
return d.discovery
return d.discovery.Get()
}

// Scripting returns scripting client
func (d *Driver) Scripting() scripting.Client {
return d.scripting
return d.scripting.Get()
}

// Topic returns topic client
func (d *Driver) Topic() topic.Client {
return d.topic
return d.topic.Get()
}

// Open connects to database by DSN and return driver runtime holder
Expand Down Expand Up @@ -400,138 +400,133 @@ func (d *Driver) connect(ctx context.Context) (err error) {
return xerrors.WithStackTrace(err)
}

d.table, err = internalTable.New(ctx,
d.balancer,
tableConfig.New(
append(
// prepend common params from root config
[]tableConfig.Option{
tableConfig.With(d.config.Common),
},
d.tableOptions...,
)...,
),
)
if err != nil {
return xerrors.WithStackTrace(err)
}
d.table = xsync.OnceValue(func() *internalTable.Client {
return internalTable.New(xcontext.WithoutDeadline(ctx),
d.balancer,
tableConfig.New(
append(
// prepend common params from root config
[]tableConfig.Option{
tableConfig.With(d.config.Common),
},
d.tableOptions...,
)...,
),
)
})

d.query, err = internalQuery.New(ctx,
d.balancer,
queryConfig.New(
append(
// prepend common params from root config
[]queryConfig.Option{
queryConfig.With(d.config.Common),
},
d.queryOptions...,
)...,
),
)
d.query = xsync.OnceValue(func() *internalQuery.Client {
return internalQuery.New(xcontext.WithoutDeadline(ctx),
d.balancer,
queryConfig.New(
append(
// prepend common params from root config
[]queryConfig.Option{
queryConfig.With(d.config.Common),
},
d.queryOptions...,
)...,
),
)
})
if err != nil {
return xerrors.WithStackTrace(err)
}

d.scheme, err = internalScheme.New(ctx,
d.balancer,
schemeConfig.New(
append(
// prepend common params from root config
[]schemeConfig.Option{
schemeConfig.WithDatabaseName(d.Name()),
schemeConfig.With(d.config.Common),
},
d.schemeOptions...,
)...,
),
)
if err != nil {
return xerrors.WithStackTrace(err)
}
d.scheme = xsync.OnceValue(func() *internalScheme.Client {
return internalScheme.New(xcontext.WithoutDeadline(ctx),
d.balancer,
schemeConfig.New(
append(
// prepend common params from root config
[]schemeConfig.Option{
schemeConfig.WithDatabaseName(d.Name()),
schemeConfig.With(d.config.Common),
},
d.schemeOptions...,
)...,
),
)
})

d.coordination, err = internalCoordination.New(ctx,
d.balancer,
coordinationConfig.New(
append(
// prepend common params from root config
[]coordinationConfig.Option{
coordinationConfig.With(d.config.Common),
},
d.coordinationOptions...,
)...,
),
)
if err != nil {
return xerrors.WithStackTrace(err)
}
d.coordination = xsync.OnceValue(func() *internalCoordination.Client {
return internalCoordination.New(xcontext.WithoutDeadline(ctx),
d.balancer,
coordinationConfig.New(
append(
// prepend common params from root config
[]coordinationConfig.Option{
coordinationConfig.With(d.config.Common),
},
d.coordinationOptions...,
)...,
),
)
})

d.ratelimiter, err = internalRatelimiter.New(ctx,
d.balancer,
ratelimiterConfig.New(
append(
// prepend common params from root config
[]ratelimiterConfig.Option{
ratelimiterConfig.With(d.config.Common),
},
d.ratelimiterOptions...,
)...,
),
)
if err != nil {
return xerrors.WithStackTrace(err)
}
d.ratelimiter = xsync.OnceValue(func() *internalRatelimiter.Client {
return internalRatelimiter.New(xcontext.WithoutDeadline(ctx),
d.balancer,
ratelimiterConfig.New(
append(
// prepend common params from root config
[]ratelimiterConfig.Option{
ratelimiterConfig.With(d.config.Common),
},
d.ratelimiterOptions...,
)...,
),
)
})

d.discovery, err = internalDiscovery.New(ctx,
d.pool.Get(endpoint.New(d.config.Endpoint())),
discoveryConfig.New(
append(
// prepend common params from root config
[]discoveryConfig.Option{
discoveryConfig.With(d.config.Common),
discoveryConfig.WithEndpoint(d.Endpoint()),
discoveryConfig.WithDatabase(d.Name()),
discoveryConfig.WithSecure(d.Secure()),
discoveryConfig.WithMeta(d.config.Meta()),
},
d.discoveryOptions...,
)...,
),
)
if err != nil {
return xerrors.WithStackTrace(err)
}
d.discovery = xsync.OnceValue(func() *internalDiscovery.Client {
return internalDiscovery.New(xcontext.WithoutDeadline(ctx),
d.pool.Get(endpoint.New(d.config.Endpoint())),
discoveryConfig.New(
append(
// prepend common params from root config
[]discoveryConfig.Option{
discoveryConfig.With(d.config.Common),
discoveryConfig.WithEndpoint(d.Endpoint()),
discoveryConfig.WithDatabase(d.Name()),
discoveryConfig.WithSecure(d.Secure()),
discoveryConfig.WithMeta(d.config.Meta()),
},
d.discoveryOptions...,
)...,
),
)
})

d.scripting = xsync.OnceValue(func() *internalScripting.Client {
return internalScripting.New(xcontext.WithoutDeadline(ctx),
d.balancer,
scriptingConfig.New(
append(
// prepend common params from root config
[]scriptingConfig.Option{
scriptingConfig.With(d.config.Common),
},
d.scriptingOptions...,
)...,
),
)
})

d.scripting, err = internalScripting.New(ctx,
d.balancer,
scriptingConfig.New(
d.topic = xsync.OnceValue(func() *topicclientinternal.Client {
return topicclientinternal.New(xcontext.WithoutDeadline(ctx),
d.balancer,
d.config.Credentials(),
append(
// prepend common params from root config
[]scriptingConfig.Option{
scriptingConfig.With(d.config.Common),
[]topicoptions.TopicOption{
topicoptions.WithOperationTimeout(d.config.OperationTimeout()),
topicoptions.WithOperationCancelAfter(d.config.OperationCancelAfter()),
},
d.scriptingOptions...,
d.topicOptions...,
)...,
),
)
if err != nil {
return xerrors.WithStackTrace(err)
}

d.topic, err = topicclientinternal.New(ctx,
d.balancer,
d.config.Credentials(),
append(
// prepend common params from root config
[]topicoptions.TopicOption{
topicoptions.WithOperationTimeout(d.config.OperationTimeout()),
topicoptions.WithOperationCancelAfter(d.config.OperationCancelAfter()),
},
d.topicOptions...,
)...,
)
if err != nil {
return xerrors.WithStackTrace(err)
}
)
})

return nil
}
Expand Down
Loading

0 comments on commit 836562e

Please sign in to comment.