Skip to content

Commit

Permalink
Replaced lazy initialization of ydb clients (table, topic, etc.) to e…
Browse files Browse the repository at this point in the history
…xplicit initialization on ydb.Open step
  • Loading branch information
asmyasnikov committed Nov 7, 2023
1 parent 4b6157b commit 2fce79d
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 167 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Replaced lazy initialization of ydb clients (table, topic, etc.) to explicit initialization on `ydb.Open` step

## v3.54.1
* Fixed inconsistent labels in `metrics`

Expand Down
279 changes: 112 additions & 167 deletions driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,31 +57,24 @@ type Driver struct { //nolint:maligned
config *config.Config
options []config.Option

discoveryOnce initOnce
discovery *internalDiscovery.Client
discoveryOptions []discoveryConfig.Option

tableOnce initOnce
table *internalTable.Client
tableOptions []tableConfig.Option

scriptingOnce initOnce
scripting *internalScripting.Client
scriptingOptions []scriptingConfig.Option

schemeOnce initOnce
scheme *internalScheme.Client
schemeOptions []schemeConfig.Option

coordinationOnce initOnce
coordination *internalCoordination.Client
coordinationOptions []coordinationConfig.Option

ratelimiterOnce initOnce
ratelimiter *internalRatelimiter.Client
ratelimiterOptions []ratelimiterConfig.Option

topicOnce initOnce
topic *topicclientinternal.Client
topicOptions []topicoptions.TopicOption

Expand Down Expand Up @@ -120,12 +113,12 @@ func (d *Driver) Close(ctx context.Context) error {

closers = append(
closers,
d.ratelimiterOnce.Close,
d.coordinationOnce.Close,
d.schemeOnce.Close,
d.scriptingOnce.Close,
d.tableOnce.Close,
d.topicOnce.Close,
d.ratelimiter.Close,
d.coordination.Close,
d.scheme.Close,
d.scripting.Close,
d.table.Close,
d.topic.Close,
d.balancer.Close,
d.pool.Release,
)
Expand Down Expand Up @@ -161,150 +154,36 @@ func (d *Driver) Secure() bool {

// Table returns table client
func (d *Driver) Table() table.Client {
d.tableOnce.Init(func() closeFunc {
d.table = internalTable.New(
d.balancer,
tableConfig.New(
append(
// prepend common params from root config
[]tableConfig.Option{
tableConfig.With(d.config.Common),
},
d.tableOptions...,
)...,
),
)
return d.table.Close
})
// may be nil if driver closed early
return d.table
}

// Scheme returns scheme client
func (d *Driver) Scheme() scheme.Client {
d.schemeOnce.Init(func() closeFunc {
d.scheme = internalScheme.New(
d.balancer,
schemeConfig.New(
append(
// prepend common params from root config
[]schemeConfig.Option{
schemeConfig.WithDatabaseName(d.Name()),
schemeConfig.With(d.config.Common),
},
d.schemeOptions...,
)...,
),
)
return d.scheme.Close
})
// may be nil if driver closed early
return d.scheme
}

// Coordination returns coordination client
func (d *Driver) Coordination() coordination.Client {
d.coordinationOnce.Init(func() closeFunc {
d.coordination = internalCoordination.New(
d.balancer,
coordinationConfig.New(
append(
// prepend common params from root config
[]coordinationConfig.Option{
coordinationConfig.With(d.config.Common),
},
d.coordinationOptions...,
)...,
),
)
return d.coordination.Close
})
// may be nil if driver closed early
return d.coordination
}

// Ratelimiter returns ratelimiter client
func (d *Driver) Ratelimiter() ratelimiter.Client {
d.ratelimiterOnce.Init(func() closeFunc {
d.ratelimiter = internalRatelimiter.New(
d.balancer,
ratelimiterConfig.New(
append(
// prepend common params from root config
[]ratelimiterConfig.Option{
ratelimiterConfig.With(d.config.Common),
},
d.ratelimiterOptions...,
)...,
),
)
return d.ratelimiter.Close
})
// may be nil if driver closed early
return d.ratelimiter
}

// Discovery returns discovery client
func (d *Driver) Discovery() discovery.Client {
d.discoveryOnce.Init(func() closeFunc {
d.discovery = internalDiscovery.New(
d.pool.Get(endpoint.New(d.config.Endpoint())),
discoveryConfig.New(
append(
// prepend common params from root config
[]discoveryConfig.Option{
discoveryConfig.With(d.config.Common),
discoveryConfig.WithEndpoint(d.Endpoint()),
discoveryConfig.WithDatabase(d.Name()),
discoveryConfig.WithSecure(d.Secure()),
discoveryConfig.WithMeta(d.config.Meta()),
},
d.discoveryOptions...,
)...,
),
)
return d.discovery.Close
})
// may be nil if driver closed early
return d.discovery
}

// Scripting returns scripting client
func (d *Driver) Scripting() scripting.Client {
d.scriptingOnce.Init(func() closeFunc {
d.scripting = internalScripting.New(
d.balancer,
scriptingConfig.New(
append(
// prepend common params from root config
[]scriptingConfig.Option{
scriptingConfig.With(d.config.Common),
},
d.scriptingOptions...,
)...,
),
)
return d.scripting.Close
})
// may be nil if driver closed early
return d.scripting
}

// Topic returns topic client
func (d *Driver) Topic() topic.Client {
d.topicOnce.Init(func() closeFunc {
d.topic = topicclientinternal.New(d.balancer, d.config.Credentials(),
append(
// prepend common params from root config
[]topicoptions.TopicOption{
topicoptions.WithOperationTimeout(d.config.OperationTimeout()),
topicoptions.WithOperationCancelAfter(d.config.OperationCancelAfter()),
},
d.topicOptions...,
)...,
)
return d.topic.Close
})
return d.topic
}

Expand Down Expand Up @@ -402,42 +281,42 @@ func newConnectionFromOptions(ctx context.Context, opts ...Option) (_ *Driver, e
return d, nil
}

func connect(ctx context.Context, c *Driver) error {
func connect(ctx context.Context, d *Driver) error {
var err error

if c.config.Endpoint() == "" {
if d.config.Endpoint() == "" {
return xerrors.WithStackTrace(errors.New("configuration: empty dial address"))
}
if c.config.Database() == "" {
if d.config.Database() == "" {
return xerrors.WithStackTrace(errors.New("configuration: empty database"))
}

onDone := trace.DriverOnInit(
c.config.Trace(),
d.config.Trace(),
&ctx,
c.config.Endpoint(),
c.config.Database(),
c.config.Secure(),
d.config.Endpoint(),
d.config.Database(),
d.config.Secure(),
)
defer func() {
onDone(err)
}()

if c.userInfo != nil {
c.config = c.config.With(config.WithCredentials(
if d.userInfo != nil {
d.config = d.config.With(config.WithCredentials(
credentials.NewStaticCredentials(
c.userInfo.User, c.userInfo.Password,
c.config.Endpoint(),
credentials.WithGrpcDialOptions(c.config.GrpcDialOptions()...),
d.userInfo.User, d.userInfo.Password,
d.config.Endpoint(),
credentials.WithGrpcDialOptions(d.config.GrpcDialOptions()...),
),
))
}

if c.pool == nil {
c.pool = conn.NewPool(c.config)
if d.pool == nil {
d.pool = conn.NewPool(d.config)
}

c.balancer, err = balancer.New(ctx, c.config, c.pool, c.discoveryOptions...)
d.balancer, err = balancer.New(ctx, d.config, d.pool, d.discoveryOptions...)
if err != nil {
return xerrors.WithStackTrace(err)
}
Expand All @@ -446,15 +325,103 @@ func connect(ctx context.Context, c *Driver) error {
}

func open(ctx context.Context, opts ...Option) (_ *Driver, err error) {
c, err := newConnectionFromOptions(ctx, opts...)
d, err := newConnectionFromOptions(ctx, opts...)
if err != nil {
return nil, xerrors.WithStackTrace(err)
}
err = connect(ctx, c)
err = connect(ctx, d)
if err != nil {
return nil, xerrors.WithStackTrace(err)
}
return c, nil
d.table = internalTable.New(
d.balancer,
tableConfig.New(
append(
// prepend common params from root config
[]tableConfig.Option{
tableConfig.With(d.config.Common),
},
d.tableOptions...,
)...,
),
)
d.scheme = internalScheme.New(
d.balancer,
schemeConfig.New(
append(
// prepend common params from root config
[]schemeConfig.Option{
schemeConfig.WithDatabaseName(d.Name()),
schemeConfig.With(d.config.Common),
},
d.schemeOptions...,
)...,
),
)
d.coordination = internalCoordination.New(
d.balancer,
coordinationConfig.New(
append(
// prepend common params from root config
[]coordinationConfig.Option{
coordinationConfig.With(d.config.Common),
},
d.coordinationOptions...,
)...,
),
)
d.ratelimiter = internalRatelimiter.New(
d.balancer,
ratelimiterConfig.New(
append(
// prepend common params from root config
[]ratelimiterConfig.Option{
ratelimiterConfig.With(d.config.Common),
},
d.ratelimiterOptions...,
)...,
),
)
d.discovery = internalDiscovery.New(
d.pool.Get(endpoint.New(d.config.Endpoint())),
discoveryConfig.New(
append(
// prepend common params from root config
[]discoveryConfig.Option{
discoveryConfig.With(d.config.Common),
discoveryConfig.WithEndpoint(d.Endpoint()),
discoveryConfig.WithDatabase(d.Name()),
discoveryConfig.WithSecure(d.Secure()),
discoveryConfig.WithMeta(d.config.Meta()),
},
d.discoveryOptions...,
)...,
),
)
d.scripting = internalScripting.New(
d.balancer,
scriptingConfig.New(
append(
// prepend common params from root config
[]scriptingConfig.Option{
scriptingConfig.With(d.config.Common),
},
d.scriptingOptions...,
)...,
),
)
d.topic = topicclientinternal.New(d.balancer, d.config.Credentials(),
append(
// prepend common params from root config
[]topicoptions.TopicOption{
topicoptions.WithOperationTimeout(d.config.OperationTimeout()),
topicoptions.WithOperationCancelAfter(d.config.OperationCancelAfter()),
},
d.topicOptions...,
)...,
)

return d, nil
}

// GRPCConn casts *ydb.Driver to grpc.ClientConnInterface for executing
Expand All @@ -464,25 +431,3 @@ func open(ctx context.Context, opts ...Option) (_ *Driver, err error) {
func GRPCConn(cc *Driver) grpc.ClientConnInterface {
return conn.WithContextModifier(cc.balancer, conn.WithoutWrapping)
}

// Helper types for closing lazy clients
type closeFunc func(ctx context.Context) error

type initOnce struct {
once sync.Once
close closeFunc
}

func (lo *initOnce) Init(f func() closeFunc) {
lo.once.Do(func() {
lo.close = f()
})
}

func (lo *initOnce) Close(ctx context.Context) error {
lo.once.Do(func() {})
if lo.close == nil {
return nil
}
return lo.close(ctx)
}

0 comments on commit 2fce79d

Please sign in to comment.