Skip to content

Commit

Permalink
Merge pull request #894 from ydb-platform/function-id
Browse files Browse the repository at this point in the history
stack.FunctionID fix for codegen in the future
  • Loading branch information
asmyasnikov authored Nov 15, 2023
2 parents 66f5d69 + 79f4d13 commit 3c08fa3
Show file tree
Hide file tree
Showing 28 changed files with 509 additions and 258 deletions.
84 changes: 52 additions & 32 deletions driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,16 @@ type Driver struct { //nolint:maligned
panicCallback func(e interface{})
}

func (d *Driver) trace() *trace.Driver {
if d.config != nil {
return d.config.Trace()
}
return &trace.Driver{}
}

// Close closes Driver and clear resources
func (d *Driver) Close(ctx context.Context) (finalErr error) {
onDone := trace.DriverOnClose(d.config.Trace(), &ctx, stack.FunctionID(0))
onDone := trace.DriverOnClose(d.trace(), &ctx, stack.FunctionID(""))
defer func() {
onDone(finalErr)
}()
Expand Down Expand Up @@ -201,15 +208,30 @@ func (d *Driver) Topic() topic.Client {
//
// See sugar.DSN helper for make dsn from endpoint and database
func Open(ctx context.Context, dsn string, opts ...Option) (_ *Driver, err error) {
return open(
ctx,
append(
[]Option{
WithConnectionString(dsn),
},
opts...,
)...,
d, err := newConnectionFromOptions(ctx, append(
[]Option{
WithConnectionString(dsn),
},
opts...,
)...)
if err != nil {
return nil, xerrors.WithStackTrace(err)
}

onDone := trace.DriverOnInit(
d.trace(), &ctx,
stack.FunctionID(""),
d.config.Endpoint(), d.config.Database(), d.config.Secure(),
)
defer func() {
onDone(err)
}()

if err = d.connect(ctx); err != nil {
return nil, xerrors.WithStackTrace(err)
}

return d, nil
}

func MustOpen(ctx context.Context, dsn string, opts ...Option) *Driver {
Expand All @@ -224,7 +246,25 @@ func MustOpen(ctx context.Context, dsn string, opts ...Option) *Driver {
//
// Deprecated: use Open with required param connectionString instead
func New(ctx context.Context, opts ...Option) (_ *Driver, err error) {
return open(ctx, opts...)
d, err := newConnectionFromOptions(ctx, opts...)
if err != nil {
return nil, xerrors.WithStackTrace(err)
}

onDone := trace.DriverOnInit(
d.trace(), &ctx,
stack.FunctionID(""),
d.config.Endpoint(), d.config.Database(), d.config.Secure(),
)
defer func() {
onDone(err)
}()

if err = d.connect(ctx); err != nil {
return nil, xerrors.WithStackTrace(err)
}

return d, nil
}

func newConnectionFromOptions(ctx context.Context, opts ...Option) (_ *Driver, err error) {
Expand Down Expand Up @@ -287,23 +327,15 @@ func newConnectionFromOptions(ctx context.Context, opts ...Option) (_ *Driver, e
return d, nil
}

func connect(ctx context.Context, d *Driver) error {
var err error

func (d *Driver) connect(ctx context.Context) (err error) {
if d.config.Endpoint() == "" {
return xerrors.WithStackTrace(errors.New("configuration: empty dial address"))
}

if d.config.Database() == "" {
return xerrors.WithStackTrace(errors.New("configuration: empty database"))
}

onDone := trace.DriverOnInit(
d.config.Trace(), &ctx, stack.FunctionID(2), d.config.Endpoint(), d.config.Database(), d.config.Secure(),
)
defer func() {
onDone(err)
}()

if d.userInfo != nil {
d.config = d.config.With(config.WithCredentials(
credentials.NewStaticCredentials(
Expand Down Expand Up @@ -443,18 +475,6 @@ func connect(ctx context.Context, d *Driver) error {
return nil
}

func open(ctx context.Context, opts ...Option) (_ *Driver, err error) {
d, err := newConnectionFromOptions(ctx, opts...)
if err != nil {
return nil, xerrors.WithStackTrace(err)
}
err = connect(ctx, d)
if err != nil {
return nil, xerrors.WithStackTrace(err)
}
return d, nil
}

// GRPCConn casts *ydb.Driver to grpc.ClientConnInterface for executing
// unary and streaming RPC over internal driver balancer.
//
Expand Down
18 changes: 13 additions & 5 deletions internal/balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ func (b *Balancer) clusterDiscoveryAttempt(ctx context.Context) (err error) {
var (
address = "ydb:///" + b.driverConfig.Endpoint()
onDone = trace.DriverOnBalancerClusterDiscoveryAttempt(
b.driverConfig.Trace(), &ctx, stack.FunctionID(0), address,
b.driverConfig.Trace(), &ctx,
stack.FunctionID(""),
address,
)
endpoints []endpoint.Endpoint
localDC string
Expand Down Expand Up @@ -166,7 +168,9 @@ func endpointsDiff(newestEndpoints []endpoint.Endpoint, previousConns []conn.Con
func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, endpoints []endpoint.Endpoint, localDC string) {
var (
onDone = trace.DriverOnBalancerUpdate(
b.driverConfig.Trace(), &ctx, stack.FunctionID(0), b.config.DetectLocalDC,
b.driverConfig.Trace(), &ctx,
stack.FunctionID(""),
b.config.DetectLocalDC,
)
previousConns []conn.Conn
)
Expand Down Expand Up @@ -202,7 +206,8 @@ func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, endpoints []end

func (b *Balancer) Close(ctx context.Context) (err error) {
onDone := trace.DriverOnBalancerClose(
b.driverConfig.Trace(), &ctx, stack.FunctionID(0),
b.driverConfig.Trace(), &ctx,
stack.FunctionID(""),
)
defer func() {
onDone(err)
Expand All @@ -227,7 +232,9 @@ func New(
) (b *Balancer, finalErr error) {
var (
onDone = trace.DriverOnBalancerInit(
driverConfig.Trace(), &ctx, stack.FunctionID(0), driverConfig.Balancer().String(),
driverConfig.Trace(), &ctx,
stack.FunctionID(""),
driverConfig.Balancer().String(),
)
discoveryConfig = discoveryConfig.New(append(opts,
discoveryConfig.With(driverConfig.Common),
Expand Down Expand Up @@ -358,7 +365,8 @@ func (b *Balancer) connections() *connectionsState {

func (b *Balancer) getConn(ctx context.Context) (c conn.Conn, err error) {
onDone := trace.DriverOnBalancerChooseEndpoint(
b.driverConfig.Trace(), &ctx, stack.FunctionID(0),
b.driverConfig.Trace(), &ctx,
stack.FunctionID(""),
)
defer func() {
if err == nil {
Expand Down
31 changes: 19 additions & 12 deletions internal/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ func (c *conn) IsState(states ...State) bool {

func (c *conn) park(ctx context.Context) (err error) {
onDone := trace.DriverOnConnPark(
c.config.Trace(), &ctx, stack.FunctionID(0), c.Endpoint(),
c.config.Trace(), &ctx,
stack.FunctionID(""),
c.Endpoint(),
)
defer func() {
onDone(err)
Expand Down Expand Up @@ -141,7 +143,9 @@ func (c *conn) SetState(ctx context.Context, s State) State {
func (c *conn) setState(ctx context.Context, s State) State {
if state := State(c.state.Swap(uint32(s))); state != s {
trace.DriverOnConnStateChange(
c.config.Trace(), &ctx, stack.FunctionID(0), c.endpoint.Copy(), state,
c.config.Trace(), &ctx,
stack.FunctionID(""),
c.endpoint.Copy(), state,
)(s)
}
return s
Expand Down Expand Up @@ -185,9 +189,10 @@ func (c *conn) realConn(ctx context.Context) (cc *grpc.ClientConn, err error) {
}

onDone := trace.DriverOnConnDial(
c.config.Trace(), &ctx, stack.FunctionID(0), c.endpoint.Copy(),
c.config.Trace(), &ctx,
stack.FunctionID(""),
c.endpoint.Copy(),
)

defer func() {
onDone(err)
}()
Expand Down Expand Up @@ -265,7 +270,9 @@ func (c *conn) Close(ctx context.Context) (err error) {
}

onDone := trace.DriverOnConnClose(
c.config.Trace(), &ctx, stack.FunctionID(0), c.Endpoint(),
c.config.Trace(), &ctx,
stack.FunctionID(""),
c.Endpoint(),
)
defer func() {
onDone(err)
Expand Down Expand Up @@ -296,18 +303,16 @@ func (c *conn) Invoke(
issues []trace.Issue
useWrapping = UseWrapping(ctx)
onDone = trace.DriverOnConnInvoke(
c.config.Trace(), &ctx, stack.FunctionID(0), c.endpoint, trace.Method(method),
c.config.Trace(), &ctx,
stack.FunctionID(""),
c.endpoint, trace.Method(method),
)
cc *grpc.ClientConn
md = metadata.MD{}
)

defer func() {
onDone(err, issues, opID, c.GetState(), md)
}()

defer func() {
meta.CallTrailerCallback(ctx, md)
onDone(err, issues, opID, c.GetState(), md)
}()

cc, err = c.realConn(ctx)
Expand Down Expand Up @@ -378,7 +383,9 @@ func (c *conn) NewStream(
) (_ grpc.ClientStream, err error) {
var (
streamRecv = trace.DriverOnConnNewStream(
c.config.Trace(), &ctx, stack.FunctionID(0), c.endpoint.Copy(), trace.Method(method),
c.config.Trace(), &ctx,
stack.FunctionID(""),
c.endpoint.Copy(), trace.Method(method),
)
useWrapping = UseWrapping(ctx)
cc *grpc.ClientConn
Expand Down
12 changes: 8 additions & 4 deletions internal/conn/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ func (p *Pool) Ban(ctx context.Context, cc Conn, cause error) {
}

trace.DriverOnConnBan(
p.config.Trace(), &ctx, stack.FunctionID(0), e, cc.GetState(), cause,
p.config.Trace(), &ctx,
stack.FunctionID(""),
e, cc.GetState(), cause,
)(cc.SetState(ctx, Banned))
}

Expand All @@ -110,7 +112,9 @@ func (p *Pool) Allow(ctx context.Context, cc Conn) {
}

trace.DriverOnConnAllow(
p.config.Trace(), &ctx, stack.FunctionID(0), e, cc.GetState(),
p.config.Trace(), &ctx,
stack.FunctionID(""),
e, cc.GetState(),
)(cc.Unban(ctx))
}

Expand All @@ -120,7 +124,7 @@ func (p *Pool) Take(context.Context) error {
}

func (p *Pool) Release(ctx context.Context) (finalErr error) {
onDone := trace.DriverOnPoolRelease(p.config.Trace(), &ctx, stack.FunctionID(0))
onDone := trace.DriverOnPoolRelease(p.config.Trace(), &ctx, stack.FunctionID(""))
defer func() {
onDone(finalErr)
}()
Expand Down Expand Up @@ -201,7 +205,7 @@ func (p *Pool) collectConns() []*conn {
}

func NewPool(ctx context.Context, config Config) *Pool {
onDone := trace.DriverOnPoolNew(config.Trace(), &ctx, stack.FunctionID(0))
onDone := trace.DriverOnPoolNew(config.Trace(), &ctx, stack.FunctionID(""))
defer onDone()

p := &Pool{
Expand Down
9 changes: 5 additions & 4 deletions internal/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,17 @@ type Client struct {
func (c *Client) Discover(ctx context.Context) (endpoints []endpoint.Endpoint, err error) {
var (
onDone = trace.DiscoveryOnDiscover(
c.config.Trace(), &ctx, stack.FunctionID(0), c.config.Endpoint(), c.config.Database(),
c.config.Trace(), &ctx,
stack.FunctionID(""),
c.config.Endpoint(), c.config.Database(),
)
request = Ydb_Discovery.ListEndpointsRequest{
Database: c.config.Database(),
}
response *Ydb_Discovery.ListEndpointsResponse
result Ydb_Discovery.ListEndpointsResult
location string
)

var location string
defer func() {
nodes := make([]trace.EndpointInfo, 0, len(endpoints))
for _, e := range endpoints {
Expand Down Expand Up @@ -100,7 +101,7 @@ func (c *Client) Discover(ctx context.Context) (endpoints []endpoint.Endpoint, e

func (c *Client) WhoAmI(ctx context.Context) (whoAmI *discovery.WhoAmI, err error) {
var (
onDone = trace.DiscoveryOnWhoAmI(c.config.Trace(), &ctx, stack.FunctionID(0))
onDone = trace.DiscoveryOnWhoAmI(c.config.Trace(), &ctx, stack.FunctionID(""))
request = Ydb_Discovery.WhoAmIRequest{}
response *Ydb_Discovery.WhoAmIResponse
whoAmIResultResult Ydb_Discovery.WhoAmIResult
Expand Down
2 changes: 1 addition & 1 deletion internal/meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (m *Meta) meta(ctx context.Context) (_ metadata.MD, err error) {

var token string

done := trace.DriverOnGetCredentials(m.trace, &ctx, stack.FunctionID(0))
done := trace.DriverOnGetCredentials(m.trace, &ctx, stack.FunctionID(""))
defer func() {
done(token, err)
}()
Expand Down
5 changes: 4 additions & 1 deletion internal/repeater/repeater.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,10 @@ func (r *repeater) wakeUp(ctx context.Context, e Event) (err error) {

ctx = WithEvent(ctx, e)

onDone := trace.DriverOnRepeaterWakeUp(r.trace, &ctx, stack.FunctionID(0), r.name, e)
onDone := trace.DriverOnRepeaterWakeUp(r.trace, &ctx,
stack.FunctionID(""),
r.name, e,
)
defer func() {
onDone(err)

Expand Down
Loading

0 comments on commit 3c08fa3

Please sign in to comment.