Skip to content

Commit

Permalink
refactoring + fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
asmyasnikov committed Dec 11, 2024
1 parent 3f5638b commit 162f475
Show file tree
Hide file tree
Showing 14 changed files with 308 additions and 48 deletions.
1 change: 1 addition & 0 deletions internal/tx/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ var _ Identifier = LazyID{}

const (
LazyTxID = "LAZY_TX"
FakeTxID = "FAKE_TX"
)

type (
Expand Down
21 changes: 18 additions & 3 deletions internal/xsql/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ var (

type (
connWrapper struct {
cc conn.Conn
cc conn.Conn
currentTx *txWrapper

connector *Connector
lastUsage xsync.LastUsage
Expand All @@ -54,16 +55,22 @@ func (c *connWrapper) CheckNamedValue(value *driver.NamedValue) error {
}

func (c *connWrapper) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) {
if c.currentTx != nil {
return nil, xerrors.WithStackTrace(xerrors.AlreadyHasTx(c.currentTx.ID()))
}

tx, err := c.cc.BeginTx(ctx, opts)
if err != nil {
return nil, xerrors.WithStackTrace(err)
}

return &txWrapper{
c.currentTx = &txWrapper{
conn: c,
ctx: ctx,
tx: tx,
}, nil
}

return c.currentTx, nil
}

func (c *connWrapper) Close() error {
Expand Down Expand Up @@ -179,6 +186,10 @@ func (c *connWrapper) QueryContext(ctx context.Context, sql string, args []drive
return rowByAstPlan(ast, plan), nil
}

if c.currentTx != nil {
return c.currentTx.tx.Query(ctx, sql, params)
}

return c.cc.Query(ctx, sql, params)
}

Expand All @@ -191,6 +202,10 @@ func (c *connWrapper) ExecContext(ctx context.Context, sql string, args []driver
return nil, xerrors.WithStackTrace(err)
}

if c.currentTx != nil {
return c.currentTx.tx.Exec(ctx, sql, params)
}

return c.cc.Exec(ctx, sql, params)
}

Expand Down
5 changes: 5 additions & 0 deletions internal/xsql/conn/query/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Conn struct {
session *query.Session
onClose []func()
closed atomic.Bool
fakeTx bool
}

func (c *Conn) Exec(ctx context.Context, sql string, params *params.Params) (
Expand Down Expand Up @@ -114,6 +115,10 @@ func (c *Conn) isReady() bool {
}

func (c *Conn) beginTx(ctx context.Context, txOptions driver.TxOptions) (tx conn.Tx, finalErr error) {
if c.fakeTx {
return beginTxFake(ctx, c), nil
}

tx, err := beginTx(ctx, c, txOptions)
if err != nil {
return nil, xerrors.WithStackTrace(err)
Expand Down
6 changes: 6 additions & 0 deletions internal/xsql/conn/query/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,9 @@ func WithOnClose(onClose func()) Option {
c.onClose = append(c.onClose, onClose)
}
}

func WithFakeTx() Option {
return func(c *Conn) {
c.fakeTx = true
}
}
62 changes: 62 additions & 0 deletions internal/xsql/conn/query/tx_fake.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package query

import (
"context"
"database/sql/driver"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/params"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/tx"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsql/conn"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsql/conn/table/badconn"
)

type txFake struct {
conn *Conn
ctx context.Context //nolint:containedctx
}

func (t *txFake) Exec(ctx context.Context, sql string, params *params.Params) (driver.Result, error) {
result, err := t.conn.Exec(ctx, sql, params)
if err != nil {
return nil, xerrors.WithStackTrace(err)
}

return result, nil
}

func (t *txFake) Query(ctx context.Context, sql string, params *params.Params) (driver.RowsNextResultSet, error) {
rows, err := t.conn.Query(ctx, sql, params)
if err != nil {
return nil, xerrors.WithStackTrace(err)
}

return rows, nil
}

func (t *txFake) ID() string {
return tx.FakeTxID
}

func beginTxFake(ctx context.Context, c *Conn) conn.Tx {
return &txFake{
conn: c,
ctx: ctx,
}
}

func (t *txFake) Commit(ctx context.Context) (err error) {
if !t.conn.isReady() {
return badconn.Map(xerrors.WithStackTrace(errNotReadyConn))
}

return nil
}

func (t *txFake) Rollback(ctx context.Context) (err error) {
if !t.conn.isReady() {
return badconn.Map(xerrors.WithStackTrace(errNotReadyConn))
}

return err
}
1 change: 1 addition & 0 deletions internal/xsql/conn/table/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ var (
ErrUnsupported = driver.ErrSkip
errConnClosedEarly = xerrors.Retryable(errors.New("conn closed early"), xerrors.InvalidObject())
errNotReadyConn = xerrors.Retryable(errors.New("conn not ready"), xerrors.InvalidObject())
ErrWrongQueryMode = errors.New("wrong query mode")
)
10 changes: 1 addition & 9 deletions internal/xsql/conn/table/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,7 @@ func (tx *transaction) ID() string {
func (tx *transaction) Exec(ctx context.Context, sql string, params *params.Params) (driver.Result, error) {
m := queryModeFromContext(ctx, tx.conn.defaultQueryMode)
if m != DataQueryMode {
return nil, badconn.Map(
xerrors.WithStackTrace(
xerrors.Retryable(
fmt.Errorf("wrong query mode: %s", m.String()),
xerrors.InvalidObject(),
xerrors.WithName("WRONG_QUERY_MODE"),
),
),
)
return nil, xerrors.WithStackTrace(fmt.Errorf("%q: %w", m.String(), ErrWrongQueryMode))
}
_, err := tx.tx.Execute(ctx, sql, params, tx.conn.dataQueryOptions(ctx)...)
if err != nil {
Expand Down
27 changes: 14 additions & 13 deletions internal/xsql/conn/table/tx_fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,48 +12,49 @@ import (
)

type txFake struct {
tx.Identifier

conn *Conn
ctx context.Context //nolint:containedctx
}

func (tx *txFake) Exec(ctx context.Context, sql string, params *params.Params) (driver.Result, error) {
result, err := tx.conn.Exec(ctx, sql, params)
func (t *txFake) Exec(ctx context.Context, sql string, params *params.Params) (driver.Result, error) {
result, err := t.conn.Exec(ctx, sql, params)
if err != nil {
return nil, xerrors.WithStackTrace(err)
}

return result, nil
}

func (tx *txFake) Query(ctx context.Context, sql string, params *params.Params) (driver.RowsNextResultSet, error) {
rows, err := tx.conn.Query(ctx, sql, params)
func (t *txFake) Query(ctx context.Context, sql string, params *params.Params) (driver.RowsNextResultSet, error) {
rows, err := t.conn.Query(ctx, sql, params)
if err != nil {
return nil, xerrors.WithStackTrace(err)
}

return rows, nil
}

func (t *txFake) ID() string {
return tx.FakeTxID
}

func beginTxFake(ctx context.Context, c *Conn) conn.Tx {
return &txFake{
Identifier: tx.ID("FAKE"),
conn: c,
ctx: ctx,
conn: c,
ctx: ctx,
}
}

func (tx *txFake) Commit(ctx context.Context) (err error) {
if !tx.conn.isReady() {
func (t *txFake) Commit(ctx context.Context) (err error) {
if !t.conn.isReady() {
return badconn.Map(xerrors.WithStackTrace(errNotReadyConn))
}

return nil
}

func (tx *txFake) Rollback(ctx context.Context) (err error) {
if !tx.conn.isReady() {
func (t *txFake) Rollback(ctx context.Context) (err error) {
if !t.conn.isReady() {
return badconn.Map(xerrors.WithStackTrace(errNotReadyConn))
}

Expand Down
16 changes: 8 additions & 8 deletions internal/xsql/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
query2 "github.com/ydb-platform/ydb-go-sdk/v3/internal/xsql/conn/query"
table2 "github.com/ydb-platform/ydb-go-sdk/v3/internal/xsql/conn/table"
connOverQueryServiceClient "github.com/ydb-platform/ydb-go-sdk/v3/internal/xsql/conn/query"
connOverTableServiceClient "github.com/ydb-platform/ydb-go-sdk/v3/internal/xsql/conn/table"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync"
"github.com/ydb-platform/ydb-go-sdk/v3/retry/budget"
"github.com/ydb-platform/ydb-go-sdk/v3/scheme"
Expand All @@ -38,8 +38,8 @@ type (

queryProcessor queryProcessor

TableOpts []table2.Option
QueryOpts []query2.Option
TableOpts []connOverTableServiceClient.Option
QueryOpts []connOverQueryServiceClient.Option
disableServerBalancer bool
onCLose []func(*Connector)

Expand Down Expand Up @@ -122,9 +122,9 @@ func (c *Connector) Connect(ctx context.Context) (driver.Conn, error) {
id := uuid.New()

conn := &connWrapper{
cc: query2.New(ctx, c, s, append(
cc: connOverQueryServiceClient.New(ctx, c, s, append(
c.QueryOpts,
query2.WithOnClose(func() {
connOverQueryServiceClient.WithOnClose(func() {
c.conns.Delete(id)
}))...,
),
Expand All @@ -145,8 +145,8 @@ func (c *Connector) Connect(ctx context.Context) (driver.Conn, error) {
id := uuid.New()

conn := &connWrapper{
cc: table2.New(ctx, c, s, append(c.TableOpts,
table2.WithOnClose(func() {
cc: connOverTableServiceClient.New(ctx, c, s, append(c.TableOpts,
connOverTableServiceClient.WithOnClose(func() {
c.conns.Delete(id)
}))...,
),
Expand Down
2 changes: 1 addition & 1 deletion internal/xsql/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ var (
errDeprecated = driver.ErrSkip
errAlreadyClosed = errors.New("already closed")
errWrongQueryProcessor = errors.New("wrong query processor")
errNotReadyConn = xerrors.Retryable(errors.New("connWrapper not ready"), xerrors.InvalidObject())
errNotReadyConn = xerrors.Retryable(errors.New("conn not ready"), xerrors.InvalidObject())
)
16 changes: 16 additions & 0 deletions internal/xsql/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,22 @@ func WithIdleThreshold(idleThreshold time.Duration) Option {
}
}

type mergedOptions []Option

func (opts mergedOptions) Apply(c *Connector) error {
for _, opt := range opts {
if err := opt.Apply(c); err != nil {
return err
}
}

return nil
}

func Merge(opts ...Option) Option {
return mergedOptions(opts)
}

func WithTableOptions(opts ...table.Option) Option {
return tableQueryOptionsOption{
tableOps: opts,
Expand Down
8 changes: 8 additions & 0 deletions internal/xsql/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ var (
)

func (tx *txWrapper) Commit() (finalErr error) {
defer func() {
tx.conn.currentTx = nil
}()

var (
ctx = tx.ctx
onDone = trace.DatabaseSQLOnTxCommit(tx.conn.connector.Trace(), &ctx,
Expand All @@ -47,6 +51,10 @@ func (tx *txWrapper) Commit() (finalErr error) {
}

func (tx *txWrapper) Rollback() (finalErr error) {
defer func() {
tx.conn.currentTx = nil
}()

var (
ctx = tx.ctx
onDone = trace.DatabaseSQLOnTxRollback(tx.conn.connector.Trace(), &ctx,
Expand Down
Loading

0 comments on commit 162f475

Please sign in to comment.