From eb6cb0ce678226abb7c853a952a0aa0a6c466a5d Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Wed, 18 Sep 2024 23:54:36 +0300 Subject: [PATCH] fix linter --- CHANGELOG.md | 2 + .../topicreader_simple_iterators.go | 55 ++++++++++++++++ tests/integration/topic_helpers_test.go | 25 +++++++- topic/topicsugar/cdc-reader.go | 2 +- topic/topicsugar/iterators.go | 62 ++++++++++++++++--- 5 files changed, 137 insertions(+), 9 deletions(-) create mode 100644 examples/topic/topicreader/topicreader_simple_iterators.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 7288cc089..660339e8e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +* refactored experimental topic iterators in topicsugar package + ## v3.80.9 * Fixed bug in experimental api: `ydb.ParamsBuilder().Param().Optional()` receive pointer and really produce optional value. diff --git a/examples/topic/topicreader/topicreader_simple_iterators.go b/examples/topic/topicreader/topicreader_simple_iterators.go new file mode 100644 index 000000000..0b740812b --- /dev/null +++ b/examples/topic/topicreader/topicreader_simple_iterators.go @@ -0,0 +1,55 @@ +//go:build go1.23 + +package topicreaderexamples + +import ( + "context" + "fmt" + + "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader" + "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicsugar" + firestore "google.golang.org/genproto/firestore/bundle" +) + +// IterateOverMessagesAsString is simple example for easy start read messages +// it is not recommend way for heavy-load processing, batch processing usually will faster +func IterateOverMessagesAsString(ctx context.Context, reader *topicreader.Reader) error { + for msg, err := range topicsugar.StringIterator(ctx, reader) { + if err != nil { + return err + } + fmt.Println(msg.Data) + _ = reader.Commit(msg.Context(), msg) + } + + return nil +} + +// IterateOverStructUnmarshalledFromJSON is example for effective way for unmarshal json message content to value +func IterateOverStructUnmarshalledFromJSON(ctx context.Context, r *topicreader.Reader) error { + //nolint:tagliatelle + type S struct { + MyField int `json:"my_field"` + } + + for msg, err := range topicsugar.JSONIterator[S](ctx, r) { + if err != nil { + return err + } + fmt.Println(msg.Data.MyField) + } + + return nil +} + +// IterateOverProtobufMessages is example for effective way for unmarshal protobuf message content to value +func IterateOverProtobufMessages(ctx context.Context, r *topicreader.Reader) error { + for msg, err := range topicsugar.ProtobufIterator[*firestore.BundledDocumentMetadata](ctx, r) { + if err != nil { + return err + } + fmt.Println(msg.Data.GetName()) + } + + return nil +} diff --git a/tests/integration/topic_helpers_test.go b/tests/integration/topic_helpers_test.go index 61a2b0569..84d4d3220 100644 --- a/tests/integration/topic_helpers_test.go +++ b/tests/integration/topic_helpers_test.go @@ -46,6 +46,29 @@ func TestMessageReaderIterator(t *testing.T) { require.Equal(t, []string{"asd", "ddd", "ggg"}, results) } +func TestStringReaderIterator(t *testing.T) { + scope := newScope(t) + ctx := scope.Ctx + + err := scope.TopicWriter().Write(ctx, + topicwriter.Message{Data: strings.NewReader("asd")}, + topicwriter.Message{Data: strings.NewReader("ddd")}, + topicwriter.Message{Data: strings.NewReader("ggg")}, + ) + require.NoError(t, err) + + var results []string + for mess, err := range topicsugar.StringIterator(ctx, scope.TopicReader()) { + require.NoError(t, err) + + results = append(results, mess.Data) + if len(results) == 3 { + break + } + } + require.Equal(t, []string{"asd", "ddd", "ggg"}, results) +} + func TestMessageJsonUnmarshalIterator(t *testing.T) { scope := newScope(t) ctx := scope.Ctx @@ -71,7 +94,7 @@ func TestMessageJsonUnmarshalIterator(t *testing.T) { var results []testStruct expectedSeqno := int64(1) expectedOffset := int64(0) - for mess, err := range topicsugar.TopicUnmarshalJSONIterator[testStruct](ctx, scope.TopicReader()) { + for mess, err := range topicsugar.JSONIterator[testStruct](ctx, scope.TopicReader()) { require.NoError(t, err) require.Equal(t, expectedSeqno, mess.SeqNo) require.Equal(t, expectedOffset, mess.Offset) diff --git a/topic/topicsugar/cdc-reader.go b/topic/topicsugar/cdc-reader.go index 44d86fcd2..232e36f78 100644 --- a/topic/topicsugar/cdc-reader.go +++ b/topic/topicsugar/cdc-reader.go @@ -87,5 +87,5 @@ func UnmarshalCDCStream[T YDBCDCItem[K], K any]( return json.Unmarshal(data, dst) } - return TopicUnmarshalJSONFunc[YDBCDCMessage[T, K]](ctx, reader, unmarshal) + return IteratorFunc[YDBCDCMessage[T, K]](ctx, reader, unmarshal) } diff --git a/topic/topicsugar/iterators.go b/topic/topicsugar/iterators.go index 7f5ee5b2c..e3ff2388e 100644 --- a/topic/topicsugar/iterators.go +++ b/topic/topicsugar/iterators.go @@ -1,21 +1,26 @@ +//go:build go1.23 + package topicsugar import ( "context" "encoding/json" + "slices" + + "google.golang.org/protobuf/proto" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xiter" "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader" ) -// MessageReader is interface for topicreader.Message +// TopicMessageReader is interface for topicreader.Message // // Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental type TopicMessageReader interface { ReadMessage(ctx context.Context) (*topicreader.Message, error) } -// TopicMessagesIterator is typed representation of cdc event +// TopicMessageIterator iterator wrapper over topic reader // // Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental func TopicMessageIterator(ctx context.Context, r TopicMessageReader) xiter.Seq2[*topicreader.Message, error] { @@ -33,10 +38,38 @@ func TopicMessageIterator(ctx context.Context, r TopicMessageReader) xiter.Seq2[ } } -// TopicUnmarshalJSONIterator is typed representation of cdc event +// BytesIterator produce iterator over topic messages with Data as []byte, []byte is content of the message +func BytesIterator( + ctx context.Context, + r TopicMessageReader, +) xiter.Seq2[*TypedTopicMessage[[]byte], error] { + var unmarshalFunc TypedUnmarshalFunc[*[]byte] = func(data []byte, dst *[]byte) error { + *dst = slices.Clone(data) + + return nil + } + + return IteratorFunc[[]byte](ctx, r, unmarshalFunc) +} + +// StringIterator produce iterator over topic messages with Data is string, created from message content +func StringIterator( + ctx context.Context, + r TopicMessageReader, +) xiter.Seq2[*TypedTopicMessage[string], error] { + var unmarshalFunc TypedUnmarshalFunc[*string] = func(data []byte, dst *string) error { + *dst = string(data) + + return nil + } + + return IteratorFunc[string](ctx, r, unmarshalFunc) +} + +// JSONIterator produce iterator over topic messages with Data is T, created unmarshalled from message // // Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental -func TopicUnmarshalJSONIterator[T any]( +func JSONIterator[T any]( ctx context.Context, r TopicMessageReader, ) xiter.Seq2[*TypedTopicMessage[T], error] { @@ -44,13 +77,28 @@ func TopicUnmarshalJSONIterator[T any]( return json.Unmarshal(data, dst) } - return TopicUnmarshalJSONFunc[T](ctx, r, unmarshalFunc) + return IteratorFunc[T](ctx, r, unmarshalFunc) +} + +// ProtobufIterator produce iterator over topic messages with Data is T, created unmarshalled from message +// +// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental +func ProtobufIterator[T proto.Message]( + ctx context.Context, + r TopicMessageReader, +) xiter.Seq2[*TypedTopicMessage[T], error] { + var unmarshalFunc TypedUnmarshalFunc[*T] = func(data []byte, dst *T) error { + return proto.Unmarshal(data, *dst) + } + + return IteratorFunc[T](ctx, r, unmarshalFunc) } -// TopicUnmarshalJSONIterator is typed representation of cdc event +// IteratorFunc produce iterator over topic messages with Data is T, +// created unmarshalled from message by custom function // // Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental -func TopicUnmarshalJSONFunc[T any]( +func IteratorFunc[T any]( ctx context.Context, r TopicMessageReader, f TypedUnmarshalFunc[*T],