Skip to content

Commit

Permalink
feat(ydbcp): use query service instead of table service
Browse files Browse the repository at this point in the history
  • Loading branch information
ulya-sidorina committed Dec 23, 2024
1 parent 0b97117 commit 13ef21c
Show file tree
Hide file tree
Showing 3 changed files with 267 additions and 167 deletions.
134 changes: 91 additions & 43 deletions internal/connectors/db/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"io"
"time"
"ydbcp/internal/config"
"ydbcp/internal/connectors/db/yql/queries"
Expand All @@ -13,6 +14,7 @@ import (

"github.com/ydb-platform/ydb-go-sdk/v3"
"github.com/ydb-platform/ydb-go-sdk/v3/balancers"
"github.com/ydb-platform/ydb-go-sdk/v3/query"
"github.com/ydb-platform/ydb-go-sdk/v3/table"
"github.com/ydb-platform/ydb-go-sdk/v3/table/result"
table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types"
Expand All @@ -21,22 +23,22 @@ import (
)

var (
readTx = table.TxControl(
table.BeginTx(
table.WithOnlineReadOnly(),
readTx = query.TxControl(
query.BeginTx(
query.WithOnlineReadOnly(),
),
table.CommitTx(),
query.CommitTx(),
)
writeTx = table.TxControl(
table.BeginTx(
table.WithSerializableReadWrite(),
writeTx = query.TxControl(
query.BeginTx(
query.WithSerializableReadWrite(),
),
table.CommitTx(),
query.CommitTx(),
)
)

type DBConnector interface {
GetTableClient() table.Client
GetQueryClient() query.Client
SelectBackups(ctx context.Context, queryBuilder queries.ReadTableQuery) (
[]*types.Backup, error,
)
Expand Down Expand Up @@ -127,8 +129,8 @@ func NewYdbConnector(ctx context.Context, config config.YDBConnectionConfig) (*Y
return &YdbConnector{driver: driver}, nil
}

func (d *YdbConnector) GetTableClient() table.Client {
return d.driver.Table()
func (d *YdbConnector) GetQueryClient() query.Client {
return d.driver.Query()
}

func (d *YdbConnector) Close(ctx context.Context) {
Expand All @@ -151,31 +153,54 @@ func DoStructSelect[T any](
if err != nil {
return nil, err
}
err = d.GetTableClient().Do(
ctx, func(ctx context.Context, s table.Session) error {
err = d.GetQueryClient().Do(
ctx, func(ctx context.Context, s query.Session) error {
var (
res result.Result
res query.Result
)
_, res, err = s.Execute(
ctx,
readTx,
queryFormat.QueryText, queryFormat.QueryParams,
queryFormat.QueryText,
query.WithParameters(queryFormat.QueryParams),
query.WithTxControl(readTx),
)
if err != nil {
return err
}
defer func(res result.Result) {
err = res.Close()
defer func(res query.Result) {
err = res.Close(ctx)
if err != nil {
xlog.Error(ctx, "Error closing transaction result")
}
}(res) // result must be closed
if res.ResultSetCount() != 1 {
return errors.New("expected 1 result set")
}
for res.NextResultSet(ctx) {
for res.NextRow() {
entity, readErr := readLambda(res)

resultSetCount := 0
for {
resultSet, err := res.NextResultSet(ctx)
if err != nil {
if errors.Is(err, io.EOF) {
break
}

return err
}

resultSetCount++
if resultSetCount > 1 {
return errors.New("expected 1 result set")
}

for {
row, err := resultSet.NextRow(ctx)
if err != nil {
if errors.Is(err, io.EOF) {
break
}

return err
}

entity, readErr := readLambda(row)
if readErr != nil {
return readErr
}
Expand Down Expand Up @@ -205,31 +230,54 @@ func DoInterfaceSelect[T any](
if err != nil {
return nil, err
}
err = d.GetTableClient().Do(
ctx, func(ctx context.Context, s table.Session) error {
err = d.GetQueryClient().Do(
ctx, func(ctx context.Context, s query.Session) error {
var (
res result.Result
res query.Result
)
_, res, err = s.Execute(
ctx,
readTx,
queryFormat.QueryText, queryFormat.QueryParams,
queryFormat.QueryText,
query.WithParameters(queryFormat.QueryParams),
query.WithTxControl(readTx),
)
if err != nil {
return err
}
defer func(res result.Result) {
err = res.Close()
defer func(res query.Result) {
err = res.Close(ctx)
if err != nil {
xlog.Error(ctx, "Error closing transaction result")
}
}(res) // result must be closed
if res.ResultSetCount() != 1 {
return errors.New("expected 1 result set")
}
for res.NextResultSet(ctx) {
for res.NextRow() {
entity, readErr := readLambda(res)

resultSetCount := 0
for {
resultSet, err := res.NextResultSet(ctx)
if err != nil {
if errors.Is(err, io.EOF) {
break
}

return err
}

resultSetCount++
if resultSetCount > 1 {
return errors.New("expected 1 result set")
}

for {
row, err := resultSet.NextRow(ctx)
if err != nil {
if errors.Is(err, io.EOF) {
break
}

return err
}

entity, readErr := readLambda(row)
if readErr != nil {
return readErr
}
Expand All @@ -251,13 +299,13 @@ func (d *YdbConnector) ExecuteUpsert(ctx context.Context, queryBuilder queries.W
if err != nil {
return err
}
err = d.GetTableClient().Do(
ctx, func(ctx context.Context, s table.Session) (err error) {
err = d.GetQueryClient().Do(
ctx, func(ctx context.Context, s query.Session) (err error) {
_, _, err = s.Execute(
ctx,
writeTx,
queryFormat.QueryText,
queryFormat.QueryParams,
query.WithParameters(queryFormat.QueryParams),
query.WithTxControl(writeTx),
)
if err != nil {
return err
Expand Down Expand Up @@ -306,7 +354,7 @@ func (d *YdbConnector) SelectBackupSchedules(
ctx,
d,
queryBuilder,
func(res result.Result) (*types.BackupSchedule, error) {
func(res query.Row) (*types.BackupSchedule, error) {
return ReadBackupScheduleFromResultSet(res, false)
},
)
Expand All @@ -319,7 +367,7 @@ func (d *YdbConnector) SelectBackupSchedulesWithRPOInfo(
ctx,
d,
queryBuilder,
func(res result.Result) (*types.BackupSchedule, error) {
func(res query.Row) (*types.BackupSchedule, error) {
return ReadBackupScheduleFromResultSet(res, true)
},
)
Expand Down
4 changes: 2 additions & 2 deletions internal/connectors/db/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"ydbcp/internal/connectors/db/yql/queries"
"ydbcp/internal/types"

"github.com/ydb-platform/ydb-go-sdk/v3/table"
"github.com/ydb-platform/ydb-go-sdk/v3/query"
)

type MockDBConnector struct {
Expand Down Expand Up @@ -113,7 +113,7 @@ func (c *MockDBConnector) UpdateBackup(
}

func (c *MockDBConnector) Close(_ context.Context) {}
func (c *MockDBConnector) GetTableClient() table.Client {
func (c *MockDBConnector) GetQueryClient() query.Client {
return nil
}

Expand Down
Loading

0 comments on commit 13ef21c

Please sign in to comment.