Skip to content

Commit

Permalink
Remove BulkUpsertData implementations from public
Browse files Browse the repository at this point in the history
  • Loading branch information
UgnineSirdis committed Oct 4, 2024
1 parent 5e4f7d6 commit 5700cf7
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 57 deletions.
26 changes: 16 additions & 10 deletions table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,17 +596,23 @@ type BulkUpsertData interface {
ApplyBulkUpsertRequest(a *allocator.Allocator, req *BulkUpsertRequest) error
}

type BulkUpsertRows struct {
type bulkUpsertRows struct {
Rows value.Value
}

func (data BulkUpsertRows) ApplyBulkUpsertRequest(a *allocator.Allocator, req *BulkUpsertRequest) error {
func (data bulkUpsertRows) ApplyBulkUpsertRequest(a *allocator.Allocator, req *BulkUpsertRequest) error {
req.Rows = value.ToYDB(data.Rows, a)

return nil
}

type BulkUpsertCsv struct {
func NewBulkUpsertRows(rows value.Value) bulkUpsertRows {
return bulkUpsertRows{
Rows: rows,
}
}

type bulkUpsertCsv struct {
Data []byte
Options []CsvFormatOption
}
Expand All @@ -615,7 +621,7 @@ type CsvFormatOption interface {
ApplyCsvFormatOption(req *BulkUpsertRequest) (err error)
}

func (data BulkUpsertCsv) ApplyBulkUpsertRequest(a *allocator.Allocator, req *BulkUpsertRequest) error {
func (data bulkUpsertCsv) ApplyBulkUpsertRequest(a *allocator.Allocator, req *BulkUpsertRequest) error {
req.Data = data.Data

var err error
Expand All @@ -631,8 +637,8 @@ func (data BulkUpsertCsv) ApplyBulkUpsertRequest(a *allocator.Allocator, req *Bu
return err
}

func NewBulkUpsertCsv(data []byte, opts ...CsvFormatOption) BulkUpsertCsv {
return BulkUpsertCsv{
func NewBulkUpsertCsv(data []byte, opts ...CsvFormatOption) bulkUpsertCsv {
return bulkUpsertCsv{
Data: data,
Options: opts,
}
Expand Down Expand Up @@ -717,7 +723,7 @@ func WithCsvSkipRows(count uint32) CsvFormatOption {
return &csvSkipRowsOption{count}
}

type BulkUpsertArrow struct {
type bulkUpsertArrow struct {
Data []byte
Options []ArrowFormatOption
}
Expand All @@ -726,7 +732,7 @@ type ArrowFormatOption interface {
ApplyArrowFormatOption(req *BulkUpsertRequest) (err error)
}

func (data BulkUpsertArrow) ApplyBulkUpsertRequest(a *allocator.Allocator, req *BulkUpsertRequest) error {
func (data bulkUpsertArrow) ApplyBulkUpsertRequest(a *allocator.Allocator, req *BulkUpsertRequest) error {
req.Data = data.Data

var err error
Expand All @@ -742,8 +748,8 @@ func (data BulkUpsertArrow) ApplyBulkUpsertRequest(a *allocator.Allocator, req *
return err
}

func NewBulkUpsertArrow(data []byte, opts ...ArrowFormatOption) BulkUpsertArrow {
return BulkUpsertArrow{
func NewBulkUpsertArrow(data []byte, opts ...ArrowFormatOption) bulkUpsertArrow {
return bulkUpsertArrow{
Data: data,
Options: opts,
}
Expand Down
128 changes: 81 additions & 47 deletions tests/integration/table_bulk_upsert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/ydb-platform/ydb-go-sdk/v3/table/types"
)

func TestTableBulkUpsert(t *testing.T) {
func TestTableBulkUpsertSession(t *testing.T) {
var (
scope = newScope(t)
driver = scope.Driver()
Expand All @@ -39,58 +39,40 @@ func TestTableBulkUpsert(t *testing.T) {
return s.BulkUpsert(ctx, tablePath, types.ListValue(rows...))
})
scope.Require.NoError(err)
}

func assertIdValueImpl(ctx context.Context, t *testing.T, tableName string, id int64, val *string) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for i := int64(0); i < 10; i++ {
val := fmt.Sprintf("value for %v", i)
assertIdValue(scope.Ctx, t, tablePath, i, val)
}
}

db, err := ydb.Open(ctx,
os.Getenv("YDB_CONNECTION_STRING"),
// ydb.WithAccessTokenCredentials(os.Getenv("YDB_ACCESS_TOKEN_CREDENTIALS")),
func TestTableBulkUpsert(t *testing.T) {
var (
scope = newScope(t)
driver = scope.Driver()
tablePath = scope.TablePath()
)
require.NoError(t, err)
err = db.Table().DoTx(ctx, func(ctx context.Context, tx table.TransactionActor) (err error) {
res, err := tx.Execute(ctx, fmt.Sprintf("SELECT val FROM `%s` WHERE id = %d", tableName, id), nil)
if err != nil {
return err
}
err = res.NextResultSetErr(ctx)
if err != nil {
return err
}
require.EqualValues(t, 1, res.ResultSetCount())
if !res.NextRow() {
if err = res.Err(); err != nil {
return err
}
return fmt.Errorf("unexpected empty result set")
}
var resultVal *string
err = res.ScanNamed(
named.Optional("val", &resultVal),
)
if err != nil {
return err
}
if val != nil {
require.NotEmpty(t, resultVal)
require.EqualValues(t, *val, *resultVal)
} else {
require.Nil(t, resultVal)
}

return res.Err()
}, table.WithTxSettings(table.TxSettings(table.WithSnapshotReadOnly())), table.WithIdempotent())
require.NoError(t, err)
}
// upsert
var rows []types.Value

func assertIdValue(ctx context.Context, t *testing.T, tableName string, id int64, val string) {
assertIdValueImpl(ctx, t, tableName, id, &val)
}
for i := int64(0); i < 10; i++ {
val := fmt.Sprintf("value for %v", i)
rows = append(rows, types.StructValue(
types.StructFieldValue("id", types.Int64Value(i)),
types.StructFieldValue("val", types.TextValue(val)),
))
}

func assertIdValueNil(ctx context.Context, t *testing.T, tableName string, id int64) {
assertIdValueImpl(ctx, t, tableName, id, nil)
err := driver.Table().BulkUpsert(scope.Ctx, tablePath, table.NewBulkUpsertRows(
types.ListValue(rows...),
))
scope.Require.NoError(err)

for i := int64(0); i < 10; i++ {
val := fmt.Sprintf("value for %v", i)
assertIdValue(scope.Ctx, t, tablePath, i, val)
}
}

func TestTableCsvBulkUpsert(t *testing.T) {
Expand Down Expand Up @@ -209,3 +191,55 @@ func TestTableArrowBulkUpsert(t *testing.T) {
assertIdValue(scope.Ctx, t, tablePath, 123, "data1")
assertIdValue(scope.Ctx, t, tablePath, 234, "data2")
}

func assertIdValueImpl(ctx context.Context, t *testing.T, tableName string, id int64, val *string) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

db, err := ydb.Open(ctx,
os.Getenv("YDB_CONNECTION_STRING"),
// ydb.WithAccessTokenCredentials(os.Getenv("YDB_ACCESS_TOKEN_CREDENTIALS")),
)
require.NoError(t, err)
err = db.Table().DoTx(ctx, func(ctx context.Context, tx table.TransactionActor) (err error) {
res, err := tx.Execute(ctx, fmt.Sprintf("SELECT val FROM `%s` WHERE id = %d", tableName, id), nil)
if err != nil {
return err
}
err = res.NextResultSetErr(ctx)
if err != nil {
return err
}
require.EqualValues(t, 1, res.ResultSetCount())
if !res.NextRow() {
if err = res.Err(); err != nil {
return err
}
return fmt.Errorf("unexpected empty result set")
}
var resultVal *string
err = res.ScanNamed(
named.Optional("val", &resultVal),
)
if err != nil {
return err
}
if val != nil {
require.NotEmpty(t, resultVal)
require.EqualValues(t, *val, *resultVal)
} else {
require.Nil(t, resultVal)
}

return res.Err()
}, table.WithTxSettings(table.TxSettings(table.WithSnapshotReadOnly())), table.WithIdempotent())
require.NoError(t, err)
}

func assertIdValue(ctx context.Context, t *testing.T, tableName string, id int64, val string) {
assertIdValueImpl(ctx, t, tableName, id, &val)
}

func assertIdValueNil(ctx context.Context, t *testing.T, tableName string, id int64) {
assertIdValueImpl(ctx, t, tableName, id, nil)
}

0 comments on commit 5700cf7

Please sign in to comment.