diff --git a/CHANGELOG.md b/CHANGELOG.md index 73b84fa7d..d3f0f0832 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ * Fixed drop session from pool unnecessary in query service +## v3.96.0 +* Supported of list, set and struct for unmarshall using `sugar.Unmarshall...` + ## v3.95.6 * Fixed panic on span reporting in `xsql/Tx` diff --git a/internal/value/value.go b/internal/value/value.go index bd2683d50..0b8331874 100644 --- a/internal/value/value.go +++ b/internal/value/value.go @@ -1284,6 +1284,33 @@ func (v *listValue) castTo(dst any) error { case *driver.Value: *dstValue = v + return nil + case interface{}: + ptr := reflect.ValueOf(dstValue) + + inner := reflect.Indirect(ptr) + if inner.Kind() != reflect.Slice && inner.Kind() != reflect.Array { + return xerrors.WithStackTrace(fmt.Errorf( + "%w '%s(%+v)' to '%T' destination", + ErrCannotCast, v.Type().Yql(), v, dstValue, + )) + } + + targetType := inner.Type().Elem() + valueInner := reflect.ValueOf(v.ListItems()) + + newSlice := reflect.MakeSlice(reflect.SliceOf(targetType), valueInner.Len(), valueInner.Cap()) + inner.Set(newSlice) + + for i, item := range v.ListItems() { + if err := item.castTo(inner.Index(i).Addr().Interface()); err != nil { + return xerrors.WithStackTrace(fmt.Errorf( + "%w '%s(%+v)' to '%T' destination", + ErrCannotCast, v.Type().Yql(), v, dstValue, + )) + } + } + return nil default: return xerrors.WithStackTrace(fmt.Errorf( @@ -1391,6 +1418,33 @@ func (v *setValue) castTo(dst any) error { case *driver.Value: *dstValue = v + return nil + case interface{}: + ptr := reflect.ValueOf(dstValue) + + inner := reflect.Indirect(ptr) + if inner.Kind() != reflect.Slice && inner.Kind() != reflect.Array { + return xerrors.WithStackTrace(fmt.Errorf( + "%w '%s(%+v)' to '%T' destination", + ErrCannotCast, v.Type().Yql(), v, dstValue, + )) + } + + targetType := inner.Type().Elem() + valueInner := reflect.ValueOf(v.items) + + newSlice := reflect.MakeSlice(reflect.SliceOf(targetType), valueInner.Len(), valueInner.Cap()) + inner.Set(newSlice) + + for i, item := range v.items { + if err := item.castTo(inner.Index(i).Addr().Interface()); err != nil { + return xerrors.WithStackTrace(fmt.Errorf( + "%w '%s(%+v)' to '%T' destination", + ErrCannotCast, v.Type().Yql(), v, dstValue, + )) + } + } + return nil default: return xerrors.WithStackTrace(fmt.Errorf( @@ -1574,6 +1628,27 @@ func (v *structValue) castTo(dst any) error { case *driver.Value: *dstValue = v + return nil + case interface{}: + ptr := reflect.ValueOf(dst) + + inner := reflect.Indirect(ptr) + if inner.Kind() != reflect.Struct { + return xerrors.WithStackTrace(fmt.Errorf( + "%w '%s(%+v)' to '%T' destination", + ErrCannotCast, v.Type().Yql(), v, dstValue, + )) + } + + for i, field := range v.fields { + if err := field.V.castTo(inner.Field(i).Addr().Interface()); err != nil { + return xerrors.WithStackTrace(fmt.Errorf( + "scan error on struct field name '%s': %w", + field.Name, err, + )) + } + } + return nil default: return xerrors.WithStackTrace(fmt.Errorf( diff --git a/internal/value/value_test.go b/internal/value/value_test.go index 2e9537c45..6191b1b9c 100644 --- a/internal/value/value_test.go +++ b/internal/value/value_test.go @@ -1142,6 +1142,145 @@ func TestCastNumbers(t *testing.T) { } } +func TestCastList(t *testing.T) { + for _, tt := range []struct { + v Value + dst interface{} + result interface{} + error bool + }{ + { + v: ListValue(Int32Value(12), Int32Value(21), Int32Value(56)), + dst: func(v []int64) *[]int64 { return &v }([]int64{}), + result: func(v []int64) *[]int64 { return &v }([]int64{12, 21, 56}), + error: false, + }, + { + v: ListValue(Int32Value(12), Int32Value(21), Int32Value(56)), + dst: func(v []int64) *[]int64 { return &v }([]int64{17}), + result: func(v []int64) *[]int64 { return &v }([]int64{12, 21, 56}), + error: false, + }, + { + v: ListValue(BytesValue([]byte("test")), BytesValue([]byte("test2"))), + dst: func(v []string) *[]string { return &v }([]string{}), + result: func(v []string) *[]string { return &v }([]string{"test", "test2"}), + error: false, + }, + { + v: ListValue(BytesValue([]byte("test")), BytesValue([]byte("test2"))), + dst: func(v []string) *[]string { return &v }([]string{"list"}), + result: func(v []string) *[]string { return &v }([]string{"test", "test2"}), + error: false, + }, + } { + t.Run(fmt.Sprintf("%s→%v", tt.v.Type().Yql(), reflect.ValueOf(tt.dst).Type().Elem()), + func(t *testing.T) { + if err := CastTo(tt.v, tt.dst); (err != nil) != tt.error { + t.Errorf("castTo() error = %v, want %v", err, tt.error) + } else if !reflect.DeepEqual(tt.dst, tt.result) { + t.Errorf("castTo() result = %+v, want %+v", + reflect.ValueOf(tt.dst).Elem(), + reflect.ValueOf(tt.result).Elem(), + ) + } + }, + ) + } +} + +func TestCastSet(t *testing.T) { + for _, tt := range []struct { + v Value + dst interface{} + result interface{} + error bool + }{ + { + v: SetValue(Int32Value(12), Int32Value(21), Int32Value(56)), + dst: func(v []int64) *[]int64 { return &v }([]int64{}), + result: func(v []int64) *[]int64 { return &v }([]int64{12, 21, 56}), + error: false, + }, + { + v: SetValue(Int32Value(12), Int32Value(21), Int32Value(56)), + dst: func(v []int64) *[]int64 { return &v }([]int64{17}), + result: func(v []int64) *[]int64 { return &v }([]int64{12, 21, 56}), + error: false, + }, + { + v: SetValue(BytesValue([]byte("test")), BytesValue([]byte("test2"))), + dst: func(v []string) *[]string { return &v }([]string{}), + result: func(v []string) *[]string { return &v }([]string{"test", "test2"}), + error: false, + }, + { + v: SetValue(BytesValue([]byte("test")), BytesValue([]byte("test2"))), + dst: func(v []string) *[]string { return &v }([]string{"list"}), + result: func(v []string) *[]string { return &v }([]string{"test", "test2"}), + error: false, + }, + } { + t.Run(fmt.Sprintf("%s→%v", tt.v.Type().Yql(), reflect.ValueOf(tt.dst).Type().Elem()), + func(t *testing.T) { + if err := CastTo(tt.v, tt.dst); (err != nil) != tt.error { + t.Errorf("castTo() error = %v, want %v", err, tt.error) + } else if !reflect.DeepEqual(tt.dst, tt.result) { + t.Errorf("castTo() result = %+v, want %+v", + reflect.ValueOf(tt.dst).Elem(), + reflect.ValueOf(tt.result).Elem(), + ) + } + }, + ) + } +} + +func TestCastStruct(t *testing.T) { + type defaultStruct struct { + ID int32 `sql:"id"` + Str string `sql:"myStr"` + } + for _, tt := range []struct { + v Value + dst interface{} + result interface{} + error bool + }{ + { + v: StructValue( + StructValueField{Name: "id", V: Int32Value(123)}, + StructValueField{Name: "myStr", V: BytesValue([]byte("myStr123"))}, + ), + dst: func(v defaultStruct) *defaultStruct { return &v }(defaultStruct{1, "myStr1"}), + result: func(v defaultStruct) *defaultStruct { return &v }(defaultStruct{123, "myStr123"}), + error: false, + }, + { + v: StructValue( + StructValueField{Name: "id", V: Int32Value(12)}, + StructValueField{Name: "myStr", V: BytesValue([]byte("myStr12"))}, + ), + dst: func(v defaultStruct) *defaultStruct { return &v }(defaultStruct{}), + result: func(v defaultStruct) *defaultStruct { return &v }(defaultStruct{12, "myStr12"}), + error: false, + }, + } { + t.Run(fmt.Sprintf("%s→%v", tt.v.Type().Yql(), reflect.ValueOf(tt.dst).Type().Elem()), + func(t *testing.T) { + if err := CastTo(tt.v, tt.dst); (err != nil) != tt.error { + t.Errorf("castTo() error = %v, want %v", err, tt.error) + } else if !reflect.DeepEqual(tt.dst, tt.result) { + t.Errorf("castTo() result = %+v, want %+v", + reflect.ValueOf(tt.dst).Elem(), + reflect.ValueOf(tt.result).Elem(), + ) + } + }, + ) + } +} + func TestCastOtherTypes(t *testing.T) { for _, tt := range []struct { v Value diff --git a/internal/version/version.go b/internal/version/version.go index 47ed15c21..53840c6a9 100644 --- a/internal/version/version.go +++ b/internal/version/version.go @@ -2,8 +2,8 @@ package version const ( Major = "3" - Minor = "95" - Patch = "6" + Minor = "96" + Patch = "0" Package = "ydb-go-sdk" ) diff --git a/tests/integration/sugar_unmarhall_result_set_test.go b/tests/integration/sugar_unmarhall_result_set_test.go index 26ef254a4..db1df8239 100644 --- a/tests/integration/sugar_unmarhall_result_set_test.go +++ b/tests/integration/sugar_unmarhall_result_set_test.go @@ -47,6 +47,83 @@ func TestSugarUnmarshallResultSet(t *testing.T) { require.EqualValues(t, 43, many[1].ID) require.EqualValues(t, "myStr43", many[1].Str) }) + t.Run("ListField", func(t *testing.T) { + type myStruct struct { + Ids []int32 `sql:"ids"` + } + + rows, err := db.Query().QueryResultSet(ctx, `SELECT AsList(42, 43) as ids`) + + many, err := sugar.UnmarshallResultSet[myStruct](rows) + require.NoError(t, err) + require.Len(t, many, 1) + require.NotNil(t, many[0]) + require.Len(t, many[0].Ids, 2) + require.EqualValues(t, 42, many[0].Ids[0]) + require.EqualValues(t, 43, many[0].Ids[1]) + }) + t.Run("SetField", func(t *testing.T) { + type myStruct struct { + Ids []int32 `sql:"ids"` + } + + rows, err := db.Query().QueryResultSet(ctx, `SELECT AsSet(42, 43) as ids`) + + many, err := sugar.UnmarshallResultSet[myStruct](rows) + require.NoError(t, err) + require.Len(t, many, 1) + require.NotNil(t, many[0]) + require.Len(t, many[0].Ids, 2) + require.EqualValues(t, 42, many[0].Ids[0]) + require.EqualValues(t, 43, many[0].Ids[1]) + }) + t.Run("StructField", func(t *testing.T) { + type myStructField struct { + ID int32 `sql:"id"` + Str string `sql:"myStr"` + } + type myStruct struct { + ID int32 `sql:"id"` + Str string `sql:"myStr"` + StructField myStructField `sql:"structColumn"` + } + + rows, err := db.Query().QueryResultSet(ctx, ` + SELECT 42 as id, "myStr42" as myStr, AsStruct(22 as id, "myStr22" as myStr) as structColumn + `) + + many, err := sugar.UnmarshallResultSet[myStruct](rows) + require.NoError(t, err) + require.Len(t, many, 1) + require.NotNil(t, many[0]) + require.EqualValues(t, 42, many[0].ID) + require.EqualValues(t, "myStr42", many[0].Str) + require.EqualValues(t, 22, many[0].StructField.ID) + require.EqualValues(t, "myStr22", many[0].StructField.Str) + }) + t.Run("ListOfStructsField", func(t *testing.T) { + type myStructField struct { + ID int32 `sql:"id"` + Str string `sql:"myStr"` + } + type myStruct struct { + Values []myStructField `sql:"values"` + } + + rows, err := db.Query().QueryResultSet(ctx, + `SELECT AsList(AsStruct(22 as id, "myStr22" as myStr), AsStruct(42 as id, "myStr42" as myStr)) as values`, + ) + + many, err := sugar.UnmarshallResultSet[myStruct](rows) + require.NoError(t, err) + require.Len(t, many, 1) + require.NotNil(t, many[0]) + require.Len(t, many[0].Values, 2) + require.EqualValues(t, 22, many[0].Values[0].ID) + require.EqualValues(t, "myStr22", many[0].Values[0].Str) + require.EqualValues(t, 42, many[0].Values[1].ID) + require.EqualValues(t, "myStr42", many[0].Values[1].Str) + }) t.Run("UnexpectedColumn", func(t *testing.T) { type myStruct struct { ID int32 `sql:"id"` diff --git a/topic/topicreader/reader.go b/topic/topicreader/reader.go index 2e8c0fce3..d28a71f40 100644 --- a/topic/topicreader/reader.go +++ b/topic/topicreader/reader.go @@ -59,9 +59,10 @@ type MessageContentUnmarshaler = topicreadercommon.PublicMessageContentUnmarshal // Commit receive Message, Batch of single offset // It can be fast (by default) or sync and waite response from server -// see topicoptions.CommitMode for details -// -// for topicoptions.CommitModeSync mode sync the method can return ErrCommitToExpiredSession +// see topicoptions.CommitMode for details. +// Fast mode of commit (default) - store commit info to internal buffer only and send it to the server later. +// Close the reader for wait to send all commits to the server. +// For topicoptions.CommitModeSync mode sync the method can return ErrCommitToExpiredSession // it means about the message/batch was not committed because connection broken or partition routed to // other reader by server. // Client code should continue work normally @@ -160,9 +161,9 @@ type Batch = topicreadercommon.PublicBatch // ReadBatchOption is type for options of read batch type ReadBatchOption = topicreaderinternal.PublicReadBatchOption -// Close stop work with reader -// return when reader complete internal works, flush commit buffer, ets -// or when ctx cancelled +// Close stop work with reader. +// return when reader complete internal works, flush commit buffer. You should close the Reader after use and before +// exit from a program for prevent lost last commits. func (r *Reader) Close(ctx context.Context) error { // close must be non-concurrent with read and commit