Skip to content

Commit

Permalink
* Added trace.Query.OnSessionBegin event
Browse files Browse the repository at this point in the history
* Added `trace.Query.OnResult{New,NextPart,NextResultSet,Close}` events
* Added `trace.Query.OnRow{Scan,ScanNamed,ScanStruct}` events
  • Loading branch information
asmyasnikov committed Mar 17, 2024
1 parent e699ca1 commit 3400154
Show file tree
Hide file tree
Showing 19 changed files with 1,130 additions and 139 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
* Added `trace.Query.OnSessionBegin` event
* Added `trace.Query.OnResult{New,NextPart,NextResultSet,Close}` events
* Added `trace.Query.OnRow{Scan,ScanNamed,ScanStruct}` events

## v3.58.1
* Dropped all deprecated callbacks and events from traces
* Added `trace.Driver.OnConnStream{SendMsg,RecvMsg,CloseSend}` events
Expand Down
2 changes: 1 addition & 1 deletion internal/query/execute_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func execute(ctx context.Context, s *Session, c Ydb_Query_V1.QueryServiceClient,
return nil, nil, xerrors.WithStackTrace(err)
}

r, txID, err := newResult(ctx, stream)
r, txID, err := newResult(ctx, stream, s.trace)
if err != nil {
return nil, nil, xerrors.WithStackTrace(err)
}
Expand Down
44 changes: 22 additions & 22 deletions internal/query/execute_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,37 +369,37 @@ func TestExecute(t *testing.T) {
require.EqualValues(t, 0, rs.index)
{
t.Log("next (row=1)")
_, err := rs.next(ctx)
_, err := rs.nextRow(ctx)
require.NoError(t, err)
require.EqualValues(t, 0, rs.rowIndex)
}
{
t.Log("next (row=2)")
_, err := rs.next(ctx)
_, err := rs.nextRow(ctx)
require.NoError(t, err)
require.EqualValues(t, 1, rs.rowIndex)
}
{
t.Log("next (row=3)")
_, err := rs.next(ctx)
_, err := rs.nextRow(ctx)
require.NoError(t, err)
require.EqualValues(t, 2, rs.rowIndex)
}
{
t.Log("next (row=4)")
_, err := rs.next(ctx)
_, err := rs.nextRow(ctx)
require.NoError(t, err)
require.EqualValues(t, 0, rs.rowIndex)
}
{
t.Log("next (row=5)")
_, err := rs.next(ctx)
_, err := rs.nextRow(ctx)
require.NoError(t, err)
require.EqualValues(t, 1, rs.rowIndex)
}
{
t.Log("next (row=6)")
_, err := rs.next(ctx)
_, err := rs.nextRow(ctx)
require.ErrorIs(t, err, io.EOF)
}
}
Expand All @@ -416,37 +416,37 @@ func TestExecute(t *testing.T) {
require.EqualValues(t, 2, rs.index)
{
t.Log("next (row=1)")
_, err := rs.next(ctx)
_, err := rs.nextRow(ctx)
require.NoError(t, err)
require.EqualValues(t, 0, rs.rowIndex)
}
{
t.Log("next (row=2)")
_, err := rs.next(ctx)
_, err := rs.nextRow(ctx)
require.NoError(t, err)
require.EqualValues(t, 1, rs.rowIndex)
}
{
t.Log("next (row=3)")
_, err := rs.next(ctx)
_, err := rs.nextRow(ctx)
require.NoError(t, err)
require.EqualValues(t, 0, rs.rowIndex)
}
{
t.Log("next (row=4)")
_, err := rs.next(ctx)
_, err := rs.nextRow(ctx)
require.NoError(t, err)
require.EqualValues(t, 1, rs.rowIndex)
}
{
t.Log("next (row=5)")
_, err := rs.next(ctx)
_, err := rs.nextRow(ctx)
require.NoError(t, err)
require.EqualValues(t, 2, rs.rowIndex)
}
{
t.Log("next (row=6)")
_, err := rs.next(ctx)
_, err := rs.nextRow(ctx)
require.ErrorIs(t, err, io.EOF)
}
}
Expand Down Expand Up @@ -586,37 +586,37 @@ func TestExecute(t *testing.T) {
require.EqualValues(t, 0, rs.index)
{
t.Log("next (row=1)")
_, err := rs.next(ctx)
_, err := rs.nextRow(ctx)
require.NoError(t, err)
require.EqualValues(t, 0, rs.rowIndex)
}
{
t.Log("next (row=2)")
_, err := rs.next(ctx)
_, err := rs.nextRow(ctx)
require.NoError(t, err)
require.EqualValues(t, 1, rs.rowIndex)
}
{
t.Log("next (row=3)")
_, err := rs.next(ctx)
_, err := rs.nextRow(ctx)
require.NoError(t, err)
require.EqualValues(t, 2, rs.rowIndex)
}
{
t.Log("next (row=4)")
_, err := rs.next(ctx)
_, err := rs.nextRow(ctx)
require.NoError(t, err)
require.EqualValues(t, 0, rs.rowIndex)
}
{
t.Log("next (row=5)")
_, err := rs.next(ctx)
_, err := rs.nextRow(ctx)
require.NoError(t, err)
require.EqualValues(t, 1, rs.rowIndex)
}
{
t.Log("next (row=6)")
_, err := rs.next(ctx)
_, err := rs.nextRow(ctx)
require.Error(t, err)
require.True(t, xerrors.IsTransportError(err, grpcCodes.Unavailable))
}
Expand Down Expand Up @@ -726,25 +726,25 @@ func TestExecute(t *testing.T) {
require.EqualValues(t, 0, rs.index)
{
t.Log("next (row=1)")
_, err := rs.next(ctx)
_, err := rs.nextRow(ctx)
require.NoError(t, err)
require.EqualValues(t, 0, rs.rowIndex)
}
{
t.Log("next (row=2)")
_, err := rs.next(ctx)
_, err := rs.nextRow(ctx)
require.NoError(t, err)
require.EqualValues(t, 1, rs.rowIndex)
}
{
t.Log("next (row=3)")
_, err := rs.next(ctx)
_, err := rs.nextRow(ctx)
require.NoError(t, err)
require.EqualValues(t, 2, rs.rowIndex)
}
{
t.Log("next (row=4)")
_, err := rs.next(ctx)
_, err := rs.nextRow(ctx)
require.Error(t, err)
require.True(t, xerrors.IsOperationError(err, Ydb.StatusIds_UNAVAILABLE))
}
Expand Down
58 changes: 46 additions & 12 deletions internal/query/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,44 +4,58 @@ import (
"context"
"fmt"
"io"
"sync"

"github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync"
"github.com/ydb-platform/ydb-go-sdk/v3/query"
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
)

var _ query.Result = (*result)(nil)

type result struct {
stream Ydb_Query_V1.QueryService_ExecuteQueryClient
closeOnce func()
closeOnce func(ctx context.Context) error
lastPart *Ydb_Query.ExecuteQueryResponsePart
resultSetIndex int64
errs []error
closed chan struct{}
trace *trace.Query
}

func newResult(
ctx context.Context,
stream Ydb_Query_V1.QueryService_ExecuteQueryClient,
t *trace.Query,
) (_ *result, txID string, err error) {
if t == nil {
t = &trace.Query{}
}

onDone := trace.QueryOnResultNew(t, &ctx, stack.FunctionID(""))
defer func() {
onDone(err)
}()

select {
case <-ctx.Done():
return nil, txID, xerrors.WithStackTrace(ctx.Err())
default:
part, err := nextPart(stream)
part, err := nextPart(ctx, stream, t)
if err != nil {
return nil, txID, xerrors.WithStackTrace(err)
}
var (
interrupted = make(chan struct{})
closed = make(chan struct{})
closeOnce = sync.OnceFunc(func() {
closeOnce = xsync.OnceFunc(func(ctx context.Context) error {
close(interrupted)
close(closed)
return nil

Check failure on line 58 in internal/query/result.go

View workflow job for this annotation

GitHub Actions / golangci-lint

return with no blank line before (nlreturn)
})
)

Expand All @@ -51,13 +65,25 @@ func newResult(
lastPart: part,
closed: closed,
closeOnce: closeOnce,
trace: t,
}, part.GetTxMeta().GetId(), nil
}
}

func nextPart(
ctx context.Context,
stream Ydb_Query_V1.QueryService_ExecuteQueryClient,
t *trace.Query,
) (_ *Ydb_Query.ExecuteQueryResponsePart, finalErr error) {
if t == nil {
t = &trace.Query{}
}

onDone := trace.QueryOnResultNextPart(t, &ctx, stack.FunctionID(""))
defer func() {
onDone(finalErr)
}()

part, err := stream.Recv()
if err != nil {
return nil, xerrors.WithStackTrace(err)
Expand All @@ -66,10 +92,13 @@ func nextPart(
return part, nil
}

func (r *result) Close(ctx context.Context) error {
r.closeOnce()
func (r *result) Close(ctx context.Context) (err error) {
onDone := trace.QueryOnResultClose(r.trace, &ctx, stack.FunctionID(""))
defer func() {
onDone(err)
}()

return nil
return r.closeOnce(ctx)
}

func (r *result) nextResultSet(ctx context.Context) (_ *resultSet, err error) {
Expand Down Expand Up @@ -103,10 +132,10 @@ func (r *result) nextResultSet(ctx context.Context) (_ *resultSet, err error) {
case <-r.closed:
return nil, errClosedResult
default:
part, err := nextPart(r.stream)
part, err := nextPart(ctx, r.stream, r.trace)
if err != nil {
if xerrors.Is(err, io.EOF) {
r.closeOnce()
_ = r.closeOnce(ctx)
}

return nil, xerrors.WithStackTrace(err)
Expand All @@ -121,9 +150,9 @@ func (r *result) nextResultSet(ctx context.Context) (_ *resultSet, err error) {

return part, nil
}
}, r.lastPart), nil
}, r.lastPart, r.trace), nil
}
part, err := nextPart(r.stream)
part, err := nextPart(ctx, r.stream, r.trace)
if err != nil {
return nil, xerrors.WithStackTrace(err)
}
Expand All @@ -139,7 +168,12 @@ func (r *result) nextResultSet(ctx context.Context) (_ *resultSet, err error) {
}
}

func (r *result) NextResultSet(ctx context.Context) (query.ResultSet, error) {
func (r *result) NextResultSet(ctx context.Context) (_ query.ResultSet, err error) {
onDone := trace.QueryOnResultNextResultSet(r.trace, &ctx, stack.FunctionID(""))
defer func() {
onDone(err)
}()

return r.nextResultSet(ctx)
}

Expand Down
21 changes: 17 additions & 4 deletions internal/query/result_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
"github.com/ydb-platform/ydb-go-sdk/v3/query"
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
)

var _ query.ResultSet = (*resultSet)(nil)
Expand All @@ -20,6 +22,7 @@ type resultSet struct {
columns []*Ydb.Column
currentPart *Ydb_Query.ExecuteQueryResponsePart
rowIndex int
trace *trace.Query
done chan struct{}
}

Expand All @@ -28,18 +31,23 @@ func newResultSet(
*Ydb_Query.ExecuteQueryResponsePart, error,
),
part *Ydb_Query.ExecuteQueryResponsePart,
t *trace.Query,
) *resultSet {
if t == nil {
t = &trace.Query{}
}
return &resultSet{

Check failure on line 39 in internal/query/result_set.go

View workflow job for this annotation

GitHub Actions / golangci-lint

return with no blank line before (nlreturn)
index: part.GetResultSetIndex(),
recv: recv,
currentPart: part,
rowIndex: -1,
columns: part.GetResultSet().GetColumns(),
trace: t,
done: make(chan struct{}),
}
}

func (rs *resultSet) next(ctx context.Context) (*row, error) {
func (rs *resultSet) nextRow(ctx context.Context) (*row, error) {
rs.rowIndex++
select {
case <-rs.done:
Expand Down Expand Up @@ -73,10 +81,15 @@ func (rs *resultSet) next(ctx context.Context) (*row, error) {
))
}

return newRow(rs.columns, rs.currentPart.GetResultSet().GetRows()[rs.rowIndex])
return newRow(ctx, rs.columns, rs.currentPart.GetResultSet().GetRows()[rs.rowIndex], rs.trace)
}
}

func (rs *resultSet) NextRow(ctx context.Context) (query.Row, error) {
return rs.next(ctx)
func (rs *resultSet) NextRow(ctx context.Context) (_ query.Row, err error) {
onDone := trace.QueryOnResultSetNextRow(rs.trace, &ctx, stack.FunctionID(""))
defer func() {
onDone(err)
}()

return rs.nextRow(ctx)
}
Loading

0 comments on commit 3400154

Please sign in to comment.