Skip to content

Commit

Permalink
added context to internal clients constructors
Browse files Browse the repository at this point in the history
  • Loading branch information
asmyasnikov committed Nov 7, 2023
1 parent 2fce79d commit ab9e576
Show file tree
Hide file tree
Showing 14 changed files with 262 additions and 229 deletions.
37 changes: 30 additions & 7 deletions driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func open(ctx context.Context, opts ...Option) (_ *Driver, err error) {
if err != nil {
return nil, xerrors.WithStackTrace(err)
}
d.table = internalTable.New(
d.table, err = internalTable.New(ctx,
d.balancer,
tableConfig.New(
append(
Expand All @@ -345,7 +345,10 @@ func open(ctx context.Context, opts ...Option) (_ *Driver, err error) {
)...,
),
)
d.scheme = internalScheme.New(
if err != nil {
return nil, xerrors.WithStackTrace(err)
}
d.scheme, err = internalScheme.New(ctx,
d.balancer,
schemeConfig.New(
append(
Expand All @@ -358,7 +361,10 @@ func open(ctx context.Context, opts ...Option) (_ *Driver, err error) {
)...,
),
)
d.coordination = internalCoordination.New(
if err != nil {
return nil, xerrors.WithStackTrace(err)
}
d.coordination, err = internalCoordination.New(ctx,
d.balancer,
coordinationConfig.New(
append(
Expand All @@ -370,7 +376,10 @@ func open(ctx context.Context, opts ...Option) (_ *Driver, err error) {
)...,
),
)
d.ratelimiter = internalRatelimiter.New(
if err != nil {
return nil, xerrors.WithStackTrace(err)
}
d.ratelimiter, err = internalRatelimiter.New(ctx,
d.balancer,
ratelimiterConfig.New(
append(
Expand All @@ -382,7 +391,10 @@ func open(ctx context.Context, opts ...Option) (_ *Driver, err error) {
)...,
),
)
d.discovery = internalDiscovery.New(
if err != nil {
return nil, xerrors.WithStackTrace(err)
}
d.discovery, err = internalDiscovery.New(ctx,
d.pool.Get(endpoint.New(d.config.Endpoint())),
discoveryConfig.New(
append(
Expand All @@ -398,7 +410,10 @@ func open(ctx context.Context, opts ...Option) (_ *Driver, err error) {
)...,
),
)
d.scripting = internalScripting.New(
if err != nil {
return nil, xerrors.WithStackTrace(err)
}
d.scripting, err = internalScripting.New(ctx,
d.balancer,
scriptingConfig.New(
append(
Expand All @@ -410,7 +425,12 @@ func open(ctx context.Context, opts ...Option) (_ *Driver, err error) {
)...,
),
)
d.topic = topicclientinternal.New(d.balancer, d.config.Credentials(),
if err != nil {
return nil, xerrors.WithStackTrace(err)
}
d.topic, err = topicclientinternal.New(ctx,
d.balancer,
d.config.Credentials(),
append(
// prepend common params from root config
[]topicoptions.TopicOption{
Expand All @@ -420,6 +440,9 @@ func open(ctx context.Context, opts ...Option) (_ *Driver, err error) {
d.topicOptions...,
)...,
)
if err != nil {
return nil, xerrors.WithStackTrace(err)
}

return d, nil
}
Expand Down
20 changes: 11 additions & 9 deletions internal/balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func New(
driverConfig *config.Config,
pool *conn.Pool,
opts ...discoveryConfig.Option,
) (b *Balancer, err error) {
) (b *Balancer, finalErr error) {
var (
onDone = trace.DriverOnBalancerInit(
driverConfig.Trace(),
Expand All @@ -201,20 +201,22 @@ func New(
)...)
)
defer func() {
onDone(err)
onDone(finalErr)
}()

b = &Balancer{
driverConfig: driverConfig,
pool: pool,
localDCDetector: detectLocalDC,
discoveryClient: internalDiscovery.New(
pool.Get(
endpoint.New(driverConfig.Endpoint()),
),
discoveryConfig,
),
}
d, err := internalDiscovery.New(ctx, pool.Get(
endpoint.New(driverConfig.Endpoint()),
), discoveryConfig)
if err != nil {
return nil, err
}

b.discoveryClient = d

if config := driverConfig.Balancer(); config == nil {
b.balancerConfig = balancerConfig.Config{}
Expand All @@ -228,7 +230,7 @@ func New(
}, "")
} else {
// initialization of balancer state
if err = b.clusterDiscovery(ctx); err != nil {
if err := b.clusterDiscovery(ctx); err != nil {
return nil, xerrors.WithStackTrace(err)
}
// run background discovering
Expand Down
4 changes: 2 additions & 2 deletions internal/coordination/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ type Client struct {
service Ydb_Coordination_V1.CoordinationServiceClient
}

func New(cc grpc.ClientConnInterface, config config.Config) *Client {
func New(ctx context.Context, cc grpc.ClientConnInterface, config config.Config) (*Client, error) {
return &Client{
config: config,
service: Ydb_Coordination_V1.NewCoordinationServiceClient(cc),
}
}, nil
}

func (c *Client) CreateNode(ctx context.Context, path string, config coordination.NodeConfig) error {
Expand Down
4 changes: 2 additions & 2 deletions internal/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ import (
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
)

func New(cc grpc.ClientConnInterface, config *config.Config) *Client {
func New(ctx context.Context, cc grpc.ClientConnInterface, config *config.Config) (*Client, error) {
return &Client{
config: config,
cc: cc,
client: Ydb_Discovery_V1.NewDiscoveryServiceClient(cc),
}
}, nil
}

var _ discovery.Client = &Client{}
Expand Down
4 changes: 2 additions & 2 deletions internal/ratelimiter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ func (c *Client) Close(ctx context.Context) error {
return nil
}

func New(cc grpc.ClientConnInterface, config config.Config) *Client {
func New(ctx context.Context, cc grpc.ClientConnInterface, config config.Config) (*Client, error) {
return &Client{
config: config,
service: Ydb_RateLimiter_V1.NewRateLimiterServiceClient(cc),
}
}, nil
}

func (c *Client) CreateResource(
Expand Down
4 changes: 2 additions & 2 deletions internal/scheme/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ func (c *Client) Close(_ context.Context) error {
return nil
}

func New(cc grpc.ClientConnInterface, config config.Config) *Client {
func New(ctx context.Context, cc grpc.ClientConnInterface, config config.Config) (*Client, error) {
return &Client{
config: config,
service: Ydb_Scheme_V1.NewSchemeServiceClient(cc),
}
}, nil
}

func (c *Client) MakeDirectory(ctx context.Context, path string) (err error) {
Expand Down
8 changes: 4 additions & 4 deletions internal/scripting/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,11 @@ func (c *Client) explain(
}()
response, err = c.service.ExplainYql(ctx, request)
if err != nil {
return
return e, err
}
err = response.GetOperation().GetResult().UnmarshalTo(&result)
if err != nil {
return
return e, err
}
result.GetParametersTypes()
e = table.ScriptingYQLExplanation{
Expand Down Expand Up @@ -273,9 +273,9 @@ func (c *Client) Close(ctx context.Context) (err error) {
return nil
}

func New(cc grpc.ClientConnInterface, config config.Config) *Client {
func New(ctx context.Context, cc grpc.ClientConnInterface, config config.Config) (*Client, error) {
return &Client{
config: config,
service: Ydb_Scripting_V1.NewScriptingServiceClient(cc),
}
}, nil
}
21 changes: 11 additions & 10 deletions internal/table/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,23 @@ type balancer interface {
nodeChecker
}

func New(balancer balancer, config *config.Config) *Client {
return newClient(balancer, func(ctx context.Context) (s *session, err error) {
func New(ctx context.Context, balancer balancer, config *config.Config) (*Client, error) {
return newClient(ctx, balancer, func(ctx context.Context) (s *session, err error) {
return newSession(ctx, balancer, config)
}, config)
}

func newClient(
ctx context.Context,
balancer balancer,
builder sessionBuilder,
config *config.Config,
) *Client {
var (
ctx = context.Background()
onDone = trace.TableOnInit(config.Trace(), &ctx)
)
c := &Client{
) (c *Client, finalErr error) {
onDone := trace.TableOnInit(config.Trace(), &ctx)
defer func() {
onDone(config.SizeLimit(), finalErr)
}()
c = &Client{
clock: config.Clock(),
config: config,
cc: balancer,
Expand All @@ -70,8 +71,8 @@ func newClient(
c.wg.Add(1)
go c.internalPoolGC(ctx, idleThreshold)
}
onDone(c.limit)
return c

return c, nil
}

// Client is a set of session instances that may be reused.
Expand Down
10 changes: 7 additions & 3 deletions internal/table/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,8 @@ func TestSessionPoolRacyGet(t *testing.T) {
session *session
}
create := make(chan createReq)
p := newClient(
p, err := newClient(
context.Background(),
nil,
(&StubBuilder{
Limit: 1,
Expand All @@ -422,11 +423,11 @@ func TestSessionPoolRacyGet(t *testing.T) {
config.WithIdleThreshold(-1),
),
)
require.NoError(t, err)
var (
expSession *session
done = make(chan struct{}, 2)
)
var err error
for i := 0; i < 2; i++ {
go func() {
defer func() {
Expand Down Expand Up @@ -855,7 +856,8 @@ func newClientWithStubBuilder(
stubLimit int,
options ...config.Option,
) *Client {
return newClient(
c, err := newClient(
context.Background(),
balancer,
(&StubBuilder{
T: t,
Expand All @@ -864,6 +866,8 @@ func newClientWithStubBuilder(
}).createSession,
config.New(options...),
)
require.NoError(t, err)
return c
}

func (s *StubBuilder) createSession(ctx context.Context) (session *session, err error) {
Expand Down
Loading

0 comments on commit ab9e576

Please sign in to comment.