Skip to content

Commit

Permalink
Merge pull request #1137 from ydb-platform/traces
Browse files Browse the repository at this point in the history
Traces
  • Loading branch information
asmyasnikov authored Mar 17, 2024
2 parents ea0d763 + 58b3cff commit 5d66281
Show file tree
Hide file tree
Showing 38 changed files with 1,396 additions and 1,743 deletions.
2 changes: 0 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
* Fixed function id stringification for generic type function calls

## v3.58.0
* Changed `List` constructor from `ydb.ParamsBuilder().List().Build().Build()` to `ydb.ParamsBuilder().BeginList().EndList().Build()`
* Changed `Set` constructor from `ydb.ParamsBuilder().Set().Build().Build()` to `ydb.ParamsBuilder().BeginSet().EndSet().Build()`
Expand Down
5 changes: 2 additions & 3 deletions examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ require (
github.com/prometheus/client_golang v1.13.0
github.com/ydb-platform/gorm-driver v0.0.5
github.com/ydb-platform/ydb-go-sdk-auth-environ v0.1.2
github.com/ydb-platform/ydb-go-sdk-prometheus v0.11.10
github.com/ydb-platform/ydb-go-sdk/v3 v3.47.3
github.com/ydb-platform/ydb-go-sdk-prometheus/v2 v2.0.1
github.com/ydb-platform/ydb-go-sdk/v3 v3.54.0
github.com/ydb-platform/ydb-go-yc v0.10.1
google.golang.org/genproto v0.0.0-20230526161137-0005af68ea54
gorm.io/driver/postgres v1.5.0
Expand Down Expand Up @@ -51,7 +51,6 @@ require (
github.com/syndtr/goleveldb v1.0.0 // indirect
github.com/yandex-cloud/go-genproto v0.0.0-20220815090733-4c139c0154e2 // indirect
github.com/ydb-platform/ydb-go-genproto v0.0.0-20240126124512-dbb0e1720dbf // indirect
github.com/ydb-platform/ydb-go-sdk-metrics v0.16.3 // indirect
github.com/ydb-platform/ydb-go-yc-metadata v0.5.4 // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/mod v0.11.0 // indirect
Expand Down
7 changes: 2 additions & 5 deletions examples/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1100,7 +1100,6 @@ github.com/prometheus/client_golang v1.3.0/go.mod h1:hJaj2vgQTGQmVCsAACORcieXFeD
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_golang v1.12.1/go.mod h1:3Z9XVyYiZYEO+YQWt3RD2R3jrbd179Rt297l4aS6nDY=
github.com/prometheus/client_golang v1.12.2/go.mod h1:3Z9XVyYiZYEO+YQWt3RD2R3jrbd179Rt297l4aS6nDY=
github.com/prometheus/client_golang v1.13.0 h1:b71QUfeo5M8gq2+evJdTPfZhYMAU0uKPkyPJ7TPsloU=
github.com/prometheus/client_golang v1.13.0/go.mod h1:vTeo+zgvILHsnnj/39Ou/1fPN5nJFOEMgftOUOmlvYQ=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
Expand Down Expand Up @@ -1202,10 +1201,8 @@ github.com/ydb-platform/ydb-go-genproto v0.0.0-20240126124512-dbb0e1720dbf h1:ck
github.com/ydb-platform/ydb-go-genproto v0.0.0-20240126124512-dbb0e1720dbf/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I=
github.com/ydb-platform/ydb-go-sdk-auth-environ v0.1.2 h1:EYSI1kulnHb0H0zt3yOw4cRj4ABMSMGwNe43D+fX7e4=
github.com/ydb-platform/ydb-go-sdk-auth-environ v0.1.2/go.mod h1:Xfjce+VMU9yJVr1lj60yK2fFPWjB4jr/4cp3K7cjzi4=
github.com/ydb-platform/ydb-go-sdk-metrics v0.16.3 h1:30D5jErLAiGjchVG2D9JiCLbST5LpAiyS7DoUtHkWsU=
github.com/ydb-platform/ydb-go-sdk-metrics v0.16.3/go.mod h1:bqOjIBSt5LtA8fcTprRPGLvlQGkNlqBSRqnL+yZUJh4=
github.com/ydb-platform/ydb-go-sdk-prometheus v0.11.10 h1:eXRJ8nKGv5Dyz7qTDFraahyqlSmOf1/8JqUtlxGlA4o=
github.com/ydb-platform/ydb-go-sdk-prometheus v0.11.10/go.mod h1:7OffPa+OmsJgIP5G+2Cg5oP9+xB5UJSLm5AUpLxi5Uc=
github.com/ydb-platform/ydb-go-sdk-prometheus/v2 v2.0.1 h1:Lsir3AC2VQOTlp8UjZY9zQdCVfWvBNHT3hZn+jSGoo0=
github.com/ydb-platform/ydb-go-sdk-prometheus/v2 v2.0.1/go.mod h1:vofSH6XG0Cr04RV+V3fLp5apOhwDqj1kSoYD9/lmzmE=
github.com/ydb-platform/ydb-go-yc v0.8.3/go.mod h1:zUolAFGzJ5XG8uwiseTLr9Lapm7L7hdVdZgLSuv9FXE=
github.com/ydb-platform/ydb-go-yc v0.10.1 h1:9SBUpR94tzasEzqYSbBuuEp9mY/jV6xbwPMy3muvV7U=
github.com/ydb-platform/ydb-go-yc v0.10.1/go.mod h1:9HaZmOHUWy2MpJ4GZw9j9gR2I82/kb6H8fjsu8b2lxQ=
Expand Down
2 changes: 1 addition & 1 deletion examples/serverless/url_shortener/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
environ "github.com/ydb-platform/ydb-go-sdk-auth-environ"
ydbMetrics "github.com/ydb-platform/ydb-go-sdk-prometheus"
ydbMetrics "github.com/ydb-platform/ydb-go-sdk-prometheus/v2"
ydb "github.com/ydb-platform/ydb-go-sdk/v3"
"github.com/ydb-platform/ydb-go-sdk/v3/table"
"github.com/ydb-platform/ydb-go-sdk/v3/table/options"
Expand Down
2 changes: 1 addition & 1 deletion internal/balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, endpoints []end
)
defer func() {
nodes, added, dropped := endpointsDiff(endpoints, previousConns)
onDone(nodes, added, dropped, localDC, nil)
onDone(nodes, added, dropped, localDC)
}()

connections := endpointsToConnections(b.pool, endpoints)
Expand Down
18 changes: 3 additions & 15 deletions internal/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ func (c *conn) NewStream(
opts ...grpc.CallOption,
) (_ grpc.ClientStream, err error) {
var (
streamRecv = trace.DriverOnConnNewStream(
onDone = trace.DriverOnConnNewStream(
c.config.Trace(), &ctx,
stack.FunctionID(""),
c.endpoint.Copy(), trace.Method(method),
Expand All @@ -402,18 +402,7 @@ func (c *conn) NewStream(
)

defer func() {
if err != nil {
streamRecv(err)(err, c.GetState(), metadata.MD{})
}
}()

var cancel context.CancelFunc
ctx, cancel = xcontext.WithCancel(ctx)

defer func() {
if err != nil {
cancel()
}
onDone(err, c.GetState())
}()

cc, err = c.realConn(ctx)
Expand Down Expand Up @@ -454,15 +443,14 @@ func (c *conn) NewStream(

return &grpcClientStream{
ClientStream: s,
ctx: ctx,
c: c,
wrapping: useWrapping,
traceID: traceID,
sentMark: sentMark,
onDone: func(ctx context.Context, md metadata.MD) {
cancel()
meta.CallTrailerCallback(ctx, md)
},
recv: streamRecv,
}, nil
}

Expand Down
22 changes: 18 additions & 4 deletions internal/conn/grpc_client_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/wrap"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
Expand All @@ -17,15 +18,20 @@ import (

type grpcClientStream struct {
grpc.ClientStream
ctx context.Context
c *conn
wrapping bool
traceID string
sentMark *modificationMark
onDone func(ctx context.Context, md metadata.MD)
recv func(error) func(error, trace.ConnState, map[string][]string)
}

func (s *grpcClientStream) CloseSend() (err error) {
onDone := trace.DriverOnConnStreamCloseSend(s.c.config.Trace(), &s.ctx, stack.FunctionID(""))
defer func() {
onDone(err)
}()

err = s.ClientStream.CloseSend()

if err != nil {
Expand All @@ -46,6 +52,11 @@ func (s *grpcClientStream) CloseSend() (err error) {
}

func (s *grpcClientStream) SendMsg(m interface{}) (err error) {
onDone := trace.DriverOnConnStreamSendMsg(s.c.config.Trace(), &s.ctx, stack.FunctionID(""))
defer func() {
onDone(err)
}()

cancel := createPinger(s.c)
defer cancel()

Expand Down Expand Up @@ -77,15 +88,18 @@ func (s *grpcClientStream) SendMsg(m interface{}) (err error) {
}

func (s *grpcClientStream) RecvMsg(m interface{}) (err error) {
onDone := trace.DriverOnConnStreamRecvMsg(s.c.config.Trace(), &s.ctx, stack.FunctionID(""))
defer func() {
onDone(err)
}()

cancel := createPinger(s.c)
defer cancel()

defer func() {
onDone := s.recv(xerrors.HideEOF(err))
if err != nil {
md := s.ClientStream.Trailer()
onDone(xerrors.HideEOF(err), s.c.GetState(), md)
s.onDone(s.ClientStream.Context(), md)
s.onDone(s.ctx, md)
}
}()

Expand Down
4 changes: 3 additions & 1 deletion internal/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ func (p *Pool[PT, T]) put(ctx context.Context, item PT) (err error) {
}

func (p *Pool[PT, T]) produce(ctx context.Context) {
ctx = xcontext.WithoutDeadline(ctx)

onDone := p.trace.OnProduce(&ProduceStartInfo{
Context: &ctx,
Call: stack.FunctionID(""),
Expand Down Expand Up @@ -197,7 +199,7 @@ func (p *Pool[PT, T]) produce(ctx context.Context) {
if msg != nil {
p.idle <- msg
} else {
item, err := p.create(context.Background())
item, err := p.create(ctx)
if err == nil {
p.idle <- item
}
Expand Down
68 changes: 35 additions & 33 deletions internal/pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,43 +140,45 @@ func TestPool(t *testing.T) {
}, xtest.StopAfter(time.Second))
})
t.Run("IsAlive", func(t *testing.T) {
var (
newItems int64
deleteItems int64
expErr = xerrors.Retryable(errors.New("expected error"), xerrors.WithDeleteSession())
)
p, err := New(rootCtx,
WithMaxSize[*testItem, testItem](1),
WithCreateFunc(func(context.Context) (*testItem, error) {
atomic.AddInt64(&newItems, 1)
xtest.TestManyTimes(t, func(t testing.TB) {
var (
newItems int64
deleteItems int64
expErr = xerrors.Retryable(errors.New("expected error"), xerrors.WithDeleteSession())
)
p, err := New(rootCtx,
WithMaxSize[*testItem, testItem](1),
WithCreateFunc(func(context.Context) (*testItem, error) {
atomic.AddInt64(&newItems, 1)

v := &testItem{
onClose: func() error {
atomic.AddInt64(&deleteItems, 1)
v := &testItem{
onClose: func() error {
atomic.AddInt64(&deleteItems, 1)

return nil
},
onIsAlive: func() bool {
return atomic.LoadInt64(&newItems) >= 10
},
}
return nil
},
onIsAlive: func() bool {
return atomic.LoadInt64(&newItems) >= 10
},
}

return v, nil
}),
)
require.NoError(t, err)
err = p.With(rootCtx, func(ctx context.Context, testItem *testItem) error {
if atomic.LoadInt64(&newItems) < 10 {
return expErr
}
return v, nil
}),
)
require.NoError(t, err)
err = p.With(rootCtx, func(ctx context.Context, testItem *testItem) error {
if atomic.LoadInt64(&newItems) < 10 {
return expErr
}

return nil
})
require.NoError(t, err)
require.EqualValues(t, 10, atomic.LoadInt64(&newItems))
require.GreaterOrEqual(t, int64(9), atomic.LoadInt64(&deleteItems))
p.Close(rootCtx)
require.EqualValues(t, atomic.LoadInt64(&newItems), atomic.LoadInt64(&deleteItems))
return nil
})
require.NoError(t, err)
require.GreaterOrEqual(t, atomic.LoadInt64(&newItems), int64(9))
require.GreaterOrEqual(t, atomic.LoadInt64(&newItems), atomic.LoadInt64(&deleteItems))
p.Close(rootCtx)
require.EqualValues(t, atomic.LoadInt64(&newItems), atomic.LoadInt64(&deleteItems))
}, xtest.StopAfter(5*time.Second))
})
})
t.Run("Stress", func(t *testing.T) {
Expand Down
Loading

0 comments on commit 5d66281

Please sign in to comment.