diff --git a/driver.go b/driver.go index f35ee074f..69c96873a 100644 --- a/driver.go +++ b/driver.go @@ -93,9 +93,16 @@ type Driver struct { //nolint:maligned panicCallback func(e interface{}) } +func (d *Driver) trace() *trace.Driver { + if d.config != nil { + return d.config.Trace() + } + return &trace.Driver{} +} + // Close closes Driver and clear resources func (d *Driver) Close(ctx context.Context) (finalErr error) { - onDone := trace.DriverOnClose(d.config.Trace(), &ctx, stack.FunctionID(0)) + onDone := trace.DriverOnClose(d.trace(), &ctx, stack.FunctionID("")) defer func() { onDone(finalErr) }() @@ -201,15 +208,30 @@ func (d *Driver) Topic() topic.Client { // // See sugar.DSN helper for make dsn from endpoint and database func Open(ctx context.Context, dsn string, opts ...Option) (_ *Driver, err error) { - return open( - ctx, - append( - []Option{ - WithConnectionString(dsn), - }, - opts..., - )..., + d, err := newConnectionFromOptions(ctx, append( + []Option{ + WithConnectionString(dsn), + }, + opts..., + )...) + if err != nil { + return nil, xerrors.WithStackTrace(err) + } + + onDone := trace.DriverOnInit( + d.trace(), &ctx, + stack.FunctionID(""), + d.config.Endpoint(), d.config.Database(), d.config.Secure(), ) + defer func() { + onDone(err) + }() + + if err = d.connect(ctx); err != nil { + return nil, xerrors.WithStackTrace(err) + } + + return d, nil } func MustOpen(ctx context.Context, dsn string, opts ...Option) *Driver { @@ -224,7 +246,25 @@ func MustOpen(ctx context.Context, dsn string, opts ...Option) *Driver { // // Deprecated: use Open with required param connectionString instead func New(ctx context.Context, opts ...Option) (_ *Driver, err error) { - return open(ctx, opts...) + d, err := newConnectionFromOptions(ctx, opts...) + if err != nil { + return nil, xerrors.WithStackTrace(err) + } + + onDone := trace.DriverOnInit( + d.trace(), &ctx, + stack.FunctionID(""), + d.config.Endpoint(), d.config.Database(), d.config.Secure(), + ) + defer func() { + onDone(err) + }() + + if err = d.connect(ctx); err != nil { + return nil, xerrors.WithStackTrace(err) + } + + return d, nil } func newConnectionFromOptions(ctx context.Context, opts ...Option) (_ *Driver, err error) { @@ -287,23 +327,15 @@ func newConnectionFromOptions(ctx context.Context, opts ...Option) (_ *Driver, e return d, nil } -func connect(ctx context.Context, d *Driver) error { - var err error - +func (d *Driver) connect(ctx context.Context) (err error) { if d.config.Endpoint() == "" { return xerrors.WithStackTrace(errors.New("configuration: empty dial address")) } + if d.config.Database() == "" { return xerrors.WithStackTrace(errors.New("configuration: empty database")) } - onDone := trace.DriverOnInit( - d.config.Trace(), &ctx, stack.FunctionID(2), d.config.Endpoint(), d.config.Database(), d.config.Secure(), - ) - defer func() { - onDone(err) - }() - if d.userInfo != nil { d.config = d.config.With(config.WithCredentials( credentials.NewStaticCredentials( @@ -443,18 +475,6 @@ func connect(ctx context.Context, d *Driver) error { return nil } -func open(ctx context.Context, opts ...Option) (_ *Driver, err error) { - d, err := newConnectionFromOptions(ctx, opts...) - if err != nil { - return nil, xerrors.WithStackTrace(err) - } - err = connect(ctx, d) - if err != nil { - return nil, xerrors.WithStackTrace(err) - } - return d, nil -} - // GRPCConn casts *ydb.Driver to grpc.ClientConnInterface for executing // unary and streaming RPC over internal driver balancer. // diff --git a/internal/balancer/balancer.go b/internal/balancer/balancer.go index 3b6da7355..5f089688f 100644 --- a/internal/balancer/balancer.go +++ b/internal/balancer/balancer.go @@ -93,7 +93,9 @@ func (b *Balancer) clusterDiscoveryAttempt(ctx context.Context) (err error) { var ( address = "ydb:///" + b.driverConfig.Endpoint() onDone = trace.DriverOnBalancerClusterDiscoveryAttempt( - b.driverConfig.Trace(), &ctx, stack.FunctionID(0), address, + b.driverConfig.Trace(), &ctx, + stack.FunctionID(""), + address, ) endpoints []endpoint.Endpoint localDC string @@ -166,7 +168,9 @@ func endpointsDiff(newestEndpoints []endpoint.Endpoint, previousConns []conn.Con func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, endpoints []endpoint.Endpoint, localDC string) { var ( onDone = trace.DriverOnBalancerUpdate( - b.driverConfig.Trace(), &ctx, stack.FunctionID(0), b.config.DetectLocalDC, + b.driverConfig.Trace(), &ctx, + stack.FunctionID(""), + b.config.DetectLocalDC, ) previousConns []conn.Conn ) @@ -202,7 +206,8 @@ func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, endpoints []end func (b *Balancer) Close(ctx context.Context) (err error) { onDone := trace.DriverOnBalancerClose( - b.driverConfig.Trace(), &ctx, stack.FunctionID(0), + b.driverConfig.Trace(), &ctx, + stack.FunctionID(""), ) defer func() { onDone(err) @@ -227,7 +232,9 @@ func New( ) (b *Balancer, finalErr error) { var ( onDone = trace.DriverOnBalancerInit( - driverConfig.Trace(), &ctx, stack.FunctionID(0), driverConfig.Balancer().String(), + driverConfig.Trace(), &ctx, + stack.FunctionID(""), + driverConfig.Balancer().String(), ) discoveryConfig = discoveryConfig.New(append(opts, discoveryConfig.With(driverConfig.Common), @@ -358,7 +365,8 @@ func (b *Balancer) connections() *connectionsState { func (b *Balancer) getConn(ctx context.Context) (c conn.Conn, err error) { onDone := trace.DriverOnBalancerChooseEndpoint( - b.driverConfig.Trace(), &ctx, stack.FunctionID(0), + b.driverConfig.Trace(), &ctx, + stack.FunctionID(""), ) defer func() { if err == nil { diff --git a/internal/conn/conn.go b/internal/conn/conn.go index 3a2e14248..9a9c0780a 100644 --- a/internal/conn/conn.go +++ b/internal/conn/conn.go @@ -94,7 +94,9 @@ func (c *conn) IsState(states ...State) bool { func (c *conn) park(ctx context.Context) (err error) { onDone := trace.DriverOnConnPark( - c.config.Trace(), &ctx, stack.FunctionID(0), c.Endpoint(), + c.config.Trace(), &ctx, + stack.FunctionID(""), + c.Endpoint(), ) defer func() { onDone(err) @@ -141,7 +143,9 @@ func (c *conn) SetState(ctx context.Context, s State) State { func (c *conn) setState(ctx context.Context, s State) State { if state := State(c.state.Swap(uint32(s))); state != s { trace.DriverOnConnStateChange( - c.config.Trace(), &ctx, stack.FunctionID(0), c.endpoint.Copy(), state, + c.config.Trace(), &ctx, + stack.FunctionID(""), + c.endpoint.Copy(), state, )(s) } return s @@ -185,9 +189,10 @@ func (c *conn) realConn(ctx context.Context) (cc *grpc.ClientConn, err error) { } onDone := trace.DriverOnConnDial( - c.config.Trace(), &ctx, stack.FunctionID(0), c.endpoint.Copy(), + c.config.Trace(), &ctx, + stack.FunctionID(""), + c.endpoint.Copy(), ) - defer func() { onDone(err) }() @@ -265,7 +270,9 @@ func (c *conn) Close(ctx context.Context) (err error) { } onDone := trace.DriverOnConnClose( - c.config.Trace(), &ctx, stack.FunctionID(0), c.Endpoint(), + c.config.Trace(), &ctx, + stack.FunctionID(""), + c.Endpoint(), ) defer func() { onDone(err) @@ -296,18 +303,16 @@ func (c *conn) Invoke( issues []trace.Issue useWrapping = UseWrapping(ctx) onDone = trace.DriverOnConnInvoke( - c.config.Trace(), &ctx, stack.FunctionID(0), c.endpoint, trace.Method(method), + c.config.Trace(), &ctx, + stack.FunctionID(""), + c.endpoint, trace.Method(method), ) cc *grpc.ClientConn md = metadata.MD{} ) - - defer func() { - onDone(err, issues, opID, c.GetState(), md) - }() - defer func() { meta.CallTrailerCallback(ctx, md) + onDone(err, issues, opID, c.GetState(), md) }() cc, err = c.realConn(ctx) @@ -378,7 +383,9 @@ func (c *conn) NewStream( ) (_ grpc.ClientStream, err error) { var ( streamRecv = trace.DriverOnConnNewStream( - c.config.Trace(), &ctx, stack.FunctionID(0), c.endpoint.Copy(), trace.Method(method), + c.config.Trace(), &ctx, + stack.FunctionID(""), + c.endpoint.Copy(), trace.Method(method), ) useWrapping = UseWrapping(ctx) cc *grpc.ClientConn diff --git a/internal/conn/pool.go b/internal/conn/pool.go index cb43de1cc..47e0082e8 100644 --- a/internal/conn/pool.go +++ b/internal/conn/pool.go @@ -90,7 +90,9 @@ func (p *Pool) Ban(ctx context.Context, cc Conn, cause error) { } trace.DriverOnConnBan( - p.config.Trace(), &ctx, stack.FunctionID(0), e, cc.GetState(), cause, + p.config.Trace(), &ctx, + stack.FunctionID(""), + e, cc.GetState(), cause, )(cc.SetState(ctx, Banned)) } @@ -110,7 +112,9 @@ func (p *Pool) Allow(ctx context.Context, cc Conn) { } trace.DriverOnConnAllow( - p.config.Trace(), &ctx, stack.FunctionID(0), e, cc.GetState(), + p.config.Trace(), &ctx, + stack.FunctionID(""), + e, cc.GetState(), )(cc.Unban(ctx)) } @@ -120,7 +124,7 @@ func (p *Pool) Take(context.Context) error { } func (p *Pool) Release(ctx context.Context) (finalErr error) { - onDone := trace.DriverOnPoolRelease(p.config.Trace(), &ctx, stack.FunctionID(0)) + onDone := trace.DriverOnPoolRelease(p.config.Trace(), &ctx, stack.FunctionID("")) defer func() { onDone(finalErr) }() @@ -201,7 +205,7 @@ func (p *Pool) collectConns() []*conn { } func NewPool(ctx context.Context, config Config) *Pool { - onDone := trace.DriverOnPoolNew(config.Trace(), &ctx, stack.FunctionID(0)) + onDone := trace.DriverOnPoolNew(config.Trace(), &ctx, stack.FunctionID("")) defer onDone() p := &Pool{ diff --git a/internal/discovery/discovery.go b/internal/discovery/discovery.go index e6b00a88e..6126c5961 100644 --- a/internal/discovery/discovery.go +++ b/internal/discovery/discovery.go @@ -39,16 +39,17 @@ type Client struct { func (c *Client) Discover(ctx context.Context) (endpoints []endpoint.Endpoint, err error) { var ( onDone = trace.DiscoveryOnDiscover( - c.config.Trace(), &ctx, stack.FunctionID(0), c.config.Endpoint(), c.config.Database(), + c.config.Trace(), &ctx, + stack.FunctionID(""), + c.config.Endpoint(), c.config.Database(), ) request = Ydb_Discovery.ListEndpointsRequest{ Database: c.config.Database(), } response *Ydb_Discovery.ListEndpointsResponse result Ydb_Discovery.ListEndpointsResult + location string ) - - var location string defer func() { nodes := make([]trace.EndpointInfo, 0, len(endpoints)) for _, e := range endpoints { @@ -100,7 +101,7 @@ func (c *Client) Discover(ctx context.Context) (endpoints []endpoint.Endpoint, e func (c *Client) WhoAmI(ctx context.Context) (whoAmI *discovery.WhoAmI, err error) { var ( - onDone = trace.DiscoveryOnWhoAmI(c.config.Trace(), &ctx, stack.FunctionID(0)) + onDone = trace.DiscoveryOnWhoAmI(c.config.Trace(), &ctx, stack.FunctionID("")) request = Ydb_Discovery.WhoAmIRequest{} response *Ydb_Discovery.WhoAmIResponse whoAmIResultResult Ydb_Discovery.WhoAmIResult diff --git a/internal/meta/meta.go b/internal/meta/meta.go index cf426c3ca..fbe4f4603 100644 --- a/internal/meta/meta.go +++ b/internal/meta/meta.go @@ -108,7 +108,7 @@ func (m *Meta) meta(ctx context.Context) (_ metadata.MD, err error) { var token string - done := trace.DriverOnGetCredentials(m.trace, &ctx, stack.FunctionID(0)) + done := trace.DriverOnGetCredentials(m.trace, &ctx, stack.FunctionID("")) defer func() { done(token, err) }() diff --git a/internal/repeater/repeater.go b/internal/repeater/repeater.go index 74f7b1c1c..bb24c2f2c 100644 --- a/internal/repeater/repeater.go +++ b/internal/repeater/repeater.go @@ -145,7 +145,10 @@ func (r *repeater) wakeUp(ctx context.Context, e Event) (err error) { ctx = WithEvent(ctx, e) - onDone := trace.DriverOnRepeaterWakeUp(r.trace, &ctx, stack.FunctionID(0), r.name, e) + onDone := trace.DriverOnRepeaterWakeUp(r.trace, &ctx, + stack.FunctionID(""), + r.name, e, + ) defer func() { onDone(err) diff --git a/internal/scheme/client.go b/internal/scheme/client.go index e6de0904d..22428fe41 100644 --- a/internal/scheme/client.go +++ b/internal/scheme/client.go @@ -45,7 +45,10 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, config config.Config) } func (c *Client) MakeDirectory(ctx context.Context, path string) (finalErr error) { - onDone := trace.SchemeOnMakeDirectory(c.config.Trace(), &ctx, stack.FunctionID(0), path) + onDone := trace.SchemeOnMakeDirectory(c.config.Trace(), &ctx, + stack.FunctionID(""), + path, + ) defer func() { onDone(finalErr) }() @@ -79,7 +82,10 @@ func (c *Client) makeDirectory(ctx context.Context, path string) (err error) { } func (c *Client) RemoveDirectory(ctx context.Context, path string) (finalErr error) { - onDone := trace.SchemeOnRemoveDirectory(c.config.Trace(), &ctx, stack.FunctionID(0), path) + onDone := trace.SchemeOnRemoveDirectory(c.config.Trace(), &ctx, + stack.FunctionID(""), + path, + ) defer func() { onDone(finalErr) }() @@ -113,7 +119,7 @@ func (c *Client) removeDirectory(ctx context.Context, path string) (err error) { } func (c *Client) ListDirectory(ctx context.Context, path string) (d scheme.Directory, finalErr error) { - onDone := trace.SchemeOnListDirectory(c.config.Trace(), &ctx, stack.FunctionID(0)) + onDone := trace.SchemeOnListDirectory(c.config.Trace(), &ctx, stack.FunctionID("")) defer func() { onDone(finalErr) }() @@ -166,7 +172,10 @@ func (c *Client) listDirectory(ctx context.Context, path string) (scheme.Directo } func (c *Client) DescribePath(ctx context.Context, path string) (e scheme.Entry, finalErr error) { - onDone := trace.SchemeOnDescribePath(c.config.Trace(), &ctx, stack.FunctionID(0), path) + onDone := trace.SchemeOnDescribePath(c.config.Trace(), &ctx, + stack.FunctionID(""), + path, + ) defer func() { onDone(e.Type.String(), finalErr) }() @@ -220,7 +229,10 @@ func (c *Client) describePath(ctx context.Context, path string) (e scheme.Entry, func (c *Client) ModifyPermissions( ctx context.Context, path string, opts ...scheme.PermissionsOption, ) (finalErr error) { - onDone := trace.SchemeOnModifyPermissions(c.config.Trace(), &ctx, stack.FunctionID(0), path) + onDone := trace.SchemeOnModifyPermissions(c.config.Trace(), &ctx, + stack.FunctionID(""), + path, + ) defer func() { onDone(finalErr) }() diff --git a/internal/scripting/client.go b/internal/scripting/client.go index ade7d9143..b73f3c75f 100644 --- a/internal/scripting/client.go +++ b/internal/scripting/client.go @@ -66,7 +66,10 @@ func (c *Client) execute( params *table.QueryParameters, ) (r result.Result, err error) { var ( - onDone = trace.ScriptingOnExecute(c.config.Trace(), &ctx, stack.FunctionID(0), query, params) + onDone = trace.ScriptingOnExecute(c.config.Trace(), &ctx, + stack.FunctionID(""), + query, params, + ) a = allocator.New() request = &Ydb_Scripting.ExecuteYqlRequest{ Script: query, @@ -138,7 +141,10 @@ func (c *Client) explain( mode scripting.ExplainMode, ) (e table.ScriptingYQLExplanation, err error) { var ( - onDone = trace.ScriptingOnExplain(c.config.Trace(), &ctx, stack.FunctionID(0), query) + onDone = trace.ScriptingOnExplain(c.config.Trace(), &ctx, + stack.FunctionID(""), + query, + ) request = &Ydb_Scripting.ExplainYqlRequest{ Script: query, Mode: mode2mode(mode), @@ -205,9 +211,12 @@ func (c *Client) streamExecute( params *table.QueryParameters, ) (r result.StreamResult, err error) { var ( - onIntermediate = trace.ScriptingOnStreamExecute(c.config.Trace(), &ctx, stack.FunctionID(0), query, params) - a = allocator.New() - request = &Ydb_Scripting.ExecuteYqlRequest{ + onIntermediate = trace.ScriptingOnStreamExecute(c.config.Trace(), &ctx, + stack.FunctionID(""), + query, params, + ) + a = allocator.New() + request = &Ydb_Scripting.ExecuteYqlRequest{ Script: query, Parameters: params.Params().ToYDB(a), OperationParams: operation.Params( @@ -267,7 +276,7 @@ func (c *Client) Close(ctx context.Context) (err error) { if c == nil { return xerrors.WithStackTrace(errNilClient) } - onDone := trace.ScriptingOnClose(c.config.Trace(), &ctx, stack.FunctionID(0)) + onDone := trace.ScriptingOnClose(c.config.Trace(), &ctx, stack.FunctionID("")) defer func() { onDone(err) }() diff --git a/internal/stack/function_id.go b/internal/stack/function_id.go index ef1dce4b9..518551e5b 100644 --- a/internal/stack/function_id.go +++ b/internal/stack/function_id.go @@ -1,5 +1,20 @@ package stack -func FunctionID(depth int) call { - return Call(depth + 1) +type caller interface { + FunctionID() string +} + +var _ caller = functionID("") + +type functionID string + +func (id functionID) FunctionID() string { + return string(id) +} + +func FunctionID(id string) caller { + if id != "" { + return functionID(id) + } + return Call(1) } diff --git a/internal/stack/record.go b/internal/stack/record.go index 8e4bc21b6..c47097a5c 100644 --- a/internal/stack/record.go +++ b/internal/stack/record.go @@ -62,6 +62,8 @@ func PackagePath(b bool) recordOption { } } +var _ caller = call{} + type call struct { function uintptr file string diff --git a/internal/table/client.go b/internal/table/client.go index 28edfc235..a19662a7f 100644 --- a/internal/table/client.go +++ b/internal/table/client.go @@ -46,7 +46,7 @@ func newClient( builder sessionBuilder, config *config.Config, ) (c *Client, finalErr error) { - onDone := trace.TableOnInit(config.Trace(), &ctx, stack.FunctionID(0)) + onDone := trace.TableOnInit(config.Trace(), &ctx, stack.FunctionID("")) defer func() { onDone(config.SizeLimit(), finalErr) }() @@ -256,7 +256,7 @@ func (c *Client) CreateSession(ctx context.Context, opts ...table.Option) (_ tab retry.WithIdempotent(true), retry.WithTrace(&trace.Retry{ OnRetry: func(info trace.RetryLoopStartInfo) func(trace.RetryLoopIntermediateInfo) func(trace.RetryLoopDoneInfo) { - onIntermediate := trace.TableOnCreateSession(c.config.Trace(), info.Context, stack.FunctionID(0)) + onIntermediate := trace.TableOnCreateSession(c.config.Trace(), info.Context, stack.FunctionID("")) return func(info trace.RetryLoopIntermediateInfo) func(trace.RetryLoopDoneInfo) { onDone := onIntermediate(info.Error) return func(info trace.RetryLoopDoneInfo) { @@ -372,7 +372,7 @@ func (c *Client) internalPoolGet(ctx context.Context, opts ...getOption) (s *ses } } - onDone := trace.TableOnPoolGet(o.t, &ctx, stack.FunctionID(0)) + onDone := trace.TableOnPoolGet(o.t, &ctx, stack.FunctionID("")) defer func() { onDone(s, i, err) }() @@ -465,7 +465,7 @@ func (c *Client) internalPoolWaitFromCh(ctx context.Context, t *trace.Table) (s el = c.waitQ.PushBack(ch) }) - waitDone := trace.TableOnPoolWait(t, &ctx, stack.FunctionID(0)) + waitDone := trace.TableOnPoolWait(t, &ctx, stack.FunctionID("")) defer func() { waitDone(s, err) @@ -522,7 +522,10 @@ func (c *Client) internalPoolWaitFromCh(ctx context.Context, t *trace.Table) (s // Get() or Take() calls. In other way it will produce unexpected behavior or // panic. func (c *Client) Put(ctx context.Context, s *session) (err error) { - onDone := trace.TableOnPoolPut(c.config.Trace(), &ctx, stack.FunctionID(0), s) + onDone := trace.TableOnPoolPut(c.config.Trace(), &ctx, + stack.FunctionID(""), + s, + ) defer func() { onDone(err) }() @@ -579,7 +582,7 @@ func (c *Client) Close(ctx context.Context) (err error) { default: close(c.done) - onDone := trace.TableOnClose(c.config.Trace(), &ctx, stack.FunctionID(0)) + onDone := trace.TableOnClose(c.config.Trace(), &ctx, stack.FunctionID("")) defer func() { onDone(err) }() @@ -613,24 +616,106 @@ func (c *Client) Close(ctx context.Context) (err error) { // - deadline was canceled or deadlined // - retry operation returned nil as error // Warning: if deadline without deadline or cancellation func Retry will be worked infinite -func (c *Client) Do(ctx context.Context, op table.Operation, opts ...table.Option) error { +func (c *Client) Do(ctx context.Context, op table.Operation, opts ...table.Option) (finalErr error) { if c == nil { return xerrors.WithStackTrace(errNilClient) } + if c.isClosed() { return xerrors.WithStackTrace(errClosedClient) } - return xerrors.WithStackTrace(do(ctx, c, c.config, op, c.retryOptions(opts...))) + + config := c.retryOptions(opts...) + + attempts, onIntermediate := 0, trace.TableOnDo(config.Trace, &ctx, + stack.FunctionID(""), + config.Label, config.Label, config.Idempotent, xcontext.IsNestedCall(ctx), + ) + defer func() { + onIntermediate(finalErr)(attempts, finalErr) + }() + + err := do(ctx, c, c.config, op, func(err error) { + attempts++ + onIntermediate(err) + }, config.RetryOptions...) + if err != nil { + return xerrors.WithStackTrace(err) + } + + return nil } -func (c *Client) DoTx(ctx context.Context, op table.TxOperation, opts ...table.Option) error { +func (c *Client) DoTx(ctx context.Context, op table.TxOperation, opts ...table.Option) (finalErr error) { if c == nil { return xerrors.WithStackTrace(errNilClient) } + if c.isClosed() { return xerrors.WithStackTrace(errClosedClient) } - return xerrors.WithStackTrace(doTx(ctx, c, c.config, op, c.retryOptions(opts...))) + + config := c.retryOptions(opts...) + + attempts, onIntermediate := 0, trace.TableOnDoTx(config.Trace, &ctx, + stack.FunctionID(""), + config.Label, config.Label, config.Idempotent, xcontext.IsNestedCall(ctx), + ) + defer func() { + onIntermediate(finalErr)(attempts, finalErr) + }() + + return retryBackoff(ctx, c, + func(ctx context.Context, s table.Session) (err error) { + attempts++ + + defer func() { + onIntermediate(err) + }() + + tx, err := s.BeginTransaction(ctx, config.TxSettings) + if err != nil { + return xerrors.WithStackTrace(err) + } + + defer func() { + if err != nil { + errRollback := tx.Rollback(ctx) + if errRollback != nil { + err = xerrors.NewWithIssues("", + xerrors.WithStackTrace(err), + xerrors.WithStackTrace(errRollback), + ) + } else { + err = xerrors.WithStackTrace(err) + } + } + }() + + err = func() error { + if panicCallback := c.config.PanicCallback(); panicCallback != nil { + defer func() { + if e := recover(); e != nil { + panicCallback(e) + } + }() + } + return op(xcontext.MarkRetryCall(ctx), tx) + }() + + if err != nil { + return xerrors.WithStackTrace(err) + } + + _, err = tx.CommitTx(ctx, config.TxCommitOptions...) + if err != nil { + return xerrors.WithStackTrace(err) + } + + return nil + }, + config.RetryOptions..., + ) } func (c *Client) internalPoolGCTick(ctx context.Context, idleThreshold time.Duration) { diff --git a/internal/table/retry.go b/internal/table/retry.go index 90ccc951c..ec21c1b8f 100644 --- a/internal/table/retry.go +++ b/internal/table/retry.go @@ -3,7 +3,6 @@ package table import ( "context" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" "github.com/ydb-platform/ydb-go-sdk/v3/internal/table/config" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" @@ -23,97 +22,20 @@ type SessionProvider interface { Put(context.Context, *session) (err error) } -func doTx( - ctx context.Context, - c SessionProvider, - config *config.Config, - op table.TxOperation, - opts *table.Options, -) (err error) { - if opts.Trace == nil { - opts.Trace = &trace.Table{} - } - attempts, onIntermediate := 0, trace.TableOnDoTx(opts.Trace, &ctx, stack.FunctionID(1), - opts.Label, opts.Label, opts.Idempotent, xcontext.IsNestedCall(ctx), - ) - defer func() { - onIntermediate(err)(attempts, err) - }() - return retryBackoff(ctx, c, - func(ctx context.Context, s table.Session) (err error) { - attempts++ - - defer func() { - onIntermediate(err) - }() - - tx, err := s.BeginTransaction(ctx, opts.TxSettings) - if err != nil { - return xerrors.WithStackTrace(err) - } - - defer func() { - if err != nil { - errRollback := tx.Rollback(ctx) - if errRollback != nil { - err = xerrors.NewWithIssues("", - xerrors.WithStackTrace(err), - xerrors.WithStackTrace(errRollback), - ) - } else { - err = xerrors.WithStackTrace(err) - } - } - }() - - err = func() error { - if panicCallback := config.PanicCallback(); panicCallback != nil { - defer func() { - if e := recover(); e != nil { - panicCallback(e) - } - }() - } - return op(xcontext.MarkRetryCall(ctx), tx) - }() - - if err != nil { - return xerrors.WithStackTrace(err) - } - - _, err = tx.CommitTx(ctx, opts.TxCommitOptions...) - if err != nil { - return xerrors.WithStackTrace(err) - } - - return nil - }, - opts.RetryOptions..., - ) -} - func do( ctx context.Context, c SessionProvider, config *config.Config, op table.Operation, - opts *table.Options, + onAttempt func(err error), + opts ...retry.Option, ) (err error) { - if opts.Trace == nil { - opts.Trace = &trace.Table{} - } - attempts, onIntermediate := 0, trace.TableOnDo(opts.Trace, &ctx, stack.FunctionID(1), - opts.Label, opts.Label, opts.Idempotent, xcontext.IsNestedCall(ctx), - ) - defer func() { - onIntermediate(err)(attempts, err) - }() return retryBackoff(ctx, c, func(ctx context.Context, s table.Session) (err error) { - attempts++ - defer func() { - onIntermediate(err) + if onAttempt != nil { + onAttempt(err) + } }() err = func() error { @@ -133,7 +55,7 @@ func do( return nil }, - opts.RetryOptions..., + opts..., ) } @@ -182,5 +104,8 @@ func (c *Client) retryOptions(opts ...table.Option) *table.Options { opt.ApplyTableOption(options) } } + if options.Trace == nil { + options.Trace = &trace.Table{} + } return options } diff --git a/internal/table/retry_test.go b/internal/table/retry_test.go index 3e3897b81..43a1c865e 100644 --- a/internal/table/retry_test.go +++ b/internal/table/retry_test.go @@ -41,31 +41,26 @@ func TestRetryerBackoffRetryCancelation(t *testing.T) { ctx, cancel := xcontext.WithCancel(context.Background()) results := make(chan error) go func() { - err := do( - ctx, - p, + err := do(ctx, p, config.New(), func(ctx context.Context, _ table.Session) error { return testErr }, - &table.Options{ - RetryOptions: []retry.Option{ - retry.WithFastBackoff( - testutil.BackoffFunc(func(n int) <-chan time.Time { - ch := make(chan time.Time) - backoff <- ch - return ch - }), - ), - retry.WithSlowBackoff( - testutil.BackoffFunc(func(n int) <-chan time.Time { - ch := make(chan time.Time) - backoff <- ch - return ch - }), - ), - }, - }, + nil, + retry.WithFastBackoff( + testutil.BackoffFunc(func(n int) <-chan time.Time { + ch := make(chan time.Time) + backoff <- ch + return ch + }), + ), + retry.WithSlowBackoff( + testutil.BackoffFunc(func(n int) <-chan time.Time { + ch := make(chan time.Time) + backoff <- ch + return ch + }), + ), ) results <- err }() @@ -115,7 +110,7 @@ func TestRetryerBadSession(t *testing.T) { xerrors.WithStatusCode(Ydb.StatusIds_BAD_SESSION), ) }, - &table.Options{}, + func(err error) {}, ) if !xerrors.Is(err, context.Canceled) { t.Errorf("unexpected error: %v", err) @@ -161,7 +156,7 @@ func TestRetryerSessionClosing(t *testing.T) { s.(*session).SetStatus(table.SessionClosing) return nil }, - &table.Options{}, + nil, ) if err != nil { t.Errorf("unexpected error: %v", err) @@ -212,20 +207,17 @@ func TestRetryerImmediateReturn(t *testing.T) { func(ctx context.Context, _ table.Session) error { return testErr }, - &table.Options{ - RetryOptions: []retry.Option{ - retry.WithFastBackoff( - testutil.BackoffFunc(func(n int) <-chan time.Time { - panic("this code will not be called") - }), - ), - retry.WithSlowBackoff( - testutil.BackoffFunc(func(n int) <-chan time.Time { - panic("this code will not be called") - }), - ), - }, - }, + nil, + retry.WithFastBackoff( + testutil.BackoffFunc(func(n int) <-chan time.Time { + panic("this code will not be called") + }), + ), + retry.WithSlowBackoff( + testutil.BackoffFunc(func(n int) <-chan time.Time { + panic("this code will not be called") + }), + ), ) if !xerrors.Is(err, testErr) { t.Fatalf("unexpected error: %v", err) @@ -353,7 +345,7 @@ func TestRetryContextDeadline(t *testing.T) { return errs[r.Int(len(errs))] } }, - &table.Options{}, + nil, ) }) } @@ -454,7 +446,7 @@ func TestRetryWithCustomErrors(t *testing.T) { } return nil }, - &table.Options{}, + nil, ) //nolint:nestif if test.retriable { diff --git a/internal/table/session.go b/internal/table/session.go index 9e960d5f8..045ebc00d 100644 --- a/internal/table/session.go +++ b/internal/table/session.go @@ -114,7 +114,7 @@ func (s *session) isClosing() bool { func newSession(ctx context.Context, cc grpc.ClientConnInterface, config *config.Config) ( s *session, err error, ) { - onDone := trace.TableOnSessionNew(config.Trace(), &ctx, stack.FunctionID(0)) + onDone := trace.TableOnSessionNew(config.Trace(), &ctx, stack.FunctionID("")) defer func() { onDone(s, err) }() @@ -175,12 +175,15 @@ func (s *session) Close(ctx context.Context) (err error) { } s.closeOnce.Do(func() { + onDone := trace.TableOnSessionDelete(s.config.Trace(), &ctx, + stack.FunctionID(""), + s, + ) defer func() { s.SetStatus(table.SessionClosed) + onDone(err) }() - onDone := trace.TableOnSessionDelete(s.config.Trace(), &ctx, stack.FunctionID(0), s) - if time.Since(s.LastUsage()) < s.config.IdleThreshold() { _, err = s.tableService.DeleteSession(ctx, &Ydb_Table.DeleteSessionRequest{ @@ -197,8 +200,6 @@ func (s *session) Close(ctx context.Context) (err error) { for _, onClose := range s.onClose { onClose(s) } - - onDone(err) }) if err != nil { @@ -226,7 +227,9 @@ func (s *session) KeepAlive(ctx context.Context) (err error) { var ( result Ydb_Table.KeepAliveResult onDone = trace.TableOnSessionKeepAlive( - s.config.Trace(), &ctx, stack.FunctionID(0), s, + s.config.Trace(), &ctx, + stack.FunctionID(""), + s, ) ) defer func() { @@ -599,10 +602,11 @@ func (s *session) Explain( result Ydb_Table.ExplainQueryResult response *Ydb_Table.ExplainDataQueryResponse onDone = trace.TableOnSessionQueryExplain( - s.config.Trace(), &ctx, stack.FunctionID(0), s, query, + s.config.Trace(), &ctx, + stack.FunctionID(""), + s, query, ) ) - defer func() { if err != nil { onDone("", "", err) @@ -647,7 +651,9 @@ func (s *session) Prepare(ctx context.Context, queryText string) (_ table.Statem response *Ydb_Table.PrepareDataQueryResponse result Ydb_Table.PrepareQueryResult onDone = trace.TableOnSessionQueryPrepare( - s.config.Trace(), &ctx, stack.FunctionID(0), s, queryText, + s.config.Trace(), &ctx, + stack.FunctionID(""), + s, queryText, ) ) defer func() { @@ -728,7 +734,9 @@ func (s *session) Execute( } onDone := trace.TableOnSessionQueryExecute( - s.config.Trace(), &ctx, stack.FunctionID(0), s, q, params, + s.config.Trace(), &ctx, + stack.FunctionID(""), + s, q, params, request.QueryCachePolicy.GetKeepInCache(), ) defer func() { @@ -964,8 +972,11 @@ func (s *session) StreamReadTable( opts ...options.ReadTableOption, ) (_ result.StreamResult, err error) { var ( - onIntermediate = trace.TableOnSessionQueryStreamRead(s.config.Trace(), &ctx, stack.FunctionID(0), s) - request = Ydb_Table.ReadTableRequest{ + onIntermediate = trace.TableOnSessionQueryStreamRead(s.config.Trace(), &ctx, + stack.FunctionID(""), + s, + ) + request = Ydb_Table.ReadTableRequest{ SessionId: s.id, Path: path, } @@ -1084,7 +1095,9 @@ func (s *session) StreamExecuteScanQuery( a = allocator.New() q = queryFromText(query) onIntermediate = trace.TableOnSessionQueryStreamExecute( - s.config.Trace(), &ctx, stack.FunctionID(0), s, q, params, + s.config.Trace(), &ctx, + stack.FunctionID(""), + s, q, params, ) request = Ydb_Table.ExecuteScanQueryRequest{ Query: q.toYDB(a), @@ -1155,7 +1168,9 @@ func (s *session) BulkUpsert(ctx context.Context, table string, rows types.Value a = allocator.New() callOptions []grpc.CallOption onDone = trace.TableOnSessionBulkUpsert( - s.config.Trace(), &ctx, stack.FunctionID(0), s, + s.config.Trace(), &ctx, + stack.FunctionID(""), + s, ) ) defer func() { @@ -1197,7 +1212,9 @@ func (s *session) BeginTransaction( result Ydb_Table.BeginTransactionResult response *Ydb_Table.BeginTransactionResponse onDone = trace.TableOnSessionTransactionBegin( - s.config.Trace(), &ctx, stack.FunctionID(0), s, + s.config.Trace(), &ctx, + stack.FunctionID(""), + s, ) ) defer func() { diff --git a/internal/table/statement.go b/internal/table/statement.go index b0e2c68cb..e59f07b5a 100644 --- a/internal/table/statement.go +++ b/internal/table/statement.go @@ -60,7 +60,9 @@ func (s *statement) Execute( } onDone := trace.TableOnSessionQueryExecute( - s.session.config.Trace(), &ctx, stack.FunctionID(0), s.session, s.query, params, + s.session.config.Trace(), &ctx, + stack.FunctionID(""), + s.session, s.query, params, request.QueryCachePolicy.GetKeepInCache(), ) defer func() { diff --git a/internal/table/transaction.go b/internal/table/transaction.go index c3bc19337..180784c3a 100644 --- a/internal/table/transaction.go +++ b/internal/table/transaction.go @@ -61,7 +61,9 @@ func (tx *transaction) Execute( opts ...options.ExecuteDataQueryOption, ) (r result.Result, err error) { onDone := trace.TableOnSessionTransactionExecute( - tx.s.config.Trace(), &ctx, stack.FunctionID(0), tx.s, tx, queryFromText(query), params, + tx.s.config.Trace(), &ctx, + stack.FunctionID(""), + tx.s, tx, queryFromText(query), params, ) defer func() { onDone(r, err) @@ -99,7 +101,9 @@ func (tx *transaction) ExecuteStatement( defer a.Free() onDone := trace.TableOnSessionTransactionExecuteStatement( - tx.s.config.Trace(), &ctx, stack.FunctionID(0), tx.s, tx, stmt.(*statement).query, params, + tx.s.config.Trace(), &ctx, + stack.FunctionID(""), + tx.s, tx, stmt.(*statement).query, params, ) defer func() { onDone(r, err) @@ -130,7 +134,9 @@ func (tx *transaction) CommitTx( opts ...options.CommitTransactionOption, ) (r result.Result, err error) { onDone := trace.TableOnSessionTransactionCommit( - tx.s.config.Trace(), &ctx, stack.FunctionID(0), tx.s, tx, + tx.s.config.Trace(), &ctx, + stack.FunctionID(""), + tx.s, tx, ) defer func() { onDone(err) @@ -186,7 +192,9 @@ func (tx *transaction) CommitTx( // Rollback performs a rollback of the specified active transaction. func (tx *transaction) Rollback(ctx context.Context) (err error) { onDone := trace.TableOnSessionTransactionRollback( - tx.s.config.Trace(), &ctx, stack.FunctionID(0), tx.s, tx, + tx.s.config.Trace(), &ctx, + stack.FunctionID(""), + tx.s, tx, ) defer func() { onDone(err) diff --git a/internal/xresolver/xresolver.go b/internal/xresolver/xresolver.go index 647a27613..6ce395fa8 100644 --- a/internal/xresolver/xresolver.go +++ b/internal/xresolver/xresolver.go @@ -31,19 +31,24 @@ func (c *clientConn) Endpoint() string { } func (c *clientConn) UpdateState(state resolver.State) (err error) { - onDone := trace.DriverOnResolve(c.trace, stack.FunctionID(0), c.Endpoint(), func() (addrs []string) { - for i := range state.Addresses { - addrs = append(addrs, state.Addresses[i].Addr) - } - return - }()) + onDone := trace.DriverOnResolve(c.trace, + stack.FunctionID(""), + c.Endpoint(), func() (addrs []string) { + for i := range state.Addresses { + addrs = append(addrs, state.Addresses[i].Addr) + } + return + }(), + ) defer func() { onDone(err) }() + err = c.ClientConn.UpdateState(state) if err != nil { return xerrors.WithStackTrace(err) } + return nil } diff --git a/internal/xsql/conn.go b/internal/xsql/conn.go index 98d2cb24c..b5a52fd71 100644 --- a/internal/xsql/conn.go +++ b/internal/xsql/conn.go @@ -152,13 +152,18 @@ func (c *conn) PrepareContext(ctx context.Context, query string) (_ driver.Stmt, if c.currentTx != nil { return c.currentTx.PrepareContext(ctx, query) } - onDone := trace.DatabaseSQLOnConnPrepare(c.trace, &ctx, stack.FunctionID(0), query) + onDone := trace.DatabaseSQLOnConnPrepare(c.trace, &ctx, + stack.FunctionID(""), + query, + ) defer func() { onDone(finalErr) }() + if !c.isReady() { return nil, badconn.Map(xerrors.WithStackTrace(errNotReadyConn)) } + return &stmt{ conn: c, processor: c, @@ -190,10 +195,11 @@ func (c *conn) execContext(ctx context.Context, query string, args []driver.Name var ( m = queryModeFromContext(ctx, c.defaultQueryMode) onDone = trace.DatabaseSQLOnConnExec( - c.trace, &ctx, stack.FunctionID(0), query, m.String(), xcontext.IsIdempotent(ctx), c.sinceLastUsage(), + c.trace, &ctx, + stack.FunctionID(""), + query, m.String(), xcontext.IsIdempotent(ctx), c.sinceLastUsage(), ) ) - defer func() { onDone(finalErr) }() @@ -297,7 +303,9 @@ func (c *conn) queryContext(ctx context.Context, query string, args []driver.Nam var ( m = queryModeFromContext(ctx, c.defaultQueryMode) onDone = trace.DatabaseSQLOnConnQuery( - c.trace, &ctx, stack.FunctionID(0), query, m.String(), xcontext.IsIdempotent(ctx), c.sinceLastUsage(), + c.trace, &ctx, + stack.FunctionID(""), + query, m.String(), xcontext.IsIdempotent(ctx), c.sinceLastUsage(), ) ) defer func() { @@ -379,7 +387,7 @@ func (c *conn) queryContext(ctx context.Context, query string, args []driver.Nam } func (c *conn) Ping(ctx context.Context) (finalErr error) { - onDone := trace.DatabaseSQLOnConnPing(c.trace, &ctx, stack.FunctionID(0)) + onDone := trace.DatabaseSQLOnConnPing(c.trace, &ctx, stack.FunctionID("")) defer func() { onDone(finalErr) }() @@ -396,7 +404,8 @@ func (c *conn) Close() (finalErr error) { if c.closed.CompareAndSwap(false, true) { c.connector.detach(c) onDone := trace.DatabaseSQLOnConnClose( - c.trace, &c.openConnCtx, stack.FunctionID(0), + c.trace, &c.openConnCtx, + stack.FunctionID(""), ) defer func() { onDone(finalErr) @@ -436,7 +445,7 @@ func (c *conn) ID() string { func (c *conn) BeginTx(ctx context.Context, txOptions driver.TxOptions) (_ driver.Tx, finalErr error) { var tx currentTx - onDone := trace.DatabaseSQLOnConnBegin(c.trace, &ctx, stack.FunctionID(0)) + onDone := trace.DatabaseSQLOnConnBegin(c.trace, &ctx, stack.FunctionID("")) defer func() { onDone(tx, finalErr) }() @@ -483,7 +492,10 @@ func (c *conn) Version(_ context.Context) (_ string, _ error) { func (c *conn) IsTableExists(ctx context.Context, tableName string) (tableExists bool, finalErr error) { tableName = c.normalizePath(tableName) - onDone := trace.DatabaseSQLOnConnIsTableExists(c.trace, &ctx, stack.FunctionID(0), tableName) + onDone := trace.DatabaseSQLOnConnIsTableExists(c.trace, &ctx, + stack.FunctionID(""), + tableName, + ) defer func() { onDone(tableExists, finalErr) }() diff --git a/internal/xsql/connector.go b/internal/xsql/connector.go index e95f5926a..2812ac768 100644 --- a/internal/xsql/connector.go +++ b/internal/xsql/connector.go @@ -297,7 +297,8 @@ func (c *Connector) detach(cc *conn) { func (c *Connector) Connect(ctx context.Context) (_ driver.Conn, err error) { var ( onDone = trace.DatabaseSQLOnConnectorConnect( - c.trace, &ctx, stack.FunctionID(0), + c.trace, &ctx, + stack.FunctionID(""), ) session table.ClosableSession ) diff --git a/internal/xsql/stmt.go b/internal/xsql/stmt.go index 5660d2504..99d1d3131 100644 --- a/internal/xsql/stmt.go +++ b/internal/xsql/stmt.go @@ -30,7 +30,10 @@ var ( ) func (s *stmt) QueryContext(ctx context.Context, args []driver.NamedValue) (_ driver.Rows, finalErr error) { - onDone := trace.DatabaseSQLOnStmtQuery(s.trace, &ctx, stack.FunctionID(0), s.stmtCtx, s.query) + onDone := trace.DatabaseSQLOnStmtQuery(s.trace, &ctx, + stack.FunctionID(""), + s.stmtCtx, s.query, + ) defer func() { onDone(finalErr) }() @@ -46,7 +49,10 @@ func (s *stmt) QueryContext(ctx context.Context, args []driver.NamedValue) (_ dr } func (s *stmt) ExecContext(ctx context.Context, args []driver.NamedValue) (_ driver.Result, finalErr error) { - onDone := trace.DatabaseSQLOnStmtExec(s.trace, &ctx, stack.FunctionID(0), s.stmtCtx, s.query) + onDone := trace.DatabaseSQLOnStmtExec(s.trace, &ctx, + stack.FunctionID(""), + s.stmtCtx, s.query, + ) defer func() { onDone(finalErr) }() @@ -66,7 +72,7 @@ func (s *stmt) NumInput() int { } func (s *stmt) Close() (finalErr error) { - onDone := trace.DatabaseSQLOnStmtClose(s.trace, &s.stmtCtx, stack.FunctionID(0)) + onDone := trace.DatabaseSQLOnStmtClose(s.trace, &s.stmtCtx, stack.FunctionID("")) defer func() { onDone(finalErr) }() diff --git a/internal/xsql/tx.go b/internal/xsql/tx.go index 5d77bfd98..5a135c0c9 100644 --- a/internal/xsql/tx.go +++ b/internal/xsql/tx.go @@ -71,7 +71,10 @@ func (tx *tx) checkTxState() error { } func (tx *tx) Commit() (finalErr error) { - onDone := trace.DatabaseSQLOnTxCommit(tx.conn.trace, &tx.txCtx, stack.FunctionID(0), tx) + onDone := trace.DatabaseSQLOnTxCommit(tx.conn.trace, &tx.txCtx, + stack.FunctionID(""), + tx, + ) defer func() { onDone(finalErr) }() @@ -89,7 +92,10 @@ func (tx *tx) Commit() (finalErr error) { } func (tx *tx) Rollback() (finalErr error) { - onDone := trace.DatabaseSQLOnTxRollback(tx.conn.trace, &tx.txCtx, stack.FunctionID(0), tx) + onDone := trace.DatabaseSQLOnTxRollback(tx.conn.trace, &tx.txCtx, + stack.FunctionID(""), + tx, + ) defer func() { onDone(finalErr) }() @@ -109,7 +115,10 @@ func (tx *tx) Rollback() (finalErr error) { func (tx *tx) QueryContext(ctx context.Context, query string, args []driver.NamedValue) ( _ driver.Rows, finalErr error, ) { - onDone := trace.DatabaseSQLOnTxQuery(tx.conn.trace, &ctx, stack.FunctionID(0), tx.txCtx, tx, query, true) + onDone := trace.DatabaseSQLOnTxQuery(tx.conn.trace, &ctx, + stack.FunctionID(""), + tx.txCtx, tx, query, true, + ) defer func() { onDone(finalErr) }() @@ -147,7 +156,10 @@ func (tx *tx) QueryContext(ctx context.Context, query string, args []driver.Name func (tx *tx) ExecContext(ctx context.Context, query string, args []driver.NamedValue) ( _ driver.Result, finalErr error, ) { - onDone := trace.DatabaseSQLOnTxExec(tx.conn.trace, &ctx, stack.FunctionID(0), tx.txCtx, tx, query, true) + onDone := trace.DatabaseSQLOnTxExec(tx.conn.trace, &ctx, + stack.FunctionID(""), + tx.txCtx, tx, query, true, + ) defer func() { onDone(finalErr) }() @@ -177,7 +189,10 @@ func (tx *tx) ExecContext(ctx context.Context, query string, args []driver.Named } func (tx *tx) PrepareContext(ctx context.Context, query string) (_ driver.Stmt, finalErr error) { - onDone := trace.DatabaseSQLOnTxPrepare(tx.conn.trace, &ctx, stack.FunctionID(0), &tx.txCtx, tx, query) + onDone := trace.DatabaseSQLOnTxPrepare(tx.conn.trace, &ctx, + stack.FunctionID(""), + &tx.txCtx, tx, query, + ) defer func() { onDone(finalErr) }() diff --git a/internal/xsql/tx_fake.go b/internal/xsql/tx_fake.go index a61c8e94f..1488e00b6 100644 --- a/internal/xsql/tx_fake.go +++ b/internal/xsql/tx_fake.go @@ -19,7 +19,10 @@ type txFake struct { } func (tx *txFake) PrepareContext(ctx context.Context, query string) (_ driver.Stmt, finalErr error) { - onDone := trace.DatabaseSQLOnTxPrepare(tx.conn.trace, &ctx, stack.FunctionID(0), &tx.beginCtx, tx, query) + onDone := trace.DatabaseSQLOnTxPrepare(tx.conn.trace, &ctx, + stack.FunctionID(""), + &tx.beginCtx, tx, query, + ) defer func() { onDone(finalErr) }() @@ -54,7 +57,10 @@ func (tx *txFake) ID() string { } func (tx *txFake) Commit() (err error) { - onDone := trace.DatabaseSQLOnTxCommit(tx.conn.trace, &tx.ctx, stack.FunctionID(0), tx) + onDone := trace.DatabaseSQLOnTxCommit(tx.conn.trace, &tx.ctx, + stack.FunctionID(""), + tx, + ) defer func() { onDone(err) }() @@ -68,7 +74,10 @@ func (tx *txFake) Commit() (err error) { } func (tx *txFake) Rollback() (err error) { - onDone := trace.DatabaseSQLOnTxRollback(tx.conn.trace, &tx.ctx, stack.FunctionID(0), tx) + onDone := trace.DatabaseSQLOnTxRollback(tx.conn.trace, &tx.ctx, + stack.FunctionID(""), + tx, + ) defer func() { onDone(err) }() @@ -85,7 +94,9 @@ func (tx *txFake) QueryContext(ctx context.Context, query string, args []driver. rows driver.Rows, err error, ) { onDone := trace.DatabaseSQLOnTxQuery( - tx.conn.trace, &ctx, stack.FunctionID(0), tx.ctx, tx, query, xcontext.IsIdempotent(ctx), + tx.conn.trace, &ctx, + stack.FunctionID(""), + tx.ctx, tx, query, xcontext.IsIdempotent(ctx), ) defer func() { onDone(err) @@ -101,7 +112,9 @@ func (tx *txFake) ExecContext(ctx context.Context, query string, args []driver.N result driver.Result, err error, ) { onDone := trace.DatabaseSQLOnTxExec( - tx.conn.trace, &ctx, stack.FunctionID(0), tx.ctx, tx, query, xcontext.IsIdempotent(ctx), + tx.conn.trace, &ctx, + stack.FunctionID(""), + tx.ctx, tx, query, xcontext.IsIdempotent(ctx), ) defer func() { onDone(err) diff --git a/retry/retry.go b/retry/retry.go index 4db2b3a8f..50f9c0b99 100644 --- a/retry/retry.go +++ b/retry/retry.go @@ -232,7 +232,7 @@ func WithPanicCallback(panicCallback func(e interface{})) panicCallbackOption { // If you need to retry your op func on some logic errors - you must return RetryableError() from retryOperation func Retry(ctx context.Context, op retryOperation, opts ...Option) (finalErr error) { options := &retryOptions{ - call: stack.FunctionID(0), + call: stack.FunctionID(""), trace: &trace.Retry{}, fastBackoff: backoff.Fast, slowBackoff: backoff.Slow, diff --git a/retry/sql.go b/retry/sql.go index e82e5ffdc..c0525891c 100644 --- a/retry/sql.go +++ b/retry/sql.go @@ -42,7 +42,7 @@ func Do(ctx context.Context, db *sql.DB, op func(ctx context.Context, cc *sql.Co var ( options = doOptions{ retryOptions: []Option{ - withCaller(stack.FunctionID(0)), + withCaller(stack.FunctionID("")), }, } attempts = 0 @@ -127,7 +127,7 @@ func DoTx(ctx context.Context, db *sql.DB, op func(context.Context, *sql.Tx) err var ( options = doTxOptions{ retryOptions: []Option{ - withCaller(stack.FunctionID(0)), + withCaller(stack.FunctionID("")), }, txOptions: &sql.TxOptions{ Isolation: sql.LevelDefault, diff --git a/trace/driver.go b/trace/driver.go index 45b9f14fc..5c31cfbd5 100644 --- a/trace/driver.go +++ b/trace/driver.go @@ -17,6 +17,7 @@ type ( Driver struct { // Driver runtime events OnInit func(DriverInitStartInfo) func(DriverInitDoneInfo) + OnWith func(DriverWithStartInfo) func(DriverWithDoneInfo) OnClose func(DriverCloseStartInfo) func(DriverCloseDoneInfo) // Pool of connections @@ -416,6 +417,20 @@ type ( DriverInitDoneInfo struct { Error error } + DriverWithStartInfo struct { + // Context make available context in trace callback function. + // Pointer to context provide replacement of context in trace callback function. + // Warning: concurrent access to pointer on client side must be excluded. + // Safe replacement of context are provided only inside callback function + Context *context.Context + Call call + Endpoint string + Database string + Secure bool + } + DriverWithDoneInfo struct { + Error error + } DriverConnPoolNewStartInfo struct { // Context make available context in trace callback function. // Pointer to context provide replacement of context in trace callback function. diff --git a/trace/driver_gtrace.go b/trace/driver_gtrace.go index b51e2f07e..0fe4e4fe7 100644 --- a/trace/driver_gtrace.go +++ b/trace/driver_gtrace.go @@ -65,6 +65,41 @@ func (t *Driver) Compose(x *Driver, opts ...DriverComposeOption) *Driver { } } } + { + h1 := t.OnWith + h2 := x.OnWith + ret.OnWith = func(d DriverWithStartInfo) func(DriverWithDoneInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + var r, r1 func(DriverWithDoneInfo) + if h1 != nil { + r = h1(d) + } + if h2 != nil { + r1 = h2(d) + } + return func(d DriverWithDoneInfo) { + if options.panicCallback != nil { + defer func() { + if e := recover(); e != nil { + options.panicCallback(e) + } + }() + } + if r != nil { + r(d) + } + if r1 != nil { + r1(d) + } + } + } + } { h1 := t.OnClose h2 := x.OnClose @@ -973,6 +1008,21 @@ func (t *Driver) onInit(d DriverInitStartInfo) func(DriverInitDoneInfo) { } return res } +func (t *Driver) onWith(d DriverWithStartInfo) func(DriverWithDoneInfo) { + fn := t.OnWith + if fn == nil { + return func(DriverWithDoneInfo) { + return + } + } + res := fn(d) + if res == nil { + return func(DriverWithDoneInfo) { + return + } + } + return res +} func (t *Driver) onClose(d DriverCloseStartInfo) func(DriverCloseDoneInfo) { fn := t.OnClose if fn == nil { @@ -1374,6 +1424,20 @@ func DriverOnInit(t *Driver, c *context.Context, call call, endpoint string, dat res(p) } } +func DriverOnWith(t *Driver, c *context.Context, call call, endpoint string, database string, secure bool) func(error) { + var p DriverWithStartInfo + p.Context = c + p.Call = call + p.Endpoint = endpoint + p.Database = database + p.Secure = secure + res := t.onWith(p) + return func(e error) { + var p DriverWithDoneInfo + p.Error = e + res(p) + } +} func DriverOnClose(t *Driver, c *context.Context, call call) func(error) { var p DriverCloseStartInfo p.Context = c diff --git a/with.go b/with.go index fefd32091..300d65cac 100644 --- a/with.go +++ b/with.go @@ -3,8 +3,10 @@ package ydb import ( "context" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/stack" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xatomic" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" + "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) var nextID xatomic.Uint64 @@ -44,8 +46,16 @@ func (d *Driver) With(ctx context.Context, opts ...Option) (*Driver, error) { return nil, xerrors.WithStackTrace(err) } - err = connect(ctx, child) - if err != nil { + onDone := trace.DriverOnWith( + d.trace(), &ctx, + stack.FunctionID(""), + d.config.Endpoint(), d.config.Database(), d.config.Secure(), + ) + defer func() { + onDone(err) + }() + + if err = child.connect(ctx); err != nil { return nil, xerrors.WithStackTrace(err) }