Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/1607-bad-session-on-error' into …
Browse files Browse the repository at this point in the history
…1607-bad-session-on-error
  • Loading branch information
rekby committed Jan 15, 2025
2 parents 8fd4317 + 7626cb5 commit 4514e46
Show file tree
Hide file tree
Showing 6 changed files with 303 additions and 8 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
* Fixed drop session from pool unnecessary in query service

## v3.96.0
* Supported of list, set and struct for unmarshall using `sugar.Unmarshall...`

## v3.95.6
* Fixed panic on span reporting in `xsql/Tx`

Expand Down
75 changes: 75 additions & 0 deletions internal/value/value.go
Original file line number Diff line number Diff line change
Expand Up @@ -1284,6 +1284,33 @@ func (v *listValue) castTo(dst any) error {
case *driver.Value:
*dstValue = v

return nil
case interface{}:
ptr := reflect.ValueOf(dstValue)

inner := reflect.Indirect(ptr)
if inner.Kind() != reflect.Slice && inner.Kind() != reflect.Array {
return xerrors.WithStackTrace(fmt.Errorf(
"%w '%s(%+v)' to '%T' destination",
ErrCannotCast, v.Type().Yql(), v, dstValue,
))
}

targetType := inner.Type().Elem()
valueInner := reflect.ValueOf(v.ListItems())

newSlice := reflect.MakeSlice(reflect.SliceOf(targetType), valueInner.Len(), valueInner.Cap())
inner.Set(newSlice)

for i, item := range v.ListItems() {
if err := item.castTo(inner.Index(i).Addr().Interface()); err != nil {
return xerrors.WithStackTrace(fmt.Errorf(
"%w '%s(%+v)' to '%T' destination",
ErrCannotCast, v.Type().Yql(), v, dstValue,
))
}
}

return nil
default:
return xerrors.WithStackTrace(fmt.Errorf(
Expand Down Expand Up @@ -1391,6 +1418,33 @@ func (v *setValue) castTo(dst any) error {
case *driver.Value:
*dstValue = v

return nil
case interface{}:
ptr := reflect.ValueOf(dstValue)

inner := reflect.Indirect(ptr)
if inner.Kind() != reflect.Slice && inner.Kind() != reflect.Array {
return xerrors.WithStackTrace(fmt.Errorf(
"%w '%s(%+v)' to '%T' destination",
ErrCannotCast, v.Type().Yql(), v, dstValue,
))
}

targetType := inner.Type().Elem()
valueInner := reflect.ValueOf(v.items)

newSlice := reflect.MakeSlice(reflect.SliceOf(targetType), valueInner.Len(), valueInner.Cap())
inner.Set(newSlice)

for i, item := range v.items {
if err := item.castTo(inner.Index(i).Addr().Interface()); err != nil {
return xerrors.WithStackTrace(fmt.Errorf(
"%w '%s(%+v)' to '%T' destination",
ErrCannotCast, v.Type().Yql(), v, dstValue,
))
}
}

return nil
default:
return xerrors.WithStackTrace(fmt.Errorf(
Expand Down Expand Up @@ -1574,6 +1628,27 @@ func (v *structValue) castTo(dst any) error {
case *driver.Value:
*dstValue = v

return nil
case interface{}:
ptr := reflect.ValueOf(dst)

inner := reflect.Indirect(ptr)
if inner.Kind() != reflect.Struct {
return xerrors.WithStackTrace(fmt.Errorf(
"%w '%s(%+v)' to '%T' destination",
ErrCannotCast, v.Type().Yql(), v, dstValue,
))
}

for i, field := range v.fields {
if err := field.V.castTo(inner.Field(i).Addr().Interface()); err != nil {
return xerrors.WithStackTrace(fmt.Errorf(
"scan error on struct field name '%s': %w",
field.Name, err,
))
}
}

return nil
default:
return xerrors.WithStackTrace(fmt.Errorf(
Expand Down
139 changes: 139 additions & 0 deletions internal/value/value_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1142,6 +1142,145 @@ func TestCastNumbers(t *testing.T) {
}
}

func TestCastList(t *testing.T) {
for _, tt := range []struct {
v Value
dst interface{}
result interface{}
error bool
}{
{
v: ListValue(Int32Value(12), Int32Value(21), Int32Value(56)),
dst: func(v []int64) *[]int64 { return &v }([]int64{}),
result: func(v []int64) *[]int64 { return &v }([]int64{12, 21, 56}),
error: false,
},
{
v: ListValue(Int32Value(12), Int32Value(21), Int32Value(56)),
dst: func(v []int64) *[]int64 { return &v }([]int64{17}),
result: func(v []int64) *[]int64 { return &v }([]int64{12, 21, 56}),
error: false,
},
{
v: ListValue(BytesValue([]byte("test")), BytesValue([]byte("test2"))),
dst: func(v []string) *[]string { return &v }([]string{}),
result: func(v []string) *[]string { return &v }([]string{"test", "test2"}),
error: false,
},
{
v: ListValue(BytesValue([]byte("test")), BytesValue([]byte("test2"))),
dst: func(v []string) *[]string { return &v }([]string{"list"}),
result: func(v []string) *[]string { return &v }([]string{"test", "test2"}),
error: false,
},
} {
t.Run(fmt.Sprintf("%s→%v", tt.v.Type().Yql(), reflect.ValueOf(tt.dst).Type().Elem()),
func(t *testing.T) {
if err := CastTo(tt.v, tt.dst); (err != nil) != tt.error {
t.Errorf("castTo() error = %v, want %v", err, tt.error)
} else if !reflect.DeepEqual(tt.dst, tt.result) {
t.Errorf("castTo() result = %+v, want %+v",
reflect.ValueOf(tt.dst).Elem(),
reflect.ValueOf(tt.result).Elem(),
)
}
},
)
}
}

func TestCastSet(t *testing.T) {
for _, tt := range []struct {
v Value
dst interface{}
result interface{}
error bool
}{
{
v: SetValue(Int32Value(12), Int32Value(21), Int32Value(56)),
dst: func(v []int64) *[]int64 { return &v }([]int64{}),
result: func(v []int64) *[]int64 { return &v }([]int64{12, 21, 56}),
error: false,
},
{
v: SetValue(Int32Value(12), Int32Value(21), Int32Value(56)),
dst: func(v []int64) *[]int64 { return &v }([]int64{17}),
result: func(v []int64) *[]int64 { return &v }([]int64{12, 21, 56}),
error: false,
},
{
v: SetValue(BytesValue([]byte("test")), BytesValue([]byte("test2"))),
dst: func(v []string) *[]string { return &v }([]string{}),
result: func(v []string) *[]string { return &v }([]string{"test", "test2"}),
error: false,
},
{
v: SetValue(BytesValue([]byte("test")), BytesValue([]byte("test2"))),
dst: func(v []string) *[]string { return &v }([]string{"list"}),
result: func(v []string) *[]string { return &v }([]string{"test", "test2"}),
error: false,
},
} {
t.Run(fmt.Sprintf("%s→%v", tt.v.Type().Yql(), reflect.ValueOf(tt.dst).Type().Elem()),
func(t *testing.T) {
if err := CastTo(tt.v, tt.dst); (err != nil) != tt.error {
t.Errorf("castTo() error = %v, want %v", err, tt.error)
} else if !reflect.DeepEqual(tt.dst, tt.result) {
t.Errorf("castTo() result = %+v, want %+v",
reflect.ValueOf(tt.dst).Elem(),
reflect.ValueOf(tt.result).Elem(),
)
}
},
)
}
}

func TestCastStruct(t *testing.T) {
type defaultStruct struct {
ID int32 `sql:"id"`
Str string `sql:"myStr"`
}
for _, tt := range []struct {
v Value
dst interface{}
result interface{}
error bool
}{
{
v: StructValue(
StructValueField{Name: "id", V: Int32Value(123)},
StructValueField{Name: "myStr", V: BytesValue([]byte("myStr123"))},
),
dst: func(v defaultStruct) *defaultStruct { return &v }(defaultStruct{1, "myStr1"}),
result: func(v defaultStruct) *defaultStruct { return &v }(defaultStruct{123, "myStr123"}),
error: false,
},
{
v: StructValue(
StructValueField{Name: "id", V: Int32Value(12)},
StructValueField{Name: "myStr", V: BytesValue([]byte("myStr12"))},
),
dst: func(v defaultStruct) *defaultStruct { return &v }(defaultStruct{}),
result: func(v defaultStruct) *defaultStruct { return &v }(defaultStruct{12, "myStr12"}),
error: false,
},
} {
t.Run(fmt.Sprintf("%s→%v", tt.v.Type().Yql(), reflect.ValueOf(tt.dst).Type().Elem()),
func(t *testing.T) {
if err := CastTo(tt.v, tt.dst); (err != nil) != tt.error {
t.Errorf("castTo() error = %v, want %v", err, tt.error)
} else if !reflect.DeepEqual(tt.dst, tt.result) {
t.Errorf("castTo() result = %+v, want %+v",
reflect.ValueOf(tt.dst).Elem(),
reflect.ValueOf(tt.result).Elem(),
)
}
},
)
}
}

func TestCastOtherTypes(t *testing.T) {
for _, tt := range []struct {
v Value
Expand Down
4 changes: 2 additions & 2 deletions internal/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package version

const (
Major = "3"
Minor = "95"
Patch = "6"
Minor = "96"
Patch = "0"

Package = "ydb-go-sdk"
)
Expand Down
77 changes: 77 additions & 0 deletions tests/integration/sugar_unmarhall_result_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,83 @@ func TestSugarUnmarshallResultSet(t *testing.T) {
require.EqualValues(t, 43, many[1].ID)
require.EqualValues(t, "myStr43", many[1].Str)
})
t.Run("ListField", func(t *testing.T) {
type myStruct struct {
Ids []int32 `sql:"ids"`
}

rows, err := db.Query().QueryResultSet(ctx, `SELECT AsList(42, 43) as ids`)

many, err := sugar.UnmarshallResultSet[myStruct](rows)
require.NoError(t, err)
require.Len(t, many, 1)
require.NotNil(t, many[0])
require.Len(t, many[0].Ids, 2)
require.EqualValues(t, 42, many[0].Ids[0])
require.EqualValues(t, 43, many[0].Ids[1])
})
t.Run("SetField", func(t *testing.T) {
type myStruct struct {
Ids []int32 `sql:"ids"`
}

rows, err := db.Query().QueryResultSet(ctx, `SELECT AsSet(42, 43) as ids`)

many, err := sugar.UnmarshallResultSet[myStruct](rows)
require.NoError(t, err)
require.Len(t, many, 1)
require.NotNil(t, many[0])
require.Len(t, many[0].Ids, 2)
require.EqualValues(t, 42, many[0].Ids[0])
require.EqualValues(t, 43, many[0].Ids[1])
})
t.Run("StructField", func(t *testing.T) {
type myStructField struct {
ID int32 `sql:"id"`
Str string `sql:"myStr"`
}
type myStruct struct {
ID int32 `sql:"id"`
Str string `sql:"myStr"`
StructField myStructField `sql:"structColumn"`
}

rows, err := db.Query().QueryResultSet(ctx, `
SELECT 42 as id, "myStr42" as myStr, AsStruct(22 as id, "myStr22" as myStr) as structColumn
`)

many, err := sugar.UnmarshallResultSet[myStruct](rows)
require.NoError(t, err)
require.Len(t, many, 1)
require.NotNil(t, many[0])
require.EqualValues(t, 42, many[0].ID)
require.EqualValues(t, "myStr42", many[0].Str)
require.EqualValues(t, 22, many[0].StructField.ID)
require.EqualValues(t, "myStr22", many[0].StructField.Str)
})
t.Run("ListOfStructsField", func(t *testing.T) {
type myStructField struct {
ID int32 `sql:"id"`
Str string `sql:"myStr"`
}
type myStruct struct {
Values []myStructField `sql:"values"`
}

rows, err := db.Query().QueryResultSet(ctx,
`SELECT AsList(AsStruct(22 as id, "myStr22" as myStr), AsStruct(42 as id, "myStr42" as myStr)) as values`,
)

many, err := sugar.UnmarshallResultSet[myStruct](rows)
require.NoError(t, err)
require.Len(t, many, 1)
require.NotNil(t, many[0])
require.Len(t, many[0].Values, 2)
require.EqualValues(t, 22, many[0].Values[0].ID)
require.EqualValues(t, "myStr22", many[0].Values[0].Str)
require.EqualValues(t, 42, many[0].Values[1].ID)
require.EqualValues(t, "myStr42", many[0].Values[1].Str)
})
t.Run("UnexpectedColumn", func(t *testing.T) {
type myStruct struct {
ID int32 `sql:"id"`
Expand Down
13 changes: 7 additions & 6 deletions topic/topicreader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ type MessageContentUnmarshaler = topicreadercommon.PublicMessageContentUnmarshal

// Commit receive Message, Batch of single offset
// It can be fast (by default) or sync and waite response from server
// see topicoptions.CommitMode for details
//
// for topicoptions.CommitModeSync mode sync the method can return ErrCommitToExpiredSession
// see topicoptions.CommitMode for details.
// Fast mode of commit (default) - store commit info to internal buffer only and send it to the server later.
// Close the reader for wait to send all commits to the server.
// For topicoptions.CommitModeSync mode sync the method can return ErrCommitToExpiredSession
// it means about the message/batch was not committed because connection broken or partition routed to
// other reader by server.
// Client code should continue work normally
Expand Down Expand Up @@ -160,9 +161,9 @@ type Batch = topicreadercommon.PublicBatch
// ReadBatchOption is type for options of read batch
type ReadBatchOption = topicreaderinternal.PublicReadBatchOption

// Close stop work with reader
// return when reader complete internal works, flush commit buffer, ets
// or when ctx cancelled
// Close stop work with reader.
// return when reader complete internal works, flush commit buffer. You should close the Reader after use and before
// exit from a program for prevent lost last commits.
func (r *Reader) Close(ctx context.Context) error {
// close must be non-concurrent with read and commit

Expand Down

0 comments on commit 4514e46

Please sign in to comment.